"Results/DataRedist/Asynch/config124.ini" did not exist on "afb370032ef7447a057f6b43e9f796588d84e52c"
malleabilityManager.c 40.5 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
//#include "malleabilityManager.h"
#include "MAM.h"
5
#include "malleabilityStates.h"
6
#include "malleabilityDataStructures.h"
7
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
8
#include "malleabilityZombies.h"
9
#include "malleabilityTimes.h"
10
#include "malleabilityRMS.h"
11
#include "MAM_Init_Configuration.h"
12
#include "spawn_methods/GenericSpawn.h"
13
14
15
16
17
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1

18
void MAM_Commit(int *mam_state);
19
20
21
22

void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

23

24
25
int MAM_St_rms(int *mam_state);
int MAM_St_spawn_start();
26
27
28
29
30
31
32
33
34
35
36
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
int MAM_St_red_pending(int *mam_state, int wait_completed);
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);


37
void Children_init(void (*user_function)(void *), void *user_args);
38
39
int spawn_step();
int start_redistribution();
40
int check_redistribution(int wait_completed);
41
int end_redistribution();
iker_martin's avatar
iker_martin committed
42
int shrink_redistribution();
43
44

int thread_creation();
45
int thread_check(int wait_completed);
46
void* thread_async_work();
47

48
void print_comms_state();
49
void malleability_comms_update(MPI_Comm comm);
50

51
int MAM_I_convert_key(char *key);
52
void MAM_I_create_user_struct(int is_children_group);
53

54
55
56
57
58
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

59
60
mam_user_reconf_t *user_reconf;

61
/*
62
63
64
65
66
67
68
69
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
70
 */
71
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(void *), void *user_args) {
72
73
74
75
  MPI_Comm dup_comm, thread_comm;

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
76
77
78
79
80
81
82
83
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
84

85
86
87
88
89
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

90
91
92
93
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
94
95

  mall->root = root;
iker_martin's avatar
iker_martin committed
96
  mall->root_parents = root;
97
  mall->zombie = 0;
98
  mall->comm = dup_comm;
99
  mall->thread_comm = thread_comm;
100
101
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
102

103
  mall->name_exec = name_exec;
104
  mall->nodelist = NULL;
105
  mall->nodelist_len = 0;
106
107
108
109
110
111

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

112
  state = MALL_NOT_STARTED;
113

114
  MAM_Init_configuration();
115
  zombies_service_init();
116
  init_malleability_times();
117
  MAM_Def_main_datatype();
118

119
120
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
121
  if(mall->intercomm != MPI_COMM_NULL) { 
122
    Children_init(user_function, user_args);
123
    return MALLEABILITY_CHILDREN;
124
  }
125

126
  MAM_check_hosts();
127
  MAM_Set_initial_configuration();
iker_martin's avatar
iker_martin committed
128

129
130
131
132
133
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

134
  #if USE_MAL_DEBUG
135
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
136
137
  #endif

138
  return MALLEABILITY_NOT_CHILDREN;
139
140
}

141
142
143
144
145
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
146
void MAM_Finalize() {	  
147
148
149
150
151
152
153
154
155
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);
156
  if(mall->nodelist != NULL) free(mall->nodelist);
157

158
  MAM_Free_main_datatype();
159
  free_malleability_times();
160
161
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
162
  if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
163
164
  free(mall);
  free(mall_conf);
165
  free(user_reconf);
iker_martin's avatar
iker_martin committed
166
167
168
169

  zombies_awake();
  zombies_service_free();

170
  state = MALL_UNRESERVED;
171
172
}

173
174
/* 
 * TODO Reescribir
175
176
177
178
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
179
 *
180
181
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
182
183
 *
 */
184
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
185
  int call_checkpoint = 0;
186

187
  //TODO This could be changed to an array with the functions to call in each case
188
189
  switch(state) {
    case MALL_UNRESERVED:
190
      *mam_state = MAM_UNRESERVED;
191
192
      break;
    case MALL_NOT_STARTED:
193
194
195
196
      call_checkpoint = MAM_St_rms(mam_state);
      break;
    case MALL_RMS_COMPLETED:
      call_checkpoint = MAM_St_spawn_start();
197
      break;
198

199
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
200
    case MALL_SPAWN_SINGLE_PENDING:
201
      call_checkpoint = MAM_St_spawn_pending(wait_completed);
202
      break;
203

204
205
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
206
      call_checkpoint = MAM_St_red_start();
207
      break;
208

209
    case MALL_DIST_PENDING:
210
      call_checkpoint = MAM_St_red_pending(mam_state, wait_completed);
211
212
      break;

213
214
    case MALL_USER_PENDING:
      call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
215
      break;
216

217
218
    case MALL_USER_COMPLETED:
      call_checkpoint = MAM_St_user_completed();
219
      break;
220

221
222
    case MALL_SPAWN_ADAPT_PENDING:
      call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
223
224
      break;

225
226
227
    case MALL_SPAWN_ADAPTED:
    case MALL_DIST_COMPLETED:
      call_checkpoint = MAM_St_completed(mam_state);
228
229
      break;
  }
230

231
232
  if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
  if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
233
234
235
  return state;
}

236
237
238
/*
 * TODO
 */
239
240
241
242
243
244
245
246
void MAM_Resume_redistribution(int *mam_state) {
  state = MALL_USER_COMPLETED;
  *mam_state = MAM_PENDING;
}

/*
 * TODO
 */
247
void MAM_Commit(int *mam_state) {
248
  int zombies = 0;
249
  #if USE_MAL_DEBUG
250
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
251
252
  #endif

253
254
255
  // Get times before commiting
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    // This communication is only needed when a root process will become a zombie
256
    malleability_times_broadcast(mall->root_collectives);
257
  }
258

259
  // Free unneded communicators
260
261
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
262
263
264
265
266
267
268
269

  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    MPI_Allreduce(&mall->zombie, &zombies, 1, MPI_INT, MPI_MAX, mall->comm);
    if(zombies) {
      zombies_collect_suspended(mall->comm);
    }
  }
270

271
  // Zombies KILL
272
  if(mall->zombie) {
273
    #if USE_MAL_DEBUG >= 2
274
275
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
276
    MAM_Finalize();
277
    MPI_Finalize();
278
279
280
    exit(0);
  }

281
282
283
284
  // Reset/Free communicators
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge

iker_martin's avatar
iker_martin committed
285
286
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);
287
288
  mall->root = mall_conf->spawn_method == MALL_SPAWN_BASELINE ? mall->root : mall->root_parents;
  mall->root_parents = mall->root;
289
  state = MALL_NOT_STARTED;
290
  if(mam_state != NULL) *mam_state = MAM_COMPLETED;
291

292
  // Set new communicator
293
294
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
295
  #if USE_MAL_DEBUG
296
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
297
  #endif
298
299
300
301
302

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_end = MPI_Wtime();
303
304
}

305
/*
306
307
308
309
310
311
312
313
314
 * This function adds data to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a pointer to a size_t variable where the index of the added data will be stored
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * Finally, it updates the index with the index of the last added data if index is not NULL.
315
 */
316
317
void MAM_Data_add(void *data, size_t *index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
  size_t total_reqs = 0, returned_index;
318

319
  if(is_constant) { //Async
320
    if(is_replicated) {
321
      total_reqs = 1;
322
      add_data(data, total_qty, type, total_reqs, rep_a_data);
323
      returned_index = rep_a_data->entries-1;
324
    } else {
325
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
326
        total_reqs = 1;
327
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
328
        total_reqs = mall->numC;
329
      } 
330
331
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
332
      returned_index = dist_a_data->entries-1;
333
    }
334
  } else { //Sync
335
336
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
337
      returned_index = rep_s_data->entries-1;
338
339
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
340
      returned_index = dist_s_data->entries-1;
341
    }
342
  }
343
344

  if(index != NULL) *index = returned_index;
345
346
}

347
/*
348
349
350
351
352
353
354
355
 * This function modifies a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a value indicating which entry will be modified
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
356
 */
357
void MAM_Data_modify(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
358
359
  size_t total_reqs = 0;

360
361
  if(is_constant) {
    if(is_replicated) {
362
      total_reqs = 1;
363
364
365
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
366
        total_reqs = 1;
367
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
368
369
370
        total_reqs = mall->numC;
      }
      
371
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
372
    }
373
374
375
376
377
378
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
379
380
381
  }
}

382
/*
383
384
385
386
387
 * This functions returns how many data entries are available for one of the specific data structures.
 * It takes the following parameters:
 * - is_replicated: a flag indicating whether the structure is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - entries: a pointer where the amount of entries will be stored
388
 */
389
void MAM_Data_get_entries(int is_replicated, int is_constant, size_t *entries){
390
391
392
  
  if(is_constant) {
    if(is_replicated) {
393
      *entries = rep_a_data->entries;
394
    } else {
395
      *entries = dist_a_data->entries;
396
397
398
    }
  } else {
    if(is_replicated) {
399
      *entries = rep_s_data->entries;
400
    } else {
401
      *entries = dist_s_data->entries;
402
403
404
405
406
    }
  }
}

/*
407
408
409
410
411
412
413
414
 * This function returns a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - index: a value indicating which entry will be modified
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - data: a pointer where the data will be stored. The user must free it
 * - total_qty: the amount of elements in data for all ranks
 * - local_qty: the amount of elements in data for this rank
415
 */
416
void MAM_Data_get_pointer(void **data, size_t index, size_t *total_qty, MPI_Datatype *type, int is_replicated, int is_constant) {
417
418
419
420
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
421
      data_struct = rep_a_data;
422
    } else {
423
      data_struct = dist_a_data;
424
425
426
    }
  } else {
    if(is_replicated) {
427
      data_struct = rep_s_data;
428
    } else {
429
      data_struct = dist_s_data;
430
431
432
    }
  }

433
  *data = data_struct->arrays[index];
434
435
436
  *total_qty = data_struct->qty[index];
  *type = data_struct->types[index];
  //get_block_dist(qty, mall->myId, mall->numP, &dist_data); //FIXME Asegurar que numP es correcto
437
438
}

439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
/*
 * @brief Returns a structure to perform data redistribution during a reconfiguration.
 *
 * This function is intended to be called when the state of MaM is MALL_USER_PENDING only. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *   - mam_user_reconf_t *reconf_info: A pointer to a mam_user_reconf_t structure where the function will store the required information for data redistribution.
 *
 * Return Value:
 *   - MAM_OK: If the function successfully retrieves the reconfiguration information.
 *   - MALL_DENIED: If the function is called when the state of the MaM is not MALL_USER_PENDING.
 */
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return MAM_OK;
}

/*
 * @brief Returns the times used for the different steps of last reconfiguration.
 *
 * This function is intended to be called when a reconfiguration has ended. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *  - double *sp_time:   A pointer where the spawn time will be saved.
 *  - double *sy_time:   A pointer where the sychronous data redistribution time will be saved.
 *  - double *asy_time:  A pointer where the asychronous data redistribution time will be saved.
 *  - double *mall_time: A pointer where the malleability time will be saved.
 */
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
}
474
475
476
477
478
479
480
481
482
483
484
485
486

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
487
  size_t i;
488
  void *aux_send, *aux_recv;
489
490
491

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
492
      aux_send = data_struct->arrays[i];
493
      aux_recv = NULL;
494
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,  
495
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
496
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
497
498
499
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
500
      aux_send = data_struct->arrays[i];
501
      aux_recv = NULL;
502
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
503
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
504
505
506
507
508
509
510
511
512
513
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
514
  size_t i;
515
  void *aux, *aux_s = NULL;
516
517
518

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
519
      aux = data_struct->arrays[i];
520
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN,
521
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
522
      data_struct->arrays[i] = aux;
523
524
525
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
526
      aux = data_struct->arrays[i];
527
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
528
      data_struct->arrays[i] = aux;
529
530
531
532
    }
  }
}

533
534
535
536
537
538

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||
539
540
541
542
//======================================================||
//======================================================||
//======================================================||
//======================================================||
543

544
int MAM_St_rms(int *mam_state) {
545
546
547
548
549
  reset_malleability_times();
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_start = MPI_Wtime();
550
551
552
553
554
555
556

  *mam_state = MAM_NOT_STARTED;
  state = MALL_RMS_COMPLETED;
  MAM_Check_configuration();
  mall->wait_targets_posted = 0;

  //if(CHECK_RMS()) {return MALL_DENIED;}    
557
558
  return 1;
}
559

560
int MAM_St_spawn_start() {
561
  mall->num_parents = mall->numP;
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
  state = spawn_step();
  //FIXME Esto es necesario pero feo
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
  else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }

  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
    return 1;
  }
  return 0;
}

int MAM_St_spawn_pending(int wait_completed) {
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_red_start() {
586
587
588
589
590
591
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    mall->root_collectives = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    mall->root_collectives = mall->root;
  }

592
593
594
595
596
  state = start_redistribution();
  return 1;
}

int MAM_St_red_pending(int *mam_state, int wait_completed) {
597
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
598
599
600
601
602
603
    state = thread_check(wait_completed);
  } else {
    state = check_redistribution(wait_completed);
  }

  if(state != MALL_DIST_PENDING) { 
604
    if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
      MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
    } else {
      MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
    }
    MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
    state = MALL_USER_PENDING;
    *mam_state = MAM_USER_PENDING;
    return 1;
  }
  return 0;
}

int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
  #endif
  if(user_function != NULL) {
    MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
    do {
      user_function(user_args);
    } while(wait_completed && state == MALL_USER_PENDING);
  } else {
    MAM_Resume_redistribution(mam_state);
  }

  if(state != MALL_USER_PENDING) {
    #if USE_MAL_DEBUG
      if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
    #endif
    return 1;
  }
  return 0;
}

int MAM_St_user_completed() {
  state = end_redistribution();
  return 1;
}

int MAM_St_spawn_adapt_pending(int wait_completed) {
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_start = MPI_Wtime();
  unset_spawn_postpone_flag(state);
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);

652
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
653
654
655
656
657
658
659
660
661
662
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_completed(int *mam_state) {
663
  MAM_Commit(mam_state);
664
665
666
667
  return 0;
}


668
669
670
671
672
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
673
674
675
676
//======================================================||
//======================================================||
//======================================================||
//======================================================||
677
678
679
680
681
682
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
683
void Children_init(void (*user_function)(void *), void *user_args) {
684
  size_t i;
685

686
  #if USE_MAL_DEBUG
687
    DEBUG_FUNC("MaM will now initialize spawned processes", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
688
689
  #endif

iker_martin's avatar
iker_martin committed
690
  malleability_connect_children(mall->comm, &(mall->intercomm));
691
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { // For Merge Method, these processes will be added
iker_martin's avatar
iker_martin committed
692
693
    MPI_Comm_rank(mall->intercomm, &mall->myId);
    MPI_Comm_size(mall->intercomm, &mall->numP);
694
  }
695
  mall->root_collectives = mall->root_parents;
696

697
  #if USE_MAL_DEBUG
698
    DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
699
700
  #endif

701
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
702
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
703
    #if USE_MAL_DEBUG >= 2
704
      DEBUG_FUNC("Spawned start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
705
    #endif
706
707
708
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
709

710
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
iker_martin's avatar
iker_martin committed
711
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
712
      for(i=0; i<rep_a_data->entries; i++) {
713
        MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
714
      } 
715
    } else {
iker_martin's avatar
iker_martin committed
716
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
717

718
      for(i=0; i<rep_a_data->entries; i++) {
719
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
720
      } 
721
      #if USE_MAL_DEBUG >= 2
722
        DEBUG_FUNC("Spawned started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
723
      #endif
724

725
      for(i=0; i<rep_a_data->entries; i++) {
726
        async_communication_wait(rep_a_data->requests[i], rep_a_data->request_qty[i]);
727
      }
728
      for(i=0; i<dist_a_data->entries; i++) {
729
        async_communication_wait(dist_a_data->requests[i], dist_a_data->request_qty[i]);
730
      }
731
732
733
734
735
736
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE);
      }

737
      #if USE_MAL_DEBUG >= 2
738
        DEBUG_FUNC("Spawned waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
739
      #endif
740
      for(i=0; i<dist_a_data->entries; i++) {
741
        async_communication_end(dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
742
      }
743
      for(i=0; i<rep_a_data->entries; i++) {
744
        async_communication_end(rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
745
      }
746
    }
747

748
749
750
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
751
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
752
  }
753
  #if USE_MAL_DEBUG
754
    DEBUG_FUNC("Spawned have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
755
  #endif
756

757
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
758
759
760
761
762
763
764
765
766
767
768
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
    user_function(user_args);
  }

769
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN);
770
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
771
772
773
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
iker_martin's avatar
iker_martin committed
774
    recv_data(mall->num_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
775
776

    for(i=0; i<rep_s_data->entries; i++) {
777
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
778
    } 
779
780
781
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
782
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
783
  }
784
  #if USE_MAL_DEBUG
785
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
786
  #endif
787

788
  MAM_Commit(NULL);
789

790
  #if USE_MAL_DEBUG
791
    DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
792
  #endif
793
794
795
796
797
798
799
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||
800
801
//======================================================||
//======================================================||
802
803
804
805
806
807

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
808
809
810
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
811
  mall_conf->times->spawn_start = MPI_Wtime();
812
 
iker_martin's avatar
iker_martin committed
813
  state = init_spawn(mall->thread_comm, &(mall->intercomm));
814

815
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
816
817
818
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
819
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
820
821
822
823
  }
  return state;
}

824

825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
840
  size_t i;
841

842
  if(mall->intercomm == MPI_COMM_NULL) {
843
844
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
845
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
846
  }
847

848
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN);
849
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
850
851
852
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
853
    mall_conf->times->async_start = MPI_Wtime();
854
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
855
856
857
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
858
      for(i=0; i<rep_a_data->entries; i++) {
859
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
860
      } 
861
862
863
864
865

      if(mall->zombie && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
      }
866
      return MALL_DIST_PENDING; 
867
868
    }
  } 
869
  return MALL_USER_PENDING;
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
885
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
886
 */
887
int check_redistribution(int wait_completed) {
888
  int completed, local_completed, all_completed;
889
  size_t i, req_qty;
890
  MPI_Request *req_completed;
891
892
  MPI_Win window;
  local_completed = 1;
893
  #if USE_MAL_DEBUG >= 2
894
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
895
  #endif
896

897
  if(wait_completed) {
898
899
900
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL) && !mall->wait_targets_posted) {
      MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
      mall->wait_targets_posted = 1;
901
902
903
904
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
905
      async_communication_wait(req_completed, req_qty);
906
    }
907
908
909
    for(i=0; i<rep_a_data->entries; i++) {
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
910
      async_communication_wait(req_completed, req_qty);
911
    }
912
913

    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE); }
914
  } else {
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
    if(mall->wait_targets_posted) { 
      MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); 
    } else {
      for(i=0; i<dist_a_data->entries; i++) {
        req_completed = dist_a_data->requests[i];
        req_qty = dist_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }
      for(i=0; i<rep_a_data->entries; i++) {
        req_completed = rep_a_data->requests[i];
        req_qty = rep_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }

      if(local_completed && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); //TODO - Figure out if last process takes profit from calling here
      }
936
    }
937
938
939
940
941
942
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
943
944
  }

945
  #if USE_MAL_DEBUG >= 2
946
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
947
  #endif
948

949
950
951
952
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
953
    async_communication_end(req_completed, req_qty, &window);
954
  }
955
956
957
958
  for(i=0; i<rep_a_data->entries; i++) {
    req_completed = rep_a_data->requests[i];
    req_qty = rep_a_data->request_qty[i];
    window = rep_a_data->windows[i];
959
    async_communication_end(req_completed, req_qty, &window);
960
  }
961

962
963
964
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
965
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
966
  return MALL_USER_PENDING;
967
968
969
970
971
972
973
974
975
976
977
}

/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
978
  size_t i;
979
  int local_state;
980

981
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN);
982
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
983
984
985
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
986
    mall_conf->times->sync_start = MPI_Wtime();
987
988
989
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    for(i=0; i<rep_s_data->entries; i++) {
990
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
991
992
    }

993
994
995
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
996
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
997
  }
iker_martin's avatar
iker_martin committed
998

999
  local_state = MALL_DIST_COMPLETED;
1000
1001
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->numP > mall->numC) { // Merge Shrink
    local_state = MALL_SPAWN_ADAPT_PENDING;
1002
  }
1003

1004
  return local_state;
1005
1006
1007
1008
1009
1010
1011
1012
1013
}

// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

1014
1015

int comm_state; //FIXME Usar un handler
1016
1017
1018
1019
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
1020
  comm_state = MALL_DIST_PENDING;
1021
1022
1023
1024
1025
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
1026
  return comm_state;
1027
1028
1029
1030
1031
1032
1033
1034
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
1035
int thread_check(int wait_completed) {
1036
  int all_completed = 0;
1037

1038
1039
1040
1041
1042
1043
1044
1045
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1046
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1047
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1048
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
1049
1050
1051
1052
1053
1054

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1055
1056
1057
1058

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1059
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1060
  return MALL_USER_PENDING;
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1072
void* thread_async_work() {
1073
1074
  size_t i;

1075
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1076
  for(i=0; i<rep_a_data->entries; i++) {
1077
    MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
1078
  } 
1079
  comm_state = MALL_DIST_COMPLETED;
1080
1081
  pthread_exit(NULL);
}
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
1094
1095
  MPI_Comm_get_name(*(mall->user_comm), test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
1096
1097
1098
1099
1100
1101
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
1102

1103
1104
1105
/*
 * Función solo necesaria en Merge
 */
1106
1107
1108
1109
1110
1111
1112
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1113
1114
1115
1116
1117
1118
1119
  MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}

/*
 * TODO Por hacer
 */
1120
void MAM_I_create_user_struct(int is_children_group) {
1121
1122
1123
  user_reconf->comm = mall->tmp_comm;

  if(is_children_group) {
1124
    user_reconf->rank_state = MAM_PROC_NEW_RANK;
iker_martin's avatar
iker_martin committed
1125
1126
1127
    user_reconf->numS = mall->num_parents;
    if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numP;
    else user_reconf->numT = mall->num_parents + mall->numP;
1128
1129
1130
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
1131
1132
    if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
    else user_reconf->rank_state = MAM_PROC_CONTINUE;
1133
1134
  }
}