malleabilityManager.c 38.7 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
#include "malleabilityManager.h"
#include "malleabilityStates.h"
5
#include "malleabilityDataStructures.h"
6
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
7
#include "malleabilityZombies.h"
8
#include "malleabilityTimes.h"
9
#include "spawn_methods/GenericSpawn.h"
10
11
12
13
14
15
16
17
18
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1


void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

19
void Children_init(void (*user_function)(void *), void *user_args);
20
21
int spawn_step();
int start_redistribution();
22
int check_redistribution(int wait_completed);
23
int end_redistribution();
iker_martin's avatar
iker_martin committed
24
int shrink_redistribution();
25
26

int thread_creation();
27
int thread_check(int wait_completed);
28
void* thread_async_work();
29

30
void print_comms_state();
31
void malleability_comms_update(MPI_Comm comm);
32

33
34
35
int MAM_I_convert_key(char *key);
void MAM_I_create_user_struct(int mam_state, int is_children_group);

36
int state = MALL_UNRESERVED; //FIXME Mover a otro lado
37
38
39
40
41
42

malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

43
44
mam_user_reconf_t *user_reconf;

45
/*
46
47
48
49
50
51
52
53
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
54
 */
55
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes, void (*user_function)(void *), void *user_args) {
56
57
58
59
  MPI_Comm dup_comm, thread_comm;

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
60
61
62
63
64
65
66
67
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
68

69
70
71
72
73
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

74
75
76
77
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
78
79

  mall->root = root;
80
  mall->root_parents = -1;
81
  mall->zombie = 0;
82
  mall->comm = dup_comm;
83
  mall->thread_comm = thread_comm;
84
85
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
86

87
  mall->name_exec = name_exec;
88
89
90
  mall->nodelist = nodelist;
  mall->num_cpus = num_cpus;
  mall->num_nodes = num_nodes;
91
92
93
94
95
96

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

97
  state = MALL_NOT_STARTED;
98

99
  zombies_service_init();
100
  init_malleability_times();
101
  MAM_Def_main_datatype();
102

103
104
105
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
  if(mall->intercomm != MPI_COMM_NULL ) { 
106
    Children_init(user_function, user_args);
107
    return MALLEABILITY_CHILDREN;
108
  }
iker_martin's avatar
iker_martin committed
109

110
111
112
113
114
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

115
116
117
118
119
120
121
  if(nodelist != NULL) { //TODO To be deprecated by using Slurm or else statement
    mall->nodelist_len = strlen(nodelist);
  } else { // If no nodelist is detected, get it from the actual run
    mall->nodelist = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
    MPI_Get_processor_name(mall->nodelist, &mall->nodelist_len);
    //TODO Get name of each process and create real nodelist
  }
122

123
  #if USE_MAL_DEBUG
124
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
125
126
  #endif

127
  return MALLEABILITY_NOT_CHILDREN;
128
129
}

130
131
132
133
134
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
135
void MAM_Finalize() {	  
136
137
138
139
140
141
142
143
144
145
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);

146
  MAM_Free_main_datatype();
147
  free_malleability_times();
148
149
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
150
151
  free(mall);
  free(mall_conf);
152
  free(user_reconf);
iker_martin's avatar
iker_martin committed
153
154
155
156

  zombies_awake();
  zombies_service_free();

157
  state = MALL_UNRESERVED;
158
159
}

160
161
/* 
 * TODO Reescribir
162
163
164
165
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
166
 *
167
168
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
169
170
 *
 */
171
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
172
  int is_intercomm;
173
174
175

  switch(state) {
    case MALL_UNRESERVED:
176
      *mam_state = MAM_UNRESERVED;
177
178
      break;
    case MALL_NOT_STARTED:
179
      *mam_state = MAM_NOT_STARTED;
180
      reset_malleability_times();
181
      // Comprobar si se tiene que realizar un redimensionado
182
183
184
185
      
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
186
      mall_conf->times->malleability_start = MPI_Wtime();
187
      //if(CHECK_RMS()) {return MALL_DENIED;}
188

189
190
191
      state = spawn_step();

      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
192
        MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
193
194
      }
      break;
195

196
197
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado y comienza la redistribucion
    case MALL_SPAWN_SINGLE_PENDING:
198
      state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
199
      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
200
201
202
        #if USE_MAL_BARRIERS
  	  MPI_Barrier(mall->comm);
	#endif
203
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
204

205
        MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
206
207
      }
      break;
208

209
210
211
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
      state = start_redistribution();
212
      MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
213
      break;
214

215
    case MALL_DIST_PENDING:
216
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
217
        state = thread_check(wait_completed);
218
      } else {
219
        state = check_redistribution(wait_completed);
220
      }
221
      if(state != MALL_DIST_PENDING) { 
222
        MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
223
224
225
226
      }
      break;

    case MALL_SPAWN_ADAPT_PENDING:
227
228
229
230

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
231
      mall_conf->times->spawn_start = MPI_Wtime();
232
      unset_spawn_postpone_flag(state);
233
      state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
234

235
      if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
236
237
238
        #if USE_MAL_BARRIERS
          MPI_Barrier(mall->comm);
	#endif
239
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
240
        MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
241
      }
242
      break;
243

244
    case MALL_SPAWN_ADAPTED: //FIXME Borrar?
245
      state = shrink_redistribution();
246
      if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla
247
      MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
248
      break;
249

250
251
252
    case MALL_DIST_COMPLETED:
      MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
      if(is_intercomm) {
253
        MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
254
      } else {
255
        MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
256
      }
257
258
259
260
      MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
      state = MALL_USER_PENDING;
      *mam_state = MAM_USER_PENDING;
      if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }
261
262
263
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
264
      mall_conf->times->malleability_end = MPI_Wtime();
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
      MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
      break;

    case MALL_USER_PENDING:
      #if USE_MAL_DEBUG
        if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
      #endif
      if(user_function != NULL) {
        MAM_I_create_user_struct(*mam_state, MALLEABILITY_NOT_CHILDREN);
        user_function(user_args);
      } else {
        state = MALL_COMPLETED; //FIXME Deberia ser hacer sync redist
        *mam_state = MAM_COMPLETED; //FIXME Deberia ser hacer sync redist
      }
      if(state != MALL_USER_PENDING && state != MALL_NOT_STARTED) { // TODO Quitar la segunda parte cuando USER este antes de redist sinc
        MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
      }
      if(state == MALL_NOT_STARTED) { //FIXME Muy feo, borrar
        *mam_state = MAM_COMMITED;
      }
      break;

    case MALL_COMPLETED:
      MAM_Commit(mam_state);
289
290
      break;
  }
291
292

  if(state > MALL_ZOMBIE && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
293
294
295
  return state;
}

296
297
298
/*
 * TODO
 */
299
300
void MAM_Commit(int *mam_state) {
  if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE || state == MALL_USER_PENDING)) { //FIXME El ultimo habria que borrarlo
301
302
303
304
    *mam_state = MALL_DENIED;
    return;
  }
  #if USE_MAL_DEBUG
305
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout); MPI_Barrier(mall->intercomm);
306
307
  #endif

308
309
310
311
312
313
314
315
  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    int zombies;
    MPI_Allreduce(&state, &zombies, 1, MPI_INT, MPI_MIN, mall->intercomm);
    if(zombies == MALL_ZOMBIE) {
      zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root);
    }
  }
316

317
  // Reset/Free unneded communicators
318
319
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
320
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
321
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
322

323
  // Zombies KILL
324
325
  if(state == MALL_ZOMBIE || mall->zombie) {
    #if USE_MAL_DEBUG >= 2
326
327
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
328
329
    MAM_Finalize();
    MPI_Finalize(); 
330
331
332
    exit(0);
  }

333
334
  MPI_Comm_rank(mall->comm, &(mall->myId));
  MPI_Comm_size(mall->comm, &(mall->numP));
335
336
337
  mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
  mall->root_parents = -1;
  state = MALL_NOT_STARTED;
338
  if(mam_state != NULL) *mam_state = MAM_COMMITED;
339

340
  // Set new communicator
341
342
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
343
  #if USE_MAL_DEBUG
344
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
345
  #endif
346
347
}

348
349
350
351
352
353
354
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return 0;
}

355
356
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
357
358
}

359
void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
360
361
  if(state > MALL_NOT_STARTED) return;

362
363
  mall_conf->spawn_method = spawn_method;
  mall_conf->spawn_strategies = spawn_strategies;
364
  mall_conf->spawn_dist = spawn_dist;
365
366
367
368
369
370
371
  mall_conf->red_method = red_method;
  mall_conf->red_strategies = red_strategies;

  if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) && 
	(mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL)) {
    malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
  }
372
373
}

374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
void MAM_Set_key_configuration(char *key, int required, int *provided) {
  int value = MAM_I_convert_key(key);
  *provided = required;
  switch(value) { //TODO Comprobar si required existe para key
    case MAM_SPAWN_METHOD_VALUE:
      mall_conf->spawn_method = required;
      break;
    case MAM_SPAWN_STRATEGIES_VALUE:
      malleability_spawn_add_strat(&(mall_conf->spawn_strategies), required);
      *provided = mall_conf->spawn_strategies;
      break;
    case MAM_PHYSICAL_DISTRIBUTION_VALUE:
      mall_conf->spawn_dist = required;
      break;
    case MAM_RED_METHOD_VALUE:
      mall_conf->red_method = required;
      break;
    case MAM_RED_STRATEGIES_VALUE:
      malleability_red_add_strat(&(mall_conf->red_strategies), required);
      *provided = mall_conf->red_strategies;
      break;
    case MALL_DENIED:
    default:
      printf("MAM: Key %s does not exist\n", key);
      *provided = MALL_DENIED;
      break;
  }

  if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) && 
	(mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL)) {
    malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
  }
}

408
/*
409
 * Tiene que ser llamado despues de setear la config
410
 */
411
void MAM_Set_target_number(int numC){
412
413
  if(state > MALL_NOT_STARTED) return;

414
  if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
415
416
417
418
419
    mall->numC = numC;
    mall->numC_spawned = numC - mall->numP;

    if(numC == mall->numP) { // Migrar
      mall->numC_spawned = numC;
420
      mall_conf->spawn_method = MALL_SPAWN_BASELINE;
421
422
423
424
425
426
427
    }
  } else {
    mall->numC = numC;
    mall->numC_spawned = numC;
  }
}

428
429
430
431
432
433
434
/*
 * Anyade a la estructura concreta de datos elegida
 * el nuevo set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "add_data".
435
 *
436
 */
437
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
438
  size_t total_reqs = 0;
439
440
441

  if(is_constant) {
    if(is_replicated) {
442
      total_reqs = 1;
443
      add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
444
    } else {
445
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
446
        total_reqs = 1;
447
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
448
        total_reqs = mall->numC;
449
      }
450
451
452
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
453
454
455
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
    }
456
457
458
459
460
461
  } else {
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
    }
462
463
464
  }
}

465
466
467
468
469
470
471
472
/*
 * Modifica en la estructura concreta de datos elegida en el indice "index"
 * con el set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "modify_data".
 */
473
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
474
475
  size_t total_reqs = 0;

476
477
  if(is_constant) {
    if(is_replicated) {
478
      total_reqs = 1;
479
480
481
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
482
        total_reqs = 1;
483
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
484
485
        total_reqs = mall->numC;
      }
486
487
488
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
489
      
490
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
491
    }
492
493
494
495
496
497
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
498
499
500
  }
}

501
502
503
504
/*
 * Devuelve el numero de entradas para la estructura de descripcion de 
 * datos elegida.
 */
505
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
506
507
508
  
  if(is_constant) {
    if(is_replicated) {
509
      *entries = rep_a_data->entries;
510
    } else {
511
      *entries = dist_a_data->entries;
512
513
514
    }
  } else {
    if(is_replicated) {
515
      *entries = rep_s_data->entries;
516
    } else {
517
      *entries = dist_s_data->entries;
518
519
520
521
522
523
524
525
526
527
528
    }
  }
}

/*
 * Devuelve el elemento de la lista "index" al usuario.
 * La devolución es en el mismo orden que lo han metido los padres
 * con la funcion "malleability_add_data()".
 * Es tarea del usuario saber el tipo de esos datos.
 * TODO Refactor a que sea automatico
 */
529
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
530
531
532
533
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
534
      data_struct = rep_a_data;
535
    } else {
536
      data_struct = dist_a_data;
537
538
539
    }
  } else {
    if(is_replicated) {
540
      data_struct = rep_s_data;
541
    } else {
542
      data_struct = dist_s_data;
543
544
545
    }
  }

546
  *data = data_struct->arrays[index];
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
}


//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
562
  size_t i;
563
  void *aux_send, *aux_recv;
564
565
566

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
567
      aux_send = data_struct->arrays[i];
568
      aux_recv = NULL;
569
570
571
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, 
		      mall_conf->red_strategies, mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
572
573
574
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
575
      aux_send = data_struct->arrays[i];
576
      aux_recv = NULL;
577
578
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
579
580
581
582
583
584
585
586
587
588
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
589
  size_t i;
590
  void *aux, *aux_s = NULL;
591
592
593

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
594
595
      aux = data_struct->arrays[i];
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, 
596
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
597
      data_struct->arrays[i] = aux;
598
599
600
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
601
602
603
      aux = data_struct->arrays[i];
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
      data_struct->arrays[i] = aux;
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
    }
  }
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
619
void Children_init(void (*user_function)(void *), void *user_args) {
620
621
  size_t i;
  int numP_parents, root_parents;
622
  int is_intercomm;
623

624
625
626
627
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

628
629
  malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
630
631
632
633
  if(!is_intercomm) { // For intracommunicators, these processes will be added
    MPI_Comm_rank(mall->intercomm, &(mall->myId));
    MPI_Comm_size(mall->intercomm, &(mall->numP));
  }
634

635
  MAM_Comm_main_structures(root_parents);
636

637
  #if USE_MAL_DEBUG
638
    DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
639
640
  #endif

641
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
642
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
643
644
645
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif
646
647
648
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
649

650
651
652
653
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
    } else {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
654

655
656
657
      //for(i=0; i<rep_a_data->entries; i++) {
      //  MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], root_parents, mall->intercomm, &(rep_a_data));
      //} 
658
      #if USE_MAL_DEBUG >= 2
659
        DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
660
      #endif
661
662
663

      int post_ibarrier = 0; 
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { post_ibarrier=1; }
664
      for(i=0; i<dist_a_data->entries; i++) {
665
        async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
666
      }
667
      #if USE_MAL_DEBUG >= 2
668
        DEBUG_FUNC("Targets waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
669
      #endif
670
671
672
      for(i=0; i<dist_a_data->entries; i++) {
        async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
      }
673
    }
674

675
676
677
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
678
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
679
  }
680
  #if USE_MAL_DEBUG
681
    DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
682
  #endif
683

684
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
685
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
686
687
688
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
689
    recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
690
691
692

    // TODO Crear funcion especifica y anyadir para Asinc
    for(i=0; i<rep_s_data->entries; i++) {
693
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], root_parents, mall->intercomm);
694
    } 
695
696
697
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
698
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
699
  }
700
  #if USE_MAL_DEBUG
701
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
702
  #endif
703

704
  // Guardar los resultados de esta transmision
705
  malleability_times_broadcast(mall->root);
706

707
708
709
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
710
  mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
711
  state = MALL_COMPLETED;
712

713
  if(is_intercomm) {
714
715
716
717
718
719
720
721
722
723
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  mall->numC = numP_parents;
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MAM_COMPLETED, MALLEABILITY_CHILDREN);
    user_function(user_args);
724
  } else {
725
    MAM_Commit(NULL);
726
727
  }

728
  #if USE_MAL_DEBUG
729
    if(mall->myId == mall->root){ DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); } fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
730
  #endif
731
732
733
734
735
736
737
738
739
740
741
742
743
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
744
745
746
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
747
  mall_conf->times->spawn_start = MPI_Wtime();
748
 
749
  state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
750

751
  if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
752
753
754
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
755
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
756
757
758
759
  }
  return state;
}

760

761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
776
777
778
779
780
781
782
783
  int rootBcast, is_intercomm;

  is_intercomm = 0;
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
  } else { 
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
784
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
785
  }
786

787
788
789
790
791
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
792

793
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); }
794

795
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
796
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
797
    //FIXME No se envian los datos replicados (rep_a_data)
798
799
800
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
801
    mall_conf->times->async_start = MPI_Wtime();
802
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
803
804
805
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
806
      return MALL_DIST_PENDING; 
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
    }
  } 
  return end_redistribution();
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
825
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
826
 */
827
828
int check_redistribution(int wait_completed) {
  int is_intercomm, completed, local_completed, all_completed, post_ibarrier;
829
  size_t i, req_qty;
830
  MPI_Request *req_completed;
831
  MPI_Win window;
832
  post_ibarrier = 0;
833
  local_completed = 1;
834
  #if USE_MAL_DEBUG >= 2
835
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
836
  #endif
837
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
838

839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
  if(wait_completed) {
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
      if( is_intercomm || mall->myId >= mall->numC) {
        post_ibarrier=1;
      }
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
    }
  } else {
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
      local_completed = local_completed && completed;
    }
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
863
864
  }

865
  #if USE_MAL_DEBUG >= 2
866
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
867
  #endif
868

869
870
871
872
873
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
    async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
874
  }
875

876
877
878
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
879
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
880
881
882
883
884
885
886
887
888
889
890
891
892
  return end_redistribution();
}


/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
893
894
  size_t i;
  int is_intercomm, rootBcast, local_state;
895

896
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
897
898
899
900
901
902
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  
903
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
904
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
905
906
907
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
908
    mall_conf->times->sync_start = MPI_Wtime();
909
910
911
912
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    // TODO Crear funcion especifica y anyadir para Asinc
    for(i=0; i<rep_s_data->entries; i++) {
913
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
914
    } 
915
916
917
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
918
    if(!is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
919
  }
iker_martin's avatar
iker_martin committed
920

921
  malleability_times_broadcast(rootBcast);
iker_martin's avatar
iker_martin committed
922

923
924
  local_state = MALL_DIST_COMPLETED;
  if(!is_intercomm) { // Merge Spawn
925
    if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
926
927
928
      local_state = MALL_SPAWN_ADAPT_PENDING;
    }
  }
929

930
  return local_state;
931
932
}

933
934
935
936

///=============================================
///=============================================
///=============================================
937
//TODO DEPRECATED
iker_martin's avatar
iker_martin committed
938
int shrink_redistribution() {
939
940
941
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
942
    double time_extra = MPI_Wtime();
943

944
    MPI_Abort(MPI_COMM_WORLD, -20); //                                                         
945
    zombies_collect_suspended(*(mall->user_comm), mall->myId, mall->numP, mall->numC, mall->root);
iker_martin's avatar
iker_martin committed
946
947
    
    if(mall->myId < mall->numC) {
948
      if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
949
950
951
952
953
      if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

      MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
      MPI_Comm_dup(mall->intercomm, &(mall->comm));

954
955
956
      MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
      MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");

957
958
      MPI_Comm_free(&(mall->intercomm));

959
960
961
962

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
963
      mall_conf->times->spawn_time += MPI_Wtime() - time_extra;
964
      return MALL_DIST_COMPLETED;
iker_martin's avatar
iker_martin committed
965
    } else {
966
      return MALL_ZOMBIE;
iker_martin's avatar
iker_martin committed
967
968
969
    }
}

970
971
972
973
974
975
976
// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

977
978

int comm_state; //FIXME Usar un handler
979
980
981
982
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
983
  comm_state = MALL_DIST_PENDING;
984
985
986
987
988
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
989
  return comm_state;
990
991
992
993
994
995
996
997
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
998
int thread_check(int wait_completed) {
999
  int all_completed = 0, is_intercomm;
1000

1001
1002
1003
1004
1005
1006
1007
1008
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1009
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1010
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1011
1012
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
  //FIXME No se tiene en cuenta el estado MALL_APP_ENDED
1013
1014
1015
1016
1017
1018

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1019
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
1020
1021
1022
1023

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1024
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
  return end_redistribution();
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1037
void* thread_async_work() {
1038
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1039
  comm_state = MALL_DIST_COMPLETED;
1040
1041
  pthread_exit(NULL);
}
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
1054
1055
  MPI_Comm_get_name(*(mall->user_comm), test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
1056
1057
1058
1059
1060
1061
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
1062

1063
1064
1065
/*
 * Función solo necesaria en Merge
 */
1066
1067
1068
1069
1070
1071
1072
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
  MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}

/*
 * Converts the name of a Key to its value version
 */
int MAM_I_convert_key(char *key) {
  size_t i; 

  for(i=0; i<MAM_KEY_COUNT; i++) {
    if(strcmp(key, mam_key_names[i]) == 0) { // Equal
      return i;
    }
  }
  return MALL_DENIED;
1089
}
1090

1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
/*
 * TODO Por hacer
 */
void MAM_I_create_user_struct(int mam_state, int is_children_group) {
  user_reconf->comm = mall->tmp_comm;
  user_reconf->rank_state = mam_state;

  if(is_children_group) {
    user_reconf->rank_state = is_children_group; //FIXME Elegir nombre adecuado
    user_reconf->numS = mall->numC;
    if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numC;
    else user_reconf->numT = mall->numC + mall->numP;
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
  }
}