"Results/DataRedist/Synch/config15.ini" did not exist on "8a9e883db484ea6df53e2e1218530e8f0ab7349e"
malleabilityManager.c 35.2 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
#include "malleabilityManager.h"
#include "malleabilityStates.h"
5
#include "malleabilityDataStructures.h"
6
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
7
#include "malleabilityZombies.h"
8
#include "malleabilityTimes.h"
9
#include "spawn_methods/GenericSpawn.h"
10
11
12
13
14
15
16
17
18
19
20
21
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1


void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

void Children_init();
int spawn_step();
int start_redistribution();
22
int check_redistribution(int wait_completed);
23
int end_redistribution();
iker_martin's avatar
iker_martin committed
24
int shrink_redistribution();
25
26

int thread_creation();
27
int thread_check(int wait_completed);
28
void* thread_async_work();
29

30
void print_comms_state();
31
void malleability_comms_update(MPI_Comm comm);
32

33
int state = MALL_UNRESERVED; //FIXME Mover a otro lado
34
35
36
37
38
39

malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

40
/*
41
42
43
44
45
46
47
48
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
49
 */
50
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) {
51
52
  MPI_Comm dup_comm, thread_comm;

53
54
55
56
  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", myId, numP); fflush(stdout); MPI_Barrier(comm);
  #endif

57
58
  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
59

60
61
62
63
64
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

65
66
  MPI_Comm_dup(comm, &dup_comm);
  MPI_Comm_dup(comm, &thread_comm);
67
68
  MPI_Comm_set_name(dup_comm, "MPI_COMM_MAM");
  MPI_Comm_set_name(thread_comm, "MPI_COMM_MAM_THREAD");
69
70
71
72

  mall->myId = myId;
  mall->numP = numP;
  mall->root = root;
73
  mall->root_parents = -1;
74
  mall->comm = dup_comm;
75
  mall->thread_comm = thread_comm;
76
  mall->user_comm = MPI_COMM_NULL;
77

78
  mall->name_exec = name_exec;
79
80
81
  mall->nodelist = nodelist;
  mall->num_cpus = num_cpus;
  mall->num_nodes = num_nodes;
82
83
84
85
86
87

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

88
  state = MALL_NOT_STARTED;
89

90
  zombies_service_init();
91
  init_malleability_times();
92
  MAM_Def_main_datatype();
93

94
95
96
97
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
  if(mall->intercomm != MPI_COMM_NULL ) { 
    Children_init();
98
    return MALLEABILITY_CHILDREN;
99
  }
iker_martin's avatar
iker_martin committed
100

101
102
103
104
105
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

106
107
108
109
110
111
112
  if(nodelist != NULL) { //TODO To be deprecated by using Slurm or else statement
    mall->nodelist_len = strlen(nodelist);
  } else { // If no nodelist is detected, get it from the actual run
    mall->nodelist = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
    MPI_Get_processor_name(mall->nodelist, &mall->nodelist_len);
    //TODO Get name of each process and create real nodelist
  }
113

114
115
116
117
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM has been initialized correctly as parents", myId, numP); fflush(stdout); MPI_Barrier(comm);
  #endif

118
  return MALLEABILITY_NOT_CHILDREN;
119
120
}

121
122
123
124
125
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
126
void free_malleability() {	  
127
128
129
130
131
132
133
134
135
136
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);

137
  MAM_Free_main_datatype();
138
  free_malleability_times();
139
140
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
141
142
  free(mall);
  free(mall_conf);
iker_martin's avatar
iker_martin committed
143
144
145
146

  zombies_awake();
  zombies_service_free();

147
  state = MALL_UNRESERVED;
148
149
}

150
151
/* 
 * TODO Reescribir
152
153
154
155
156
157
158
159
160
161
162
163
164
165
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
 * llamando a la función de nuevo que se han terminado de enviar
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
166
int malleability_checkpoint(int *mam_state, int wait_completed) {
167
  int is_intercomm;
168
169
170

  switch(state) {
    case MALL_UNRESERVED:
171
      *mam_state = MAM_UNRESERVED;
172
173
      break;
    case MALL_NOT_STARTED:
174
      *mam_state = MAM_NOT_STARTED;
175
      reset_malleability_times();
176
      // Comprobar si se tiene que realizar un redimensionado
177
178
179
180
      
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
181
      mall_conf->times->malleability_start = MPI_Wtime();
182
      //if(CHECK_RMS()) {return MALL_DENIED;}
183

184
185
186
      state = spawn_step();

      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
187
        malleability_checkpoint(mam_state, wait_completed);
188
189
      }
      break;
190

191
192
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado y comienza la redistribucion
    case MALL_SPAWN_SINGLE_PENDING:
193
      state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
194
      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
195
196
197
        #if USE_MAL_BARRIERS
  	  MPI_Barrier(mall->comm);
	#endif
198
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
199

200
        malleability_checkpoint(mam_state, wait_completed);
201
202
      }
      break;
203

204
205
206
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
      state = start_redistribution();
207
      malleability_checkpoint(mam_state, wait_completed);
208
      break;
209

210
    case MALL_DIST_PENDING:
211
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
212
        state = thread_check(wait_completed);
213
      } else {
214
        state = check_redistribution(wait_completed);
215
      }
216
      if(state != MALL_DIST_PENDING) { 
217
        malleability_checkpoint(mam_state, wait_completed);
218
219
220
221
      }
      break;

    case MALL_SPAWN_ADAPT_PENDING:
222
223
224
225

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
226
      mall_conf->times->spawn_start = MPI_Wtime();
227
      unset_spawn_postpone_flag(state);
228
      state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
229

230
      if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
231
232
233
        #if USE_MAL_BARRIERS
          MPI_Barrier(mall->comm);
	#endif
234
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
235
	malleability_checkpoint(mam_state, wait_completed);
236
      }
237
      break;
238

239
    case MALL_SPAWN_ADAPTED: //FIXME Borrar?
240
      state = shrink_redistribution();
241
      if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla
242
      malleability_checkpoint(mam_state, wait_completed);
243
      break;
244

245
246
247
248
249
250
251
252
253
254
    case MALL_DIST_COMPLETED:
      MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
      if(is_intercomm) {
        MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->user_comm); //El que pone 0 va primero
      } else {
        MPI_Comm_dup(mall->intercomm, &mall->user_comm);
      }
      MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");
      state = MALL_COMPLETED;
      *mam_state = MAM_COMPLETED;
255
256
257
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
258
      mall_conf->times->malleability_end = MPI_Wtime();
259
260
      break;
  }
261
262

  if(state > MALL_ZOMBIE && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
263
264
265
  return state;
}

266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
/*
 * Returns an intracommunicator to allow users to perform their
 * own redistributions. The user must free this communicator
 * when is not longer needed.
 *
 * This is a blocking function, must be called by all processes involved in the
 * reconfiguration.
 * TODO Hacer en otro sitio la creacion del comunicador y borrar en commit.
 *
 * The communicator is only returned if the state of reconfiguration
 * is completed (MALL_COMPLETED / MAM_COMPLETED). Otherwise MALL_DENIED is obtained.
 */
int MAM_Get_comm(MPI_Comm *comm, int *targets_qty) {
  if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
    return MALL_DENIED;
  }

  MPI_Comm_dup(mall->user_comm, comm);
  MPI_Comm_set_name(*comm, "MPI_MAM_DUP");
  *targets_qty = mall->numC;
  return 0;
}

289
290
291
292
293
294
295
296
297
298
299
300
301
/*
 * TODO
 */
void MAM_Commit(int *mam_state, MPI_Comm *new_comm) {
  if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
    *mam_state = MALL_DENIED;
    return;
  }

  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

302
303
304
305
306
307
308
309
310
311
  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    int zombies;
    MPI_Allreduce(&state, &zombies, 1, MPI_INT, MPI_MIN, mall->intercomm);
    if(zombies == MALL_ZOMBIE) {
      zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root);
    }
  }
  // Reset/Free unneded communicators
  if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm));
312
313
314
315
316
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { 
    MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
  }

317
318
  MPI_Comm_rank(mall->comm, &(mall->myId));
  MPI_Comm_size(mall->comm, &(mall->numP));
319
320
321
322
323
  mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
  mall->root_parents = -1;
  state = MALL_NOT_STARTED;
  *mam_state = MAM_COMMITED;

324
325
326
  // Set new communicator
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *new_comm = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, new_comm); }
327
328
329
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
330
331
}

332
333
void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  malleability_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
334
335
}

336
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
337
338
  if(state > MALL_NOT_STARTED) return;

339
340
  mall_conf->spawn_method = spawn_method;
  mall_conf->spawn_strategies = spawn_strategies;
341
  mall_conf->spawn_dist = spawn_dist;
342
343
344
345
346
347
348
  mall_conf->red_method = red_method;
  mall_conf->red_strategies = red_strategies;

  if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) && 
	(mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL)) {
    malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
  }
349
350
351
352
}

/*
 * To be deprecated
353
 * Tiene que ser llamado despues de setear la config
354
355
 */
void set_children_number(int numC){
356
357
  if(state > MALL_NOT_STARTED) return;

358
  if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
359
360
361
362
363
    mall->numC = numC;
    mall->numC_spawned = numC - mall->numP;

    if(numC == mall->numP) { // Migrar
      mall->numC_spawned = numC;
364
      mall_conf->spawn_method = MALL_SPAWN_BASELINE;
365
366
367
368
369
370
371
    }
  } else {
    mall->numC = numC;
    mall->numC_spawned = numC;
  }
}

372
373
374
375
376
377
378
/*
 * Anyade a la estructura concreta de datos elegida
 * el nuevo set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "add_data".
379
 *
380
 */
381
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
382
  size_t total_reqs = 0;
383
384
385

  if(is_constant) {
    if(is_replicated) {
386
      total_reqs = 1;
387
      add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
388
    } else {
389
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
390
        total_reqs = 1;
391
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
392
        total_reqs = mall->numC;
393
      }
394
395
396
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
397
398
399
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
    }
400
401
402
403
404
405
  } else {
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
    }
406
407
408
  }
}

409
410
411
412
413
414
415
416
/*
 * Modifica en la estructura concreta de datos elegida en el indice "index"
 * con el set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "modify_data".
 */
417
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
418
419
  size_t total_reqs = 0;

420
421
  if(is_constant) {
    if(is_replicated) {
422
      total_reqs = 1;
423
424
425
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
426
        total_reqs = 1;
427
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
428
429
        total_reqs = mall->numC;
      }
430
431
432
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
433
      
434
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
435
    }
436
437
438
439
440
441
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
442
443
444
  }
}

445
446
447
448
/*
 * Devuelve el numero de entradas para la estructura de descripcion de 
 * datos elegida.
 */
449
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
450
451
452
  
  if(is_constant) {
    if(is_replicated) {
453
      *entries = rep_a_data->entries;
454
    } else {
455
      *entries = dist_a_data->entries;
456
457
458
    }
  } else {
    if(is_replicated) {
459
      *entries = rep_s_data->entries;
460
    } else {
461
      *entries = dist_s_data->entries;
462
463
464
465
466
467
468
469
470
471
472
    }
  }
}

/*
 * Devuelve el elemento de la lista "index" al usuario.
 * La devolución es en el mismo orden que lo han metido los padres
 * con la funcion "malleability_add_data()".
 * Es tarea del usuario saber el tipo de esos datos.
 * TODO Refactor a que sea automatico
 */
473
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
474
475
476
477
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
478
      data_struct = rep_a_data;
479
    } else {
480
      data_struct = dist_a_data;
481
482
483
    }
  } else {
    if(is_replicated) {
484
      data_struct = rep_s_data;
485
    } else {
486
      data_struct = dist_s_data;
487
488
489
    }
  }

490
  *data = data_struct->arrays[index];
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
}


//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
506
  size_t i;
507
  void *aux_send, *aux_recv;
508
509
510

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
511
      aux_send = data_struct->arrays[i];
512
      aux_recv = NULL;
513
514
515
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, 
		      mall_conf->red_strategies, mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
516
517
518
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
519
      aux_send = data_struct->arrays[i];
520
      aux_recv = NULL;
521
522
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
523
524
525
526
527
528
529
530
531
532
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
533
  size_t i;
534
  void *aux, *aux_s = NULL;
535
536
537

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
538
539
      aux = data_struct->arrays[i];
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, 
540
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
541
      data_struct->arrays[i] = aux;
542
543
544
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
545
546
547
      aux = data_struct->arrays[i];
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
      data_struct->arrays[i] = aux;
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
    }
  }
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
void Children_init() {
564
565
  size_t i;
  int numP_parents, root_parents;
566
  int is_intercomm;
567

568
569
570
571
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

572
573
  malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
574
575
576
577
  if(!is_intercomm) { // For intracommunicators, these processes will be added
    MPI_Comm_rank(mall->intercomm, &(mall->myId));
    MPI_Comm_size(mall->intercomm, &(mall->numP));
  }
578

579
  MAM_Comm_main_structures(root_parents);
580

581
  #if USE_MAL_DEBUG
582
    DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
583
584
  #endif

585
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
586
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
587
588
589
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif
590
591
592
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
593

594
595
596
597
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
    } else {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
598

599
      #if USE_MAL_DEBUG >= 2
600
        DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
601
      #endif
602
603
604

      int post_ibarrier = 0; 
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { post_ibarrier=1; }
605
      for(i=0; i<dist_a_data->entries; i++) {
606
        async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
607
      }
608
      #if USE_MAL_DEBUG >= 2
609
        DEBUG_FUNC("Targets waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
610
      #endif
611
612
613
      for(i=0; i<dist_a_data->entries; i++) {
        async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
      }
614
    }
615

616
617
618
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
619
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
620
  }
621
  #if USE_MAL_DEBUG
622
    DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
623
  #endif
624

625
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
626
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
627
628
629
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
630
    recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
631
632
633

    // TODO Crear funcion especifica y anyadir para Asinc
    for(i=0; i<rep_s_data->entries; i++) {
634
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], root_parents, mall->intercomm);
635
    } 
636
637
638
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
639
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
640
  }
641
  #if USE_MAL_DEBUG
642
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
643
  #endif
644

645
  // Guardar los resultados de esta transmision
646
  malleability_times_broadcast(mall->root);
647

648
649
650
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
651
  mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
652
  state = MALL_COMPLETED;
653

654
655
656
657
658
659
660
  if(is_intercomm) {
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->user_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->user_comm);
  }
  MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");

661
662
663
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
664
665
666
667
668
669
670
671
672
673
674
675
676
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
677
678
679
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
680
  mall_conf->times->spawn_start = MPI_Wtime();
681
 
682
  state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
683

684
  if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
685
686
687
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
688
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
689
690
691
692
  }
  return state;
}

693

694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
709
710
711
712
713
714
715
716
  int rootBcast, is_intercomm;

  is_intercomm = 0;
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
  } else { 
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
717
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
718
  }
719

720
721
722
723
724
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
725

726
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); }
727

728
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
729
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
730
    //FIXME No se envian los datos replicados (rep_a_data)
731
732
733
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
734
    mall_conf->times->async_start = MPI_Wtime();
735
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
736
737
738
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
739
      return MALL_DIST_PENDING; 
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
    }
  } 
  return end_redistribution();
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
758
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
759
 */
760
761
int check_redistribution(int wait_completed) {
  int is_intercomm, completed, local_completed, all_completed, post_ibarrier;
762
  size_t i, req_qty;
763
  MPI_Request *req_completed;
764
  MPI_Win window;
765
  post_ibarrier = 0;
766
  local_completed = 1;
767
  #if USE_MAL_DEBUG >= 2
768
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
769
  #endif
770
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
771

772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
  if(wait_completed) {
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
      if( is_intercomm || mall->myId >= mall->numC) {
        post_ibarrier=1;
      }
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
    }
  } else {
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
      local_completed = local_completed && completed;
    }
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
796
797
  }

798
  #if USE_MAL_DEBUG >= 2
799
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
800
  #endif
801

802
803
804
805
806
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
    async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
807
  }
808

809
810
811
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
812
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
813
814
815
816
817
818
819
820
821
822
823
824
825
  return end_redistribution();
}


/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
826
827
  size_t i;
  int is_intercomm, rootBcast, local_state;
828

829
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
830
831
832
833
834
835
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  
836
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
837
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
838
839
840
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
841
    mall_conf->times->sync_start = MPI_Wtime();
842
843
844
845
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    // TODO Crear funcion especifica y anyadir para Asinc
    for(i=0; i<rep_s_data->entries; i++) {
846
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
847
    } 
848
849
850
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
851
    if(!is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
852
  }
iker_martin's avatar
iker_martin committed
853

854
  malleability_times_broadcast(rootBcast);
iker_martin's avatar
iker_martin committed
855

856
857
  local_state = MALL_DIST_COMPLETED;
  if(!is_intercomm) { // Merge Spawn
858
    if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
859
860
861
      local_state = MALL_SPAWN_ADAPT_PENDING;
    }
  }
862

863
  return local_state;
864
865
}

866
867
868
869

///=============================================
///=============================================
///=============================================
870
//TODO DEPRECATED
iker_martin's avatar
iker_martin committed
871
int shrink_redistribution() {
872
873
874
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
875
    double time_extra = MPI_Wtime();
876

877
    //TODO Create Commit function. Processes can perform tasks before that. Then call again Malleability to commit the change
878
879
    MPI_Abort(MPI_COMM_WORLD, -20); //                                                         
    zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root);
iker_martin's avatar
iker_martin committed
880
881
    
    if(mall->myId < mall->numC) {
882
      if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
883
884
885
886
887
      if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

      MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
      MPI_Comm_dup(mall->intercomm, &(mall->comm));

888
889
890
      MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
      MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");

891
892
      MPI_Comm_free(&(mall->intercomm));

893
894
895
896

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
897
      mall_conf->times->spawn_time += MPI_Wtime() - time_extra;
898
      return MALL_DIST_COMPLETED;
iker_martin's avatar
iker_martin committed
899
    } else {
900
      return MALL_ZOMBIE;
iker_martin's avatar
iker_martin committed
901
902
903
    }
}

904
905
906
907
908
909
910
// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

911
912

int comm_state; //FIXME Usar un handler
913
914
915
916
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
917
  comm_state = MALL_DIST_PENDING;
918
919
920
921
922
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
923
  return comm_state;
924
925
926
927
928
929
930
931
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
932
int thread_check(int wait_completed) {
933
  int all_completed = 0, is_intercomm;
934

935
936
937
938
939
940
941
942
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

943
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
944
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
945
946
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
  //FIXME No se tiene en cuenta el estado MALL_APP_ENDED
947
948
949
950
951
952

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
953
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
954
955
956
957

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
958
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
959
960
961
962
963
964
965
966
967
968
969
970
  return end_redistribution();
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
971
void* thread_async_work() {
972
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
973
  comm_state = MALL_DIST_COMPLETED;
974
975
  pthread_exit(NULL);
}
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
  MPI_Comm_get_name(mall->user_comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->user_comm, test);
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
996

997
998
999
/*
 * Función solo necesaria en Merge
 */
1000
1001
1002
1003
1004
1005
1006
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1007
1008
  MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MPI_COMM_MAM");
1009
}
1010