malleabilityManager.c 37.2 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
//#include "malleabilityManager.h"
#include "MAM.h"
5
#include "malleabilityStates.h"
6
#include "malleabilityDataStructures.h"
7
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
8
#include "malleabilityZombies.h"
9
#include "malleabilityTimes.h"
10
#include "malleabilityRMS.h"
11
#include "MAM_Init_Configuration.h"
12
#include "spawn_methods/GenericSpawn.h"
13
14
15
16
17
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1

18
void MAM_Commit(int *mam_state);
19
20
21
22

void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

23

24
25
int MAM_St_rms(int *mam_state);
int MAM_St_spawn_start();
26
27
28
29
30
31
32
33
34
35
36
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
int MAM_St_red_pending(int *mam_state, int wait_completed);
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);


37
void Children_init(void (*user_function)(void *), void *user_args);
38
39
int spawn_step();
int start_redistribution();
40
int check_redistribution(int wait_completed);
41
int end_redistribution();
iker_martin's avatar
iker_martin committed
42
int shrink_redistribution();
43
44

int thread_creation();
45
int thread_check(int wait_completed);
46
void* thread_async_work();
47

48
void print_comms_state();
49
void malleability_comms_update(MPI_Comm comm);
50

51
int MAM_I_convert_key(char *key);
52
void MAM_I_create_user_struct(int is_children_group);
53

54
55
56
57
58
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

59
60
mam_user_reconf_t *user_reconf;

61
/*
62
63
64
65
66
67
68
69
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
70
 */
71
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(void *), void *user_args) {
72
73
74
75
  MPI_Comm dup_comm, thread_comm;

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
76
77
78
79
80
81
82
83
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
84

85
86
87
88
89
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

90
91
92
93
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
94
95

  mall->root = root;
iker_martin's avatar
iker_martin committed
96
  mall->root_parents = root;
97
  mall->zombie = 0;
98
  mall->comm = dup_comm;
99
  mall->thread_comm = thread_comm;
100
101
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
102

103
  mall->name_exec = name_exec;
104
  mall->nodelist = NULL;
105
  mall->nodelist_len = 0;
106
107
108
109
110
111

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

112
  state = MALL_NOT_STARTED;
113

114
  MAM_Init_configuration();
115
  zombies_service_init();
116
  init_malleability_times();
117
  MAM_Def_main_datatype();
118

119
120
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
121
  if(mall->intercomm != MPI_COMM_NULL) { 
122
    Children_init(user_function, user_args);
123
    return MALLEABILITY_CHILDREN;
124
  }
125

126
  MAM_check_hosts();
127
  MAM_Set_initial_configuration();
iker_martin's avatar
iker_martin committed
128

129
130
131
132
133
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

134
  #if USE_MAL_DEBUG
135
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
136
137
  #endif

138
  return MALLEABILITY_NOT_CHILDREN;
139
140
}

141
142
143
144
145
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
146
void MAM_Finalize() {	  
147
148
149
150
151
152
153
154
155
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);
156
  if(mall->nodelist != NULL) free(mall->nodelist);
157

158
  MAM_Free_main_datatype();
159
  free_malleability_times();
160
161
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
162
  if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
163
164
  free(mall);
  free(mall_conf);
165
  free(user_reconf);
iker_martin's avatar
iker_martin committed
166
167
168
169

  zombies_awake();
  zombies_service_free();

170
  state = MALL_UNRESERVED;
171
172
}

173
174
/* 
 * TODO Reescribir
175
176
177
178
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
179
 *
180
181
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
182
183
 *
 */
184
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
185
  int call_checkpoint = 0;
186

187
  //TODO This could be changed to an array with the functions to call in each case
188
189
  switch(state) {
    case MALL_UNRESERVED:
190
      *mam_state = MAM_UNRESERVED;
191
192
      break;
    case MALL_NOT_STARTED:
193
194
195
196
      call_checkpoint = MAM_St_rms(mam_state);
      break;
    case MALL_RMS_COMPLETED:
      call_checkpoint = MAM_St_spawn_start();
197
      break;
198

199
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
200
    case MALL_SPAWN_SINGLE_PENDING:
201
      call_checkpoint = MAM_St_spawn_pending(wait_completed);
202
      break;
203

204
205
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
206
      call_checkpoint = MAM_St_red_start();
207
      break;
208

209
    case MALL_DIST_PENDING:
210
      call_checkpoint = MAM_St_red_pending(mam_state, wait_completed);
211
212
      break;

213
214
    case MALL_USER_PENDING:
      call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
215
      break;
216

217
218
    case MALL_USER_COMPLETED:
      call_checkpoint = MAM_St_user_completed();
219
      break;
220

221
222
    case MALL_SPAWN_ADAPT_PENDING:
      call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
223
224
      break;

225
226
227
    case MALL_SPAWN_ADAPTED:
    case MALL_DIST_COMPLETED:
      call_checkpoint = MAM_St_completed(mam_state);
228
229
      break;
  }
230

231
232
  if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
  if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
233
234
235
  return state;
}

236
237
238
/*
 * TODO
 */
239
240
241
242
243
244
245
246
void MAM_Resume_redistribution(int *mam_state) {
  state = MALL_USER_COMPLETED;
  *mam_state = MAM_PENDING;
}

/*
 * TODO
 */
247
void MAM_Commit(int *mam_state) {
248
  int zombies = 0;
249
  #if USE_MAL_DEBUG
250
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
251
252
  #endif

253
254
255
  // Get times before commiting
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    // This communication is only needed when a root process will become a zombie
256
    malleability_times_broadcast(mall->root_collectives);
257
  }
258

259
  // Free unneded communicators
260
261
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
262
263
264
265
266
267
268
269

  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    MPI_Allreduce(&mall->zombie, &zombies, 1, MPI_INT, MPI_MAX, mall->comm);
    if(zombies) {
      zombies_collect_suspended(mall->comm);
    }
  }
270

271
  // Zombies KILL
272
  if(mall->zombie) {
273
    #if USE_MAL_DEBUG >= 2
274
275
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
276
    MAM_Finalize();
277
    MPI_Finalize();
278
279
280
    exit(0);
  }

281
282
283
284
  // Reset/Free communicators
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge

iker_martin's avatar
iker_martin committed
285
286
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);
287
288
  mall->root = mall_conf->spawn_method == MALL_SPAWN_BASELINE ? mall->root : mall->root_parents;
  mall->root_parents = mall->root;
289
  state = MALL_NOT_STARTED;
290
  if(mam_state != NULL) *mam_state = MAM_COMPLETED;
291

292
  // Set new communicator
293
294
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
295
  #if USE_MAL_DEBUG
296
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
297
  #endif
298
299
300
301
302

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_end = MPI_Wtime();
303
304
}

305
306
307
308
309
310
311
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return 0;
}

312
313
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
314
315
316
317
318
319
320
321
322
}

/*
 * Anyade a la estructura concreta de datos elegida
 * el nuevo set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "add_data".
323
 *
324
 */
325
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
326
  size_t total_reqs = 0;
327

328
  if(is_constant) { //Async
329
    if(is_replicated) {
330
      total_reqs = 1;
331
      add_data(data, total_qty, type, total_reqs, rep_a_data);
332
    } else {
333
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
334
        total_reqs = 1;
335
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
336
        total_reqs = mall->numC;
337
      } 
338
339
340
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
    }
341
  } else { //Sync
342
343
344
345
346
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
    }
347
348
349
  }
}

350
351
352
353
354
355
356
357
/*
 * Modifica en la estructura concreta de datos elegida en el indice "index"
 * con el set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "modify_data".
 */
358
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
359
360
  size_t total_reqs = 0;

361
362
  if(is_constant) {
    if(is_replicated) {
363
      total_reqs = 1;
364
365
366
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
367
        total_reqs = 1;
368
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
369
370
371
        total_reqs = mall->numC;
      }
      
372
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
373
    }
374
375
376
377
378
379
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
380
381
382
  }
}

383
384
385
386
/*
 * Devuelve el numero de entradas para la estructura de descripcion de 
 * datos elegida.
 */
387
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
388
389
390
  
  if(is_constant) {
    if(is_replicated) {
391
      *entries = rep_a_data->entries;
392
    } else {
393
      *entries = dist_a_data->entries;
394
395
396
    }
  } else {
    if(is_replicated) {
397
      *entries = rep_s_data->entries;
398
    } else {
399
      *entries = dist_s_data->entries;
400
401
402
403
404
405
406
407
408
409
410
    }
  }
}

/*
 * Devuelve el elemento de la lista "index" al usuario.
 * La devolución es en el mismo orden que lo han metido los padres
 * con la funcion "malleability_add_data()".
 * Es tarea del usuario saber el tipo de esos datos.
 * TODO Refactor a que sea automatico
 */
411
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
412
413
414
415
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
416
      data_struct = rep_a_data;
417
    } else {
418
      data_struct = dist_a_data;
419
420
421
    }
  } else {
    if(is_replicated) {
422
      data_struct = rep_s_data;
423
    } else {
424
      data_struct = dist_s_data;
425
426
427
    }
  }

428
  *data = data_struct->arrays[index];
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
}


//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
444
  size_t i;
445
  void *aux_send, *aux_recv;
446
447
448

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
449
      aux_send = data_struct->arrays[i];
450
      aux_recv = NULL;
451
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,  
452
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
453
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
454
455
456
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
457
      aux_send = data_struct->arrays[i];
458
      aux_recv = NULL;
459
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
460
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
461
462
463
464
465
466
467
468
469
470
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
471
  size_t i;
472
  void *aux, *aux_s = NULL;
473
474
475

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
476
      aux = data_struct->arrays[i];
477
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN,
478
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
479
      data_struct->arrays[i] = aux;
480
481
482
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
483
      aux = data_struct->arrays[i];
484
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
485
      data_struct->arrays[i] = aux;
486
487
488
489
    }
  }
}

490
491
492
493
494
495

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||
496
497
498
499
//======================================================||
//======================================================||
//======================================================||
//======================================================||
500

501
int MAM_St_rms(int *mam_state) {
502
503
504
505
506
  reset_malleability_times();
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_start = MPI_Wtime();
507
508
509
510
511
512
513

  *mam_state = MAM_NOT_STARTED;
  state = MALL_RMS_COMPLETED;
  MAM_Check_configuration();
  mall->wait_targets_posted = 0;

  //if(CHECK_RMS()) {return MALL_DENIED;}    
514
515
  return 1;
}
516

517
int MAM_St_spawn_start() {
518
  mall->num_parents = mall->numP;
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
  state = spawn_step();
  //FIXME Esto es necesario pero feo
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
  else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }

  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
    return 1;
  }
  return 0;
}

int MAM_St_spawn_pending(int wait_completed) {
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_red_start() {
543
544
545
546
547
548
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    mall->root_collectives = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    mall->root_collectives = mall->root;
  }

549
550
551
552
553
  state = start_redistribution();
  return 1;
}

int MAM_St_red_pending(int *mam_state, int wait_completed) {
554
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
555
556
557
558
559
560
    state = thread_check(wait_completed);
  } else {
    state = check_redistribution(wait_completed);
  }

  if(state != MALL_DIST_PENDING) { 
561
    if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
      MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
    } else {
      MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
    }
    MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
    state = MALL_USER_PENDING;
    *mam_state = MAM_USER_PENDING;
    return 1;
  }
  return 0;
}

int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
  #endif
  if(user_function != NULL) {
    MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
    do {
      user_function(user_args);
    } while(wait_completed && state == MALL_USER_PENDING);
  } else {
    MAM_Resume_redistribution(mam_state);
  }

  if(state != MALL_USER_PENDING) {
    #if USE_MAL_DEBUG
      if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
    #endif
    return 1;
  }
  return 0;
}

int MAM_St_user_completed() {
  state = end_redistribution();
  return 1;
}

int MAM_St_spawn_adapt_pending(int wait_completed) {
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_start = MPI_Wtime();
  unset_spawn_postpone_flag(state);
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);

609
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
610
611
612
613
614
615
616
617
618
619
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_completed(int *mam_state) {
620
  MAM_Commit(mam_state);
621
622
623
624
  return 0;
}


625
626
627
628
629
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
630
631
632
633
//======================================================||
//======================================================||
//======================================================||
//======================================================||
634
635
636
637
638
639
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
640
void Children_init(void (*user_function)(void *), void *user_args) {
641
  size_t i;
642

643
  #if USE_MAL_DEBUG
644
    DEBUG_FUNC("MaM will now initialize spawned processes", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
645
646
  #endif

iker_martin's avatar
iker_martin committed
647
  malleability_connect_children(mall->comm, &(mall->intercomm));
648
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { // For Merge Method, these processes will be added
iker_martin's avatar
iker_martin committed
649
650
    MPI_Comm_rank(mall->intercomm, &mall->myId);
    MPI_Comm_size(mall->intercomm, &mall->numP);
651
  }
652
  mall->root_collectives = mall->root_parents;
653

654
  #if USE_MAL_DEBUG
655
    DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
656
657
  #endif

658
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
659
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
660
    #if USE_MAL_DEBUG >= 2
661
      DEBUG_FUNC("Spawned start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
662
    #endif
663
664
665
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
666

667
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
iker_martin's avatar
iker_martin committed
668
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
669
      for(i=0; i<rep_a_data->entries; i++) {
670
        MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
671
      } 
672
    } else {
iker_martin's avatar
iker_martin committed
673
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
674

675
      for(i=0; i<rep_a_data->entries; i++) {
676
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
677
      } 
678
      #if USE_MAL_DEBUG >= 2
679
        DEBUG_FUNC("Spawned started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
680
      #endif
681

682
      for(i=0; i<rep_a_data->entries; i++) {
683
        async_communication_wait(rep_a_data->requests[i], rep_a_data->request_qty[i]);
684
      }
685
      for(i=0; i<dist_a_data->entries; i++) {
686
        async_communication_wait(dist_a_data->requests[i], dist_a_data->request_qty[i]);
687
      }
688
689
690
691
692
693
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE);
      }

694
      #if USE_MAL_DEBUG >= 2
695
        DEBUG_FUNC("Spawned waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
696
      #endif
697
      for(i=0; i<dist_a_data->entries; i++) {
698
        async_communication_end(dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
699
      }
700
      for(i=0; i<rep_a_data->entries; i++) {
701
        async_communication_end(rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
702
      }
703
    }
704

705
706
707
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
708
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
709
  }
710
  #if USE_MAL_DEBUG
711
    DEBUG_FUNC("Spawned have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
712
  #endif
713

714
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
715
716
717
718
719
720
721
722
723
724
725
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
    user_function(user_args);
  }

726
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN);
727
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
728
729
730
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
iker_martin's avatar
iker_martin committed
731
    recv_data(mall->num_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
732
733

    for(i=0; i<rep_s_data->entries; i++) {
734
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
735
    } 
736
737
738
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
739
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
740
  }
741
  #if USE_MAL_DEBUG
742
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
743
  #endif
744

745
  MAM_Commit(NULL);
746

747
  #if USE_MAL_DEBUG
748
    DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
749
  #endif
750
751
752
753
754
755
756
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||
757
758
//======================================================||
//======================================================||
759
760
761
762
763
764

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
765
766
767
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
768
  mall_conf->times->spawn_start = MPI_Wtime();
769
 
iker_martin's avatar
iker_martin committed
770
  state = init_spawn(mall->thread_comm, &(mall->intercomm));
771

772
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
773
774
775
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
776
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
777
778
779
780
  }
  return state;
}

781

782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
797
  size_t i;
798

799
  if(mall->intercomm == MPI_COMM_NULL) {
800
801
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
802
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
803
  }
804

805
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN);
806
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
807
808
809
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
810
    mall_conf->times->async_start = MPI_Wtime();
811
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
812
813
814
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
815
      for(i=0; i<rep_a_data->entries; i++) {
816
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
817
      } 
818
819
820
821
822

      if(mall->zombie && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
      }
823
      return MALL_DIST_PENDING; 
824
825
    }
  } 
826
  return MALL_USER_PENDING;
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
842
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
843
 */
844
int check_redistribution(int wait_completed) {
845
  int completed, local_completed, all_completed;
846
  size_t i, req_qty;
847
  MPI_Request *req_completed;
848
849
  MPI_Win window;
  local_completed = 1;
850
  #if USE_MAL_DEBUG >= 2
851
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
852
  #endif
853

854
  if(wait_completed) {
855
856
857
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL) && !mall->wait_targets_posted) {
      MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
      mall->wait_targets_posted = 1;
858
859
860
861
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
862
      async_communication_wait(req_completed, req_qty);
863
    }
864
865
866
    for(i=0; i<rep_a_data->entries; i++) {
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
867
      async_communication_wait(req_completed, req_qty);
868
    }
869
870

    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE); }
871
  } else {
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
    if(mall->wait_targets_posted) { 
      MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); 
    } else {
      for(i=0; i<dist_a_data->entries; i++) {
        req_completed = dist_a_data->requests[i];
        req_qty = dist_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }
      for(i=0; i<rep_a_data->entries; i++) {
        req_completed = rep_a_data->requests[i];
        req_qty = rep_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }

      if(local_completed && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); //TODO - Figure out if last process takes profit from calling here
      }
893
    }
894
895
896
897
898
899
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
900
901
  }

902
  #if USE_MAL_DEBUG >= 2
903
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
904
  #endif
905

906
907
908
909
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
910
    async_communication_end(req_completed, req_qty, &window);
911
  }
912
913
914
915
  for(i=0; i<rep_a_data->entries; i++) {
    req_completed = rep_a_data->requests[i];
    req_qty = rep_a_data->request_qty[i];
    window = rep_a_data->windows[i];
916
    async_communication_end(req_completed, req_qty, &window);
917
  }
918

919
920
921
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
922
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
923
  return MALL_USER_PENDING;
924
925
926
927
928
929
930
931
932
933
934
}

/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
935
  size_t i;
936
  int local_state;
937

938
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN);
939
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
940
941
942
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
943
    mall_conf->times->sync_start = MPI_Wtime();
944
945
946
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    for(i=0; i<rep_s_data->entries; i++) {
947
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
948
    } 
949
950
951
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
952
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
953
  }
iker_martin's avatar
iker_martin committed
954

955
  local_state = MALL_DIST_COMPLETED;
956
957
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->numP > mall->numC) { // Merge Shrink
    local_state = MALL_SPAWN_ADAPT_PENDING;
958
  }
959

960
  return local_state;
961
962
963
964
965
966
967
968
969
}

// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

970
971

int comm_state; //FIXME Usar un handler
972
973
974
975
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
976
  comm_state = MALL_DIST_PENDING;
977
978
979
980
981
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
982
  return comm_state;
983
984
985
986
987
988
989
990
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
991
int thread_check(int wait_completed) {
992
  int all_completed = 0;
993

994
995
996
997
998
999
1000
1001
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1002
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1003
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1004
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
1005
1006
1007
1008
1009
1010

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1011
1012
1013
1014

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1015
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1016
  return MALL_USER_PENDING;
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1028
void* thread_async_work() {
1029
1030
  size_t i;

1031
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1032
  for(i=0; i<rep_a_data->entries; i++) {
1033
    MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
1034
  } 
1035
  comm_state = MALL_DIST_COMPLETED;
1036
1037
  pthread_exit(NULL);
}
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
1050
1051
  MPI_Comm_get_name(*(mall->user_comm), test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
1052
1053
1054
1055
1056
1057
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
1058

1059
1060
1061
/*
 * Función solo necesaria en Merge
 */
1062
1063
1064
1065
1066
1067
1068
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1069
1070
1071
1072
1073
1074
1075
  MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}

/*
 * TODO Por hacer
 */
1076
void MAM_I_create_user_struct(int is_children_group) {
1077
1078
1079
  user_reconf->comm = mall->tmp_comm;

  if(is_children_group) {
1080
    user_reconf->rank_state = MAM_PROC_NEW_RANK;
iker_martin's avatar
iker_martin committed
1081
1082
1083
    user_reconf->numS = mall->num_parents;
    if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numP;
    else user_reconf->numT = mall->num_parents + mall->numP;
1084
1085
1086
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
1087
1088
    if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
    else user_reconf->rank_state = MAM_PROC_CONTINUE;
1089
1090
  }
}