Main.c 14.8 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
10
#include "../IOcodes/read_ini.h"
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

iker_martin's avatar
iker_martin committed
15
int work();
16
double iterate(double *matrix, int n, int async_comm, int iter);
17

18
void init_group_struct(char *argv[], int argc, int myId, int numP);
19
void init_application();
20
void obtain_op_times();
21
22
void free_application_data();

23
void print_general_info(int myId, int grp, int numP);
24
int print_local_results();
25
int print_final_results();
iker_martin's avatar
iker_martin committed
26
int create_out_file(char *nombre, int *ptr, int newstdout);
27

iker_martin's avatar
iker_martin committed
28
29
configuration *config_file;
group_data *group;
30
results_data *results;
31
MPI_Comm comm;
32
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
33

34
int main(int argc, char *argv[]) {
35
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
36
    int req;
37
    int im_child;
38

39
40
41
42
43
44
45
46
    //FIXME El codigo no es capaz de hacer mas de una redistribucion - Arreglar malleabilityTypes.c
    int num_cpus, num_nodes; //nodelist_len; //FIXME Eliminar cuando se utilice Slurm
    char *nodelist = NULL;
    num_cpus = 20; //FIXME NUMERO MAGICO
    if (argc >= 5) {
      nodelist = argv[3];
      //nodelist_len = strlen(nodelist);
      num_nodes = atoi(argv[4]);
47
      num_cpus = num_nodes * num_cpus;
48
49
    }

50
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
51
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
52
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
53
    comm = MPI_COMM_WORLD;
iker_martin's avatar
iker_martin committed
54

55
56
57
58
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
    }

59
    init_group_struct(argv, argc, myId, numP);
60
    //FIXME No funciona en OpenMPI
61
    im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
62

63
    if(!im_child) { //TODO REFACTOR Simplificar inicio
64
65
      init_application();

66
      set_benchmark_grp(group->grp);
67
68
69
      set_benchmark_configuration(config_file);
      set_benchmark_results(results);

70
      MPI_Barrier(comm);
71
      results->exec_start = MPI_Wtime();
72
    } else { //Init hijos
73

74
      get_malleability_user_comm(&comm);
75
76
      get_benchmark_configuration(&config_file);
      get_benchmark_results(&results);
77
      set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
78

79
80
      // TODO Refactor - Que sea una unica funcion
      // Obtiene las variables que van a utilizar los hijos
81
82
83
84
85
86
87
      void *value = NULL;
      malleability_get_data(&value, 0, 1, 1);
      group->grp = *((int *)value);
      free(value);
      malleability_get_data(&value, 1, 1, 1);
      run_id = *((int *)value);
      free(value);
88
89
90
91
      
      malleability_get_data(&value, 2, 1, 1);
      group->iter_start = *((int *)value);
      free(value);
92

93
94
95
96
97
98
99
100
101
102
103
104
      //FIXME Eliminar cuando se utilice SLURM
      /*
      malleability_get_data(&value, 4, 1, 1);
      num_nodes = *((int *)value);
      free(value);

      malleability_get_data(&value, 5, 1, 1);
      nodelist = (char *)value;
      //free(value);
      nodelist_len = strlen(nodelist);
      */

105
      group->grp = group->grp + 1;
106
107
    }

108
109
110
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
111
112
    group->grp = group->grp - 1; // TODO REFACTOR???
    do {
113

114
115
116
117
118
      group->grp = group->grp + 1;
      set_benchmark_grp(group->grp);
      get_malleability_user_comm(&comm);
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));
119
120
      if(group->grp != 0) 
        obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
121

122
      if(config_file->n_resizes != group->grp + 1) { //TODO Llevar a otra funcion
123
        set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], config_file->at, -1);
124
125
126
127
128
        set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED

        if(group->grp == 0) {
          malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
          malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
129
          malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
130
131
132
133

	  //FIXME Eliminar cuando se utilice SLURM
          //malleability_add_data(&num_nodes, 1, MAL_INT, 1, 1);
          //malleability_add_data(&nodelist, nodelist_len, MAL_CHAR, 1, 1);
134
        }
135
      }
136
137

      res = work();
138
      if(res == MALL_ZOMBIE) break;
139
140

      print_local_results();
141
      reset_results_index(results);
142
    } while(config_file->n_resizes > group->grp + 1 && config_file->sm == MALL_SPAWN_MERGE);
143

144
145
146
147
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //

148

149
    if(res==1) { // Se ha llegado al final de la aplicacion
150
      MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
151
      results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
152
    }
153
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
154
155
156
157
158

    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

159
    if(group->myId == ROOT && config_file->sm == MALL_SPAWN_MERGE) {
160
161
162
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
    free_application_data();
163

164
    MPI_Finalize();
165

166
167
168
169
    return 0;
}

/*
170
171
172
173
174
175
176
177
178
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
179
180
181
182
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
183
 */
iker_martin's avatar
iker_martin committed
184
int work() {
185
  int iter, maxiter, state, res;
186
  double *matrix = NULL;
187

iker_martin's avatar
iker_martin committed
188
  maxiter = config_file->iters[group->grp];
189
  state = MALL_NOT_STARTED;
190

191
  res = 0;
192
  for(iter=group->iter_start; iter < maxiter; iter++) {
193
    iterate(matrix, config_file->granularity, state, iter);
194
  }
195

196
  if(config_file->n_resizes != group->grp + 1)
197
198
    state = malleability_checkpoint();

199
  iter = 0;
200
  while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE) {
201
    if(iter < config_file->iters[group->grp+1]) {
202
      iterate(matrix, config_file->granularity, state, iter);
203
204
205
      iter++;
      group->iter_start = iter;
    }
206
    state = malleability_checkpoint();
207
  }
208

209
  
210
  if(config_file->n_resizes - 1 == group->grp) res=1;
211
  if(state == MALL_ZOMBIE) res=state;
212
  return res;
213
214
}

215
216
217
218
219
220
221
222
223
224
225
226

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
 * que dura al menos un tiempo de "time" segundos.
 */
227
double iterate(double *matrix, int n, int async_comm, int iter) {
228
  double start_time, start_time_stage, actual_time, *times_stages;
229
  int i, cnt_async = 0;
230
231
  double aux = 0;

232
  times_stages = malloc(config_file->n_stages * sizeof(double));
233
  start_time = MPI_Wtime();
234

235
  for(i=0; i < config_file->n_stages; i++) {
236
    start_time_stage = MPI_Wtime();
237
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
238
    times_stages[i] = MPI_Wtime() - start_time_stage;
239
240
241
242
  }

  actual_time = MPI_Wtime(); // Guardar tiempos
  // TODO Que diferencie entre ambas en el IO
243
  if(async_comm == MALL_DIST_PENDING || async_comm == MALL_SPAWN_PENDING || async_comm == MALL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
244
    cnt_async=1;
245
246
247
  }

  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
248
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
249
250
  }
  results->iters_time[results->iter_index] = actual_time - start_time;
251
252
253
  for(i=0; i < config_file->n_stages; i++) {
    results->stage_times[i][results->iter_index] = times_stages[i];
  }
254
  results->iters_async += cnt_async;
255
  results->iter_index = results->iter_index + 1;
256

257
258
  free(times_stages);

259
  return aux;
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
}

//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

284

285
286
287
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
288
289
int print_local_results() {
  int ptr_local, ptr_out, err;
290
291
  char *file_name;

292
  compute_results_iter(results, group->myId, ROOT, comm);
293
  if(group->myId == ROOT) {
294
295
    ptr_out = dup(1);

296
297
298
299
300
301
302
303
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
304
305
    print_iter_results(*results);
    print_stage_results(*results, config_file->n_stages);
306
307
    free(file_name);

308
    fflush(stdout);
309
310
311
312
313
314
315
316
317
318
319
    close(1);
    dup(ptr_out);
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
320
  int ptr_global, err, ptr_out;
321
322
323
324
  char *file_name;

  if(group->myId == ROOT) {

325
    if(group->grp == config_file->n_resizes -1) {
326
327
328
329
330
331
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

332
      ptr_out = dup(1);
333
334
      create_out_file(file_name, &ptr_global, 1);
      print_config(config_file, group->grp);
335
      print_global_results(*results, config_file->n_resizes);
336
      fflush(stdout);
337
      free(file_name);
338
339
340

      close(1);
      dup(ptr_out);
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
  group = malloc(1 * sizeof(group_data));
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

  config_file = read_ini_file(group->argv[1]);
  results = malloc(sizeof(results_data));
379
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->iters[group->grp]);
380
381
382
383
384
385
  if(config_file->sdr) {
    malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
  }
  if(config_file->adr) {
    malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
  }
386
387

  int message_tam = 100000000;
388
389
390
  for(int i=0; i<3; i++) {
    config_file->latency_m = latency(group->myId, group->numP, comm);
    config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
391
  //if(group->myId == ROOT) printf("numP=%d Lat=%lf Bw=%lf\n", group->numP, config_file->latency_m, config_file->bw_m);
392
  }
393
  obtain_op_times(1);
394
395
396
397
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
398
399
400
401
402
403
404
405
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
406
 */
407
408
void obtain_op_times(int compute) {
  int i;
409
  double time = 0;
410
  for(i=0; i<config_file->n_stages; i++) {
411
    time+=init_stage(config_file, i, *group, comm, compute);
412
  }
413
  if(!compute) {results->wasted_time += time;}
414
415
416
417
418
419
420
421
422
423
424
425
426
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
  if(config_file->sdr) {
    free(group->sync_array);
  }
  if(config_file->adr) {
    free(group->async_array);
  }
  
427
428
  free_malleability();

429
  if(group->grp == 0) { //FIXME Revisar porque cuando es diferente a 0 no funciona
430
    free_results_data(results, config_file->n_stages);
431
    free(results);
432
  }
433
  free_config(config_file);
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}