Main.c 14.2 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
#include "../malleability/CommDist.h"
10
11
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
12

iker_martin's avatar
iker_martin committed
13
int work();
14
double iterate(double *matrix, int n, int async_comm, int iter);
15

16
void init_group_struct(char *argv[], int argc, int myId, int numP);
17
void init_application();
18
void obtain_op_times();
19
20
void free_application_data();

21
void print_general_info(int myId, int grp, int numP);
22
int print_local_results();
23
int print_final_results();
iker_martin's avatar
iker_martin committed
24
int create_out_file(char *nombre, int *ptr, int newstdout);
25

iker_martin's avatar
iker_martin committed
26
27
configuration *config_file;
group_data *group;
28
results_data *results;
29
MPI_Comm comm;
30
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
31

32
int main(int argc, char *argv[]) {
33
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
34
    int req;
35
    int im_child;
36

37
38
39
40
41
42
43
44
    //FIXME El codigo no es capaz de hacer mas de una redistribucion - Arreglar malleabilityTypes.c
    int num_cpus, num_nodes; //nodelist_len; //FIXME Eliminar cuando se utilice Slurm
    char *nodelist = NULL;
    num_cpus = 20; //FIXME NUMERO MAGICO
    if (argc >= 5) {
      nodelist = argv[3];
      //nodelist_len = strlen(nodelist);
      num_nodes = atoi(argv[4]);
45
      num_cpus = num_nodes * num_cpus;
46
47
    }

48
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
49
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
50
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
51
    comm = MPI_COMM_WORLD;
iker_martin's avatar
iker_martin committed
52

53
54
55
56
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
    }

57
    init_group_struct(argv, argc, myId, numP);
58
    //FIXME No funciona en OpenMPI
59
    im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
60

61
    if(!im_child) { //TODO REFACTOR Simplificar inicio
62
63
      init_application();

64
      set_benchmark_grp(group->grp);
65
66
67
      set_benchmark_configuration(config_file);
      set_benchmark_results(results);

68
      MPI_Barrier(comm);
69
      results->exec_start = MPI_Wtime();
70
    } else { //Init hijos
71

72
      get_malleability_user_comm(&comm);
73
74
      get_benchmark_configuration(&config_file);
      get_benchmark_results(&results);
75
      set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
76

77
78
      // TODO Refactor - Que sea una unica funcion
      // Obtiene las variables que van a utilizar los hijos
79
80
81
82
83
84
85
      void *value = NULL;
      malleability_get_data(&value, 0, 1, 1);
      group->grp = *((int *)value);
      free(value);
      malleability_get_data(&value, 1, 1, 1);
      run_id = *((int *)value);
      free(value);
86
87
88
89
      
      malleability_get_data(&value, 2, 1, 1);
      group->iter_start = *((int *)value);
      free(value);
90

91
92
93
94
95
96
97
98
99
100
101
102
      //FIXME Eliminar cuando se utilice SLURM
      /*
      malleability_get_data(&value, 4, 1, 1);
      num_nodes = *((int *)value);
      free(value);

      malleability_get_data(&value, 5, 1, 1);
      nodelist = (char *)value;
      //free(value);
      nodelist_len = strlen(nodelist);
      */

103
      group->grp = group->grp + 1;
104
105
    }

106
107
108
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
109
110
    group->grp = group->grp - 1; // TODO REFACTOR???
    do {
111

112
      group->grp = group->grp + 1;
113
      if(group->grp != 0) obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
114
115
116
117
118
      set_benchmark_grp(group->grp);
      get_malleability_user_comm(&comm);
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));

119
120
      if(config_file->n_resizes != group->grp + 1) { 
        set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], -1, config_file->at, -1);
121
122
123
124
125
        set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED

        if(group->grp == 0) {
          malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
          malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
126
          malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
127
128
129
130

	  //FIXME Eliminar cuando se utilice SLURM
          //malleability_add_data(&num_nodes, 1, MAL_INT, 1, 1);
          //malleability_add_data(&nodelist, nodelist_len, MAL_CHAR, 1, 1);
131
        }
132
      }
133
134

      res = work();
iker_martin's avatar
iker_martin committed
135
      if(res == MAL_ZOMBIE) break;
136
137

      print_local_results();
138
      reset_results_index(results);
139
    } while((config_file->n_resizes > group->grp + 1) && (config_file->sm == COMM_SPAWN_MERGE || config_file->sm == COMM_SPAWN_MERGE_PTHREAD));
140

141
142
143
144
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //

145

146
    if(res==1) { // Se ha llegado al final de la aplicacion
147
      MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
148
      results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
149
    }
150
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
151
152
153
154
155

    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

156
    if(group->myId == ROOT && (config_file->sm == COMM_SPAWN_MERGE || config_file->sm == COMM_SPAWN_MERGE_PTHREAD)) {
157
158
159
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
    free_application_data();
160

161
    MPI_Finalize();
162

163
164
165
166
    return 0;
}

/*
167
168
169
170
171
172
173
174
175
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
176
177
178
179
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
180
 */
iker_martin's avatar
iker_martin committed
181
int work() {
182
  int iter, maxiter, state, res;
183
  double *matrix = NULL;
184

iker_martin's avatar
iker_martin committed
185
  maxiter = config_file->iters[group->grp];
186
  state = MAL_NOT_STARTED;
187
  
188
  res = 0;
189
  for(iter=group->iter_start; iter < maxiter; iter++) {
190
    iterate(matrix, config_file->granularity, state, iter);
191
  }
192

193
  if(config_file->n_resizes != group->grp + 1)
194
195
    state = malleability_checkpoint();

196
  iter = 0;
197
  while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) {
198
    if(iter < config_file->iters[group->grp+1]) {
199
      iterate(matrix, config_file->granularity, state, iter);
200
201
202
      iter++;
      group->iter_start = iter;
    }
203
    state = malleability_checkpoint();
204
  }
205

206
  
207
  if(config_file->n_resizes - 1 == group->grp) res=1;
iker_martin's avatar
iker_martin committed
208
  if(state == MAL_ZOMBIE) res=state;
209
  return res;
210
211
}

212
213
214
215
216
217
218
219
220
221
222
223

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
 * que dura al menos un tiempo de "time" segundos.
 */
224
double iterate(double *matrix, int n, int async_comm, int iter) {
225
  double start_time, actual_time;
226
  int i, cnt_async = 0;
227
228
  double aux = 0;

229
  start_time = MPI_Wtime();
230

231
232
  for(i=0; i < config_file->n_stages; i++) {
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
233
234
235
236
  }

  actual_time = MPI_Wtime(); // Guardar tiempos
  // TODO Que diferencie entre ambas en el IO
237
  if(async_comm == MAL_DIST_PENDING || async_comm == MAL_SPAWN_PENDING || async_comm == MAL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
238
    cnt_async=1;
239
240
241
242
243
244
  }

  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
    realloc_results_iters(results, results->iters_size + 100);
  }
  results->iters_time[results->iter_index] = actual_time - start_time;
245
  results->iters_async += cnt_async;
246
  results->iter_index = results->iter_index + 1;
247
248

  return aux;
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
}

//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

273

274
275
276
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
277
278
int print_local_results() {
  int ptr_local, ptr_out, err;
279
280
  char *file_name;

281
  compute_results_iter(results, group->myId, ROOT, comm);
282
  if(group->myId == ROOT) {
283
284
    ptr_out = dup(1);

285
286
287
288
289
290
291
292
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
293
    print_iter_results(*results, config_file->iters[group->grp] - 1);
294
295
    free(file_name);

296
    fflush(stdout);
297
298
299
300
301
302
303
304
305
306
307
    close(1);
    dup(ptr_out);
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
308
  int ptr_global, err, ptr_out;
309
310
311
312
  char *file_name;

  if(group->myId == ROOT) {

313
    if(group->grp == config_file->n_resizes -1) {
314
315
316
317
318
319
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

320
      ptr_out = dup(1);
321
322
      create_out_file(file_name, &ptr_global, 1);
      print_config(config_file, group->grp);
323
      print_global_results(*results, config_file->n_resizes);
324
      fflush(stdout);
325
      free(file_name);
326
327
328

      close(1);
      dup(ptr_out);
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
  group = malloc(1 * sizeof(group_data));
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

  config_file = read_ini_file(group->argv[1]);
  results = malloc(sizeof(results_data));
367
  init_results_data(results, config_file->n_resizes, config_file->iters[group->grp]);
368
369
370
371
372
373
  if(config_file->sdr) {
    malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
  }
  if(config_file->adr) {
    malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
  }
374
375

  int message_tam = 100000000;
376
  message_tam =     10240000;
377
378
379
  config_file->latency_m = latency(group->myId, group->numP, comm);
  config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
  obtain_op_times(1);
380
381
382
383
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
384
385
386
387
388
389
390
391
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
392
 */
393
394
void obtain_op_times(int compute) {
  int i;
395
  double time = 0;
396
  for(i=0; i<config_file->n_stages; i++) {
397
    time+=init_stage(config_file, i, *group, comm, compute);
398
  }
399
  if(!compute) results->wasted_time += time;
400
401
402
403
404
405
406
407
408
409
410
411
412
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
  if(config_file->sdr) {
    free(group->sync_array);
  }
  if(config_file->adr) {
    free(group->async_array);
  }
  
413
414
415
  free_malleability();
  free_config(config_file);

416
  if(group->grp == 0) { //FIXME Revisar porque cuando es diferente a 0 no funciona
417
    free_results_data(results);
418
    free(results);
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
  }
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}