Main.c 14.8 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
10
#include "../IOcodes/read_ini.h"
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

iker_martin's avatar
iker_martin committed
15
int work();
16
double iterate(double *matrix, int n, int async_comm, int iter);
17

18
void init_group_struct(char *argv[], int argc, int myId, int numP);
19
void init_application();
20
void obtain_op_times();
21
22
void free_application_data();

23
void print_general_info(int myId, int grp, int numP);
24
int print_local_results();
25
int print_final_results();
iker_martin's avatar
iker_martin committed
26
int create_out_file(char *nombre, int *ptr, int newstdout);
27

iker_martin's avatar
iker_martin committed
28
29
configuration *config_file;
group_data *group;
30
results_data *results;
31
MPI_Comm comm;
32
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
33

34
int main(int argc, char *argv[]) {
35
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
36
    int req;
37
    int im_child;
38

39
40
41
42
43
44
45
46
    //FIXME El codigo no es capaz de hacer mas de una redistribucion - Arreglar malleabilityTypes.c
    int num_cpus, num_nodes; //nodelist_len; //FIXME Eliminar cuando se utilice Slurm
    char *nodelist = NULL;
    num_cpus = 20; //FIXME NUMERO MAGICO
    if (argc >= 5) {
      nodelist = argv[3];
      //nodelist_len = strlen(nodelist);
      num_nodes = atoi(argv[4]);
47
      num_cpus = num_nodes * num_cpus;
48
49
    }

50
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
51
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
52
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
53
    comm = MPI_COMM_WORLD;
iker_martin's avatar
iker_martin committed
54

55
56
57
58
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
    }

59
    init_group_struct(argv, argc, myId, numP);
60
    //FIXME No funciona en OpenMPI
61
    im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
62

63
    if(!im_child) { //TODO REFACTOR Simplificar inicio
64
65
      init_application();

66
      set_benchmark_grp(group->grp);
67
68
69
      set_benchmark_configuration(config_file);
      set_benchmark_results(results);

70
      MPI_Barrier(comm);
71
      results->exec_start = MPI_Wtime();
72
    } else { //Init hijos
73

74
      get_malleability_user_comm(&comm);
75
76
      get_benchmark_configuration(&config_file);
      get_benchmark_results(&results);
77
      set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
78

79
80
      // TODO Refactor - Que sea una unica funcion
      // Obtiene las variables que van a utilizar los hijos
81
82
83
84
85
86
87
      void *value = NULL;
      malleability_get_data(&value, 0, 1, 1);
      group->grp = *((int *)value);
      free(value);
      malleability_get_data(&value, 1, 1, 1);
      run_id = *((int *)value);
      free(value);
88
89
90
91
      
      malleability_get_data(&value, 2, 1, 1);
      group->iter_start = *((int *)value);
      free(value);
92

93
94
95
96
97
98
99
100
101
102
103
104
      //FIXME Eliminar cuando se utilice SLURM
      /*
      malleability_get_data(&value, 4, 1, 1);
      num_nodes = *((int *)value);
      free(value);

      malleability_get_data(&value, 5, 1, 1);
      nodelist = (char *)value;
      //free(value);
      nodelist_len = strlen(nodelist);
      */

105
      group->grp = group->grp + 1;
106
107
    }

108
109
110
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
111
112
    group->grp = group->grp - 1; // TODO REFACTOR???
    do {
113

114
115
      group->grp = group->grp + 1;
      set_benchmark_grp(group->grp);
116
      if(group->grp != 0) {
117
        obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
118
      }
119

120
      if(config_file->n_resizes != group->grp + 1) { //TODO Llevar a otra funcion
121
        set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], config_file->at, -1);
122
123
124
125
126
        set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED

        if(group->grp == 0) {
          malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
          malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
127
          malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
128
129
130
131

	  //FIXME Eliminar cuando se utilice SLURM
          //malleability_add_data(&num_nodes, 1, MAL_INT, 1, 1);
          //malleability_add_data(&nodelist, nodelist_len, MAL_CHAR, 1, 1);
132
        }
133
      }
134
135

      res = work();
136
      if(res == MALL_ZOMBIE) break;
137

138
139
140
141
      get_malleability_user_comm(&comm);
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));

142
      print_local_results();
143
      reset_results_index(results);
144
    } while(config_file->n_resizes > group->grp + 1 && config_file->sm == MALL_SPAWN_MERGE);
145

146
147
148
149
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //

150

151
    if(res==1) { // Se ha llegado al final de la aplicacion
152
      MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
153
      results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
154
    }
155
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
156
157
158
159
160

    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

161
    if(group->myId == ROOT && config_file->sm == MALL_SPAWN_MERGE) {
162
163
164
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
    free_application_data();
165

166
    MPI_Finalize();
167

168
169
170
171
    return 0;
}

/*
172
173
174
175
176
177
178
179
180
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
181
182
183
184
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
185
 */
iker_martin's avatar
iker_martin committed
186
int work() {
187
  int iter, maxiter, state, res;
188
  double *matrix = NULL;
189

iker_martin's avatar
iker_martin committed
190
  maxiter = config_file->iters[group->grp];
191
  state = MALL_NOT_STARTED;
192

193
  res = 0;
194
  for(iter=group->iter_start; iter < maxiter; iter++) {
195
    iterate(matrix, config_file->granularity, state, iter);
196
  }
197

198
  if(config_file->n_resizes != group->grp + 1)
199
200
    state = malleability_checkpoint();

201
  iter = 0;
202
  while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE) {
203
    if(iter < config_file->iters[group->grp+1]) {
204
      iterate(matrix, config_file->granularity, state, iter);
205
206
207
      iter++;
      group->iter_start = iter;
    }
208
    state = malleability_checkpoint();
209
  }
210

211
  
212
  if(config_file->n_resizes - 1 == group->grp) res=1;
213
  if(state == MALL_ZOMBIE) res=state;
214
  return res;
215
216
}

217
218
219
220
221
222
223
224
225
226
227
228

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
 * que dura al menos un tiempo de "time" segundos.
 */
229
double iterate(double *matrix, int n, int async_comm, int iter) {
230
  double start_time, start_time_stage, actual_time, *times_stages;
231
  int i, cnt_async = 0;
232
233
  double aux = 0;

234
  times_stages = malloc(config_file->n_stages * sizeof(double));
235
  start_time = MPI_Wtime();
236

237
  for(i=0; i < config_file->n_stages; i++) {
238
    start_time_stage = MPI_Wtime();
239
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
240
    times_stages[i] = MPI_Wtime() - start_time_stage;
241
242
243
244
  }

  actual_time = MPI_Wtime(); // Guardar tiempos
  // TODO Que diferencie entre ambas en el IO
245
  if(async_comm == MALL_DIST_PENDING || async_comm == MALL_SPAWN_PENDING || async_comm == MALL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
246
    cnt_async=1;
247
248
249
  }

  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
250
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
251
252
  }
  results->iters_time[results->iter_index] = actual_time - start_time;
253
254
255
  for(i=0; i < config_file->n_stages; i++) {
    results->stage_times[i][results->iter_index] = times_stages[i];
  }
256
  results->iters_async += cnt_async;
257
  results->iter_index = results->iter_index + 1;
258

259
260
  free(times_stages);

261
  return aux;
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
}

//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

286

287
288
289
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
290
291
int print_local_results() {
  int ptr_local, ptr_out, err;
292
293
  char *file_name;

294
  compute_results_iter(results, group->myId, ROOT, comm);
295
  if(group->myId == ROOT) {
296
297
    ptr_out = dup(1);

298
299
300
301
302
303
304
305
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
306
307
    print_iter_results(*results);
    print_stage_results(*results, config_file->n_stages);
308
309
    free(file_name);

310
    fflush(stdout);
311
312
313
314
315
316
317
318
319
320
321
    close(1);
    dup(ptr_out);
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
322
  int ptr_global, err, ptr_out;
323
324
325
326
  char *file_name;

  if(group->myId == ROOT) {

327
    if(group->grp == config_file->n_resizes -1) {
328
329
330
331
332
333
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

334
      ptr_out = dup(1);
335
336
      create_out_file(file_name, &ptr_global, 1);
      print_config(config_file, group->grp);
337
      print_global_results(*results, config_file->n_resizes);
338
      fflush(stdout);
339
      free(file_name);
340
341
342

      close(1);
      dup(ptr_out);
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
  group = malloc(1 * sizeof(group_data));
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

  config_file = read_ini_file(group->argv[1]);
  results = malloc(sizeof(results_data));
381
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->iters[group->grp]);
382
383
384
385
386
387
  if(config_file->sdr) {
    malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
  }
  if(config_file->adr) {
    malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
  }
388
389

  int message_tam = 100000000;
390
391
392
  for(int i=0; i<3; i++) {
    config_file->latency_m = latency(group->myId, group->numP, comm);
    config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
393
  //if(group->myId == ROOT) printf("numP=%d Lat=%lf Bw=%lf\n", group->numP, config_file->latency_m, config_file->bw_m);
394
  }
395
  obtain_op_times(1);
396
397
398
399
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
400
401
402
403
404
405
406
407
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
408
 */
409
410
void obtain_op_times(int compute) {
  int i;
411
  double time = 0;
412
  for(i=0; i<config_file->n_stages; i++) {
413
    time+=init_stage(config_file, i, *group, comm, compute);
414
  }
415
  if(!compute) {results->wasted_time += time;}
416
417
418
419
420
421
422
423
424
425
426
427
428
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
  if(config_file->sdr) {
    free(group->sync_array);
  }
  if(config_file->adr) {
    free(group->async_array);
  }
  
429
430
  free_malleability();

431
  if(group->grp == 0) { //FIXME Revisar porque cuando es diferente a 0 no funciona
432
    free_results_data(results, config_file->n_stages);
433
    free(results);
434
  }
435
  free_config(config_file);
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}