Main.c 17.3 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
5
#include <fcntl.h>
#include <sys/stat.h>
6
#include "../IOcodes/read_ini.h"
7
#include "../IOcodes/results.h"
iker_martin's avatar
iker_martin committed
8
9
#include "../malleability/ProcessDist.h"
#include "../malleability/CommDist.h"
10
11
12

#define ROOT 0

iker_martin's avatar
iker_martin committed
13
14
15
int work();
void Sons_init();

16
int checkpoint(int iter, int state, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
17
void TC(int numS);
18
19
int start_redistribution(int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
20

21
void iterate(double *matrix, int n, int async_comm);
22
23
24
void computeMatrix(double *matrix, int n);
void initMatrix(double **matrix, int n);

25
void init_group_struct(char *argv[], int argc, int myId, int numP);
26
27
28
void init_application();
void free_application_data();

29
void print_general_info(int myId, int grp, int numP);
30
int print_final_results();
iker_martin's avatar
iker_martin committed
31
int create_out_file(char *nombre, int *ptr, int newstdout);
32

iker_martin's avatar
iker_martin committed
33
34
35
36
typedef struct {
  int myId;
  int numP;
  int grp;
37
  int iter_start;
38
  int argc;
iker_martin's avatar
iker_martin committed
39
40
41

  MPI_Comm children, parents;
  char **argv;
42
  char *sync_array, *async_array;
iker_martin's avatar
iker_martin committed
43
44
45
46
} group_data;

configuration *config_file;
group_data *group;
47
results_data *results;
48
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
49

50
int main(int argc, char *argv[]) {
51
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
52
    int req;
53

iker_martin's avatar
iker_martin committed
54
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
55
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
56
57
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);

58
    init_group_struct(argv, argc, myId, numP);
59
    init_application();
iker_martin's avatar
iker_martin committed
60

61
62
63
    if(group->grp == 0) {
      MPI_Barrier(MPI_COMM_WORLD);
      results->exec_start = MPI_Wtime();
64
    }
65

66
    res = work();
67

68
69
70
    if(res) { // Se he llegado al final de la aplicacion
      MPI_Barrier(MPI_COMM_WORLD);
      results->exec_time = MPI_Wtime() - results->exec_start;
71
    }
72
73
74
    print_final_results();

    free_application_data();
75
76
77
78
79
    MPI_Finalize();
    return 0;
}

/*
80
81
82
83
84
85
86
87
88
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
89
90
91
92
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
93
 */
iker_martin's avatar
iker_martin committed
94
int work() {
95
  int iter, maxiter, state, res;
96
  double *matrix;
97
  MPI_Request *async_comm;
98

iker_martin's avatar
iker_martin committed
99
100
  maxiter = config_file->iters[group->grp];
  initMatrix(&matrix, config_file->matrix_tam);
101
  state = MAL_COMM_UNINITIALIZED;
iker_martin's avatar
iker_martin committed
102

103
  res = 0;
104
  for(iter=group->iter_start; iter < maxiter; iter++) {
105
    iterate(matrix, config_file->matrix_tam, state);
106
  }
107
  state = checkpoint(iter, state, &async_comm);
108
  
109
110
  iter = 0;
  while(state == MAL_ASYNC_PENDING) {
111
    iterate(matrix, config_file->matrix_tam, state);
112
    iter++;
113
    state = checkpoint(iter, state, &async_comm);
114
  }
115
  
116
117
  if(config_file->resizes - 1 == group->grp) res=1;
  return res;
118
119
}

120
121
122
123
124
125
126
127
/*
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
128
 * llamando a la función de nuevo que se han terminado de enviar
129
130
131
132
133
134
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
135
136
137
138
139
140
141
int checkpoint(int iter, int state, MPI_Request **comm_req) {
  
  if(state == MAL_COMM_UNINITIALIZED) {
    // Comprobar si se tiene que realizar un redimensionado
    if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}

    int numS = config_file->procs[group->grp +1];
142
143

      results->spawn_start = MPI_Wtime();
144
    TC(numS);
145
      results->spawn_time[group->grp] = MPI_Wtime() - results->spawn_start;
146

147
148
149
150
151
    state = start_redistribution(numS, comm_req);

  } else if(MAL_ASYNC_PENDING) {
    state = check_redistribution(iter, comm_req);
  }
iker_martin's avatar
iker_martin committed
152

153
154
  return state;
}
iker_martin's avatar
iker_martin committed
155

156
157
158
159
160
161
162
/*
 * Se encarga de realizar la creacion de los procesos hijos.
 */
void TC(int numS){
  // Inicialización de la comunicación con SLURM
  int dist = config_file->phy_dist[group->grp +1];
  init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, COMM_SPAWN_SERIAL);
iker_martin's avatar
iker_martin committed
163

164
165
166
167
168
169
170
  // Esperar a que la comunicación y creación de procesos
  // haya finalizado
  int test = -1;
  while(test != MPI_SUCCESS) {
    test = check_slurm_comm(group->myId, ROOT, MPI_COMM_WORLD, &(group->children));
  }
}
iker_martin's avatar
iker_martin committed
171

172
173
174
175
176
177
178
179
180
181
182
183
184
185
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
186
int start_redistribution(int numS, MPI_Request **comm_req) {
iker_martin's avatar
iker_martin committed
187
188
189
190
191
  int rootBcast = MPI_PROC_NULL;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
192
  MPI_Bcast(&run_id, 1, MPI_INT, rootBcast, group->children);
iker_martin's avatar
iker_martin committed
193
194
  send_config_file(config_file, rootBcast, group->children);

195
  if(config_file->adr > 0) {
196
    results->async_start = MPI_Wtime();
197
198
199
    send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
    return MAL_ASYNC_PENDING;
  } 
iker_martin's avatar
iker_martin committed
200
  if(config_file->sdr > 0) {
201
      results->sync_start = MPI_Wtime();
iker_martin's avatar
iker_martin committed
202
203
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }
204
205

  
206
  send_results(results, rootBcast, config_file->resizes, group->children);
iker_martin's avatar
iker_martin committed
207
208
209
  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));

210
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
211
212
}

213
214
215
216
217
218
219
220
221
222
223
224
225
/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL", se considera terminada cuando
 * los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
 */
226
227
228
229
230
231
int check_redistribution(int iter, MPI_Request **comm_req) {
  int completed, all_completed, test_err, iter_send;
  int numS = config_file->procs[group->grp +1];
  int rootBcast = MPI_PROC_NULL;
  MPI_Request *req_completed;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;
iker_martin's avatar
iker_martin committed
232

233
234
  if(config_file->aib == MAL_USE_NORMAL) {
    req_completed = &(*comm_req)[0];
235
  } else { // MAL_USE_IBARRIER
236
    req_completed = &(*comm_req)[1];
iker_martin's avatar
iker_martin committed
237
  }
238
 
239
  
240
241
  test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
242
    printf("P%d aborting -- Test Async\n", group->myId);
243
244
245
246
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }

  MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
247
  if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended 
248
249
250
251
252

  //MPI_Wait(req_completed, MPI_STATUS_IGNORE);
  if(config_file->aib == MAL_USE_IBARRIER) {
    MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
  }
253
  
254
255
  iter_send = iter;
  MPI_Bcast(&iter_send, 1, MPI_INT, rootBcast, group->children);
256
  if(config_file->sdr > 0) { // Realizar envio sincrono
257
      results->sync_start = MPI_Wtime();
258
259
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }
260
  send_results(results, rootBcast, config_file->resizes, group->children);
261
262
263

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));
264
  free(*comm_req);
265
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
266
267
}

268
269
270
271
272
273
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
iker_martin's avatar
iker_martin committed
274
275
276
277
void Sons_init() {

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, ROOT, group->parents);
278
  MPI_Bcast(&run_id, 1, MPI_INT, ROOT, group->parents);
iker_martin's avatar
iker_martin committed
279
280
281
282
  group->grp++;

  config_file = recv_config_file(ROOT, group->parents);
  int numP_parents = config_file->procs[group->grp -1];
283
  init_results_data(&results, config_file->resizes - 1, config_file->iters[group->grp]);
iker_martin's avatar
iker_martin committed
284

285
286
  if(config_file->adr > 0) { // Recibir datos asincronos
    recv_async(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents, config_file->aib);
287
      results->async_time[group->grp] = MPI_Wtime();
288
289
    MPI_Bcast(&(group->iter_start), 1, MPI_INT, ROOT, group->parents);
  }
290
  if(config_file->sdr > 0) { // Recibir datos sincronos
iker_martin's avatar
iker_martin committed
291
    recv_sync(&(group->sync_array), config_file->sdr, group->myId, group->numP, ROOT, group->parents, numP_parents);
292
    results->sync_time[group->grp] = MPI_Wtime();
iker_martin's avatar
iker_martin committed
293
  }
294
295
296
297
298
299
300
301
302
303
304
305
306

  // Guardar los resultados de esta transmision
  recv_results(results, ROOT, config_file->resizes, group->parents);
  if(config_file->sdr > 0) { // Si no hay datos sincronos, el tiempo es 0
    results->sync_time[group->grp]  = MPI_Wtime() - results->sync_start;
  } else {
    results->sync_time[group->grp]  = 0;
  }
  if(config_file->adr > 0) { // Si no hay datos asincronos, el tiempo es 0
    results->async_time[group->grp]  = MPI_Wtime() - results->async_start;
  } else {
    results->async_time[group->grp]  = 0;
  }
iker_martin's avatar
iker_martin committed
307
308
309
310
311
312
313
314
315
316
317
318
319

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->parents));
}


/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


320
321
/*
 * Simula la ejecucción de una iteración de computo en la aplicación
322
 * que dura al menos un tiempo de "time" segundos.
323
 */
324
void iterate(double *matrix, int n, int async_comm) {
325
  double start_time, actual_time;
iker_martin's avatar
iker_martin committed
326
  double time = config_file->general_time * config_file->factors[group->grp];
327
  int i, operations = 0;
328
329

  start_time = actual_time = MPI_Wtime();
330
331
  if(async_comm == MAL_ASYNC_PENDING) { // Se esta realizando una redistribucion de datos asincrona
    operations = results->iters_type[config_file->iters[group->grp] - 1];
332
333
334
    for (i=0; i<operations; i++) {
      computeMatrix(matrix, n);
    }
335
    actual_time = MPI_Wtime(); // Guardar tiempos
336
337
    operations = 0;

338
  } else { // No hay redistribucion de datos actualmente	  
339
340
341
342
343
    while (actual_time - start_time < time) {
      computeMatrix(matrix, n);
      operations++;
      actual_time = MPI_Wtime(); // Guardar tiempos
    }
344
  }
345

346
347
348
  results->iters_time[results->iter_index] = actual_time - start_time;
  results->iters_type[results->iter_index] = operations;
  results->iter_index = results->iter_index + 1;
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
}

/*
 * Realiza una multiplicación de matrices de tamaño n
 */
void computeMatrix(double *matrix, int n) {
  int row, col, i, aux;

  for(row=0; i<n; row++) {
    /* COMPUTE */
    for(col=0; col<n; col++) {
      aux=0;
      for(i=0; i<n; i++) {
        aux += matrix[row*n + i] * matrix[i*n + col];
      }
    }
  }
}

/*
 * Init matrix
 */
void initMatrix(double **matrix, int n) {
  int i, j;

  // Init matrix
  if(matrix != NULL) {
    *matrix = malloc(n * n * sizeof(double));
    if(*matrix == NULL) { MPI_Abort(MPI_COMM_WORLD, -1);}
    for(i=0; i < n; i++) {
      for(j=0; j < n; j++) {
        (*matrix)[i*n + j] = i+j;
      }
    }
  }
}
385
386
387

//======================================================||
//======================================================||
388
//=============INIT/FREE/PRINT FUNCTIONS================||
389
390
391
//======================================================||
//======================================================||

392
393
394
395
/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
396
397
398
399
400
401
402
403
404
405
406
407
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}

408
409
410
411
412
413
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 *
 * Si es el ultimo grupo de procesos, muestra los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
414
int print_final_results() {
iker_martin's avatar
iker_martin committed
415
416
417
  int ptr_local, ptr_global, err;
  char *file_name;

418
  if(group->myId == ROOT) {
iker_martin's avatar
iker_martin committed
419
420
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
421
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
422
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
423
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
iker_martin's avatar
iker_martin committed
424
425
    create_out_file(file_name, &ptr_local, 1);
  
426
427
    print_config_group(config_file, group->grp);
    print_iter_results(results, config_file->iters[group->grp] -1);
iker_martin's avatar
iker_martin committed
428
    free(file_name);
429
430

    if(group->grp == config_file->resizes -1) {
iker_martin's avatar
iker_martin committed
431
432
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
433
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
434
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
435
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
iker_martin's avatar
iker_martin committed
436
437

      create_out_file(file_name, &ptr_global, 1);
438
439
      print_config(config_file, group->grp);
      print_global_results(results, config_file->resizes);
iker_martin's avatar
iker_martin committed
440
441
      free(file_name);
      
442
443
    }
  }
444
  return 0;
445
446
447
448
449
}

/*
 * Inicializa la estructura group
 */
450
void init_group_struct(char *argv[], int argc, int myId, int numP) {
451
452
453
454
455
  group = malloc(1 * sizeof(group_data));
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
456
  group->argc        = argc;
457
458
  group->argv        = argv;
}
459

460
461
462
463
464
465
466
467
468
469
/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
470

471
472
473
474
  MPI_Comm_get_parent(&(group->parents));
  if(group->parents != MPI_COMM_NULL ) { // Si son procesos hijos deben comunicarse con las padres
    Sons_init();
  } else { // Si son el primer grupo de procesos, recogen la configuracion inicial
475
476
477
478
479
480
481
482
483

    if(group->argc < 2) {
      printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
      exit(0);
    }
    if(group->argc > 2) {
      run_id = atoi(group->argv[2]);
    }

484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
    config_file = read_ini_file(group->argv[1]);
    init_results_data(&results, config_file->resizes, config_file->iters[group->grp]);
    if(config_file->sdr > 0) {
      malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
    }
    if(config_file->adr > 0) {
      malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
    }
  }
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
  if(config_file->sdr > 0) {
    free(group->sync_array);
  }
  if(config_file->adr > 0) {
    free(group->async_array);
  }
  free(group);
  free_config(config_file);
  free_results_data(&results);
}
iker_martin's avatar
iker_martin committed
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}