malleabilityManager.c 37 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
//#include "malleabilityManager.h"
#include "MAM.h"
5
#include "malleabilityStates.h"
6
#include "malleabilityDataStructures.h"
7
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
8
#include "malleabilityZombies.h"
9
#include "malleabilityTimes.h"
10
#include "malleabilityRMS.h"
11
#include "MAM_Init_Configuration.h"
12
#include "spawn_methods/GenericSpawn.h"
13
14
15
16
17
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1

18
void MAM_Commit(int *mam_state, int is_children_group);
19
20
21
22

void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

23

24
25
int MAM_St_rms(int *mam_state);
int MAM_St_spawn_start();
26
27
28
29
30
31
32
33
34
35
36
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
int MAM_St_red_pending(int *mam_state, int wait_completed);
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);


37
void Children_init(void (*user_function)(void *), void *user_args);
38
39
int spawn_step();
int start_redistribution();
40
int check_redistribution(int wait_completed);
41
int end_redistribution();
iker_martin's avatar
iker_martin committed
42
int shrink_redistribution();
43
44

int thread_creation();
45
int thread_check(int wait_completed);
46
void* thread_async_work();
47

48
void print_comms_state();
49
void malleability_comms_update(MPI_Comm comm);
50

51
int MAM_I_convert_key(char *key);
52
void MAM_I_create_user_struct(int is_children_group);
53

54
55
56
57
58
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

59
60
mam_user_reconf_t *user_reconf;

61
/*
62
63
64
65
66
67
68
69
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
70
 */
71
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(void *), void *user_args) {
72
73
74
75
  MPI_Comm dup_comm, thread_comm;

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
76
77
78
79
80
81
82
83
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
84

85
86
87
88
89
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

90
91
92
93
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
94
95

  mall->root = root;
96
  mall->root_parents = -1;
97
  mall->zombie = 0;
98
  mall->comm = dup_comm;
99
  mall->thread_comm = thread_comm;
100
101
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
102

103
  mall->name_exec = name_exec;
104
  mall->nodelist = NULL;
105
106
107
108
109
110

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

111
  state = MALL_NOT_STARTED;
112

113
  MAM_Init_configuration();
114
  zombies_service_init();
115
  init_malleability_times();
116
  MAM_Def_main_datatype();
117

118
119
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
120
  if(mall->intercomm != MPI_COMM_NULL) { 
121
    Children_init(user_function, user_args);
122
    return MALLEABILITY_CHILDREN;
123
  }
124

125
126
  MAM_check_hosts();
  MAM_Check_configuration();
iker_martin's avatar
iker_martin committed
127

128
129
130
131
132
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

133
  #if USE_MAL_DEBUG
134
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
135
136
  #endif

137
  return MALLEABILITY_NOT_CHILDREN;
138
139
}

140
141
142
143
144
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
145
void MAM_Finalize() {	  
146
147
148
149
150
151
152
153
154
155
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);

156
  MAM_Free_main_datatype();
157
  free_malleability_times();
158
159
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
160
  if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
161
162
  free(mall);
  free(mall_conf);
163
  free(user_reconf);
iker_martin's avatar
iker_martin committed
164
165
166
167

  zombies_awake();
  zombies_service_free();

168
  state = MALL_UNRESERVED;
169
170
}

171
172
/* 
 * TODO Reescribir
173
174
175
176
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
177
 *
178
179
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
180
181
 *
 */
182
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
183
  int call_checkpoint = 0;
184
185
186

  switch(state) {
    case MALL_UNRESERVED:
187
      *mam_state = MAM_UNRESERVED;
188
189
      break;
    case MALL_NOT_STARTED:
190
191
192
193
      call_checkpoint = MAM_St_rms(mam_state);
      break;
    case MALL_RMS_COMPLETED:
      call_checkpoint = MAM_St_spawn_start();
194
      break;
195

196
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
197
    case MALL_SPAWN_SINGLE_PENDING:
198
      call_checkpoint = MAM_St_spawn_pending(wait_completed);
199
      break;
200

201
202
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
203
      call_checkpoint = MAM_St_red_start();
204
      break;
205

206
    case MALL_DIST_PENDING:
207
      call_checkpoint = MAM_St_red_pending(mam_state, wait_completed);
208
209
      break;

210
211
    case MALL_USER_PENDING:
      call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
212
      break;
213

214
215
    case MALL_USER_COMPLETED:
      call_checkpoint = MAM_St_user_completed();
216
      break;
217

218
219
    case MALL_SPAWN_ADAPT_PENDING:
      call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
220
221
      break;

222
223
224
    case MALL_SPAWN_ADAPTED:
    case MALL_DIST_COMPLETED:
      call_checkpoint = MAM_St_completed(mam_state);
225
226
      break;
  }
227

228
229
  if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
  if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
230
231
232
  return state;
}

233
234
235
/*
 * TODO
 */
236
237
238
239
240
241
242
243
244
245
void MAM_Resume_redistribution(int *mam_state) {
  state = MALL_USER_COMPLETED;
  *mam_state = MAM_PENDING;
}

/*
 * TODO
 */
void MAM_Commit(int *mam_state, int rootBcast) {
  int zombies = 0;
246
  #if USE_MAL_DEBUG
247
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
248
249
  #endif

250
251
252
253
  // Get times before commiting
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    // This communication is only needed when a root process will become a zombie
    malleability_times_broadcast(rootBcast);
254
  }
255

256
  // Free unneded communicators
257
258
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
259
260
261
262
263
264
265
266

  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    MPI_Allreduce(&mall->zombie, &zombies, 1, MPI_INT, MPI_MAX, mall->comm);
    if(zombies) {
      zombies_collect_suspended(mall->comm);
    }
  }
267

268
  // Zombies KILL
269
  if(mall->zombie) {
270
    #if USE_MAL_DEBUG >= 2
271
272
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
273
    MAM_Finalize();
274
    MPI_Finalize();
275
276
277
    exit(0);
  }

278
279
280
281
  // Reset/Free communicators
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge

282
283
  MPI_Comm_rank(mall->comm, &(mall->myId));
  MPI_Comm_size(mall->comm, &(mall->numP));
284
285
286
  mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
  mall->root_parents = -1;
  state = MALL_NOT_STARTED;
287
  if(mam_state != NULL) *mam_state = MAM_COMPLETED;
288

289
  // Set new communicator
290
291
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
292
  #if USE_MAL_DEBUG
293
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
294
  #endif
295
296
297
298
299

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_end = MPI_Wtime();
300
301
}

302
303
304
305
306
307
308
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return 0;
}

309
310
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
311
312
313
314
315
316
317
318
319
}

/*
 * Anyade a la estructura concreta de datos elegida
 * el nuevo set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "add_data".
320
 *
321
 */
322
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
323
  size_t total_reqs = 0;
324

325
  if(is_constant) { //Async
326
    if(is_replicated) {
327
      total_reqs = 1;
328
      add_data(data, total_qty, type, total_reqs, rep_a_data);
329
    } else {
330
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
331
        total_reqs = 1;
332
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
333
        total_reqs = mall->numC;
334
335
      } 
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
336
337
        total_reqs++;
      }
338
339
340
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
    }
341
  } else { //Sync
342
343
344
345
346
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
    }
347
348
349
  }
}

350
351
352
353
354
355
356
357
/*
 * Modifica en la estructura concreta de datos elegida en el indice "index"
 * con el set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "modify_data".
 */
358
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
359
360
  size_t total_reqs = 0;

361
362
  if(is_constant) {
    if(is_replicated) {
363
      total_reqs = 1;
364
365
366
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
367
        total_reqs = 1;
368
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
369
370
        total_reqs = mall->numC;
      }
371
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
372
373
        total_reqs++;
      }
374
      
375
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
376
    }
377
378
379
380
381
382
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
383
384
385
  }
}

386
387
388
389
/*
 * Devuelve el numero de entradas para la estructura de descripcion de 
 * datos elegida.
 */
390
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
391
392
393
  
  if(is_constant) {
    if(is_replicated) {
394
      *entries = rep_a_data->entries;
395
    } else {
396
      *entries = dist_a_data->entries;
397
398
399
    }
  } else {
    if(is_replicated) {
400
      *entries = rep_s_data->entries;
401
    } else {
402
      *entries = dist_s_data->entries;
403
404
405
406
407
408
409
410
411
412
413
    }
  }
}

/*
 * Devuelve el elemento de la lista "index" al usuario.
 * La devolución es en el mismo orden que lo han metido los padres
 * con la funcion "malleability_add_data()".
 * Es tarea del usuario saber el tipo de esos datos.
 * TODO Refactor a que sea automatico
 */
414
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
415
416
417
418
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
419
      data_struct = rep_a_data;
420
    } else {
421
      data_struct = dist_a_data;
422
423
424
    }
  } else {
    if(is_replicated) {
425
      data_struct = rep_s_data;
426
    } else {
427
      data_struct = dist_s_data;
428
429
430
    }
  }

431
  *data = data_struct->arrays[index];
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
}


//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
447
  size_t i;
448
  void *aux_send, *aux_recv;
449
450
451

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
452
      aux_send = data_struct->arrays[i];
453
      aux_recv = NULL;
454
455
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,  
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
456
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
457
458
459
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
460
      aux_send = data_struct->arrays[i];
461
      aux_recv = NULL;
462
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
463
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
464
465
466
467
468
469
470
471
472
473
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
474
  size_t i;
475
  void *aux, *aux_s = NULL;
476
477
478

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
479
      aux = data_struct->arrays[i];
480
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN,
481
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
482
      data_struct->arrays[i] = aux;
483
484
485
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
486
      aux = data_struct->arrays[i];
487
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
488
      data_struct->arrays[i] = aux;
489
490
491
492
    }
  }
}

493
494
495
496
497
498
499

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||

500
int MAM_St_rms(int *mam_state) {
501
  *mam_state = MAM_NOT_STARTED;
502
  state = MALL_RMS_COMPLETED;
503
504
505
506
507
508
509
510
  reset_malleability_times();
  // Comprobar si se tiene que realizar un redimensionado
      
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_start = MPI_Wtime();
  //if(CHECK_RMS()) {return MALL_DENIED;}
511
512
  return 1;
}
513

514
int MAM_St_spawn_start() {
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
  state = spawn_step();
  //FIXME Esto es necesario pero feo
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
  else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }

  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
    return 1;
  }
  return 0;
}

int MAM_St_spawn_pending(int wait_completed) {
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_red_start() {
  state = start_redistribution();
  return 1;
}

int MAM_St_red_pending(int *mam_state, int wait_completed) {
544
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
    state = thread_check(wait_completed);
  } else {
    state = check_redistribution(wait_completed);
  }

  if(state != MALL_DIST_PENDING) { 
    if(mall->is_intercomm) {
      MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
    } else {
      MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
    }
    MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
    state = MALL_USER_PENDING;
    *mam_state = MAM_USER_PENDING;
    return 1;
  }
  return 0;
}

int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
  #endif
  if(user_function != NULL) {
    MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
    do {
      user_function(user_args);
    } while(wait_completed && state == MALL_USER_PENDING);
  } else {
    MAM_Resume_redistribution(mam_state);
  }

  if(state != MALL_USER_PENDING) {
    #if USE_MAL_DEBUG
      if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
    #endif
    return 1;
  }
  return 0;
}

int MAM_St_user_completed() {
  state = end_redistribution();
  return 1;
}

int MAM_St_spawn_adapt_pending(int wait_completed) {
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_start = MPI_Wtime();
  unset_spawn_postpone_flag(state);
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);

599
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_completed(int *mam_state) {
  int rootBcast;

  if(mall->is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  MAM_Commit(mam_state, rootBcast);
  return 0;
}


622
623
624
625
626
627
628
629
630
631
632
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
633
void Children_init(void (*user_function)(void *), void *user_args) {
634
  size_t i;
635
  int numP_parents;
636

637
638
639
640
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

641
642
643
  malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &mall->root_parents, &(mall->intercomm));
  MPI_Comm_test_inter(mall->intercomm, &mall->is_intercomm);
  if(!mall->is_intercomm) { // For intracommunicators, these processes will be added
644
645
646
    MPI_Comm_rank(mall->intercomm, &(mall->myId));
    MPI_Comm_size(mall->intercomm, &(mall->numP));
  }
647

648
  MAM_Comm_main_structures(mall->root_parents);
649

650
  #if USE_MAL_DEBUG
651
    DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
652
653
  #endif

654
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, mall->root_parents, mall->intercomm);
655
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
656
657
658
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif
659
660
661
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
662

663
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
664
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
665
666
667
      for(i=0; i<rep_a_data->entries; i++) {
        MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_parents, mall->intercomm);
      } 
668
669
    } else {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
670

671
672
673
      for(i=0; i<rep_a_data->entries; i++) {
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_parents, mall->intercomm, &(rep_a_data->requests[i][0]));
      } 
674
      #if USE_MAL_DEBUG >= 2
675
        DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
676
      #endif
677
678

      int post_ibarrier = 0; 
679
680
681
682
      // FIXME No permite el uso de ibarrier ahora mismo. Realmente solo hace falta un ibarrier para todos
      for(i=0; i<rep_a_data->entries; i++) {
        async_communication_wait(mall->intercomm, rep_a_data->requests[i], rep_a_data->request_qty[i], post_ibarrier);
      }
683
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { post_ibarrier=1; }
684
      for(i=0; i<dist_a_data->entries; i++) {
685
        async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
686
      }
687
      #if USE_MAL_DEBUG >= 2
688
        DEBUG_FUNC("Targets waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
689
      #endif
690
      for(i=0; i<dist_a_data->entries; i++) {
691
        async_communication_end(dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
692
      }
693
      for(i=0; i<rep_a_data->entries; i++) {
694
        async_communication_end(rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
695
      }
696
    }
697

698
699
700
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
701
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
702
  }
703
  #if USE_MAL_DEBUG
704
    DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
705
  #endif
706

707
708
709
710
711
712
713
714
715
716
717
718
719
720
  if(mall->is_intercomm) {
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  mall->numC = numP_parents;
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
    user_function(user_args);
  }

  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, mall->root_parents, mall->intercomm);
721
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
722
723
724
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
725
    recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
726
727

    for(i=0; i<rep_s_data->entries; i++) {
728
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_parents, mall->intercomm);
729
    } 
730
731
732
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
733
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
734
  }
735
  #if USE_MAL_DEBUG
736
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
737
  #endif
738

739
  MAM_Commit(NULL, mall->root_parents);
740

741
  #if USE_MAL_DEBUG
742
    DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
743
  #endif
744
745
746
747
748
749
750
751
752
753
754
755
756
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
757
758
759
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
760
  mall_conf->times->spawn_start = MPI_Wtime();
761
 
762
  state = init_spawn(mall->name_exec, mall->myId, mall->numP, mall->numC, mall->root, mall->thread_comm, &(mall->intercomm));
763

764
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
765
766
767
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
768
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
769
770
771
772
  }
  return state;
}

773

774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
789
790
  int rootBcast;
  size_t i;
791

792
  mall->is_intercomm = 0;
793
  if(mall->intercomm != MPI_COMM_NULL) {
794
    MPI_Comm_test_inter(mall->intercomm, &mall->is_intercomm);
795
796
797
  } else { 
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
798
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
799
  }
800

801
  if(mall->is_intercomm) {
802
803
804
805
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
806

807
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); }
808

809
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
810
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
811
812
813
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
814
    mall_conf->times->async_start = MPI_Wtime();
815
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
816
817
818
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
819
820
821
      for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], rootBcast, mall->intercomm, &(rep_a_data->requests[i][0]));
      } 
822
      return MALL_DIST_PENDING; 
823
824
    }
  } 
825
  return MALL_USER_PENDING;
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
841
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
842
 */
843
int check_redistribution(int wait_completed) {
844
  int completed, local_completed, all_completed, post_ibarrier;
845
  size_t i, req_qty;
846
  MPI_Request *req_completed;
847
  MPI_Win window;
848
  post_ibarrier = 0;
849
  local_completed = 1;
850
  #if USE_MAL_DEBUG >= 2
851
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
852
  #endif
853

854
  if(wait_completed) {
855
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
856
      if( mall->is_intercomm || mall->myId >= mall->numC) {
857
858
859
860
861
862
863
864
        post_ibarrier=1;
      }
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
    }
865
866
867
868
869
    for(i=0; i<rep_a_data->entries; i++) {
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
      async_communication_wait(mall->intercomm, req_completed, req_qty, 0); //FIXME Ibarrier does not work with rep_a_data
    }
870
871
872
873
  } else {
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
874
      completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall->intercomm, req_completed, req_qty);
875
876
      local_completed = local_completed && completed;
    }
877
878
879
    for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
880
      completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall->intercomm, req_completed, req_qty);
881
882
      local_completed = local_completed && completed;
    }
883
884
885
886
887
888
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
889
890
  }

891
  #if USE_MAL_DEBUG >= 2
892
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
893
  #endif
894

895
896
897
898
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
899
    async_communication_end(req_completed, req_qty, &window);
900
  }
901
902
903
904
  for(i=0; i<rep_a_data->entries; i++) {
    req_completed = rep_a_data->requests[i];
    req_qty = rep_a_data->request_qty[i];
    window = rep_a_data->windows[i];
905
    async_communication_end(req_completed, req_qty, &window);
906
  }
907

908
909
910
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
911
912
  if(!mall->is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
  return MALL_USER_PENDING;
913
914
915
916
917
918
919
920
921
922
923
924
}


/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
925
  size_t i;
926
  int rootBcast, local_state;
927

928
  if(mall->is_intercomm) {
929
930
931
932
933
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  
934
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
935
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
936
937
938
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
939
    mall_conf->times->sync_start = MPI_Wtime();
940
941
942
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    for(i=0; i<rep_s_data->entries; i++) {
943
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
944
    } 
945
946
947
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
948
    if(!mall->is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
949
  }
iker_martin's avatar
iker_martin committed
950

951
  local_state = MALL_DIST_COMPLETED;
952
  if(!mall->is_intercomm) { // Merge Spawn
953
    if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
954
955
956
      local_state = MALL_SPAWN_ADAPT_PENDING;
    }
  }
957

958
  return local_state;
959
960
961
962
963
964
965
966
967
}

// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

968
969

int comm_state; //FIXME Usar un handler
970
971
972
973
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
974
  comm_state = MALL_DIST_PENDING;
975
976
977
978
979
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
980
  return comm_state;
981
982
983
984
985
986
987
988
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
989
int thread_check(int wait_completed) {
990
  int all_completed = 0;
991

992
993
994
995
996
997
998
999
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1000
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1001
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1002
1003
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
  //FIXME No se tiene en cuenta el estado MALL_APP_ENDED
1004
1005
1006
1007
1008
1009

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1010
1011
1012
1013

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1014
  if(!mall->is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
  return end_redistribution();
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1027
void* thread_async_work() {
1028
1029
1030
1031
1032
1033
1034
1035
1036
  int rootBcast;
  size_t i;

  if(mall->is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }

1037
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1038
1039
1040
  for(i=0; i<rep_a_data->entries; i++) {
    MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], rootBcast, mall->intercomm);
  } 
1041
  comm_state = MALL_DIST_COMPLETED;
1042
1043
  pthread_exit(NULL);
}
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
1056
1057
  MPI_Comm_get_name(*(mall->user_comm), test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
1058
1059
1060
1061
1062
1063
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
1064

1065
1066
1067
/*
 * Función solo necesaria en Merge
 */
1068
1069
1070
1071
1072
1073
1074
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1075
1076
1077
1078
1079
1080
1081
  MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}

/*
 * TODO Por hacer
 */
1082
void MAM_I_create_user_struct(int is_children_group) {
1083
1084
1085
  user_reconf->comm = mall->tmp_comm;

  if(is_children_group) {
1086
    user_reconf->rank_state = MAM_PROC_NEW_RANK;
1087
1088
1089
1090
1091
1092
    user_reconf->numS = mall->numC;
    if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numC;
    else user_reconf->numT = mall->numC + mall->numP;
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
1093
1094
    if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
    else user_reconf->rank_state = MAM_PROC_CONTINUE;
1095
1096
  }
}