Main.c 14.6 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
#include "computing_func.h"
8
#include "../malleability/CommDist.h"
9
10
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
11
12
13

#define ROOT 0

iker_martin's avatar
iker_martin committed
14
int work();
15
void iterate(double *matrix, int n, int async_comm, int iter);
16

17
void init_group_struct(char *argv[], int argc, int myId, int numP);
18
void init_application();
19
void obtain_op_times();
20
21
void free_application_data();

22
void print_general_info(int myId, int grp, int numP);
23
int print_local_results();
24
int print_final_results();
iker_martin's avatar
iker_martin committed
25
int create_out_file(char *nombre, int *ptr, int newstdout);
26

iker_martin's avatar
iker_martin committed
27
28
29
30
typedef struct {
  int myId;
  int numP;
  int grp;
31
  int iter_start;
32
  int argc;
iker_martin's avatar
iker_martin committed
33

34
  int numS; // Cantidad de procesos hijos
iker_martin's avatar
iker_martin committed
35
  MPI_Comm children, parents;
36
37

  char *compute_comm_array;
iker_martin's avatar
iker_martin committed
38
  char **argv;
39
  char *sync_array, *async_array;
iker_martin's avatar
iker_martin committed
40
41
42
43
} group_data;

configuration *config_file;
group_data *group;
44
results_data *results;
45
MPI_Comm comm;
46
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
47

48
int main(int argc, char *argv[]) {
49
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
50
    int req;
51
    int im_child;
52

53
54
55
56
57
58
59
60
    //FIXME El codigo no es capaz de hacer mas de una redistribucion - Arreglar malleabilityTypes.c
    int num_cpus, num_nodes; //nodelist_len; //FIXME Eliminar cuando se utilice Slurm
    char *nodelist = NULL;
    num_cpus = 20; //FIXME NUMERO MAGICO
    if (argc >= 5) {
      nodelist = argv[3];
      //nodelist_len = strlen(nodelist);
      num_nodes = atoi(argv[4]);
61
      num_cpus = num_nodes * num_cpus;
62
63
    }

64
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
65
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
66
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
67
    comm = MPI_COMM_WORLD;
iker_martin's avatar
iker_martin committed
68

69
70
71
72
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
    }

73
    init_group_struct(argv, argc, myId, numP);
74
    //FIXME No funciona en OpenMPI
75
    im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
76

77
    if(!im_child) { //TODO REFACTOR Simplificar inicio
78
79
      init_application();

80
      set_benchmark_grp(group->grp);
81
82
83
      set_benchmark_configuration(config_file);
      set_benchmark_results(results);

84
      MPI_Barrier(comm);
85
      results->exec_start = MPI_Wtime();
86
    } else { //Init hijos
87

88
      get_malleability_user_comm(&comm);
89
90
      get_benchmark_configuration(&config_file);
      get_benchmark_results(&results);
91
      set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
92

93
94
95
      if(config_file->comm_tam) {
        group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char));
      }
96

97
98
      // TODO Refactor - Que sea una unica funcion
      // Obtiene las variables que van a utilizar los hijos
99
100
101
102
103
104
105
      void *value = NULL;
      malleability_get_data(&value, 0, 1, 1);
      group->grp = *((int *)value);
      free(value);
      malleability_get_data(&value, 1, 1, 1);
      run_id = *((int *)value);
      free(value);
106
107
108
109
      
      malleability_get_data(&value, 2, 1, 1);
      group->iter_start = *((int *)value);
      free(value);
110

111
112
113
114
115
116
117
118
119
120
121
122
      //FIXME Eliminar cuando se utilice SLURM
      /*
      malleability_get_data(&value, 4, 1, 1);
      num_nodes = *((int *)value);
      free(value);

      malleability_get_data(&value, 5, 1, 1);
      nodelist = (char *)value;
      //free(value);
      nodelist_len = strlen(nodelist);
      */

123
      group->grp = group->grp + 1;
124
125
    }

126
127
128
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
129
130
    group->grp = group->grp - 1; // TODO REFACTOR???
    do {
131

132
133
134
135
136
137
138
      group->grp = group->grp + 1;
      set_benchmark_grp(group->grp);
      get_malleability_user_comm(&comm);
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));

      if(config_file->resizes != group->grp + 1) { 
139
        set_malleability_configuration(config_file->cst, config_file->css, config_file->phy_dist[group->grp+1], -1, config_file->aib, -1);
140
141
142
143
144
        set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED

        if(group->grp == 0) {
          malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
          malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
145
          malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
146
147
148
149

	  //FIXME Eliminar cuando se utilice SLURM
          //malleability_add_data(&num_nodes, 1, MAL_INT, 1, 1);
          //malleability_add_data(&nodelist, nodelist_len, MAL_CHAR, 1, 1);
150
        }
151
      }
152
153

      res = work();
iker_martin's avatar
iker_martin committed
154
      if(res == MAL_ZOMBIE) break;
155
156

      print_local_results();
157
158
      reset_results_index(results);
    } while((config_file->resizes > group->grp + 1) && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD));
159

160
161
162
163
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //

164

iker_martin's avatar
iker_martin committed
165
    if(res==1) { // Se he llegado al final de la aplicacion
166
      MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
167
      results->exec_time = MPI_Wtime() - results->exec_start;
168
    }
169
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
170
171
172
173
174

    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

175
176
177
178
    if(group->myId == ROOT && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD)) {
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
    free_application_data();
179
    MPI_Finalize();
180

181
182
183
184
    return 0;
}

/*
185
186
187
188
189
190
191
192
193
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
194
195
196
197
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
198
 */
iker_martin's avatar
iker_martin committed
199
int work() {
200
  int iter, maxiter, state, res;
201
  double *matrix = NULL;
202

iker_martin's avatar
iker_martin committed
203
  maxiter = config_file->iters[group->grp];
204
  //initMatrix(&matrix, config_file->matrix_tam);
205
  state = MAL_NOT_STARTED;
206
  
207
  res = 0;
208
  for(iter=group->iter_start; iter < maxiter; iter++) {
209
    iterate(matrix, config_file->matrix_tam, state, iter);
210
  }
211

212
  if(config_file->resizes != group->grp + 1)
213
214
    state = malleability_checkpoint();

215
  iter = 0;
216
  while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) {
217
    if(iter < config_file->iters[group->grp+1]) {
218
      iterate(matrix, config_file->matrix_tam, state, iter);
219
220
221
      iter++;
      group->iter_start = iter;
    }
222
    state = malleability_checkpoint();
223
  }
224
  
225
  if(config_file->resizes - 1 == group->grp) res=1;
iker_martin's avatar
iker_martin committed
226
  if(state == MAL_ZOMBIE) res=state;
227
  return res;
228
229
}

230
231
232
233
234
235
236
237
238
239
240
241

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
 * que dura al menos un tiempo de "time" segundos.
 */
242
void iterate(double *matrix, int n, int async_comm, int iter) {
243
244
245
246
247
248
  double start_time, actual_time;
  double time = config_file->general_time * config_file->factors[group->grp];
  double Top = config_file->Top;
  int i, operations = 0;
  double aux = 0;

249
  start_time = MPI_Wtime();
250
251

  operations = time / Top; //FIXME Calcular una sola vez
252
  
253
254
255
  for(i=0; i < operations; i++) {
    aux += computePiSerial(n);
  }
256
257
258
259
260
261
262
263
264
265
266
  
  /*
  if(time >= 1) {
    sleep(time);
  }
  else {
    unsigned int sleep_time = time * 1000000;
    usleep(sleep_time);
  }
  */
  
267

268
  if(config_file->comm_tam) {
269
    MPI_Bcast(group->compute_comm_array, config_file->comm_tam, MPI_CHAR, ROOT, comm);
270
271
272
273
  }

  actual_time = MPI_Wtime(); // Guardar tiempos
  // TODO Que diferencie entre ambas en el IO
274
  if(async_comm == MAL_DIST_PENDING || async_comm == MAL_SPAWN_PENDING || async_comm == MAL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
    operations=0;
  }

  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
    realloc_results_iters(results, results->iters_size + 100);
  }
  results->iters_time[results->iter_index] = actual_time - start_time;
  results->iters_type[results->iter_index] = operations;
  results->iter_index = results->iter_index + 1;
}

//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

308

309
310
311
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
312
313
int print_local_results() {
  int ptr_local, ptr_out, err;
314
315
  char *file_name;

316
  compute_results_iter(results, group->myId, ROOT, comm);
317
  if(group->myId == ROOT) {
318
319
    ptr_out = dup(1);

320
321
322
323
324
325
326
327
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
328
    print_iter_results(*results, config_file->iters[group->grp] - 1);
329
330
    free(file_name);

331
    fflush(stdout);
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
    close(1);
    dup(ptr_out);
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
  int ptr_global, err;
  char *file_name;

  if(group->myId == ROOT) {

348
349
350
351
352
353
354
355
356
357
    if(group->grp == config_file->resizes -1) {
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

      create_out_file(file_name, &ptr_global, 1);
      print_config(config_file, group->grp);
      print_global_results(*results, config_file->resizes);
358
      fflush(stdout);
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
      free(file_name);
      
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
  group = malloc(1 * sizeof(group_data));
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

  config_file = read_ini_file(group->argv[1]);
  results = malloc(sizeof(results_data));
  init_results_data(results, config_file->resizes, config_file->iters[group->grp]);
  if(config_file->comm_tam) {
    group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char));
  }
  if(config_file->sdr) {
    malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
  }
  if(config_file->adr) {
    malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
  }
   
  obtain_op_times();
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
 */
void obtain_op_times() {
  double result, start_time = MPI_Wtime();
  int i, qty = 20000;
  result = 0;
  for(i=0; i<qty; i++) {
    result += computePiSerial(config_file->matrix_tam);
  }
  //printf("Creado Top con valor %lf\n", result);
  //fflush(stdout);

  config_file->Top = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion
427
  MPI_Bcast(&(config_file->Top), 1, MPI_DOUBLE, ROOT, comm);
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
  if(config_file->comm_tam) {
    free(group->compute_comm_array);
  }
  if(config_file->sdr) {
    free(group->sync_array);
  }
  if(config_file->adr) {
    free(group->async_array);
  }
  
444
445
446
  free_malleability();
  free_config(config_file);

447
  if(group->grp == 0) { //FIXME Revisar porque cuando es diferente a 0 no funciona
448
    free_results_data(results);
449
    free(results);
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
  }
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}