malleabilityManager.c 34.8 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
#include "malleabilityManager.h"
#include "malleabilityStates.h"
5
#include "malleabilityDataStructures.h"
6
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
7
#include "malleabilityZombies.h"
8
#include "malleabilityTimes.h"
9
#include "spawn_methods/GenericSpawn.h"
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1


void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

void Children_init();
int spawn_step();
int start_redistribution();
int check_redistribution();
int end_redistribution();
iker_martin's avatar
iker_martin committed
24
int shrink_redistribution();
25

26
27
28
void comm_node_data(int rootBcast, int is_child_group);
void def_nodeinfo_type(MPI_Datatype *node_type);

29
30
int thread_creation();
int thread_check();
31
void* thread_async_work();
32

33
void print_comms_state();
34
void malleability_comms_update(MPI_Comm comm);
35

36
int state = MALL_UNRESERVED; //FIXME Mover a otro lado
37
38
39
40
41
42

malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

43
/*
44
45
46
47
48
49
50
51
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
52
 */
53
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) {
54
55
  MPI_Comm dup_comm, thread_comm;

56
57
58
59
  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", myId, numP); fflush(stdout); MPI_Barrier(comm);
  #endif

60
61
  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
62

63
64
65
66
67
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

68
  mall->dup_user_comm = 0;
69
70
  MPI_Comm_dup(comm, &dup_comm);
  MPI_Comm_dup(comm, &thread_comm);
71
72
  MPI_Comm_set_name(dup_comm, "MPI_COMM_MALL");
  MPI_Comm_set_name(thread_comm, "MPI_COMM_MALL_THREAD");
73
74
75
76

  mall->myId = myId;
  mall->numP = numP;
  mall->root = root;
77
  mall->comm = dup_comm;
78
  mall->thread_comm = thread_comm;
79
  mall->user_comm = comm;
80

81
  mall->name_exec = name_exec;
82
83
84
  mall->nodelist = nodelist;
  mall->num_cpus = num_cpus;
  mall->num_nodes = num_nodes;
85
86
87
88
89
90

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

91
  state = MALL_NOT_STARTED;
92

93
  zombies_service_init();
94
  init_malleability_times();
95

96
97
98
99
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
  if(mall->intercomm != MPI_COMM_NULL ) { 
    Children_init();
100
    return MALLEABILITY_CHILDREN;
101
  }
iker_martin's avatar
iker_martin committed
102

103
104
105
106
107
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

108
109
110
111
112
113
114
  if(nodelist != NULL) { //TODO To be deprecated by using Slurm or else statement
    mall->nodelist_len = strlen(nodelist);
  } else { // If no nodelist is detected, get it from the actual run
    mall->nodelist = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
    MPI_Get_processor_name(mall->nodelist, &mall->nodelist_len);
    //TODO Get name of each process and create real nodelist
  }
115

116
117
118
119
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM has been initialized correctly as parents", myId, numP); fflush(stdout); MPI_Barrier(comm);
  #endif

120
  return MALLEABILITY_NOT_CHILDREN;
121
122
}

123
124
125
126
127
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
128
void free_malleability() {	  
129
130
131
132
133
134
135
136
137
138
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);

139
  free_malleability_times();
140
141
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
142
143
  free(mall);
  free(mall_conf);
iker_martin's avatar
iker_martin committed
144
145
146
147

  zombies_awake();
  zombies_service_free();

148
  state = MALL_UNRESERVED;
149
150
}

151
152
/* 
 * TODO Reescribir
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
 * llamando a la función de nuevo que se han terminado de enviar
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
int malleability_checkpoint() {
168
169
170
171
172
173
  double end_real_time;

  switch(state) {
    case MALL_UNRESERVED:
      break;
    case MALL_NOT_STARTED:
174
      reset_malleability_times();
175
      // Comprobar si se tiene que realizar un redimensionado
176
177
178
179
      
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
180
      mall_conf->times->malleability_start = MPI_Wtime();
181
      //if(CHECK_RMS()) {return MALL_DENIED;}
182

183
184
185
186
187
188
      state = spawn_step();

      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
        malleability_checkpoint();
      }
      break;
189

190
191
192
193
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado y comienza la redistribucion
    case MALL_SPAWN_SINGLE_PENDING:
      state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
194
195
196
        #if USE_MAL_BARRIERS
  	  MPI_Barrier(mall->comm);
	#endif
197
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
198

199
200
201
        malleability_checkpoint();
      }
      break;
202

203
204
205
206
207
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
      state = start_redistribution();
      malleability_checkpoint();
      break;
208

209
    case MALL_DIST_PENDING:
210
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
211
212
213
214
        state = thread_check();
      } else {
        state = check_redistribution();
      }
215
      if(state != MALL_DIST_PENDING) { 
216
217
218
219
220
        malleability_checkpoint();
      }
      break;

    case MALL_SPAWN_ADAPT_PENDING:
221
222
223
224

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
225
      mall_conf->times->spawn_start = MPI_Wtime();
226
227
      unset_spawn_postpone_flag(state);
      state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
228

229
      if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
230
231
232
        #if USE_MAL_BARRIERS
          MPI_Barrier(mall->comm);
	#endif
233
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
234
	malleability_checkpoint();
235
      }
236
      break;
237

238
    case MALL_SPAWN_ADAPTED:
239
      state = shrink_redistribution();
240
      malleability_checkpoint();
241
      break;
242

243
    case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
244
245
246
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
247
      mall_conf->times->malleability_end = MPI_Wtime();
248
249
250
      state = MALL_COMPLETED;
      break;
  }
251
252
253
254
255
256
257
258
259
260
261
262
263
  return state;
}

// Funciones solo necesarias por el benchmark
//-------------------------------------------------------------------------------------------------------------
void set_benchmark_grp(int grp) {
  mall_conf->grp = grp;
}

void set_benchmark_configuration(configuration *config_file) {
  mall_conf->config_file = config_file;
}

264
void get_benchmark_configuration(configuration **config_file) {
265
266
267
  *config_file = mall_conf->config_file;
}

268
269
void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  malleability_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
270
271
272
}
//-------------------------------------------------------------------------------------------------------------

273
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
274
275
  mall_conf->spawn_method = spawn_method;
  mall_conf->spawn_strategies = spawn_strategies;
276
  mall_conf->spawn_dist = spawn_dist;
277
278
279
280
281
282
283
  mall_conf->red_method = red_method;
  mall_conf->red_strategies = red_strategies;

  if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) && 
	(mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL)) {
    malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
  }
284
285
286
287
}

/*
 * To be deprecated
288
 * Tiene que ser llamado despues de setear la config
289
290
 */
void set_children_number(int numC){
291
  if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
292
293
294
295
296
    mall->numC = numC;
    mall->numC_spawned = numC - mall->numP;

    if(numC == mall->numP) { // Migrar
      mall->numC_spawned = numC;
297
      mall_conf->spawn_method = MALL_SPAWN_BASELINE;
298
299
300
301
302
303
304
305
306
307
308
    }
  } else {
    mall->numC = numC;
    mall->numC_spawned = numC;
  }
}

/*
 * TODO
 */
void get_malleability_user_comm(MPI_Comm *comm) {
309
310
311
312
313
314
  if(mall->dup_user_comm) {
    if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm));
    MPI_Comm_dup(mall->comm, &(mall->user_comm));
    MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
    mall->dup_user_comm = 0;
  }
315
  *comm = mall->user_comm;
316
317
318
319
320
321
322
323
324
}

/*
 * Anyade a la estructura concreta de datos elegida
 * el nuevo set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "add_data".
325
326
 *
 * //FIXME Si es constante se debería ir a asincrono, no sincrono
327
 */
328
void malleability_add_data(void *data, size_t total_qty, int type, int is_replicated, int is_constant) {
329
  size_t total_reqs = 0;
330
331
332

  if(is_constant) {
    if(is_replicated) {
333
      add_data(data, total_qty, type, total_reqs, rep_s_data);
334
    } else {
335
      add_data(data, total_qty, type, total_reqs, dist_s_data);
336
337
338
    }
  } else {
    if(is_replicated) {
339
      add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
340
    } else {
341
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
342
        total_reqs = 1;
343
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
344
        total_reqs = mall->numC;
345
      }
346
347
348
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
349
350
351
352
353
354
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
    }
  }
}

355
356
357
358
359
360
361
/*
 * Modifica en la estructura concreta de datos elegida en el indice "index"
 * con el set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "modify_data".
362
 * //FIXME Si es constante se debería ir a asincrono, no sincrono
363
364
 */
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int is_replicated, int is_constant) {
365
366
  size_t total_reqs = 0;

367
368
  if(is_constant) {
    if(is_replicated) {
369
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
370
    } else {
371
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
372
373
374
    }
  } else {
    if(is_replicated) {
375
376
377
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
378
        total_reqs = 1;
379
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
380
381
        total_reqs = mall->numC;
      }
382
383
384
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
385
      
386
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
387
388
389
390
    }
  }
}

391
392
393
/*
 * Devuelve el numero de entradas para la estructura de descripcion de 
 * datos elegida.
394
 * //FIXME Si es constante se debería ir a asincrono, no sincrono
395
 */
396
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
  
  if(is_constant) {
    if(is_replicated) {
      *entries = rep_s_data->entries;
    } else {
      *entries = dist_s_data->entries;
    }
  } else {
    if(is_replicated) {
      *entries = rep_a_data->entries;
    } else {
      *entries = dist_a_data->entries;
    }
  }
}

/*
 * Devuelve el elemento de la lista "index" al usuario.
 * La devolución es en el mismo orden que lo han metido los padres
 * con la funcion "malleability_add_data()".
 * Es tarea del usuario saber el tipo de esos datos.
 * TODO Refactor a que sea automatico
419
 * //FIXME Si es constante se debería ir a asincrono, no sincrono
420
 */
421
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
      data_struct = rep_s_data;
    } else {
      data_struct = dist_s_data;
    }
  } else {
    if(is_replicated) {
      data_struct = rep_a_data;
    } else {
      data_struct = dist_a_data;
    }
  }

438
  *data = data_struct->arrays[index];
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
}


//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
454
  size_t i;
455
  char *aux_send, *aux_recv;
456
457
458

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
459
460
461
462
463
      aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
      aux_recv = NULL;
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, 
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
      if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
464
465
466
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
467
468
469
470
      aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
      aux_recv = NULL;
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
      if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
471
472
473
474
475
476
477
478
479
480
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
481
  size_t i;
482
  char *aux, aux_s;
483
484
485
486

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
      aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
487
488
      async_communication_start(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, 
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
489
490
491
492
493
      data_struct->arrays[i] = (void *) aux;
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
      aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
494
      sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
      data_struct->arrays[i] = (void *) aux;
    }
  }
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
void Children_init() {
512
513
  size_t i;
  int numP_parents, root_parents;
514
  int is_intercomm;
515

516
517
518
519
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

520
521
  malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
522
523
524
525
  if(!is_intercomm) { // For intracommunicators, these processes will be added
    MPI_Comm_rank(mall->intercomm, &(mall->myId));
    MPI_Comm_size(mall->intercomm, &(mall->numP));
  }
526

527
  recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
528
  comm_node_data(root_parents, MALLEABILITY_CHILDREN);
529
530
  MPI_Bcast(&(mall_conf->red_method), 1, MPI_INT, root_parents, mall->intercomm);
  MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, root_parents, mall->intercomm);
531

532
533
534
535
  #if USE_MAL_DEBUG
    DEBUG_FUNC("Children have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

536
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
537
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
538
539
540
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif
541
542
543
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
544

545
546
547
548
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
    } else {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
549

550
551
552
      #if USE_MAL_DEBUG >= 2
        DEBUG_FUNC("Children started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
      #endif
553
554
555
      for(i=0; i<dist_a_data->entries; i++) {
        async_communication_wait(mall_conf->red_strategies, mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i]);
      }
556
557
558
      #if USE_MAL_DEBUG >= 2
        DEBUG_FUNC("Children waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
      #endif
559
560
561
      for(i=0; i<dist_a_data->entries; i++) {
        async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
      }
562
    }
563

564
565
566
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
567
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
568
  }
569
570
571
  #if USE_MAL_DEBUG
    DEBUG_FUNC("Children have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
572

573
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
574
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
575
576
577
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
578
    recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
579
580
581
582

    // TODO Crear funcion especifica y anyadir para Asinc
    // TODO Tener en cuenta el tipo y qty
    for(i=0; i<rep_s_data->entries; i++) {
583
584
585
586
587
588
      MPI_Datatype datatype;
      if(rep_s_data->types[i] == MAL_INT) {
        datatype = MPI_INT;
      } else {
        datatype = MPI_CHAR;
      }
589
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
590
    } 
591
592
593
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
594
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
595
  }
596
597
598
  #if USE_MAL_DEBUG
    DEBUG_FUNC("Children have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
599

600
  // Guardar los resultados de esta transmision
601
  malleability_times_broadcast(mall->root);
602
  if(!is_intercomm) {
603
    malleability_comms_update(mall->intercomm);
604
  }
605

606
607
608
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
609
  mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
610
  MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
611
612
613
614

  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
615
616
617
618
619
620
621
622
623
624
625
626
627
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
628
629
630
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
631
  mall_conf->times->spawn_start = MPI_Wtime();
632
 
633
  state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
634

635
  if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
636
637
638
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
639
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
640
641
642
643
  }
  return state;
}

644

645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
660
661
662
663
664
665
666
667
  int rootBcast, is_intercomm;

  is_intercomm = 0;
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
  } else { 
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
668
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
669
  }
670

671
672
673
674
675
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
676

677
  send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
678
  comm_node_data(rootBcast, MALLEABILITY_NOT_CHILDREN);
679
680
  MPI_Bcast(&(mall_conf->red_method), 1, MPI_INT, rootBcast, mall->intercomm);
  MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, rootBcast, mall->intercomm);
681

682
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
683
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
684
    //FIXME No se envian los datos replicados (rep_a_data)
685
686
687
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
688
    mall_conf->times->async_start = MPI_Wtime();
689
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
690
691
692
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
693
      return MALL_DIST_PENDING; 
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
    }
  } 
  return end_redistribution();
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
712
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
713
714
 */
int check_redistribution() {
715
716
  int is_intercomm, completed, local_completed, all_completed;
  size_t i, req_qty;
717
  MPI_Request *req_completed;
718
719
  MPI_Win window;
  local_completed = 1;
720
721
722
  #if USE_MAL_DEBUG >= 2
    DEBUG_FUNC("Originals are checking for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
723
724
725
726
727
728

  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
    local_completed = local_completed && completed;
729
  }
730
731
732
  #if USE_MAL_DEBUG >= 2
    DEBUG_FUNC("Originals will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
733

734
  MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
735
  if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
736
737
738
  #if USE_MAL_DEBUG >= 2
    DEBUG_FUNC("Originals sent asyncrhonous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
739

740
741
742
743
744
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
    async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
745
  }
746
747

  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
748
749
750
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
751
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
752
753
754
755
756
757
758
759
760
761
762
763
764
  return end_redistribution();
}


/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
765
766
  size_t i;
  int is_intercomm, rootBcast, local_state;
767

768
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
769
770
771
772
773
774
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  
775
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
776
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
777
778
779
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
780
    mall_conf->times->sync_start = MPI_Wtime();
781
782
783
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    // TODO Crear funcion especifica y anyadir para Asinc
784
    // TODO Tener en cuenta el tipo
785
    for(i=0; i<rep_s_data->entries; i++) {
786
787
788
789
790
791
      MPI_Datatype datatype;
      if(rep_s_data->types[i] == MAL_INT) {
        datatype = MPI_INT;
      } else {
        datatype = MPI_CHAR;
      }
792
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
793
    } 
794
795
796
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
797
    if(!is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
798
  }
iker_martin's avatar
iker_martin committed
799

800
  malleability_times_broadcast(rootBcast);
iker_martin's avatar
iker_martin committed
801

802
803
804
  local_state = MALL_DIST_COMPLETED;
  if(!is_intercomm) { // Merge Spawn
    if(mall->numP < mall->numC) { // Expand
805
      malleability_comms_update(mall->intercomm);
806
807
808
809
    } else { // Shrink || Merge Shrink requiere de mas tareas
      local_state = MALL_SPAWN_ADAPT_PENDING;
    }
  }
810

811
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) {
812
    MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
813
  }
814

815
  return local_state;
816
817
}

818
819
820
821

///=============================================
///=============================================
///=============================================
822
//TODO Add comment
iker_martin's avatar
iker_martin committed
823
int shrink_redistribution() {
824
825
826
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
827
    double time_extra = MPI_Wtime();
828

829
830
831
    //TODO Create Commit function. Processes can perform tasks before that. Then call again Malleability to commit the change
    MPI_Abort(MPI_COMM_WORLD, -20); //                                                         (void *) mall_conf->results
    zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, NULL, mall_conf->config_file->n_stages, mall_conf->config_file->capture_method);
iker_martin's avatar
iker_martin committed
832
833
    
    if(mall->myId < mall->numC) {
834
      if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
835
      if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
836
      mall->dup_user_comm = 1;
837
838
839
840

      MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
      MPI_Comm_dup(mall->intercomm, &(mall->comm));

841
842
843
      MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
      MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");

844
845
      MPI_Comm_free(&(mall->intercomm));

846
847
848
849

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
850
      mall_conf->times->spawn_time += MPI_Wtime() - time_extra;
851
      return MALL_DIST_COMPLETED;
iker_martin's avatar
iker_martin committed
852
    } else {
853
      return MALL_ZOMBIE;
iker_martin's avatar
iker_martin committed
854
855
856
    }
}

857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=================COMM NODE INFO ======================||
//======================================================||
//======================================================||
//TODO Add comment
void comm_node_data(int rootBcast, int is_child_group) {
  MPI_Datatype node_type;

  def_nodeinfo_type(&node_type);
  MPI_Bcast(mall, 1, node_type, rootBcast, mall->intercomm);

  if(is_child_group) {
    mall->nodelist = malloc((mall->nodelist_len+1) * sizeof(char));
    mall->nodelist[mall->nodelist_len] = '\0';
  }
  MPI_Bcast(mall->nodelist, mall->nodelist_len, MPI_CHAR, rootBcast, mall->intercomm);

  MPI_Type_free(&node_type);
}

//TODO Add comment
void def_nodeinfo_type(MPI_Datatype *node_type) {
  int i, counts = 3;
  int blocklengths[3] = {1, 1, 1};
  MPI_Aint displs[counts], dir;
  MPI_Datatype types[counts];

  // Rellenar vector types
  types[0] = types[1] = types[2] = MPI_INT;

  // Rellenar vector displs
  MPI_Get_address(mall, &dir);

  MPI_Get_address(&(mall->num_cpus), &displs[0]);
  MPI_Get_address(&(mall->num_nodes), &displs[1]);
  MPI_Get_address(&(mall->nodelist_len), &displs[2]);

  for(i=0;i<counts;i++) displs[i] -= dir;

  MPI_Type_create_struct(counts, blocklengths, displs, types, node_type);
  MPI_Type_commit(node_type);
}

901
902
903
904
905
906
907
// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

908
909

int comm_state; //FIXME Usar un handler
910
911
912
913
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
914
  comm_state = MALL_DIST_PENDING;
915
916
917
918
919
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
920
  return comm_state;
921
922
923
924
925
926
927
928
929
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
int thread_check() {
930
  int all_completed = 0, is_intercomm;
931
932

  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
933
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
934
935
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
  //FIXME No se tiene en cuenta el estado MALL_APP_ENDED
936
937
938
939
940
941

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
942
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
943
944
945
946

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
947
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
948
949
950
951
952
953
954
955
956
957
958
959
  return end_redistribution();
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
960
void* thread_async_work() {
961
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
962
  comm_state = MALL_DIST_COMPLETED;
963
964
  pthread_exit(NULL);
}
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
  MPI_Comm_get_name(mall->user_comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->user_comm, test);
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
985
986
987
988
989
990
991
992
993
994
995
996
997
998

void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
  if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));
  MPI_Comm_dup(comm, &(mall->user_comm)); 

  MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
  MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
  MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
}
999