Main.c 19.7 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
#include "configuration.h"
10
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

15
16
#define DR_MAX_SIZE 1000000000

iker_martin's avatar
iker_martin committed
17
int work();
18
double iterate(int async_comm);
19
20
double iterate_relaxed(double *time, double *times_stages);
double iterate_rigid(double *time, double *times_stages);
21

22
void init_group_struct(char *argv[], int argc, int myId, int numP);
23
void init_application();
24
void obtain_op_times();
25
26
void free_application_data();

27
void print_general_info(int myId, int grp, int numP);
28
int print_local_results();
29
int print_final_results();
iker_martin's avatar
iker_martin committed
30
int create_out_file(char *nombre, int *ptr, int newstdout);
31

32
33
34

void init_originals();
void init_targets();
35
void user_redistribution(void *args);
36

iker_martin's avatar
iker_martin committed
37
38
configuration *config_file;
group_data *group;
39
results_data *results;
40
MPI_Comm comm, new_comm;
41
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
42

43
int main(int argc, char *argv[]) {
44
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
45
    int req;
46
    int im_child;
47

48
    int num_cpus, num_nodes;
49
    char *nodelist = NULL;
50
    num_cpus = 20; //FIXME NUMERO MAGICO //TODO Usar openMP para obtener el valor con un pragma
51
52
53
    if (argc >= 5) {
      nodelist = argv[3];
      num_nodes = atoi(argv[4]);
54
      num_cpus = num_nodes * num_cpus;
55
56
    }

57
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
iker_martin's avatar
iker_martin committed
58
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
59
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
60
    comm = MPI_COMM_WORLD;
61
    new_comm = MPI_COMM_NULL;
iker_martin's avatar
iker_martin committed
62

63
64
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
65
66
      fflush(stdout);
      MPI_Abort(MPI_COMM_WORLD, -50);
67
68
    }

69
    init_group_struct(argv, argc, myId, numP);
70
    im_child = MAM_Init(ROOT, &comm, argv[0], nodelist, num_cpus, num_nodes, user_redistribution, NULL);
71

72
    if(!im_child) { //TODO REFACTOR Simplificar inicio
73
      init_application();
74
      init_originals();
75

76
      MPI_Barrier(comm);
77
      results->exec_start = MPI_Wtime();
78
79
    }

80
81
82
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
83
    do {
84
85
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));
86

87
      if(group->grp != 0) {
88
        obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
89
        MAM_Retrieve_times(&results->spawn_time[group->grp - 1], &results->sync_time[group->grp - 1], &results->async_time[group->grp - 1], &results->malleability_time[group->grp - 1]);
90
      }
91

92
      if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
93
        MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
94
			config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
95
        MAM_Set_target_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
96

97
        if(group->grp != 0) {
98
99
          malleability_modify_data(&(group->grp), 0, 1, MPI_INT, 1, 0);
          malleability_modify_data(&(group->iter_start), 2, 1, MPI_INT, 1, 0);
100
        }
101
      }
102
103

      res = work();
104

105
      if(res==1) { // Se ha llegado al final de la aplicacion
106
        MPI_Barrier(comm);
107
        results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
108
        print_local_results();
109
      }
110
      
111

112
      reset_results_index(results);
113

114
      group->grp = group->grp + 1;
115
    } while(config_file->n_groups > group->grp);
116

117
118
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
119
    // 
120
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
121

122
    MPI_Barrier(comm);
123
124
125
126
    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

127
    if(group->myId == ROOT && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE) {
128
129
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
130
    free_application_data();
131

132
133
134
135
136
    MPI_Finalize();
    return 0;
}

/*
137
138
139
140
141
142
143
144
145
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
146
147
148
149
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
150
 */
iker_martin's avatar
iker_martin committed
151
int work() {
152
  int iter, maxiter, state, res;
153
  int wait_completed = MAM_CHECK_COMPLETION;
154

155
  maxiter = config_file->groups[group->grp].iters;
156
  state = MAM_NOT_STARTED;
157

158
  res = 0;
159
  for(iter=group->iter_start; iter < maxiter; iter++) {
160
    iterate(state);
161
  }
162

163
  if(config_file->n_groups != group->grp + 1)
164
    MAM_Checkpoint(&state, wait_completed, user_redistribution, NULL);
165

166
  iter = 0;
167
  while(state == MAM_PENDING || state == MAM_USER_PENDING) {
168
    if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
169
      iterate(state);
170
171
      iter++;
      group->iter_start = iter;
172
    } else { wait_completed = MAM_WAIT_COMPLETION; }
173
    MAM_Checkpoint(&state, wait_completed, user_redistribution, NULL);
174
  }
175

176
  if(config_file->n_groups == group->grp + 1) { res=1; }
177
  if(state == MAM_ZOMBIE) res=state;
178
  return res;
179
180
}

181
182
183
184
185
186
187
188
189
190

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
191
192
 * que dura al menos un tiempo determinado por la suma de todas las
 * etapas definidas en la configuracion.
193
 */
194
double iterate(int async_comm) {
195
  double time, *times_stages_aux;
196
  size_t i;
197
198
  double aux = 0;

199
  times_stages_aux = malloc(config_file->n_stages * sizeof(double));
200

201
  if(config_file->rigid_times) {
202
    aux = iterate_rigid(&time, times_stages_aux);
203
  } else {
204
    aux = iterate_relaxed(&time, times_stages_aux);
205
206
  }

207
  // Se esta realizando una redistribucion de datos asincrona
208
  if(async_comm == MAM_PENDING) { 
209
    // TODO Que diferencie entre ambas en el IO
210
    results->iters_async += 1;
211
212
  }

213
  // TODO Pasar el resto de este código a results.c
214
  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
215
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
216
  }
217
  results->iters_time[results->iter_index] = time;
218
  for(i=0; i < config_file->n_stages; i++) {
219
    results->stage_times[i][results->iter_index] = times_stages_aux[i];
220
  }
221
  results->iter_index = results->iter_index + 1;
222
  // TODO Pasar hasta aqui
223

224
  free(times_stages_aux);
225

226
  return aux;
227
228
}

229
230
231

/*
 * Performs an iteration. The gathered times for iterations
232
 * and stages could be IMPRECISE in order to ensure the 
233
234
235
236
237
 * global execution time is precise.
 */
double iterate_relaxed(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;
238
  start_time = MPI_Wtime(); // Imprecise timings
239
240

  for(i=0; i < config_file->n_stages; i++) {
241
    start_time_stage = MPI_Wtime(); 
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

/*
 * Performs an iteration. The gathered times for iterations
 * and stages are ensured to be precise but the global 
 * execution time could be imprecise.
 */
double iterate_rigid(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;

  MPI_Barrier(comm);
  start_time = MPI_Wtime();

  for(i=0; i < config_file->n_stages; i++) {
    start_time_stage = MPI_Wtime();
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
265
    MPI_Barrier(comm);
266
267
268
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

269
  MPI_Barrier(comm);
270
271
272
273
  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

296

297
298
299
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
300
301
int print_local_results() {
  int ptr_local, ptr_out, err;
302
303
  char *file_name;

304
305
  // This function causes an overhead in the recorded time for last group
  compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
306
  if(group->myId == ROOT) {
307
308
    ptr_out = dup(1);

309
310
311
312
313
314
315
316
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
317
    print_iter_results(*results);
318
    print_stage_results(*results, config_file->n_stages);
319
320
    free(file_name);

321
    fflush(stdout);
322
323
    close(1);
    dup(ptr_out);
324
    close(ptr_out);
325
326
327
328
329
330
331
332
333
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
334
  int ptr_global, err, ptr_out;
335
336
337
338
  char *file_name;

  if(group->myId == ROOT) {

339
    if(config_file->n_groups == group->grp) {
340
341
342
343
344
345
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

346
      ptr_out = dup(1);
347
      create_out_file(file_name, &ptr_global, 1);
348
349
      print_config(config_file);
      print_global_results(*results, config_file->n_resizes);
350
      fflush(stdout);
351
      free(file_name);
352
353
354

      close(1);
      dup(ptr_out);
355
356
357
358
359
360
361
362
363
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
364
  group = malloc(sizeof(group_data));
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
383
384
  int i, last_index;

385
386
387
388
389
390
391
392
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

393
  init_config(group->argv[1], &config_file);
394
  results = malloc(sizeof(results_data));
395
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
396
  if(config_file->sdr) {
397
398
399
400
401
402
403
404
405
406
    group->sync_data_groups = config_file->sdr % DR_MAX_SIZE ? config_file->sdr/DR_MAX_SIZE+1 : config_file->sdr/DR_MAX_SIZE;
    group->sync_qty = (int *) malloc(group->sync_data_groups * sizeof(int));
    group->sync_array = (char **) malloc(group->sync_data_groups * sizeof(char *));
    last_index = group->sync_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->sync_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->sync_array[i]), group->sync_qty[i], group->myId, group->numP);
    }
    group->sync_qty[last_index] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->sync_array[last_index]), group->sync_qty[last_index], group->myId, group->numP);
407
  }
408

409
  if(config_file->adr) {
410
411
412
413
414
415
416
417
418
419
    group->async_data_groups = config_file->adr % DR_MAX_SIZE ? config_file->adr/DR_MAX_SIZE+1 : config_file->adr/DR_MAX_SIZE;
    group->async_qty = (int *) malloc(group->async_data_groups * sizeof(int));
    group->async_array = (char **) malloc(group->async_data_groups * sizeof(char *));
    last_index = group->async_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->async_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->async_array[i]), group->async_qty[i], group->myId, group->numP);
    }
    group->async_qty[last_index] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->async_array[last_index]), group->async_qty[last_index], group->myId, group->numP);
420
  }
421
422

  obtain_op_times(1);
423
424
425
426
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
427
428
429
430
431
432
433
434
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
435
 */
436
void obtain_op_times(int compute) {
437
  size_t i;
438
  double time = 0;
439
  for(i=0; i<config_file->n_stages; i++) {
440
    time+=init_stage(config_file, i, *group, comm, compute);
441
  }
442
  if(!compute) {results->wasted_time += time;}
443
444
445
446
447
448
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
449
450
451
452
453
454
455
456
457
  size_t i;

  if(config_file->sdr && group->sync_array != NULL) {
    for(i=0; i<group->sync_data_groups; i++) {
      free(group->sync_array[i]);
      group->sync_array[i] = NULL;
    }
    free(group->sync_qty);
    group->sync_qty = NULL;
458
    free(group->sync_array);
459
460
    group->sync_array = NULL;

461
  }
462
463
464
465
466
467
468
  if(config_file->adr && group->async_array != NULL) {
    for(i=0; i<group->async_data_groups; i++) {
      free(group->async_array[i]);
      group->async_array[i] = NULL;
    }
    free(group->async_qty);
    group->async_qty = NULL;
469
    free(group->async_array);
470
    group->async_array = NULL;
471
  }
472
  MAM_Finalize();
473

474
475
476
  free_results_data(results, config_file->n_stages);
  free(results);

477
  free_config(config_file);
478
  
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}
506
507
508
509
510
511
512
513
514
515
516
517


//======================================================||
//======================================================||
//================ INIT MALLEABILITY ===================||
//======================================================||
//======================================================||

void init_originals() {
  size_t i;

  if(config_file->n_groups > 1) {
518
    MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
519
      config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
520
    MAM_Set_target_number(config_file->groups[group->grp+1].procs);
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539

    malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0);
    malleability_add_data(&run_id, 1, MPI_INT, 1, 0);
    malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);

    if(config_file->sdr) {
      for(i=0; i<group->sync_data_groups; i++) {
        malleability_add_data(group->sync_array[i], group->sync_qty[i], MPI_CHAR, 0, 0);
      }
    }
    if(config_file->adr) {
      for(i=0; i<group->async_data_groups; i++) {
        malleability_add_data(group->async_array[i], group->async_qty[i], MPI_CHAR, 0, 1);
      }
    }
  }
}

void init_targets() {
540
541
  size_t i, entries;
  void *value = NULL;
542

543
544
545
546
  malleability_get_data(&value, 0, 1, 0);
  group->grp = *((int *)value);
  group->grp = group->grp + 1;

547
  recv_config_file(ROOT, new_comm, &config_file);
548
549
550
  results = malloc(sizeof(results_data));
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
  results_comm(results, ROOT, config_file->n_resizes, new_comm);
551

552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
  // TODO Refactor - Que sea una unica funcion
  // Obtiene las variables que van a utilizar los hijos

  malleability_get_data(&value, 1, 1, 0);
  run_id = *((int *)value);
      
  malleability_get_data(&value, 2, 1, 0);
  group->iter_start = *((int *)value);

  if(config_file->sdr) {
    malleability_get_entries(&entries, 0, 0);
    group->sync_qty = (int *) malloc(entries * sizeof(int));
    group->sync_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
      malleability_get_data(&value, i, 0, 0);
      group->sync_array[i] = (char *)value;
      group->sync_qty[i] = DR_MAX_SIZE;
    }
    group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->sync_data_groups = entries;
  }
  if(config_file->adr) {
    malleability_get_entries(&entries, 0, 1);
    group->async_qty = (int *) malloc(entries * sizeof(int));
    group->async_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
      malleability_get_data(&value, i, 0, 1);
      group->async_array[i] = (char *)value;
      group->async_qty[i] = DR_MAX_SIZE;
    }
    group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->async_data_groups = entries;
584
  }
585
}
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604

void user_redistribution(void *args) {
  int commited;
  mam_user_reconf_t user_reconf;


  MAM_Get_Reconf_Info(&user_reconf);
  new_comm = user_reconf.comm;
  if(user_reconf.rank_state == 1) { //FIXME Crear MAM_NEW_RANK?
    init_targets();
  } else {
    send_config_file(config_file, ROOT, new_comm);
    results_comm(results, ROOT, config_file->n_resizes, new_comm);

    print_local_results();
  }

  MAM_Commit(&commited); 
}