Main.c 14.4 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
10
#include "../IOcodes/read_ini.h"
#include "../IOcodes/results.h"
11
#include "../malleability/CommDist.h"
12
13
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
14

iker_martin's avatar
iker_martin committed
15
int work();
16
double iterate(double *matrix, int n, int async_comm, int iter);
17

18
void init_group_struct(char *argv[], int argc, int myId, int numP);
19
void init_application();
20
void obtain_op_times();
21
22
void free_application_data();

23
void print_general_info(int myId, int grp, int numP);
24
int print_local_results();
25
int print_final_results();
iker_martin's avatar
iker_martin committed
26
int create_out_file(char *nombre, int *ptr, int newstdout);
27

iker_martin's avatar
iker_martin committed
28
29
configuration *config_file;
group_data *group;
30
results_data *results;
31
MPI_Comm comm;
32
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
33

34
int main(int argc, char *argv[]) {
35
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
36
    int req;
37
    int im_child;
38

39
40
41
42
43
44
45
46
    //FIXME El codigo no es capaz de hacer mas de una redistribucion - Arreglar malleabilityTypes.c
    int num_cpus, num_nodes; //nodelist_len; //FIXME Eliminar cuando se utilice Slurm
    char *nodelist = NULL;
    num_cpus = 20; //FIXME NUMERO MAGICO
    if (argc >= 5) {
      nodelist = argv[3];
      //nodelist_len = strlen(nodelist);
      num_nodes = atoi(argv[4]);
47
      num_cpus = num_nodes * num_cpus;
48
49
    }

50
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
51
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
52
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
53
    comm = MPI_COMM_WORLD;
iker_martin's avatar
iker_martin committed
54

55
56
57
58
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
    }

59
    init_group_struct(argv, argc, myId, numP);
60
    //FIXME No funciona en OpenMPI
61
    im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
62

63
    if(!im_child) { //TODO REFACTOR Simplificar inicio
64
65
      init_application();

66
      set_benchmark_grp(group->grp);
67
68
69
      set_benchmark_configuration(config_file);
      set_benchmark_results(results);

70
      MPI_Barrier(comm);
71
      results->exec_start = MPI_Wtime();
72
    } else { //Init hijos
73

74
      get_malleability_user_comm(&comm);
75
76
      get_benchmark_configuration(&config_file);
      get_benchmark_results(&results);
77
      set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
78

79
80
      // TODO Refactor - Que sea una unica funcion
      // Obtiene las variables que van a utilizar los hijos
81
82
83
84
85
86
87
      void *value = NULL;
      malleability_get_data(&value, 0, 1, 1);
      group->grp = *((int *)value);
      free(value);
      malleability_get_data(&value, 1, 1, 1);
      run_id = *((int *)value);
      free(value);
88
89
90
91
      
      malleability_get_data(&value, 2, 1, 1);
      group->iter_start = *((int *)value);
      free(value);
92

93
94
95
96
97
98
99
100
101
102
103
104
      //FIXME Eliminar cuando se utilice SLURM
      /*
      malleability_get_data(&value, 4, 1, 1);
      num_nodes = *((int *)value);
      free(value);

      malleability_get_data(&value, 5, 1, 1);
      nodelist = (char *)value;
      //free(value);
      nodelist_len = strlen(nodelist);
      */

105
      group->grp = group->grp + 1;
106
107
    }

108
109
110
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
111
112
    group->grp = group->grp - 1; // TODO REFACTOR???
    do {
113

114
      group->grp = group->grp + 1;
115
      if(group->grp != 0) obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
116
117
118
119
120
      set_benchmark_grp(group->grp);
      get_malleability_user_comm(&comm);
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));

121
122
      if(config_file->n_resizes != group->grp + 1) { 
        set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], -1, config_file->at, -1);
123
124
125
126
127
        set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED

        if(group->grp == 0) {
          malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
          malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
128
          malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
129
130
131
132

	  //FIXME Eliminar cuando se utilice SLURM
          //malleability_add_data(&num_nodes, 1, MAL_INT, 1, 1);
          //malleability_add_data(&nodelist, nodelist_len, MAL_CHAR, 1, 1);
133
        }
134
      }
135
136

      res = work();
iker_martin's avatar
iker_martin committed
137
      if(res == MAL_ZOMBIE) break;
138
139

      print_local_results();
140
      reset_results_index(results);
141
    } while((config_file->n_resizes > group->grp + 1) && (config_file->sm == COMM_SPAWN_MERGE || config_file->sm == COMM_SPAWN_MERGE_PTHREAD));
142

143
144
145
146
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
    //

147

148
    if(res==1) { // Se ha llegado al final de la aplicacion
149
      MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
150
      results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
151
    }
152
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
153
154
155
156
157

    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }

158
    if(group->myId == ROOT && (config_file->sm == COMM_SPAWN_MERGE || config_file->sm == COMM_SPAWN_MERGE_PTHREAD)) {
159
160
161
      MPI_Abort(MPI_COMM_WORLD, -100);
    }
    free_application_data();
162

163
    MPI_Finalize();
164

165
166
167
168
    return 0;
}

/*
169
170
171
172
173
174
175
176
177
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
178
179
180
181
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
182
 */
iker_martin's avatar
iker_martin committed
183
int work() {
184
  int iter, maxiter, state, res;
185
  double *matrix = NULL;
186

iker_martin's avatar
iker_martin committed
187
  maxiter = config_file->iters[group->grp];
188
  state = MAL_NOT_STARTED;
189
  
190
  res = 0;
191
  for(iter=group->iter_start; iter < maxiter; iter++) {
192
    iterate(matrix, config_file->granularity, state, iter);
193
  }
194

195
  if(config_file->n_resizes != group->grp + 1)
196
197
    state = malleability_checkpoint();

198
  iter = 0;
199
  while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) {
200
    if(iter < config_file->iters[group->grp+1]) {
201
      iterate(matrix, config_file->granularity, state, iter);
202
203
204
      iter++;
      group->iter_start = iter;
    }
205
    state = malleability_checkpoint();
206
  }
207

208
  
209
  if(config_file->n_resizes - 1 == group->grp) res=1;
iker_martin's avatar
iker_martin committed
210
  if(state == MAL_ZOMBIE) res=state;
211
  return res;
212
213
}

214
215
216
217
218
219
220
221
222
223
224
225

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
 * que dura al menos un tiempo de "time" segundos.
 */
226
double iterate(double *matrix, int n, int async_comm, int iter) {
227
  double start_time, actual_time;
228
  int i, cnt_async = 0;
229
230
  double aux = 0;

231
  start_time = MPI_Wtime();
232

233
234
  for(i=0; i < config_file->n_stages; i++) {
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
235
236
237
238
  }

  actual_time = MPI_Wtime(); // Guardar tiempos
  // TODO Que diferencie entre ambas en el IO
239
  if(async_comm == MAL_DIST_PENDING || async_comm == MAL_SPAWN_PENDING || async_comm == MAL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
240
    cnt_async=1;
241
242
243
244
245
246
  }

  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
    realloc_results_iters(results, results->iters_size + 100);
  }
  results->iters_time[results->iter_index] = actual_time - start_time;
247
  results->iters_async += cnt_async;
248
  results->iter_index = results->iter_index + 1;
249
250

  return aux;
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
}

//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

275

276
277
278
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
279
280
int print_local_results() {
  int ptr_local, ptr_out, err;
281
282
  char *file_name;

283
  compute_results_iter(results, group->myId, ROOT, comm);
284
  if(group->myId == ROOT) {
285
286
    ptr_out = dup(1);

287
288
289
290
291
292
293
294
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
295
    print_iter_results(*results, config_file->iters[group->grp] - 1);
296
297
    free(file_name);

298
    fflush(stdout);
299
300
301
302
303
304
305
306
307
308
309
    close(1);
    dup(ptr_out);
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
310
  int ptr_global, err, ptr_out;
311
312
313
314
  char *file_name;

  if(group->myId == ROOT) {

315
    if(group->grp == config_file->n_resizes -1) {
316
317
318
319
320
321
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

322
      ptr_out = dup(1);
323
324
      create_out_file(file_name, &ptr_global, 1);
      print_config(config_file, group->grp);
325
      print_global_results(*results, config_file->n_resizes);
326
      fflush(stdout);
327
      free(file_name);
328
329
330

      close(1);
      dup(ptr_out);
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
  group = malloc(1 * sizeof(group_data));
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

  config_file = read_ini_file(group->argv[1]);
  results = malloc(sizeof(results_data));
369
  init_results_data(results, config_file->n_resizes, config_file->iters[group->grp]);
370
371
372
373
374
375
  if(config_file->sdr) {
    malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
  }
  if(config_file->adr) {
    malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
  }
376
377

  int message_tam = 100000000;
378
  message_tam =     10240000;
379
  //for(int i=0; i<10; i++) {
380
381
  config_file->latency_m = latency(group->myId, group->numP, comm);
  config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
382
383
  //if(group->myId == ROOT) printf("numP=%d Lat=%lf Bw=%lf\n", group->numP, config_file->latency_m, config_file->bw_m);
  //}
384
  obtain_op_times(1);
385
386
387
388
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
389
390
391
392
393
394
395
396
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
397
 */
398
399
void obtain_op_times(int compute) {
  int i;
400
  double time = 0;
401
  for(i=0; i<config_file->n_stages; i++) {
402
    time+=init_stage(config_file, i, *group, comm, compute);
403
  }
404
  if(!compute) results->wasted_time += time;
405
406
407
408
409
410
411
412
413
414
415
416
417
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
  if(config_file->sdr) {
    free(group->sync_array);
  }
  if(config_file->adr) {
    free(group->async_array);
  }
  
418
419
420
  free_malleability();
  free_config(config_file);

421
  if(group->grp == 0) { //FIXME Revisar porque cuando es diferente a 0 no funciona
422
    free_results_data(results);
423
    free(results);
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
  }
  free(group);
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}