malleabilityManager.c 40.4 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
#include "malleabilityManager.h"
#include "malleabilityStates.h"
5
#include "malleabilityDataStructures.h"
6
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
7
#include "malleabilityZombies.h"
8
#include "malleabilityTimes.h"
9
#include "spawn_methods/GenericSpawn.h"
10
11
12
13
14
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1

15
void MAM_Commit(int *mam_state, int is_children_group);
16
17
18
19

void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

20
21
22
23
24
25
26
27
28
29
30
31
32

int MAM_St_not_started(int *mam_state);
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
int MAM_St_red_pending(int *mam_state, int wait_completed);
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);


33
void Children_init(void (*user_function)(void *), void *user_args);
34
35
int spawn_step();
int start_redistribution();
36
int check_redistribution(int wait_completed);
37
int end_redistribution();
iker_martin's avatar
iker_martin committed
38
int shrink_redistribution();
39
40

int thread_creation();
41
int thread_check(int wait_completed);
42
void* thread_async_work();
43

44
void print_comms_state();
45
void malleability_comms_update(MPI_Comm comm);
46

47
int MAM_I_convert_key(char *key);
48
void MAM_I_create_user_struct(int is_children_group);
49

50
int state = MALL_UNRESERVED; //FIXME Mover a otro lado
51
52
53
54
55
56

malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

57
58
mam_user_reconf_t *user_reconf;

59
/*
60
61
62
63
64
65
66
67
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
68
 */
69
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes, void (*user_function)(void *), void *user_args) {
70
71
72
73
  MPI_Comm dup_comm, thread_comm;

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
74
75
76
77
78
79
80
81
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
82

83
84
85
86
87
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

88
89
90
91
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
92
93

  mall->root = root;
94
  mall->root_parents = -1;
95
  mall->zombie = 0;
96
  mall->comm = dup_comm;
97
  mall->thread_comm = thread_comm;
98
99
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
100

101
  mall->name_exec = name_exec;
102
103
104
  mall->nodelist = nodelist;
  mall->num_cpus = num_cpus;
  mall->num_nodes = num_nodes;
105
106
107
108
109
110

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

111
  state = MALL_NOT_STARTED;
112

113
  zombies_service_init();
114
  init_malleability_times();
115
  MAM_Def_main_datatype();
116

117
118
119
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
  if(mall->intercomm != MPI_COMM_NULL ) { 
120
    Children_init(user_function, user_args);
121
    return MALLEABILITY_CHILDREN;
122
  }
iker_martin's avatar
iker_martin committed
123

124
125
126
127
128
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

129
130
131
132
133
134
135
  if(nodelist != NULL) { //TODO To be deprecated by using Slurm or else statement
    mall->nodelist_len = strlen(nodelist);
  } else { // If no nodelist is detected, get it from the actual run
    mall->nodelist = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
    MPI_Get_processor_name(mall->nodelist, &mall->nodelist_len);
    //TODO Get name of each process and create real nodelist
  }
136

137
  #if USE_MAL_DEBUG
138
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
139
140
  #endif

141
  return MALLEABILITY_NOT_CHILDREN;
142
143
}

144
145
146
147
148
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
149
void MAM_Finalize() {	  
150
151
152
153
154
155
156
157
158
159
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);

160
  MAM_Free_main_datatype();
161
  free_malleability_times();
162
163
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
164
  if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
165
166
  free(mall);
  free(mall_conf);
167
  free(user_reconf);
iker_martin's avatar
iker_martin committed
168
169
170
171

  zombies_awake();
  zombies_service_free();

172
  state = MALL_UNRESERVED;
173
174
}

175
176
/* 
 * TODO Reescribir
177
178
179
180
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
181
 *
182
183
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
184
185
 *
 */
186
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
187
  int call_checkpoint = 0;
188
189
190

  switch(state) {
    case MALL_UNRESERVED:
191
      *mam_state = MAM_UNRESERVED;
192
193
      break;
    case MALL_NOT_STARTED:
194
      call_checkpoint = MAM_St_not_started(mam_state);
195
      break;
196

197
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
198
    case MALL_SPAWN_SINGLE_PENDING:
199
      call_checkpoint = MAM_St_spawn_pending(wait_completed);
200
      break;
201

202
203
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
204
      call_checkpoint = MAM_St_red_start();
205
      break;
206

207
    case MALL_DIST_PENDING:
208
      call_checkpoint = MAM_St_red_pending(mam_state, wait_completed);
209
210
      break;

211
212
    case MALL_USER_PENDING:
      call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
213
      break;
214

215
216
    case MALL_USER_COMPLETED:
      call_checkpoint = MAM_St_user_completed();
217
      break;
218

219
220
    case MALL_SPAWN_ADAPT_PENDING:
      call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
221
222
      break;

223
224
225
    case MALL_SPAWN_ADAPTED:
    case MALL_DIST_COMPLETED:
      call_checkpoint = MAM_St_completed(mam_state);
226
227
      break;
  }
228

229
230
  if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
  if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
231
232
233
  return state;
}

234
235
236
/*
 * TODO
 */
237
238
239
240
241
242
243
244
245
246
void MAM_Resume_redistribution(int *mam_state) {
  state = MALL_USER_COMPLETED;
  *mam_state = MAM_PENDING;
}

/*
 * TODO
 */
void MAM_Commit(int *mam_state, int rootBcast) {
  int zombies = 0;
247
  #if USE_MAL_DEBUG
248
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
249
250
  #endif

251
252
253
254
  // Get times before commiting
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    // This communication is only needed when a root process will become a zombie
    malleability_times_broadcast(rootBcast);
255
  }
256

257
  // Free unneded communicators
258
259
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
260
261
262
263
264
265
266
267

  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    MPI_Allreduce(&mall->zombie, &zombies, 1, MPI_INT, MPI_MAX, mall->comm);
    if(zombies) {
      zombies_collect_suspended(mall->comm);
    }
  }
268

269
  // Zombies KILL
270
  if(mall->zombie) {
271
    #if USE_MAL_DEBUG >= 2
272
273
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
274
    MAM_Finalize();
275
    MPI_Finalize();
276
277
278
    exit(0);
  }

279
280
281
282
  // Reset/Free communicators
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge

283
284
  MPI_Comm_rank(mall->comm, &(mall->myId));
  MPI_Comm_size(mall->comm, &(mall->numP));
285
286
287
  mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
  mall->root_parents = -1;
  state = MALL_NOT_STARTED;
288
  if(mam_state != NULL) *mam_state = MAM_COMPLETED;
289

290
  // Set new communicator
291
292
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
293
  #if USE_MAL_DEBUG
294
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
295
  #endif
296
297
298
299
300

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_end = MPI_Wtime();
301
302
}

303
304
305
306
307
308
309
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return 0;
}

310
311
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
312
313
}

314
void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
315
316
  if(state > MALL_NOT_STARTED) return;

317
318
  mall_conf->spawn_method = spawn_method;
  mall_conf->spawn_strategies = spawn_strategies;
319
  mall_conf->spawn_dist = spawn_dist;
320
321
322
323
324
325
326
  mall_conf->red_method = red_method;
  mall_conf->red_strategies = red_strategies;

  if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) && 
	(mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL)) {
    malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
  }
327
328
}

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
void MAM_Set_key_configuration(char *key, int required, int *provided) {
  int value = MAM_I_convert_key(key);
  *provided = required;
  switch(value) { //TODO Comprobar si required existe para key
    case MAM_SPAWN_METHOD_VALUE:
      mall_conf->spawn_method = required;
      break;
    case MAM_SPAWN_STRATEGIES_VALUE:
      malleability_spawn_add_strat(&(mall_conf->spawn_strategies), required);
      *provided = mall_conf->spawn_strategies;
      break;
    case MAM_PHYSICAL_DISTRIBUTION_VALUE:
      mall_conf->spawn_dist = required;
      break;
    case MAM_RED_METHOD_VALUE:
      mall_conf->red_method = required;
      break;
    case MAM_RED_STRATEGIES_VALUE:
      malleability_red_add_strat(&(mall_conf->red_strategies), required);
      *provided = mall_conf->red_strategies;
      break;
    case MALL_DENIED:
    default:
      printf("MAM: Key %s does not exist\n", key);
      *provided = MALL_DENIED;
      break;
  }

  if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) && 
	(mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL)) {
    malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
  }
}

363
/*
364
 * Tiene que ser llamado despues de setear la config
365
 */
366
void MAM_Set_target_number(int numC){
367
368
  if(state > MALL_NOT_STARTED) return;

369
  if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
370
371
372
373
374
    mall->numC = numC;
    mall->numC_spawned = numC - mall->numP;

    if(numC == mall->numP) { // Migrar
      mall->numC_spawned = numC;
375
      mall_conf->spawn_method = MALL_SPAWN_BASELINE;
376
377
378
379
380
381
382
    }
  } else {
    mall->numC = numC;
    mall->numC_spawned = numC;
  }
}

383
384
385
386
387
388
389
/*
 * Anyade a la estructura concreta de datos elegida
 * el nuevo set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "add_data".
390
 *
391
 */
392
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
393
  size_t total_reqs = 0;
394

395
  if(is_constant) { //Async
396
    if(is_replicated) {
397
      total_reqs = 1;
398
      add_data(data, total_qty, type, total_reqs, rep_a_data);
399
    } else {
400
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
401
        total_reqs = 1;
402
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
403
        total_reqs = mall->numC;
404
      }
405
406
407
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
408
409
410
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
    }
411
  } else { //Sync
412
413
414
415
416
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
    }
417
418
419
  }
}

420
421
422
423
424
425
426
427
/*
 * Modifica en la estructura concreta de datos elegida en el indice "index"
 * con el set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "modify_data".
 */
428
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
429
430
  size_t total_reqs = 0;

431
432
  if(is_constant) {
    if(is_replicated) {
433
      total_reqs = 1;
434
435
436
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
437
        total_reqs = 1;
438
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
439
440
        total_reqs = mall->numC;
      }
441
442
443
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
444
      
445
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
446
    }
447
448
449
450
451
452
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
453
454
455
  }
}

456
457
458
459
/*
 * Devuelve el numero de entradas para la estructura de descripcion de 
 * datos elegida.
 */
460
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
461
462
463
  
  if(is_constant) {
    if(is_replicated) {
464
      *entries = rep_a_data->entries;
465
    } else {
466
      *entries = dist_a_data->entries;
467
468
469
    }
  } else {
    if(is_replicated) {
470
      *entries = rep_s_data->entries;
471
    } else {
472
      *entries = dist_s_data->entries;
473
474
475
476
477
478
479
480
481
482
483
    }
  }
}

/*
 * Devuelve el elemento de la lista "index" al usuario.
 * La devolución es en el mismo orden que lo han metido los padres
 * con la funcion "malleability_add_data()".
 * Es tarea del usuario saber el tipo de esos datos.
 * TODO Refactor a que sea automatico
 */
484
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
485
486
487
488
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
489
      data_struct = rep_a_data;
490
    } else {
491
      data_struct = dist_a_data;
492
493
494
    }
  } else {
    if(is_replicated) {
495
      data_struct = rep_s_data;
496
    } else {
497
      data_struct = dist_s_data;
498
499
500
    }
  }

501
  *data = data_struct->arrays[index];
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
}


//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
517
  size_t i;
518
  void *aux_send, *aux_recv;
519
520
521

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
522
      aux_send = data_struct->arrays[i];
523
      aux_recv = NULL;
524
525
526
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, 
		      mall_conf->red_strategies, mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
527
528
529
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
530
      aux_send = data_struct->arrays[i];
531
      aux_recv = NULL;
532
533
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
534
535
536
537
538
539
540
541
542
543
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
544
  size_t i;
545
  void *aux, *aux_s = NULL;
546
547
548

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
549
550
      aux = data_struct->arrays[i];
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, 
551
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
552
      data_struct->arrays[i] = aux;
553
554
555
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
556
557
558
      aux = data_struct->arrays[i];
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
      data_struct->arrays[i] = aux;
559
560
561
562
    }
  }
}

563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||

int MAM_St_not_started(int *mam_state) {
  *mam_state = MAM_NOT_STARTED;
  reset_malleability_times();
  // Comprobar si se tiene que realizar un redimensionado
      
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_start = MPI_Wtime();
  //if(CHECK_RMS()) {return MALL_DENIED;}

  state = spawn_step();
  //FIXME Esto es necesario pero feo
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
  else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }

  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
    return 1;
  }
  return 0;
}

int MAM_St_spawn_pending(int wait_completed) {
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_red_start() {
  state = start_redistribution();
  return 1;
}

int MAM_St_red_pending(int *mam_state, int wait_completed) {
  if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
    state = thread_check(wait_completed);
  } else {
    state = check_redistribution(wait_completed);
  }

  if(state != MALL_DIST_PENDING) { 
    if(mall->is_intercomm) {
      MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
    } else {
      MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
    }
    MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
    state = MALL_USER_PENDING;
    *mam_state = MAM_USER_PENDING;
    return 1;
  }
  return 0;
}

int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
  #endif
  if(user_function != NULL) {
    MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
    do {
      user_function(user_args);
    } while(wait_completed && state == MALL_USER_PENDING);
  } else {
    MAM_Resume_redistribution(mam_state);
  }

  if(state != MALL_USER_PENDING) {
    #if USE_MAL_DEBUG
      if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
    #endif
    return 1;
  }
  return 0;
}

int MAM_St_user_completed() {
  state = end_redistribution();
  return 1;
}

int MAM_St_spawn_adapt_pending(int wait_completed) {
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_start = MPI_Wtime();
  unset_spawn_postpone_flag(state);
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);

  if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_completed(int *mam_state) {
  int rootBcast;

  if(mall->is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  MAM_Commit(mam_state, rootBcast);
  return 0;
}


688
689
690
691
692
693
694
695
696
697
698
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
699
void Children_init(void (*user_function)(void *), void *user_args) {
700
  size_t i;
701
  int numP_parents;
702

703
704
705
706
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

707
708
709
  malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &mall->root_parents, &(mall->intercomm));
  MPI_Comm_test_inter(mall->intercomm, &mall->is_intercomm);
  if(!mall->is_intercomm) { // For intracommunicators, these processes will be added
710
711
712
    MPI_Comm_rank(mall->intercomm, &(mall->myId));
    MPI_Comm_size(mall->intercomm, &(mall->numP));
  }
713

714
  MAM_Comm_main_structures(mall->root_parents);
715

716
  #if USE_MAL_DEBUG
717
    DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
718
719
  #endif

720
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, mall->root_parents, mall->intercomm);
721
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
722
723
724
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif
725
726
727
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
728

729
730
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
731
732
733
      for(i=0; i<rep_a_data->entries; i++) {
        MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_parents, mall->intercomm);
      } 
734
735
    } else {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
736

737
738
739
      for(i=0; i<rep_a_data->entries; i++) {
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_parents, mall->intercomm, &(rep_a_data->requests[i][0]));
      } 
740
      #if USE_MAL_DEBUG >= 2
741
        DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
742
      #endif
743
744

      int post_ibarrier = 0; 
745
746
747
748
      // FIXME No permite el uso de ibarrier ahora mismo. Realmente solo hace falta un ibarrier para todos
      for(i=0; i<rep_a_data->entries; i++) {
        async_communication_wait(mall->intercomm, rep_a_data->requests[i], rep_a_data->request_qty[i], post_ibarrier);
      }
749
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { post_ibarrier=1; }
750
      for(i=0; i<dist_a_data->entries; i++) {
751
        async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
752
      }
753
      #if USE_MAL_DEBUG >= 2
754
        DEBUG_FUNC("Targets waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
755
      #endif
756
757
758
      for(i=0; i<dist_a_data->entries; i++) {
        async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
      }
759
760
761
      for(i=0; i<rep_a_data->entries; i++) {
        async_communication_end(mall_conf->red_method, mall_conf->red_strategies, rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
      }
762
    }
763

764
765
766
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
767
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
768
  }
769
  #if USE_MAL_DEBUG
770
    DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
771
  #endif
772

773
774
775
776
777
778
779
780
781
782
783
784
785
786
  if(mall->is_intercomm) {
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  mall->numC = numP_parents;
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
    user_function(user_args);
  }

  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, mall->root_parents, mall->intercomm);
787
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
788
789
790
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
791
    recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
792
793

    for(i=0; i<rep_s_data->entries; i++) {
794
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_parents, mall->intercomm);
795
    } 
796
797
798
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
799
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
800
  }
801
  #if USE_MAL_DEBUG
802
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
803
  #endif
804

805
  MAM_Commit(NULL, mall->root_parents);
806

807
  #if USE_MAL_DEBUG
808
    DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
809
  #endif
810
811
812
813
814
815
816
817
818
819
820
821
822
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
823
824
825
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
826
  mall_conf->times->spawn_start = MPI_Wtime();
827
 
828
  state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
829

830
  if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
831
832
833
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
834
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
835
836
837
838
  }
  return state;
}

839

840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
855
856
  int rootBcast;
  size_t i;
857

858
  mall->is_intercomm = 0;
859
  if(mall->intercomm != MPI_COMM_NULL) {
860
    MPI_Comm_test_inter(mall->intercomm, &mall->is_intercomm);
861
862
863
  } else { 
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
864
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
865
  }
866

867
  if(mall->is_intercomm) {
868
869
870
871
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
872

873
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); }
874

875
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
876
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
877
878
879
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
880
    mall_conf->times->async_start = MPI_Wtime();
881
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
882
883
884
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
885
886
887
      for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], rootBcast, mall->intercomm, &(rep_a_data->requests[i][0]));
      } 
888
      return MALL_DIST_PENDING; 
889
890
    }
  } 
891
  return MALL_USER_PENDING;
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
907
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
908
 */
909
int check_redistribution(int wait_completed) {
910
  int completed, local_completed, all_completed, post_ibarrier;
911
  size_t i, req_qty;
912
  MPI_Request *req_completed;
913
  MPI_Win window;
914
  post_ibarrier = 0;
915
  local_completed = 1;
916
  #if USE_MAL_DEBUG >= 2
917
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
918
  #endif
919

920
  if(wait_completed) {
921
922
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { 
      if( mall->is_intercomm || mall->myId >= mall->numC) {
923
924
925
926
927
928
929
930
        post_ibarrier=1;
      }
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
    }
931
932
933
934
935
    for(i=0; i<rep_a_data->entries; i++) {
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
      async_communication_wait(mall->intercomm, req_completed, req_qty, 0); //FIXME Ibarrier does not work with rep_a_data
    }
936
937
938
939
940
941
942
  } else {
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
      local_completed = local_completed && completed;
    }
943
944
945
946
947
948
    for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
      completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
      local_completed = local_completed && completed;
    }
949
950
951
952
953
954
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
955
956
  }

957
  #if USE_MAL_DEBUG >= 2
958
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
959
  #endif
960

961
962
963
964
965
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
    async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
966
  }
967
968
969
970
971
972
  for(i=0; i<rep_a_data->entries; i++) {
    req_completed = rep_a_data->requests[i];
    req_qty = rep_a_data->request_qty[i];
    window = rep_a_data->windows[i];
    async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
  }
973

974
975
976
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
977
978
  if(!mall->is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
  return MALL_USER_PENDING;
979
980
981
982
983
984
985
986
987
988
989
990
}


/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
991
  size_t i;
992
  int rootBcast, local_state;
993

994
  if(mall->is_intercomm) {
995
996
997
998
999
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  
1000
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
1001
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
1002
1003
1004
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1005
    mall_conf->times->sync_start = MPI_Wtime();
1006
1007
1008
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    for(i=0; i<rep_s_data->entries; i++) {
1009
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
1010
    } 
1011
1012
1013
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1014
    if(!mall->is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
1015
  }
iker_martin's avatar
iker_martin committed
1016

1017
  local_state = MALL_DIST_COMPLETED;
1018
  if(!mall->is_intercomm) { // Merge Spawn
1019
    if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
1020
1021
1022
      local_state = MALL_SPAWN_ADAPT_PENDING;
    }
  }
1023

1024
  return local_state;
1025
1026
1027
1028
1029
1030
1031
1032
1033
}

// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

1034
1035

int comm_state; //FIXME Usar un handler
1036
1037
1038
1039
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
1040
  comm_state = MALL_DIST_PENDING;
1041
1042
1043
1044
1045
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
1046
  return comm_state;
1047
1048
1049
1050
1051
1052
1053
1054
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
1055
int thread_check(int wait_completed) {
1056
  int all_completed = 0;
1057

1058
1059
1060
1061
1062
1063
1064
1065
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1066
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1067
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1068
1069
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
  //FIXME No se tiene en cuenta el estado MALL_APP_ENDED
1070
1071
1072
1073
1074
1075

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1076
1077
1078
1079

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1080
  if(!mall->is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
  return end_redistribution();
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1093
void* thread_async_work() {
1094
1095
1096
1097
1098
1099
1100
1101
1102
  int rootBcast;
  size_t i;

  if(mall->is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }

1103
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1104
1105
1106
  for(i=0; i<rep_a_data->entries; i++) {
    MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], rootBcast, mall->intercomm);
  } 
1107
  comm_state = MALL_DIST_COMPLETED;
1108
1109
  pthread_exit(NULL);
}
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
1122
1123
  MPI_Comm_get_name(*(mall->user_comm), test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
1124
1125
1126
1127
1128
1129
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
1130

1131
1132
1133
/*
 * Función solo necesaria en Merge
 */
1134
1135
1136
1137
1138
1139
1140
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
  MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}

/*
 * Converts the name of a Key to its value version
 */
int MAM_I_convert_key(char *key) {
  size_t i; 

  for(i=0; i<MAM_KEY_COUNT; i++) {
    if(strcmp(key, mam_key_names[i]) == 0) { // Equal
      return i;
    }
  }
  return MALL_DENIED;
1157
}
1158

1159
1160
1161
/*
 * TODO Por hacer
 */
1162
void MAM_I_create_user_struct(int is_children_group) {
1163
1164
1165
  user_reconf->comm = mall->tmp_comm;

  if(is_children_group) {
1166
    user_reconf->rank_state = MAM_PROC_NEW_RANK;
1167
1168
1169
1170
1171
1172
    user_reconf->numS = mall->numC;
    if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numC;
    else user_reconf->numT = mall->numC + mall->numP;
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
1173
1174
    if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
    else user_reconf->rank_state = MAM_PROC_CONTINUE;
1175
1176
  }
}