malleabilityManager.c 41 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
//#include "malleabilityManager.h"
#include "MAM.h"
5
#include "malleabilityStates.h"
6
#include "malleabilityDataStructures.h"
7
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
8
#include "malleabilityZombies.h"
9
#include "malleabilityTimes.h"
10
#include "malleabilityRMS.h"
11
#include "MAM_Init_Configuration.h"
12
#include "spawn_methods/GenericSpawn.h"
13
14
15
16
17
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1

18
void MAM_Commit(int *mam_state);
19
20
21
22

void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

23

24
25
int MAM_St_rms(int *mam_state);
int MAM_St_spawn_start();
26
27
28
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
int MAM_St_red_pending(int *mam_state, int wait_completed);
29
int MAM_St_user_start(int *mam_state);
30
31
32
33
34
35
36
37
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);


38
void Children_init(void (*user_function)(void *), void *user_args);
39
40
int spawn_step();
int start_redistribution();
41
int check_redistribution(int wait_completed);
42
int end_redistribution();
iker_martin's avatar
iker_martin committed
43
int shrink_redistribution();
44
45

int thread_creation();
46
int thread_check(int wait_completed);
47
void* thread_async_work();
48

49
void print_comms_state();
50
void malleability_comms_update(MPI_Comm comm);
51

52
int MAM_I_convert_key(char *key);
53
void MAM_I_create_user_struct(int is_children_group);
54

55
56
57
58
59
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

60
61
mam_user_reconf_t *user_reconf;

62
/*
63
64
65
66
67
68
69
70
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
71
 */
72
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(void *), void *user_args) {
73
74
75
76
  MPI_Comm dup_comm, thread_comm;

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
77
78
79
80
81
82
83
84
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
85

86
87
88
89
90
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

91
92
93
94
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
95
96

  mall->root = root;
iker_martin's avatar
iker_martin committed
97
  mall->root_parents = root;
98
  mall->zombie = 0;
99
  mall->comm = dup_comm;
100
  mall->thread_comm = thread_comm;
101
102
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
103

104
  mall->name_exec = name_exec;
105
  mall->nodelist = NULL;
106
  mall->nodelist_len = 0;
107
108
109
110
111
112

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

113
  state = MALL_NOT_STARTED;
114

115
  MAM_Init_configuration();
116
  zombies_service_init();
117
  init_malleability_times();
118
  MAM_Def_main_datatype();
119

120
121
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
122
  if(mall->intercomm != MPI_COMM_NULL) { 
123
    Children_init(user_function, user_args);
124
    return MALLEABILITY_CHILDREN;
125
  }
126

127
  MAM_check_hosts();
128
  MAM_Set_initial_configuration();
iker_martin's avatar
iker_martin committed
129

130
131
132
133
134
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

135
  #if USE_MAL_DEBUG
136
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
137
138
  #endif

139
  return MALLEABILITY_NOT_CHILDREN;
140
141
}

142
143
144
145
146
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
147
void MAM_Finalize() {	  
148
149
150
151
152
153
154
155
156
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);
157
  if(mall->nodelist != NULL) free(mall->nodelist);
158

159
  MAM_Free_main_datatype();
160
  free_malleability_times();
161
162
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
163
  if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
164
165
  free(mall);
  free(mall_conf);
166
  free(user_reconf);
iker_martin's avatar
iker_martin committed
167
168
169
170

  zombies_awake();
  zombies_service_free();

171
  state = MALL_UNRESERVED;
172
173
}

174
175
/* 
 * TODO Reescribir
176
177
178
179
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
180
 *
181
182
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
183
184
 *
 */
185
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
186
  int call_checkpoint = 0;
187

188
  //TODO This could be changed to an array with the functions to call in each case
189
190
  switch(state) {
    case MALL_UNRESERVED:
191
      *mam_state = MAM_UNRESERVED;
192
193
      break;
    case MALL_NOT_STARTED:
194
195
196
197
      call_checkpoint = MAM_St_rms(mam_state);
      break;
    case MALL_RMS_COMPLETED:
      call_checkpoint = MAM_St_spawn_start();
198
      break;
199

200
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
201
    case MALL_SPAWN_SINGLE_PENDING:
202
      call_checkpoint = MAM_St_spawn_pending(wait_completed);
203
      break;
204

205
206
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
207
      call_checkpoint = MAM_St_red_start();
208
      break;
209

210
    case MALL_DIST_PENDING:
211
      call_checkpoint = MAM_St_red_pending(mam_state, wait_completed);
212
213
      break;

214
215
216
217
    case MALL_USER_START:
      call_checkpoint = MAM_St_user_start(mam_state);
      break;

218
219
    case MALL_USER_PENDING:
      call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
220
      break;
221

222
223
    case MALL_USER_COMPLETED:
      call_checkpoint = MAM_St_user_completed();
224
      break;
225

226
227
    case MALL_SPAWN_ADAPT_PENDING:
      call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
228
229
      break;

230
231
232
    case MALL_SPAWN_ADAPTED:
    case MALL_DIST_COMPLETED:
      call_checkpoint = MAM_St_completed(mam_state);
233
234
      break;
  }
235

236
237
  if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
  if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
238
239
240
  return state;
}

241
242
243
/*
 * TODO
 */
244
245
void MAM_Resume_redistribution(int *mam_state) {
  state = MALL_USER_COMPLETED;
246
  if(mam_state != NULL) *mam_state = MAM_PENDING;
247
248
249
250
251
}

/*
 * TODO
 */
252
void MAM_Commit(int *mam_state) {
253
  int zombies = 0;
254
  #if USE_MAL_DEBUG
255
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
256
257
  #endif

258
259
260
  // Get times before commiting
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    // This communication is only needed when a root process will become a zombie
261
    malleability_times_broadcast(mall->root_collectives);
262
  }
263

264
  // Free unneded communicators
265
266
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
267
268
269
270
271
272
273
274

  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    MPI_Allreduce(&mall->zombie, &zombies, 1, MPI_INT, MPI_MAX, mall->comm);
    if(zombies) {
      zombies_collect_suspended(mall->comm);
    }
  }
275

276
  // Zombies KILL
277
  if(mall->zombie) {
278
    #if USE_MAL_DEBUG >= 2
279
280
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
281
    MAM_Finalize();
282
    MPI_Finalize();
283
284
285
    exit(0);
  }

286
287
288
289
  // Reset/Free communicators
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge

iker_martin's avatar
iker_martin committed
290
291
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);
292
293
  mall->root = mall_conf->spawn_method == MALL_SPAWN_BASELINE ? mall->root : mall->root_parents;
  mall->root_parents = mall->root;
294
  state = MALL_NOT_STARTED;
295
  if(mam_state != NULL) *mam_state = MAM_COMPLETED;
296

297
  // Set new communicator
298
299
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
300
  #if USE_MAL_DEBUG
301
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
302
  #endif
303
304
305
306
307

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_end = MPI_Wtime();
308
309
}

310
/*
311
312
313
314
315
316
317
318
319
 * This function adds data to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a pointer to a size_t variable where the index of the added data will be stored
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * Finally, it updates the index with the index of the last added data if index is not NULL.
320
 */
321
322
void MAM_Data_add(void *data, size_t *index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
  size_t total_reqs = 0, returned_index;
323

324
  if(is_constant) { //Async
325
    if(is_replicated) {
326
      total_reqs = 1;
327
      add_data(data, total_qty, type, total_reqs, rep_a_data);
328
      returned_index = rep_a_data->entries-1;
329
    } else {
330
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
331
        total_reqs = 1;
332
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
333
        total_reqs = mall->numC;
334
      } 
335
336
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
337
      returned_index = dist_a_data->entries-1;
338
    }
339
  } else { //Sync
340
341
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
342
      returned_index = rep_s_data->entries-1;
343
344
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
345
      returned_index = dist_s_data->entries-1;
346
    }
347
  }
348
349

  if(index != NULL) *index = returned_index;
350
351
}

352
/*
353
354
355
356
357
358
359
360
 * This function modifies a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a value indicating which entry will be modified
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
361
 */
362
void MAM_Data_modify(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
363
364
  size_t total_reqs = 0;

365
366
  if(is_constant) {
    if(is_replicated) {
367
      total_reqs = 1;
368
369
370
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
371
        total_reqs = 1;
372
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
373
374
375
        total_reqs = mall->numC;
      }
      
376
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
377
    }
378
379
380
381
382
383
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
384
385
386
  }
}

387
/*
388
389
390
391
392
 * This functions returns how many data entries are available for one of the specific data structures.
 * It takes the following parameters:
 * - is_replicated: a flag indicating whether the structure is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - entries: a pointer where the amount of entries will be stored
393
 */
394
void MAM_Data_get_entries(int is_replicated, int is_constant, size_t *entries){
395
396
397
  
  if(is_constant) {
    if(is_replicated) {
398
      *entries = rep_a_data->entries;
399
    } else {
400
      *entries = dist_a_data->entries;
401
402
403
    }
  } else {
    if(is_replicated) {
404
      *entries = rep_s_data->entries;
405
    } else {
406
      *entries = dist_s_data->entries;
407
408
409
410
411
    }
  }
}

/*
412
413
414
415
416
417
418
419
 * This function returns a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - index: a value indicating which entry will be modified
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - data: a pointer where the data will be stored. The user must free it
 * - total_qty: the amount of elements in data for all ranks
 * - local_qty: the amount of elements in data for this rank
420
 */
421
void MAM_Data_get_pointer(void **data, size_t index, size_t *total_qty, MPI_Datatype *type, int is_replicated, int is_constant) {
422
423
424
425
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
426
      data_struct = rep_a_data;
427
    } else {
428
      data_struct = dist_a_data;
429
430
431
    }
  } else {
    if(is_replicated) {
432
      data_struct = rep_s_data;
433
    } else {
434
      data_struct = dist_s_data;
435
436
437
    }
  }

438
  *data = data_struct->arrays[index];
439
440
441
  *total_qty = data_struct->qty[index];
  *type = data_struct->types[index];
  //get_block_dist(qty, mall->myId, mall->numP, &dist_data); //FIXME Asegurar que numP es correcto
442
443
}

444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
/*
 * @brief Returns a structure to perform data redistribution during a reconfiguration.
 *
 * This function is intended to be called when the state of MaM is MALL_USER_PENDING only. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *   - mam_user_reconf_t *reconf_info: A pointer to a mam_user_reconf_t structure where the function will store the required information for data redistribution.
 *
 * Return Value:
 *   - MAM_OK: If the function successfully retrieves the reconfiguration information.
 *   - MALL_DENIED: If the function is called when the state of the MaM is not MALL_USER_PENDING.
 */
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return MAM_OK;
}

/*
 * @brief Returns the times used for the different steps of last reconfiguration.
 *
 * This function is intended to be called when a reconfiguration has ended. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *  - double *sp_time:   A pointer where the spawn time will be saved.
 *  - double *sy_time:   A pointer where the sychronous data redistribution time will be saved.
 *  - double *asy_time:  A pointer where the asychronous data redistribution time will be saved.
 *  - double *mall_time: A pointer where the malleability time will be saved.
 */
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
}
479
480
481
482
483
484
485
486
487
488
489
490
491

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
492
  size_t i;
493
  void *aux_send, *aux_recv;
494
495
496

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
497
      aux_send = data_struct->arrays[i];
498
      aux_recv = NULL;
499
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,  
500
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
501
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
502
503
504
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
505
      aux_send = data_struct->arrays[i];
506
      aux_recv = NULL;
507
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
508
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
509
510
511
512
513
514
515
516
517
518
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
519
  size_t i;
520
  void *aux, *aux_s = NULL;
521
522
523

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
524
      aux = data_struct->arrays[i];
525
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN,
526
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
527
      data_struct->arrays[i] = aux;
528
529
530
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
531
      aux = data_struct->arrays[i];
532
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
533
      data_struct->arrays[i] = aux;
534
535
536
537
    }
  }
}

538
539
540
541
542
543

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||
544
545
546
547
//======================================================||
//======================================================||
//======================================================||
//======================================================||
548

549
int MAM_St_rms(int *mam_state) {
550
551
552
553
554
  reset_malleability_times();
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_start = MPI_Wtime();
555
556
557
558
559
560
561

  *mam_state = MAM_NOT_STARTED;
  state = MALL_RMS_COMPLETED;
  MAM_Check_configuration();
  mall->wait_targets_posted = 0;

  //if(CHECK_RMS()) {return MALL_DENIED;}    
562
563
  return 1;
}
564

565
int MAM_St_spawn_start() {
566
  mall->num_parents = mall->numP;
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
  state = spawn_step();
  //FIXME Esto es necesario pero feo
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
  else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }

  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
    return 1;
  }
  return 0;
}

int MAM_St_spawn_pending(int wait_completed) {
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_red_start() {
591
592
593
594
595
596
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    mall->root_collectives = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    mall->root_collectives = mall->root;
  }

597
598
599
600
601
  state = start_redistribution();
  return 1;
}

int MAM_St_red_pending(int *mam_state, int wait_completed) {
602
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
603
604
605
606
607
608
    state = thread_check(wait_completed);
  } else {
    state = check_redistribution(wait_completed);
  }

  if(state != MALL_DIST_PENDING) { 
609
    state = MALL_USER_START;
610
611
612
613
614
    return 1;
  }
  return 0;
}

615
616
617
618
619
620
621
622
623
624
625
626
int MAM_St_user_start(int *mam_state) {
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  state = MALL_USER_PENDING;
  *mam_state = MAM_USER_PENDING;
  return 1;
}

627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
  #endif
  if(user_function != NULL) {
    MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
    do {
      user_function(user_args);
    } while(wait_completed && state == MALL_USER_PENDING);
  } else {
    MAM_Resume_redistribution(mam_state);
  }

  if(state != MALL_USER_PENDING) {
    #if USE_MAL_DEBUG
      if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
    #endif
    return 1;
  }
  return 0;
}

int MAM_St_user_completed() {
  state = end_redistribution();
  return 1;
}

int MAM_St_spawn_adapt_pending(int wait_completed) {
655
  wait_completed = 1;
656
657
658
659
660
661
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_start = MPI_Wtime();
  unset_spawn_postpone_flag(state);
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
662
663
/* TODO Comentar problema, basicamente indicar que no es posible de la forma actual
 * Ademas es solo para una operación que hemos visto como "extremadamente" rápida
664
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
665
666
667
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
668
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->spawn_start;
669
670
671
    return 1;
  }
  return 0;
672
673
674
675
676
677
  */
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->spawn_start;
  return 1;
678
679
680
}

int MAM_St_completed(int *mam_state) {
681
  MAM_Commit(mam_state);
682
683
684
685
  return 0;
}


686
687
688
689
690
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
691
692
693
694
//======================================================||
//======================================================||
//======================================================||
//======================================================||
695
696
697
698
699
700
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
701
void Children_init(void (*user_function)(void *), void *user_args) {
702
  size_t i;
703

704
  #if USE_MAL_DEBUG
705
    DEBUG_FUNC("MaM will now initialize spawned processes", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
706
707
  #endif

iker_martin's avatar
iker_martin committed
708
  malleability_connect_children(mall->comm, &(mall->intercomm));
709
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { // For Merge Method, these processes will be added
iker_martin's avatar
iker_martin committed
710
711
    MPI_Comm_rank(mall->intercomm, &mall->myId);
    MPI_Comm_size(mall->intercomm, &mall->numP);
712
  }
713
  mall->root_collectives = mall->root_parents;
714

715
  #if USE_MAL_DEBUG
716
    DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
717
718
  #endif

719
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
720
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
721
    #if USE_MAL_DEBUG >= 2
722
      DEBUG_FUNC("Spawned start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
723
    #endif
724
725
726
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
727

728
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
iker_martin's avatar
iker_martin committed
729
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
730
      for(i=0; i<rep_a_data->entries; i++) {
731
        MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
732
      } 
733
    } else {
iker_martin's avatar
iker_martin committed
734
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
735

736
      for(i=0; i<rep_a_data->entries; i++) {
737
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
738
      } 
739
      #if USE_MAL_DEBUG >= 2
740
        DEBUG_FUNC("Spawned started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
741
      #endif
742

743
      for(i=0; i<rep_a_data->entries; i++) {
744
        async_communication_wait(rep_a_data->requests[i], rep_a_data->request_qty[i]);
745
      }
746
      for(i=0; i<dist_a_data->entries; i++) {
747
        async_communication_wait(dist_a_data->requests[i], dist_a_data->request_qty[i]);
748
      }
749
750
751
752
753
754
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE);
      }

755
      #if USE_MAL_DEBUG >= 2
756
        DEBUG_FUNC("Spawned waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
757
      #endif
758
      for(i=0; i<dist_a_data->entries; i++) {
759
        async_communication_end(dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
760
      }
761
      for(i=0; i<rep_a_data->entries; i++) {
762
        async_communication_end(rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
763
      }
764
    }
765

766
767
768
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
769
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
770
  }
771
  #if USE_MAL_DEBUG
772
    DEBUG_FUNC("Spawned have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
773
  #endif
774

775
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
776
777
778
779
780
781
782
783
784
785
786
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
    user_function(user_args);
  }

787
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN);
788
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
789
790
791
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
iker_martin's avatar
iker_martin committed
792
    recv_data(mall->num_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
793
794

    for(i=0; i<rep_s_data->entries; i++) {
795
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
796
    } 
797
798
799
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
800
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
801
  }
802
  #if USE_MAL_DEBUG
803
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
804
  #endif
805

806
  MAM_Commit(NULL);
807

808
  #if USE_MAL_DEBUG
809
    DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
810
  #endif
811
812
813
814
815
816
817
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||
818
819
//======================================================||
//======================================================||
820
821
822
823
824
825

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
826
827
828
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
829
  mall_conf->times->spawn_start = MPI_Wtime();
830
 
iker_martin's avatar
iker_martin committed
831
  state = init_spawn(mall->thread_comm, &(mall->intercomm));
832

833
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
834
835
836
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
837
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
838
839
840
841
  }
  return state;
}

842

843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
858
  size_t i;
859

860
  if(mall->intercomm == MPI_COMM_NULL) {
861
862
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
863
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
864
  }
865

866
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN);
867
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
868
869
870
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
871
    mall_conf->times->async_start = MPI_Wtime();
872
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
873
874
875
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
876
      for(i=0; i<rep_a_data->entries; i++) {
877
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
878
      } 
879
880
881
882
883

      if(mall->zombie && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
      }
884
      return MALL_DIST_PENDING; 
885
886
    }
  } 
887
  return MALL_USER_START;
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
903
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
904
 */
905
int check_redistribution(int wait_completed) {
906
  int completed, local_completed, all_completed;
907
  size_t i, req_qty;
908
  MPI_Request *req_completed;
909
910
  MPI_Win window;
  local_completed = 1;
911
  #if USE_MAL_DEBUG >= 2
912
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
913
  #endif
914

915
  if(wait_completed) {
916
917
918
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL) && !mall->wait_targets_posted) {
      MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
      mall->wait_targets_posted = 1;
919
920
921
922
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
923
      async_communication_wait(req_completed, req_qty);
924
    }
925
926
927
    for(i=0; i<rep_a_data->entries; i++) {
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
928
      async_communication_wait(req_completed, req_qty);
929
    }
930
931

    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE); }
932
  } else {
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
    if(mall->wait_targets_posted) { 
      MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); 
    } else {
      for(i=0; i<dist_a_data->entries; i++) {
        req_completed = dist_a_data->requests[i];
        req_qty = dist_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }
      for(i=0; i<rep_a_data->entries; i++) {
        req_completed = rep_a_data->requests[i];
        req_qty = rep_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }

      if(local_completed && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); //TODO - Figure out if last process takes profit from calling here
      }
954
    }
955
956
957
958
959
960
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
961
962
  }

963
  #if USE_MAL_DEBUG >= 2
964
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
965
  #endif
966

967
968
969
970
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
971
    async_communication_end(req_completed, req_qty, &window);
972
  }
973
974
975
976
  for(i=0; i<rep_a_data->entries; i++) {
    req_completed = rep_a_data->requests[i];
    req_qty = rep_a_data->request_qty[i];
    window = rep_a_data->windows[i];
977
    async_communication_end(req_completed, req_qty, &window);
978
  }
979

980
981
982
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
983
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
984
  return MALL_USER_PENDING;
985
986
987
988
989
990
991
992
993
994
995
}

/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
996
  size_t i;
997
  int local_state;
998

999
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN);
1000
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
1001
1002
1003
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1004
    mall_conf->times->sync_start = MPI_Wtime();
1005
1006
1007
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    for(i=0; i<rep_s_data->entries; i++) {
1008
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
1009
1010
    }

1011
1012
1013
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1014
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
1015
  }
iker_martin's avatar
iker_martin committed
1016

1017
  local_state = MALL_DIST_COMPLETED;
1018
1019
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->numP > mall->numC) { // Merge Shrink
    local_state = MALL_SPAWN_ADAPT_PENDING;
1020
  }
1021

1022
  return local_state;
1023
1024
1025
1026
1027
1028
1029
1030
1031
}

// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

1032
1033

int comm_state; //FIXME Usar un handler
1034
1035
1036
1037
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
1038
  comm_state = MALL_DIST_PENDING;
1039
1040
1041
1042
1043
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
1044
  return comm_state;
1045
1046
1047
1048
1049
1050
1051
1052
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
1053
int thread_check(int wait_completed) {
1054
  int all_completed = 0;
1055

1056
1057
1058
1059
1060
1061
1062
1063
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1064
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1065
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1066
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
1067
1068
1069
1070
1071
1072

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1073
1074
1075
1076

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1077
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1078
  return MALL_USER_PENDING;
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1090
void* thread_async_work() {
1091
1092
  size_t i;

1093
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1094
  for(i=0; i<rep_a_data->entries; i++) {
1095
    MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
1096
  } 
1097
  comm_state = MALL_DIST_COMPLETED;
1098
1099
  pthread_exit(NULL);
}
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
1112
1113
  MPI_Comm_get_name(*(mall->user_comm), test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
1114
1115
1116
1117
1118
1119
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
1120

1121
1122
1123
/*
 * Función solo necesaria en Merge
 */
1124
1125
1126
1127
1128
1129
1130
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1131
1132
1133
1134
1135
1136
1137
  MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}

/*
 * TODO Por hacer
 */
1138
void MAM_I_create_user_struct(int is_children_group) {
1139
1140
1141
  user_reconf->comm = mall->tmp_comm;

  if(is_children_group) {
1142
    user_reconf->rank_state = MAM_PROC_NEW_RANK;
iker_martin's avatar
iker_martin committed
1143
    user_reconf->numS = mall->num_parents;
1144
    user_reconf->numT = mall->numP;
1145
1146
1147
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
1148
1149
    if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
    else user_reconf->rank_state = MAM_PROC_CONTINUE;
1150
1151
  }
}