Main.c 16.2 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
#include "configuration.h"
10
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

iker_martin's avatar
iker_martin committed
15
int work();
16
double iterate(int async_comm);
17
18
double iterate_relaxed(double *time, double *times_stages);
double iterate_rigid(double *time, double *times_stages);
19

20
void init_group_struct(char *argv[], int argc, int myId, int numP);
21
void init_application();
22
void obtain_op_times();
23
24
void free_application_data();

25
void print_general_info(int myId, int grp, int numP);
26
int print_local_results();
27
int print_final_results();
iker_martin's avatar
iker_martin committed
28
int create_out_file(char *nombre, int *ptr, int newstdout);
29

iker_martin's avatar
iker_martin committed
30
31
configuration *config_file;
group_data *group;
32
results_data *results;
33
MPI_Comm comm;
34
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
35

36
int main(int argc, char *argv[]) {
37
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
38
    int req;
39
    int im_child;
40

41
    int num_cpus, num_nodes;
42
    char *nodelist = NULL;
43
    num_cpus = 20; //FIXME NUMERO MAGICO //TODO Usar openMP para obtener el valor con un pragma
44
45
46
    if (argc >= 5) {
      nodelist = argv[3];
      num_nodes = atoi(argv[4]);
47
      num_cpus = num_nodes * num_cpus;
48
49
    }

50
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
iker_martin's avatar
iker_martin committed
51
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
52
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
53
    comm = MPI_COMM_WORLD;
iker_martin's avatar
iker_martin committed
54

55
56
57
58
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
    }

59
    init_group_struct(argv, argc, myId, numP);
60
    im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
61

62
    if(!im_child) { //TODO REFACTOR Simplificar inicio
63
64
      init_application();

65
      set_benchmark_grp(group->grp);
66
67
68
      set_benchmark_configuration(config_file);
      set_benchmark_results(results);

69
      set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
70
71
72
      set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
	config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);

73
74
75
76
      malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
      malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
      malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);

77
78
79
80
81
82
83
      if(config_file->sdr) {
        malleability_add_data(group->sync_array, config_file->sdr, MAL_CHAR, 0, 1);
      }
      if(config_file->adr) {
        malleability_add_data(group->async_array, config_file->adr, MAL_CHAR, 0, 0);
      }

84
      MPI_Barrier(comm);
85
      results->exec_start = MPI_Wtime();
86
    } else { //Init hijos
87

88
      get_malleability_user_comm(&comm);
89
90
      get_benchmark_configuration(&config_file);
      get_benchmark_results(&results);
91

92
93
      // TODO Refactor - Que sea una unica funcion
      // Obtiene las variables que van a utilizar los hijos
94
95
96
      void *value = NULL;
      malleability_get_data(&value, 0, 1, 1);
      group->grp = *((int *)value);
97

98
99
      malleability_get_data(&value, 1, 1, 1);
      run_id = *((int *)value);
100
101
102
      
      malleability_get_data(&value, 2, 1, 1);
      group->iter_start = *((int *)value);
103

104
105
106
107
108
109
110
111
112
      if(config_file->sdr) {
        malleability_get_data(&value, 0, 0, 1);
        group->sync_array = (char *)value;
      }
      if(config_file->adr) {
        malleability_get_data(&value, 0, 0, 0);
        group->async_array = (char *)value;
      }

113
      group->grp = group->grp + 1;
114
115
    }

116
117
118
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
119
120
    group->grp = group->grp - 1; // TODO REFACTOR???
    do {
121
122
123
124

      get_malleability_user_comm(&comm);
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));
125
126
      group->grp = group->grp + 1;
      set_benchmark_grp(group->grp);
127
      if(group->grp != 0) {
128
129
        obtain_op_times(1); //Obtener los nuevos valores de tiempo para el computo
        set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr);
130
      }
131

132
      if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
133
        set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, 
134
			config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
135
        set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
136

137
138
        if(group->grp != 0) {
          malleability_modify_data(&(group->grp), 0, 1, MAL_INT, 1, 1);
139
        }
140
      }
141
142

      res = work();
143
      if(res == MALL_ZOMBIE) break;
144

145
146
147
148
      if(res==1) { // Se ha llegado al final de la aplicacion
        MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
        results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
      }
149
      print_local_results();
150
      reset_results_index(results);
151
    } while(config_file->n_groups > group->grp + 1 && config_file->groups[group->grp+1].sm == MALL_SPAWN_MERGE);
152

153
154
155
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //
156
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
157

158
    MPI_Barrier(comm);
159
160
161
162
    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

163
    if(group->myId == ROOT && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE) {
164
165
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
166
    free_application_data();
167

168
169
170
171
172
    MPI_Finalize();
    return 0;
}

/*
173
174
175
176
177
178
179
180
181
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
182
183
184
185
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
186
 */
iker_martin's avatar
iker_martin committed
187
int work() {
188
  int iter, maxiter, state, res;
189

190
  maxiter = config_file->groups[group->grp].iters;
191
  state = MALL_NOT_STARTED;
192

193
  res = 0;
194
  for(iter=group->iter_start; iter < maxiter; iter++) {
195
    iterate(state);
196
  }
197

198
  if(config_file->n_groups != group->grp + 1)
199
200
    state = malleability_checkpoint();

201
  iter = 0;
202
  while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE) {
203
    if(iter < config_file->groups[group->grp+1].iters) {
204
      iterate(state);
205
206
207
      iter++;
      group->iter_start = iter;
    }
208
    state = malleability_checkpoint();
209
  }
210

211
  
212
  if(config_file->n_groups == group->grp + 1) res=1;
213
  if(state == MALL_ZOMBIE) res=state;
214
  return res;
215
216
}

217
218
219
220
221
222
223
224
225
226

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
227
228
 * que dura al menos un tiempo determinado por la suma de todas las
 * etapas definidas en la configuracion.
229
 */
230
double iterate(int async_comm) {
231
  double time, *times_stages_aux;
232
  size_t i;
233
234
  double aux = 0;

235
  times_stages_aux = malloc(config_file->n_stages * sizeof(double));
236

237
  if(config_file->rigid_times) {
238
    aux = iterate_rigid(&time, times_stages_aux);
239
  } else {
240
    aux = iterate_relaxed(&time, times_stages_aux);
241
242
  }

243
244
  // Se esta realizando una redistribucion de datos asincrona
  if(async_comm == MALL_DIST_PENDING || async_comm == MALL_SPAWN_PENDING || async_comm == MALL_SPAWN_SINGLE_PENDING) { 
245
  // TODO Que diferencie entre ambas en el IO
246
    results->iters_async += 1;
247
248
249
  }

  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
250
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
251
  }
252
  results->iters_time[results->iter_index] = time;
253
  for(i=0; i < config_file->n_stages; i++) {
254
    results->stage_times[i][results->iter_index] = times_stages_aux[i];
255
  }
256
  results->iter_index = results->iter_index + 1;
257

258
  free(times_stages_aux);
259

260
  return aux;
261
262
}

263
264
265

/*
 * Performs an iteration. The gathered times for iterations
266
 * and stages could be IMPRECISE in order to ensure the 
267
268
269
270
271
 * global execution time is precise.
 */
double iterate_relaxed(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;
272
  start_time = MPI_Wtime(); // Imprecise timings
273
274

  for(i=0; i < config_file->n_stages; i++) {
275
    start_time_stage = MPI_Wtime(); 
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

/*
 * Performs an iteration. The gathered times for iterations
 * and stages are ensured to be precise but the global 
 * execution time could be imprecise.
 */
double iterate_rigid(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;

  MPI_Barrier(comm);
  start_time = MPI_Wtime();

  for(i=0; i < config_file->n_stages; i++) {
297
    MPI_Barrier(comm);
298
299
300
301
302
    start_time_stage = MPI_Wtime();
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

303
  MPI_Barrier(comm);
304
305
306
307
  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

330

331
332
333
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
334
335
int print_local_results() {
  int ptr_local, ptr_out, err;
336
337
  char *file_name;

338
  compute_results_iter(results, group->myId, group->numP, ROOT, comm);
339
  compute_results_stages(results, group->myId, group->numP, config_file->n_stages, ROOT, comm);
340
  if(group->myId == ROOT) {
341
342
    ptr_out = dup(1);

343
344
345
346
347
348
349
350
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
351
    print_iter_results(*results);
352
    print_stage_results(*results, config_file->n_stages);
353
354
    free(file_name);

355
    fflush(stdout);
356
357
    close(1);
    dup(ptr_out);
358
    close(ptr_out);
359
360
361
362
363
364
365
366
367
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
368
  int ptr_global, err, ptr_out;
369
370
371
372
  char *file_name;

  if(group->myId == ROOT) {

373
    if(config_file->n_groups == group->grp+1) {
374
375
376
377
378
379
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

380
      ptr_out = dup(1);
381
      create_out_file(file_name, &ptr_global, 1);
382
383
      print_config(config_file);
      print_global_results(*results, config_file->n_resizes);
384
      fflush(stdout);
385
      free(file_name);
386
387
388

      close(1);
      dup(ptr_out);
389
390
391
392
393
394
395
396
397
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
398
  group = malloc(sizeof(group_data));
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

425
  init_config(group->argv[1], &config_file);
426
  results = malloc(sizeof(results_data));
427
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
428
429
430
431
432
433
  if(config_file->sdr) {
    malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
  }
  if(config_file->adr) {
    malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
  }
434
435

  obtain_op_times(1);
436
437
438
439
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
440
441
442
443
444
445
446
447
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
448
 */
449
void obtain_op_times(int compute) {
450
  size_t i;
451
  double time = 0;
452
  for(i=0; i<config_file->n_stages; i++) {
453
    time+=init_stage(config_file, i, *group, comm, compute);
454
  }
455
  if(!compute) {results->wasted_time += time;}
456
457
458
459
460
461
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
462
463
464
	// FIXME ERROR para grupo 1 en adelante (0 Guay)
                                        if (group->grp==0){
  if(config_file->sdr && group->sync_array != NULL) {
465
    free(group->sync_array);
466
    group->sync_array = NULL;
467
  }
468
  if(config_file->adr && group->async_array != NULL) {
469
    free(group->async_array);
470
    group->async_array = NULL;
471
  }
472
                                        } 
473
474
  free_malleability();

475
476
477
  free_results_data(results, config_file->n_stages);
  free(results);

478
  free_config(config_file);
479
  
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}