Main.c 19.6 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
#include "configuration.h"
10
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

15
16
#define DR_MAX_SIZE 1000000000

iker_martin's avatar
iker_martin committed
17
int work();
18
double iterate(int async_comm);
19
20
double iterate_relaxed(double *time, double *times_stages);
double iterate_rigid(double *time, double *times_stages);
21

22
void init_group_struct(char *argv[], int argc, int myId, int numP);
23
void init_application();
24
void obtain_op_times();
25
26
void free_application_data();

27
void print_general_info(int myId, int grp, int numP);
28
int print_local_results();
29
int print_final_results();
iker_martin's avatar
iker_martin committed
30
int create_out_file(char *nombre, int *ptr, int newstdout);
31

32
33
34
35

void init_originals();
void init_targets();

iker_martin's avatar
iker_martin committed
36
37
configuration *config_file;
group_data *group;
38
results_data *results;
39
MPI_Comm comm, new_comm;
40
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
41

42
int main(int argc, char *argv[]) {
43
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
44
    int req;
45
    int im_child;
46

47
    int num_cpus, num_nodes;
48
    char *nodelist = NULL;
49
    num_cpus = 20; //FIXME NUMERO MAGICO //TODO Usar openMP para obtener el valor con un pragma
50
51
52
    if (argc >= 5) {
      nodelist = argv[3];
      num_nodes = atoi(argv[4]);
53
      num_cpus = num_nodes * num_cpus;
54
55
    }

56
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
iker_martin's avatar
iker_martin committed
57
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
58
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
59
    comm = MPI_COMM_WORLD;
60
    new_comm = MPI_COMM_NULL;
iker_martin's avatar
iker_martin committed
61

62
63
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
64
65
      fflush(stdout);
      MPI_Abort(MPI_COMM_WORLD, -50);
66
67
    }

68
    init_group_struct(argv, argc, myId, numP);
69
    im_child = MAM_Init(ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
70

71
    if(!im_child) { //TODO REFACTOR Simplificar inicio
72
      init_application();
73
      init_originals();
74

75
      MPI_Barrier(comm);
76
      results->exec_start = MPI_Wtime();
77
78
    } else { //Init targets
      init_targets();
79
80
    }

81
82
83
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
84
    do {
85

86
      if(group->grp != 0) {
87
        obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
88
        MAM_Retrieve_times(&results->spawn_time[group->grp - 1], &results->sync_time[group->grp - 1], &results->async_time[group->grp - 1], &results->malleability_time[group->grp - 1]);
89
      }
90

91
      if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
92
        MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
93
			config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
94
        MAM_Set_target_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
95

96
        if(group->grp != 0) {
97
98
          malleability_modify_data(&(group->grp), 0, 1, MPI_INT, 1, 0);
          malleability_modify_data(&(group->iter_start), 2, 1, MPI_INT, 1, 0);
99
        }
100
      }
101
102

      res = work();
103

104
      if(res == MAM_ZOMBIE) break;
105
      if(res==1) { // Se ha llegado al final de la aplicacion
106
        MPI_Barrier(comm);
107
        results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
108
        print_local_results();
109
      }
110

111
      reset_results_index(results);
112

113
114
      group->grp = group->grp + 1;
    } while(config_file->n_groups > group->grp && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE);
115

116
117
118
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //
119
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
120

121
    MPI_Barrier(comm);
122
123
124
125
    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

126
    if(group->myId == ROOT && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE) {
127
128
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
129
    free_application_data();
130

131
132
133
134
135
    MPI_Finalize();
    return 0;
}

/*
136
137
138
139
140
141
142
143
144
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
145
146
147
148
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
149
 */
iker_martin's avatar
iker_martin committed
150
int work() {
151
  int iter, maxiter, state, res, commited;
152
  int wait_completed = MAM_CHECK_COMPLETION;
153

154
  maxiter = config_file->groups[group->grp].iters;
155
  state = MAM_NOT_STARTED;
156

157
  res = 0;
158
  for(iter=group->iter_start; iter < maxiter; iter++) {
159
    iterate(state);
160
  }
161

162
  if(config_file->n_groups != group->grp + 1)
163
    MAM_Checkpoint(&state, wait_completed);
164

165
  iter = 0;
166
  while(state == MAM_PENDING) {
167
    if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
168
      iterate(state);
169
170
      iter++;
      group->iter_start = iter;
171
    } else { wait_completed = MAM_WAIT_COMPLETION; }
172
    MAM_Checkpoint(&state, wait_completed);
173
  }
174

175
176
177
178
  // This function causes an overhead in the recorded time for last group
  compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
  if(config_file->n_groups == group->grp + 1) { res=1; }
  else {
179
    MAM_Get_comm(&new_comm);
180
    send_config_file(config_file, ROOT, new_comm);
181
    results_comm(results, ROOT, config_file->n_resizes, new_comm);
182
183
184
185
    print_local_results();
    MAM_Commit(&commited, &comm); 
    MPI_Comm_size(comm, &(group->numP));
    MPI_Comm_rank(comm, &(group->myId));
186
  }
187
  if(state == MAM_ZOMBIE) res=state;
188
  return res;
189
190
}

191
192
193
194
195
196
197
198
199
200

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
201
202
 * que dura al menos un tiempo determinado por la suma de todas las
 * etapas definidas en la configuracion.
203
 */
204
double iterate(int async_comm) {
205
  double time, *times_stages_aux;
206
  size_t i;
207
208
  double aux = 0;

209
  times_stages_aux = malloc(config_file->n_stages * sizeof(double));
210

211
  if(config_file->rigid_times) {
212
    aux = iterate_rigid(&time, times_stages_aux);
213
  } else {
214
    aux = iterate_relaxed(&time, times_stages_aux);
215
216
  }

217
  // Se esta realizando una redistribucion de datos asincrona
218
  if(async_comm == MAM_PENDING) { 
219
    // TODO Que diferencie entre ambas en el IO
220
    results->iters_async += 1;
221
222
  }

223
  // TODO Pasar el resto de este código a results.c
224
  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
225
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
226
  }
227
  results->iters_time[results->iter_index] = time;
228
  for(i=0; i < config_file->n_stages; i++) {
229
    results->stage_times[i][results->iter_index] = times_stages_aux[i];
230
  }
231
  results->iter_index = results->iter_index + 1;
232
  // TODO Pasar hasta aqui
233

234
  free(times_stages_aux);
235

236
  return aux;
237
238
}

239
240
241

/*
 * Performs an iteration. The gathered times for iterations
242
 * and stages could be IMPRECISE in order to ensure the 
243
244
245
246
247
 * global execution time is precise.
 */
double iterate_relaxed(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;
248
  start_time = MPI_Wtime(); // Imprecise timings
249
250

  for(i=0; i < config_file->n_stages; i++) {
251
    start_time_stage = MPI_Wtime(); 
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

/*
 * Performs an iteration. The gathered times for iterations
 * and stages are ensured to be precise but the global 
 * execution time could be imprecise.
 */
double iterate_rigid(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;

  MPI_Barrier(comm);
  start_time = MPI_Wtime();

  for(i=0; i < config_file->n_stages; i++) {
    start_time_stage = MPI_Wtime();
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
275
    MPI_Barrier(comm);
276
277
278
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

279
  MPI_Barrier(comm);
280
281
282
283
  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

306

307
308
309
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
310
311
int print_local_results() {
  int ptr_local, ptr_out, err;
312
313
314
  char *file_name;

  if(group->myId == ROOT) {
315
316
    ptr_out = dup(1);

317
318
319
320
321
322
323
324
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
325
    print_iter_results(*results);
326
    print_stage_results(*results, config_file->n_stages);
327
328
    free(file_name);

329
    fflush(stdout);
330
331
    close(1);
    dup(ptr_out);
332
    close(ptr_out);
333
334
335
336
337
338
339
340
341
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
342
  int ptr_global, err, ptr_out;
343
344
345
346
  char *file_name;

  if(group->myId == ROOT) {

347
    if(config_file->n_groups == group->grp) {
348
349
350
351
352
353
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

354
      ptr_out = dup(1);
355
      create_out_file(file_name, &ptr_global, 1);
356
357
      print_config(config_file);
      print_global_results(*results, config_file->n_resizes);
358
      fflush(stdout);
359
      free(file_name);
360
361
362

      close(1);
      dup(ptr_out);
363
364
365
366
367
368
369
370
371
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
372
  group = malloc(sizeof(group_data));
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
391
392
  int i, last_index;

393
394
395
396
397
398
399
400
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

401
  init_config(group->argv[1], &config_file);
402
  results = malloc(sizeof(results_data));
403
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
404
  if(config_file->sdr) {
405
406
407
408
409
410
411
412
413
414
    group->sync_data_groups = config_file->sdr % DR_MAX_SIZE ? config_file->sdr/DR_MAX_SIZE+1 : config_file->sdr/DR_MAX_SIZE;
    group->sync_qty = (int *) malloc(group->sync_data_groups * sizeof(int));
    group->sync_array = (char **) malloc(group->sync_data_groups * sizeof(char *));
    last_index = group->sync_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->sync_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->sync_array[i]), group->sync_qty[i], group->myId, group->numP);
    }
    group->sync_qty[last_index] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->sync_array[last_index]), group->sync_qty[last_index], group->myId, group->numP);
415
  }
416

417
  if(config_file->adr) {
418
419
420
421
422
423
424
425
426
427
    group->async_data_groups = config_file->adr % DR_MAX_SIZE ? config_file->adr/DR_MAX_SIZE+1 : config_file->adr/DR_MAX_SIZE;
    group->async_qty = (int *) malloc(group->async_data_groups * sizeof(int));
    group->async_array = (char **) malloc(group->async_data_groups * sizeof(char *));
    last_index = group->async_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->async_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->async_array[i]), group->async_qty[i], group->myId, group->numP);
    }
    group->async_qty[last_index] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->async_array[last_index]), group->async_qty[last_index], group->myId, group->numP);
428
  }
429
430

  obtain_op_times(1);
431
432
433
434
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
435
436
437
438
439
440
441
442
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
443
 */
444
void obtain_op_times(int compute) {
445
  size_t i;
446
  double time = 0;
447
  for(i=0; i<config_file->n_stages; i++) {
448
    time+=init_stage(config_file, i, *group, comm, compute);
449
  }
450
  if(!compute) {results->wasted_time += time;}
451
452
453
454
455
456
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
457
458
459
460
461
462
463
464
465
  size_t i;

  if(config_file->sdr && group->sync_array != NULL) {
    for(i=0; i<group->sync_data_groups; i++) {
      free(group->sync_array[i]);
      group->sync_array[i] = NULL;
    }
    free(group->sync_qty);
    group->sync_qty = NULL;
466
    free(group->sync_array);
467
468
    group->sync_array = NULL;

469
  }
470
471
472
473
474
475
476
  if(config_file->adr && group->async_array != NULL) {
    for(i=0; i<group->async_data_groups; i++) {
      free(group->async_array[i]);
      group->async_array[i] = NULL;
    }
    free(group->async_qty);
    group->async_qty = NULL;
477
    free(group->async_array);
478
    group->async_array = NULL;
479
  }
480
  MAM_Finalize();
481

482
483
484
  free_results_data(results, config_file->n_stages);
  free(results);

485
  free_config(config_file);
486
  
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}
514
515
516
517
518
519
520
521
522
523
524
525


//======================================================||
//======================================================||
//================ INIT MALLEABILITY ===================||
//======================================================||
//======================================================||

void init_originals() {
  size_t i;

  if(config_file->n_groups > 1) {
526
    MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
527
      config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
528
    MAM_Set_target_number(config_file->groups[group->grp+1].procs);
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547

    malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0);
    malleability_add_data(&run_id, 1, MPI_INT, 1, 0);
    malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);

    if(config_file->sdr) {
      for(i=0; i<group->sync_data_groups; i++) {
        malleability_add_data(group->sync_array[i], group->sync_qty[i], MPI_CHAR, 0, 0);
      }
    }
    if(config_file->adr) {
      for(i=0; i<group->async_data_groups; i++) {
        malleability_add_data(group->async_array[i], group->async_qty[i], MPI_CHAR, 0, 1);
      }
    }
  }
}

void init_targets() {
548
  int commited;
549
550
  size_t i, entries;
  void *value = NULL;
551

552
  MAM_Get_comm(&new_comm);
553
554
555
556
557

  malleability_get_data(&value, 0, 1, 0);
  group->grp = *((int *)value);
  group->grp = group->grp + 1;

558
  recv_config_file(ROOT, new_comm, &config_file);
559
560
561
  results = malloc(sizeof(results_data));
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
  results_comm(results, ROOT, config_file->n_resizes, new_comm);
562
563
  MAM_Commit(&commited, &comm);

564
565
566
  MPI_Comm_size(comm, &(group->numP));
  MPI_Comm_rank(comm, &(group->myId));

567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
  // TODO Refactor - Que sea una unica funcion
  // Obtiene las variables que van a utilizar los hijos

  malleability_get_data(&value, 1, 1, 0);
  run_id = *((int *)value);
      
  malleability_get_data(&value, 2, 1, 0);
  group->iter_start = *((int *)value);

  if(config_file->sdr) {
    malleability_get_entries(&entries, 0, 0);
    group->sync_qty = (int *) malloc(entries * sizeof(int));
    group->sync_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
      malleability_get_data(&value, i, 0, 0);
      group->sync_array[i] = (char *)value;
      group->sync_qty[i] = DR_MAX_SIZE;
    }
    group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->sync_data_groups = entries;
  }
  if(config_file->adr) {
    malleability_get_entries(&entries, 0, 1);
    group->async_qty = (int *) malloc(entries * sizeof(int));
    group->async_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
      malleability_get_data(&value, i, 0, 1);
      group->async_array[i] = (char *)value;
      group->async_qty[i] = DR_MAX_SIZE;
    }
    group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->async_data_groups = entries;
599
  }
600
}