Main.c 15.9 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
#include "configuration.h"
10
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

iker_martin's avatar
iker_martin committed
15
int work();
16
double iterate(int async_comm);
17
18
double iterate_relaxed(double *time, double *times_stages);
double iterate_rigid(double *time, double *times_stages);
19

20
void init_group_struct(char *argv[], int argc, int myId, int numP);
21
void init_application();
22
void obtain_op_times();
23
24
void free_application_data();

25
void print_general_info(int myId, int grp, int numP);
26
int print_local_results();
27
int print_final_results();
iker_martin's avatar
iker_martin committed
28
int create_out_file(char *nombre, int *ptr, int newstdout);
29

iker_martin's avatar
iker_martin committed
30
31
configuration *config_file;
group_data *group;
32
results_data *results;
33
MPI_Comm comm;
34
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
35

36
int main(int argc, char *argv[]) {
37
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
38
    int req;
39
    int im_child;
40

41
42
43
44
45
46
47
48
    //FIXME El codigo no es capaz de hacer mas de una redistribucion - Arreglar malleabilityTypes.c
    int num_cpus, num_nodes; //nodelist_len; //FIXME Eliminar cuando se utilice Slurm
    char *nodelist = NULL;
    num_cpus = 20; //FIXME NUMERO MAGICO
    if (argc >= 5) {
      nodelist = argv[3];
      //nodelist_len = strlen(nodelist);
      num_nodes = atoi(argv[4]);
49
      num_cpus = num_nodes * num_cpus;
50
51
    }

52
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
53
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
54
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
55
    comm = MPI_COMM_WORLD;
iker_martin's avatar
iker_martin committed
56

57
58
59
60
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
    }

61
    init_group_struct(argv, argc, myId, numP);
62
    im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
63

64
    if(!im_child) { //TODO REFACTOR Simplificar inicio
65
66
      init_application();

67
      set_benchmark_grp(group->grp);
68
69
70
      set_benchmark_configuration(config_file);
      set_benchmark_results(results);

71
      MPI_Barrier(comm);
72
      results->exec_start = MPI_Wtime();
73
    } else { //Init hijos
74

75
      get_malleability_user_comm(&comm);
76
77
      get_benchmark_configuration(&config_file);
      get_benchmark_results(&results);
78
      set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
79

80
81
      // TODO Refactor - Que sea una unica funcion
      // Obtiene las variables que van a utilizar los hijos
82
83
84
85
86
87
88
      void *value = NULL;
      malleability_get_data(&value, 0, 1, 1);
      group->grp = *((int *)value);
      free(value);
      malleability_get_data(&value, 1, 1, 1);
      run_id = *((int *)value);
      free(value);
89
90
91
92
      
      malleability_get_data(&value, 2, 1, 1);
      group->iter_start = *((int *)value);
      free(value);
93

94
95
96
97
98
99
100
101
102
103
104
105
      //FIXME Eliminar cuando se utilice SLURM
      /*
      malleability_get_data(&value, 4, 1, 1);
      num_nodes = *((int *)value);
      free(value);

      malleability_get_data(&value, 5, 1, 1);
      nodelist = (char *)value;
      //free(value);
      nodelist_len = strlen(nodelist);
      */

106
      group->grp = group->grp + 1;
107
108
    }

109
110
111
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
112
113
    group->grp = group->grp - 1; // TODO REFACTOR???
    do {
114

115
116
      group->grp = group->grp + 1;
      set_benchmark_grp(group->grp);
117
      if(group->grp != 0) {
118
        obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
119
      }
120

121
      if(config_file->n_resizes != group->grp + 1) { //TODO Llevar a otra funcion
122
123
124
        set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
			config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].at, -1);
        set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
125
126
127
128

        if(group->grp == 0) {
          malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
          malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
129
          malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
130
131
132
133

	  //FIXME Eliminar cuando se utilice SLURM
          //malleability_add_data(&num_nodes, 1, MAL_INT, 1, 1);
          //malleability_add_data(&nodelist, nodelist_len, MAL_CHAR, 1, 1);
134
        }
135
      }
136
137

      res = work();
138
      if(res == MALL_ZOMBIE) break;
139

140
141
142
143
      get_malleability_user_comm(&comm);
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));

144
      print_local_results();
145
      reset_results_index(results);
146
    } while(config_file->n_resizes > group->grp + 1 && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE);
147

148
149
150
151
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //

152

153
    if(res==1) { // Se ha llegado al final de la aplicacion
154
      MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
155
      results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
156
    }
157
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
158
159
160
161
162

    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

163
    if(group->myId == ROOT && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE) {
164
165
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
166
    free_application_data(); //FIXME Error al liberar memoria de SDR/ADR
167

168
    MPI_Finalize();
169

170
171
172
173
    return 0;
}

/*
174
175
176
177
178
179
180
181
182
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
183
184
185
186
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
187
 */
iker_martin's avatar
iker_martin committed
188
int work() {
189
  int iter, maxiter, state, res;
190

191
  maxiter = config_file->groups[group->grp].iters;
192
  state = MALL_NOT_STARTED;
193

194
  res = 0;
195
  for(iter=group->iter_start; iter < maxiter; iter++) {
196
    iterate(state);
197
  }
198

199
  if(config_file->n_resizes != group->grp + 1)
200
201
    state = malleability_checkpoint();

202
  iter = 0;
203
  while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE) {
204
    if(iter < config_file->groups[group->grp+1].iters) {
205
      iterate(state);
206
207
208
      iter++;
      group->iter_start = iter;
    }
209
    state = malleability_checkpoint();
210
  }
211

212
  
213
  if(config_file->n_resizes - 1 == group->grp) res=1;
214
  if(state == MALL_ZOMBIE) res=state;
215
  return res;
216
217
}

218
219
220
221
222
223
224
225
226
227

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
228
229
 * que dura al menos un tiempo determinado por la suma de todas las
 * etapas definidas en la configuracion.
230
 */
231
double iterate(int async_comm) {
232
  double time, *times_stages_aux;
233
  size_t i;
234
235
  double aux = 0;

236
  times_stages_aux = malloc(config_file->n_stages * sizeof(double));
237

238
239
240
241
  if(config_file->rigid_times) {
    iterate_relaxed(&time, times_stages_aux);
  } else {
    iterate_rigid(&time, times_stages_aux);
242
243
  }

244
245
  // Se esta realizando una redistribucion de datos asincrona
  if(async_comm == MALL_DIST_PENDING || async_comm == MALL_SPAWN_PENDING || async_comm == MALL_SPAWN_SINGLE_PENDING) { 
246
  // TODO Que diferencie entre ambas en el IO
247
    results->iters_async += 1;
248
249
250
  }

  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
251
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
252
  }
253
  results->iters_time[results->iter_index] = time;
254
  for(i=0; i < config_file->n_stages; i++) {
255
    results->stage_times[i][results->iter_index] = times_stages_aux[i];
256
  }
257
  results->iter_index = results->iter_index + 1;
258

259
  free(times_stages_aux);
260

261
  return aux;
262
263
}

264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307

/*
 * Performs an iteration. The gathered times for iterations
 * and stages could be imprecise in order to ensure the 
 * global execution time is precise.
 */
double iterate_relaxed(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;
  start_time = MPI_Wtime();

  for(i=0; i < config_file->n_stages; i++) {
    start_time_stage = MPI_Wtime();
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

/*
 * Performs an iteration. The gathered times for iterations
 * and stages are ensured to be precise but the global 
 * execution time could be imprecise.
 */
double iterate_rigid(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;

  MPI_Barrier(comm);
  start_time = MPI_Wtime();

  for(i=0; i < config_file->n_stages; i++) {
    start_time_stage = MPI_Wtime();
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    MPI_Barrier(comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

330

331
332
333
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
334
335
int print_local_results() {
  int ptr_local, ptr_out, err;
336
337
  char *file_name;

338
  compute_results_iter(results, group->myId, ROOT, comm);
339
  if(group->myId == ROOT) {
340
341
    ptr_out = dup(1);

342
343
344
345
346
347
348
349
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
350
    print_iter_results(*results);
351
    print_stage_results(*results, config_file->n_stages);
352
353
    free(file_name);

354
    fflush(stdout);
355
356
357
358
359
360
361
362
363
364
365
    close(1);
    dup(ptr_out);
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
366
  int ptr_global, err, ptr_out;
367
368
369
370
  char *file_name;

  if(group->myId == ROOT) {

371
    if(group->grp == config_file->n_resizes -1) {
372
373
374
375
376
377
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

378
      ptr_out = dup(1);
379
      create_out_file(file_name, &ptr_global, 1);
380
381
      print_config(config_file);
      print_global_results(*results, config_file->n_resizes);
382
      fflush(stdout);
383
      free(file_name);
384
385
386

      close(1);
      dup(ptr_out);
387
388
389
390
391
392
393
394
395
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
396
  group = malloc(sizeof(group_data));
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

423
424
  //config_file = read_ini_file(group->argv[1]);
  init_config(group->argv[1], &config_file);
425
  results = malloc(sizeof(results_data));
426
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
427
428
429
430
431
432
  if(config_file->sdr) {
    malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
  }
  if(config_file->adr) {
    malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
  }
433
434

  int message_tam = 100000000;
435
436
437
  config_file->latency_m = latency(group->myId, group->numP, comm);
  config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);

438
  obtain_op_times(1);
439
440
441
442
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
443
444
445
446
447
448
449
450
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
451
 */
452
void obtain_op_times(int compute) {
453
  size_t i;
454
  double time = 0;
455
  for(i=0; i<config_file->n_stages; i++) {
456
    time+=init_stage(config_file, i, *group, comm, compute);
457
  }
458
  if(!compute) {results->wasted_time += time;}
459
460
461
462
463
464
465
466
467
468
469
470
471
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
  if(config_file->sdr) {
    free(group->sync_array);
  }
  if(config_file->adr) {
    free(group->async_array);
  }
  
472
473
  free_malleability();

474
  if(group->grp == 0) { //FIXME Revisar porque cuando es diferente a 0 no funciona
475
    free_results_data(results, config_file->n_stages);
476
    free(results);
477
  }
478
  free_config(config_file);
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}