Main.c 19.9 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
#include "configuration.h"
10
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

15
16
#define DR_MAX_SIZE 1000000000

iker_martin's avatar
iker_martin committed
17
int work();
18
double iterate(int async_comm);
19
20
double iterate_relaxed(double *time, double *times_stages);
double iterate_rigid(double *time, double *times_stages);
21

22
void init_group_struct(char *argv[], int argc, int myId, int numP);
23
void init_application();
24
void obtain_op_times();
25
26
void free_application_data();

27
void print_general_info(int myId, int grp, int numP);
28
int print_local_results();
29
int print_final_results();
iker_martin's avatar
iker_martin committed
30
int create_out_file(char *nombre, int *ptr, int newstdout);
31

32
33
34
35

void init_originals();
void init_targets();

iker_martin's avatar
iker_martin committed
36
37
configuration *config_file;
group_data *group;
38
results_data *results;
39
MPI_Comm comm, new_comm;
40
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
41

42
int main(int argc, char *argv[]) {
43
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
44
    int req;
45
    int im_child;
46

47
    int num_cpus, num_nodes;
48
    char *nodelist = NULL;
49
    num_cpus = 20; //FIXME NUMERO MAGICO //TODO Usar openMP para obtener el valor con un pragma
50
51
52
    if (argc >= 5) {
      nodelist = argv[3];
      num_nodes = atoi(argv[4]);
53
      num_cpus = num_nodes * num_cpus;
54
55
    }

56
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
iker_martin's avatar
iker_martin committed
57
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
58
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
59
    comm = MPI_COMM_WORLD;
60
    new_comm = MPI_COMM_NULL;
iker_martin's avatar
iker_martin committed
61

62
63
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
64
65
      fflush(stdout);
      MPI_Abort(MPI_COMM_WORLD, -50);
66
67
    }

68
    init_group_struct(argv, argc, myId, numP);
69
    im_child = MAM_Init(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
70

71
    if(!im_child) { //TODO REFACTOR Simplificar inicio
72
      init_application();
73
      init_originals();
74

75
      MPI_Barrier(comm);
76
      results->exec_start = MPI_Wtime();
77
78
    } else { //Init targets
      init_targets();
79
80
    }

81
82
83
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
84
    do {
85
86
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));
87

88
      if(group->grp != 0) {
89
        obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
90
        MAM_Retrieve_times(&results->spawn_time[group->grp - 1], &results->sync_time[group->grp - 1], &results->async_time[group->grp - 1], &results->malleability_time[group->grp - 1]);
91
      }
92

93
      if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
94
        MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
95
			config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
96
        MAM_Set_target_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
97

98
        if(group->grp != 0) {
99
100
          malleability_modify_data(&(group->grp), 0, 1, MPI_INT, 1, 0);
          malleability_modify_data(&(group->iter_start), 2, 1, MPI_INT, 1, 0);
101
        }
102
      }
103
104

      res = work();
105

106
      if(res == MAM_ZOMBIE) break;
107
      if(res==1) { // Se ha llegado al final de la aplicacion
108
        MPI_Barrier(comm);
109
110
        results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
      }
111

112
      print_local_results();
113
      reset_results_index(results);
114

115
116
117
118
119
120
      if(res!=1) {
        if(comm != MPI_COMM_WORLD) MPI_Comm_free(&comm);
        comm = new_comm;
      }
      group->grp = group->grp + 1;
    } while(config_file->n_groups > group->grp && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE);
121

122
123
124
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //
125
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
126

127
    MPI_Barrier(comm);
128
129
130
131
    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

132
    if(group->myId == ROOT && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE) {
133
134
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
135
    free_application_data();
136

137
138
139
140
141
    MPI_Finalize();
    return 0;
}

/*
142
143
144
145
146
147
148
149
150
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
151
152
153
154
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
155
 */
iker_martin's avatar
iker_martin committed
156
int work() {
157
  int iter, maxiter, state, res, commited, targets_qty;
158
  int wait_completed = MAM_CHECK_COMPLETION;
159

160
  maxiter = config_file->groups[group->grp].iters;
161
  state = MAM_NOT_STARTED;
162

163
  res = 0;
164
  for(iter=group->iter_start; iter < maxiter; iter++) {
165
    iterate(state);
166
  }
167

168
  if(config_file->n_groups != group->grp + 1)
169
    MAM_Checkpoint(&state, wait_completed);
170

171
  iter = 0;
172
  while(state == MAM_PENDING) {
173
    if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
174
      iterate(state);
175
176
      iter++;
      group->iter_start = iter;
177
    } else { wait_completed = MAM_WAIT_COMPLETION; }
178
    MAM_Checkpoint(&state, wait_completed);
179
  }
180

181
182
183
184
185
186
  // This function causes an overhead in the recorded time for last group
  compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
  if(config_file->n_groups == group->grp + 1) { res=1; }
  else {
    MAM_Get_comm(&new_comm, &targets_qty);
    send_config_file(config_file, ROOT, new_comm);
187
    results_comm(results, ROOT, config_file->n_resizes, new_comm);
188
189
190
    MPI_Comm_free(&new_comm);
    MAM_Commit(&commited, &new_comm); 
  }
191
  if(state == MAM_ZOMBIE) res=state;
192
  return res;
193
194
}

195
196
197
198
199
200
201
202
203
204

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
205
206
 * que dura al menos un tiempo determinado por la suma de todas las
 * etapas definidas en la configuracion.
207
 */
208
double iterate(int async_comm) {
209
  double time, *times_stages_aux;
210
  size_t i;
211
212
  double aux = 0;

213
  times_stages_aux = malloc(config_file->n_stages * sizeof(double));
214

215
  if(config_file->rigid_times) {
216
    aux = iterate_rigid(&time, times_stages_aux);
217
  } else {
218
    aux = iterate_relaxed(&time, times_stages_aux);
219
220
  }

221
  // Se esta realizando una redistribucion de datos asincrona
222
  if(async_comm == MAM_PENDING) { 
223
  // TODO Que diferencie entre ambas en el IO
224
    results->iters_async += 1;
225
226
  }

227
  // TODO Pasar el resto de este código a results.c
228
  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
229
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
230
  }
231
  results->iters_time[results->iter_index] = time;
232
  for(i=0; i < config_file->n_stages; i++) {
233
    results->stage_times[i][results->iter_index] = times_stages_aux[i];
234
  }
235
  results->iter_index = results->iter_index + 1;
236
  // TODO Pasar hasta aqui
237

238
  free(times_stages_aux);
239

240
  return aux;
241
242
}

243
244
245

/*
 * Performs an iteration. The gathered times for iterations
246
 * and stages could be IMPRECISE in order to ensure the 
247
248
249
250
251
 * global execution time is precise.
 */
double iterate_relaxed(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;
252
  start_time = MPI_Wtime(); // Imprecise timings
253
254

  for(i=0; i < config_file->n_stages; i++) {
255
    start_time_stage = MPI_Wtime(); 
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

/*
 * Performs an iteration. The gathered times for iterations
 * and stages are ensured to be precise but the global 
 * execution time could be imprecise.
 */
double iterate_rigid(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;

  MPI_Barrier(comm);
  start_time = MPI_Wtime();

  for(i=0; i < config_file->n_stages; i++) {
    start_time_stage = MPI_Wtime();
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
279
    MPI_Barrier(comm);
280
281
282
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

283
  MPI_Barrier(comm);
284
285
286
287
  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

310

311
312
313
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
314
315
int print_local_results() {
  int ptr_local, ptr_out, err;
316
317
  char *file_name;

318
  //compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
319
  if(group->myId == ROOT) {
320
321
    ptr_out = dup(1);

322
323
324
325
326
327
328
329
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
330
    print_iter_results(*results);
331
    print_stage_results(*results, config_file->n_stages);
332
333
    free(file_name);

334
    fflush(stdout);
335
336
    close(1);
    dup(ptr_out);
337
    close(ptr_out);
338
339
340
341
342
343
344
345
346
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
347
  int ptr_global, err, ptr_out;
348
349
350
351
  char *file_name;

  if(group->myId == ROOT) {

352
    if(config_file->n_groups == group->grp) {
353
354
355
356
357
358
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

359
      ptr_out = dup(1);
360
      create_out_file(file_name, &ptr_global, 1);
361
362
      print_config(config_file);
      print_global_results(*results, config_file->n_resizes);
363
      fflush(stdout);
364
      free(file_name);
365
366
367

      close(1);
      dup(ptr_out);
368
369
370
371
372
373
374
375
376
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
377
  group = malloc(sizeof(group_data));
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
396
397
  int i, last_index;

398
399
400
401
402
403
404
405
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

406
  init_config(group->argv[1], &config_file);
407
  results = malloc(sizeof(results_data));
408
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
409
  if(config_file->sdr) {
410
411
412
413
414
415
416
417
418
419
    group->sync_data_groups = config_file->sdr % DR_MAX_SIZE ? config_file->sdr/DR_MAX_SIZE+1 : config_file->sdr/DR_MAX_SIZE;
    group->sync_qty = (int *) malloc(group->sync_data_groups * sizeof(int));
    group->sync_array = (char **) malloc(group->sync_data_groups * sizeof(char *));
    last_index = group->sync_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->sync_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->sync_array[i]), group->sync_qty[i], group->myId, group->numP);
    }
    group->sync_qty[last_index] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->sync_array[last_index]), group->sync_qty[last_index], group->myId, group->numP);
420
  }
421

422
  if(config_file->adr) {
423
424
425
426
427
428
429
430
431
432
    group->async_data_groups = config_file->adr % DR_MAX_SIZE ? config_file->adr/DR_MAX_SIZE+1 : config_file->adr/DR_MAX_SIZE;
    group->async_qty = (int *) malloc(group->async_data_groups * sizeof(int));
    group->async_array = (char **) malloc(group->async_data_groups * sizeof(char *));
    last_index = group->async_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->async_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->async_array[i]), group->async_qty[i], group->myId, group->numP);
    }
    group->async_qty[last_index] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->async_array[last_index]), group->async_qty[last_index], group->myId, group->numP);
433
  }
434
435

  obtain_op_times(1);
436
437
438
439
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
440
441
442
443
444
445
446
447
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
448
 */
449
void obtain_op_times(int compute) {
450
  size_t i;
451
  double time = 0;
452
  for(i=0; i<config_file->n_stages; i++) {
453
    time+=init_stage(config_file, i, *group, comm, compute);
454
  }
455
  if(!compute) {results->wasted_time += time;}
456
457
458
459
460
461
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
462
463
464
465
466
467
468
469
470
  size_t i;

  if(config_file->sdr && group->sync_array != NULL) {
    for(i=0; i<group->sync_data_groups; i++) {
      free(group->sync_array[i]);
      group->sync_array[i] = NULL;
    }
    free(group->sync_qty);
    group->sync_qty = NULL;
471
    free(group->sync_array);
472
473
    group->sync_array = NULL;

474
  }
475
476
477
478
479
480
481
  if(config_file->adr && group->async_array != NULL) {
    for(i=0; i<group->async_data_groups; i++) {
      free(group->async_array[i]);
      group->async_array[i] = NULL;
    }
    free(group->async_qty);
    group->async_qty = NULL;
482
    free(group->async_array);
483
    group->async_array = NULL;
484
  }
485
  MAM_Finalize();
486

487
488
489
  free_results_data(results, config_file->n_stages);
  free(results);

490
  free_config(config_file);
491
  
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}
519
520
521
522
523
524
525
526
527
528
529
530


//======================================================||
//======================================================||
//================ INIT MALLEABILITY ===================||
//======================================================||
//======================================================||

void init_originals() {
  size_t i;

  if(config_file->n_groups > 1) {
531
    MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
532
      config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
533
    MAM_Set_target_number(config_file->groups[group->grp+1].procs);
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553

    malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0);
    malleability_add_data(&run_id, 1, MPI_INT, 1, 0);
    malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);

    if(config_file->sdr) {
      for(i=0; i<group->sync_data_groups; i++) {
        malleability_add_data(group->sync_array[i], group->sync_qty[i], MPI_CHAR, 0, 0);
      }
    }
    if(config_file->adr) {
      for(i=0; i<group->async_data_groups; i++) {
        malleability_add_data(group->async_array[i], group->async_qty[i], MPI_CHAR, 0, 1);
      }
    }
  }
}

void init_targets() {
  int commited, targets_qty;
554
555
  size_t i, entries;
  void *value = NULL;
556
557

  MAM_Get_comm(&new_comm, &targets_qty);
558
559
560
561
562

  malleability_get_data(&value, 0, 1, 0);
  group->grp = *((int *)value);
  group->grp = group->grp + 1;

563
  recv_config_file(ROOT, new_comm, &config_file);
564
565
566
  results = malloc(sizeof(results_data));
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
  results_comm(results, ROOT, config_file->n_resizes, new_comm);
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
  MPI_Comm_free(&new_comm);
  
  MAM_Commit(&commited, &comm);

  // TODO Refactor - Que sea una unica funcion
  // Obtiene las variables que van a utilizar los hijos

  malleability_get_data(&value, 1, 1, 0);
  run_id = *((int *)value);
      
  malleability_get_data(&value, 2, 1, 0);
  group->iter_start = *((int *)value);

  if(config_file->sdr) {
    malleability_get_entries(&entries, 0, 0);
    group->sync_qty = (int *) malloc(entries * sizeof(int));
    group->sync_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
      malleability_get_data(&value, i, 0, 0);
      group->sync_array[i] = (char *)value;
      group->sync_qty[i] = DR_MAX_SIZE;
    }
    group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->sync_data_groups = entries;
  }
  if(config_file->adr) {
    malleability_get_entries(&entries, 0, 1);
    group->async_qty = (int *) malloc(entries * sizeof(int));
    group->async_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
      malleability_get_data(&value, i, 0, 1);
      group->async_array[i] = (char *)value;
      group->async_qty[i] = DR_MAX_SIZE;
    }
    group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->async_data_groups = entries;
603
  }
604
}