Main.c 20 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
#include "configuration.h"
10
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

15
16
#define DR_MAX_SIZE 1000000000

iker_martin's avatar
iker_martin committed
17
int work();
18
double iterate(int async_comm);
19
20
double iterate_relaxed(double *time, double *times_stages);
double iterate_rigid(double *time, double *times_stages);
21

22
void init_group_struct(char *argv[], int argc, int myId, int numP);
23
void init_application();
24
void obtain_op_times();
25
26
void free_application_data();

27
void print_general_info(int myId, int grp, int numP);
28
int print_local_results();
29
int print_final_results();
iker_martin's avatar
iker_martin committed
30
int create_out_file(char *nombre, int *ptr, int newstdout);
31

32
33
34
35

void init_originals();
void init_targets();

iker_martin's avatar
iker_martin committed
36
37
configuration *config_file;
group_data *group;
38
results_data *results;
39
MPI_Comm comm, new_comm;
40
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
41

42
int main(int argc, char *argv[]) {
43
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
44
    int req;
45
    int im_child;
46

47
    int num_cpus, num_nodes;
48
    char *nodelist = NULL;
49
    num_cpus = 20; //FIXME NUMERO MAGICO //TODO Usar openMP para obtener el valor con un pragma
50
51
52
    if (argc >= 5) {
      nodelist = argv[3];
      num_nodes = atoi(argv[4]);
53
      num_cpus = num_nodes * num_cpus;
54
55
    }

56
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
iker_martin's avatar
iker_martin committed
57
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
58
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
59
    comm = MPI_COMM_WORLD;
60
    new_comm = MPI_COMM_NULL;
iker_martin's avatar
iker_martin committed
61

62
63
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
64
65
      fflush(stdout);
      MPI_Abort(MPI_COMM_WORLD, -50);
66
67
    }

68
    init_group_struct(argv, argc, myId, numP);
69
    im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
70

71
    if(!im_child) { //TODO REFACTOR Simplificar inicio
72
      init_application();
73
      init_originals();
74

75
      MPI_Barrier(comm);
76
      results->exec_start = MPI_Wtime();
77
78
    } else { //Init targets
      init_targets();
79
80
    }

81
82
83
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
84
    do {
85
86
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));
87

88
      if(group->grp != 0) {
89
        obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
90
        malleability_retrieve_times(&results->spawn_time[group->grp - 1], &results->sync_time[group->grp - 1], &results->async_time[group->grp - 1], &results->malleability_time[group->grp - 1]);
91
      }
92

93
      if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
94
        set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
95
			config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
96
        set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
97

98
        if(group->grp != 0) {
99
100
          malleability_modify_data(&(group->grp), 0, 1, MPI_INT, 1, 0);
          malleability_modify_data(&(group->iter_start), 2, 1, MPI_INT, 1, 0);
101
        }
102
      }
103
104

      res = work();
105

106
      if(res == MAM_ZOMBIE) break;
107
      if(res==1) { // Se ha llegado al final de la aplicacion
108
        MPI_Barrier(comm);
109
110
        results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
      }
111

112
      print_local_results();
113
      reset_results_index(results);
114

115
116
117
118
119
120
      if(res!=1) {
        if(comm != MPI_COMM_WORLD) MPI_Comm_free(&comm);
        comm = new_comm;
      }
      group->grp = group->grp + 1;
    } while(config_file->n_groups > group->grp && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE);
121

122
123
124
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //
125
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
126

127
    MPI_Barrier(comm);
128
129
130
131
    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

132
    if(group->myId == ROOT && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE) {
133
134
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
135
    free_application_data();
136

137
138
139
140
141
    MPI_Finalize();
    return 0;
}

/*
142
143
144
145
146
147
148
149
150
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
151
152
153
154
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
155
 */
iker_martin's avatar
iker_martin committed
156
int work() {
157
  int iter, maxiter, state, res, commited, targets_qty;
158
  int wait_completed = MAM_CHECK_COMPLETION;
159

160
  maxiter = config_file->groups[group->grp].iters;
161
  state = MAM_NOT_STARTED;
162

163
  res = 0;
164
  for(iter=group->iter_start; iter < maxiter; iter++) {
165
    iterate(state);
166
  }
167

168
  if(config_file->n_groups != group->grp + 1)
169
    malleability_checkpoint(&state, wait_completed);
170

171
  iter = 0;
172
  while(state == MAM_PENDING) {
173
    if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
174
      iterate(state);
175
176
      iter++;
      group->iter_start = iter;
177
178
    } else { wait_completed = MAM_WAIT_COMPLETION; }
    malleability_checkpoint(&state, wait_completed);
179
  }
180

181
182
183
184
185
186
187
188
189
  // This function causes an overhead in the recorded time for last group
  compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
  if(config_file->n_groups == group->grp + 1) { res=1; }
  else {
    MAM_Get_comm(&new_comm, &targets_qty);
    send_config_file(config_file, ROOT, new_comm);
    MPI_Comm_free(&new_comm);
    MAM_Commit(&commited, &new_comm); 
  }
190
  if(state == MAM_ZOMBIE) res=state;
191
  return res;
192
193
}

194
195
196
197
198
199
200
201
202
203

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
204
205
 * que dura al menos un tiempo determinado por la suma de todas las
 * etapas definidas en la configuracion.
206
 */
207
double iterate(int async_comm) {
208
  double time, *times_stages_aux;
209
  size_t i;
210
211
  double aux = 0;

212
  times_stages_aux = malloc(config_file->n_stages * sizeof(double));
213

214
  if(config_file->rigid_times) {
215
    aux = iterate_rigid(&time, times_stages_aux);
216
  } else {
217
    aux = iterate_relaxed(&time, times_stages_aux);
218
219
  }

220
  // Se esta realizando una redistribucion de datos asincrona
221
  if(async_comm == MAM_PENDING) { 
222
  // TODO Que diferencie entre ambas en el IO
223
    results->iters_async += 1;
224
225
  }

226
  // TODO Pasar el resto de este código a results.c
227
  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
228
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
229
  }
230
  results->iters_time[results->iter_index] = time;
231
  for(i=0; i < config_file->n_stages; i++) {
232
    results->stage_times[i][results->iter_index] = times_stages_aux[i];
233
  }
234
  results->iter_index = results->iter_index + 1;
235
  // TODO Pasar hasta aqui
236

237
  free(times_stages_aux);
238

239
  return aux;
240
241
}

242
243
244

/*
 * Performs an iteration. The gathered times for iterations
245
 * and stages could be IMPRECISE in order to ensure the 
246
247
248
249
250
 * global execution time is precise.
 */
double iterate_relaxed(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;
251
  start_time = MPI_Wtime(); // Imprecise timings
252
253

  for(i=0; i < config_file->n_stages; i++) {
254
    start_time_stage = MPI_Wtime(); 
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

/*
 * Performs an iteration. The gathered times for iterations
 * and stages are ensured to be precise but the global 
 * execution time could be imprecise.
 */
double iterate_rigid(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;

  MPI_Barrier(comm);
  start_time = MPI_Wtime();

  for(i=0; i < config_file->n_stages; i++) {
    start_time_stage = MPI_Wtime();
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
278
    MPI_Barrier(comm);
279
280
281
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

282
  MPI_Barrier(comm);
283
284
285
286
  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

309

310
311
312
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
313
314
int print_local_results() {
  int ptr_local, ptr_out, err;
315
316
  char *file_name;

317
  //compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
318
  if(group->myId == ROOT) {
319
320
    ptr_out = dup(1);

321
322
323
324
325
326
327
328
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
329
    print_iter_results(*results);
330
    print_stage_results(*results, config_file->n_stages);
331
332
    free(file_name);

333
    fflush(stdout);
334
335
    close(1);
    dup(ptr_out);
336
    close(ptr_out);
337
338
339
340
341
342
343
344
345
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
346
  int ptr_global, err, ptr_out;
347
348
349
350
  char *file_name;

  if(group->myId == ROOT) {

351
    if(config_file->n_groups == group->grp) {
352
353
354
355
356
357
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

358
      ptr_out = dup(1);
359
      create_out_file(file_name, &ptr_global, 1);
360
361
      print_config(config_file);
      print_global_results(*results, config_file->n_resizes);
362
      fflush(stdout);
363
      free(file_name);
364
365
366

      close(1);
      dup(ptr_out);
367
368
369
370
371
372
373
374
375
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
376
  group = malloc(sizeof(group_data));
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
395
396
  int i, last_index;

397
398
399
400
401
402
403
404
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

405
  init_config(group->argv[1], &config_file);
406
  results = malloc(sizeof(results_data));
407
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
408
  if(config_file->sdr) {
409
410
411
412
413
414
415
416
417
418
    group->sync_data_groups = config_file->sdr % DR_MAX_SIZE ? config_file->sdr/DR_MAX_SIZE+1 : config_file->sdr/DR_MAX_SIZE;
    group->sync_qty = (int *) malloc(group->sync_data_groups * sizeof(int));
    group->sync_array = (char **) malloc(group->sync_data_groups * sizeof(char *));
    last_index = group->sync_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->sync_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->sync_array[i]), group->sync_qty[i], group->myId, group->numP);
    }
    group->sync_qty[last_index] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->sync_array[last_index]), group->sync_qty[last_index], group->myId, group->numP);
419
  }
420

421
  if(config_file->adr) {
422
423
424
425
426
427
428
429
430
431
    group->async_data_groups = config_file->adr % DR_MAX_SIZE ? config_file->adr/DR_MAX_SIZE+1 : config_file->adr/DR_MAX_SIZE;
    group->async_qty = (int *) malloc(group->async_data_groups * sizeof(int));
    group->async_array = (char **) malloc(group->async_data_groups * sizeof(char *));
    last_index = group->async_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->async_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->async_array[i]), group->async_qty[i], group->myId, group->numP);
    }
    group->async_qty[last_index] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->async_array[last_index]), group->async_qty[last_index], group->myId, group->numP);
432
  }
433
434

  obtain_op_times(1);
435
436
437
438
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
439
440
441
442
443
444
445
446
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
447
 */
448
void obtain_op_times(int compute) {
449
  size_t i;
450
  double time = 0;
451
  for(i=0; i<config_file->n_stages; i++) {
452
    time+=init_stage(config_file, i, *group, comm, compute);
453
  }
454
  if(!compute) {results->wasted_time += time;}
455
456
457
458
459
460
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
461
462
463
464
465
466
467
468
469
  size_t i;

  if(config_file->sdr && group->sync_array != NULL) {
    for(i=0; i<group->sync_data_groups; i++) {
      free(group->sync_array[i]);
      group->sync_array[i] = NULL;
    }
    free(group->sync_qty);
    group->sync_qty = NULL;
470
    free(group->sync_array);
471
472
    group->sync_array = NULL;

473
  }
474
475
476
477
478
479
480
  if(config_file->adr && group->async_array != NULL) {
    for(i=0; i<group->async_data_groups; i++) {
      free(group->async_array[i]);
      group->async_array[i] = NULL;
    }
    free(group->async_qty);
    group->async_qty = NULL;
481
    free(group->async_array);
482
    group->async_array = NULL;
483
  }
484
485
  free_malleability();

486
487
488
  free_results_data(results, config_file->n_stages);
  free(results);

489
  free_config(config_file);
490
  
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606


//======================================================||
//======================================================||
//================ INIT MALLEABILITY ===================||
//======================================================||
//======================================================||

void init_originals() {
  size_t i;

  if(config_file->n_groups > 1) {
    set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
      config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
    set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED

    malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0);
    malleability_add_data(&run_id, 1, MPI_INT, 1, 0);
    malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);
    malleability_add_data(&(results->exec_start), 1, MPI_DOUBLE, 1, 0);

    if(config_file->sdr) {
      for(i=0; i<group->sync_data_groups; i++) {
        malleability_add_data(group->sync_array[i], group->sync_qty[i], MPI_CHAR, 0, 0);
      }
    }
    if(config_file->adr) {
      for(i=0; i<group->async_data_groups; i++) {
        malleability_add_data(group->async_array[i], group->async_qty[i], MPI_CHAR, 0, 1);
      }
    }
  }
}

void init_targets() {
  int commited, targets_qty;
  size_t i;

  MAM_Get_comm(&new_comm, &targets_qty);
  recv_config_file(ROOT, new_comm, &config_file);
  MPI_Comm_free(&new_comm);
  
  MAM_Commit(&commited, &comm);

  // TODO Refactor - Que sea una unica funcion
  // Obtiene las variables que van a utilizar los hijos
  void *value = NULL;
  size_t entries;
  malleability_get_data(&value, 0, 1, 0);
  group->grp = *((int *)value);

  malleability_get_data(&value, 1, 1, 0);
  run_id = *((int *)value);
      
  malleability_get_data(&value, 2, 1, 0);
  group->iter_start = *((int *)value);

  if(config_file->sdr) {
    malleability_get_entries(&entries, 0, 0);
    group->sync_qty = (int *) malloc(entries * sizeof(int));
    group->sync_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
      malleability_get_data(&value, i, 0, 0);
      group->sync_array[i] = (char *)value;
      group->sync_qty[i] = DR_MAX_SIZE;
    }
    group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->sync_data_groups = entries;
  }
  if(config_file->adr) {
    malleability_get_entries(&entries, 0, 1);
    group->async_qty = (int *) malloc(entries * sizeof(int));
    group->async_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
      malleability_get_data(&value, i, 0, 1);
      group->async_array[i] = (char *)value;
      group->async_qty[i] = DR_MAX_SIZE;
    }
    group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->async_data_groups = entries;
    }

    group->grp = group->grp + 1;
    results = malloc(sizeof(results_data));
    init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);

    malleability_get_data(&value, 3, 1, 0);
    results->exec_start = *((double *)value);
}