malleabilityManager.c 40.9 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
//#include "malleabilityManager.h"
#include "MAM.h"
5
#include "malleabilityStates.h"
6
#include "malleabilityDataStructures.h"
7
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
8
#include "malleabilityZombies.h"
9
#include "malleabilityTimes.h"
10
#include "malleabilityRMS.h"
11
#include "MAM_Init_Configuration.h"
12
#include "spawn_methods/GenericSpawn.h"
13
14
15
16
17
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1

18
void MAM_Commit(int *mam_state);
19
20
21
22

void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

23

24
25
int MAM_St_rms(int *mam_state);
int MAM_St_spawn_start();
26
27
28
29
30
31
32
33
34
35
36
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
int MAM_St_red_pending(int *mam_state, int wait_completed);
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);


37
void Children_init(void (*user_function)(void *), void *user_args);
38
39
int spawn_step();
int start_redistribution();
40
int check_redistribution(int wait_completed);
41
int end_redistribution();
iker_martin's avatar
iker_martin committed
42
int shrink_redistribution();
43
44

int thread_creation();
45
int thread_check(int wait_completed);
46
void* thread_async_work();
47

48
void print_comms_state();
49
void malleability_comms_update(MPI_Comm comm);
50

51
int MAM_I_convert_key(char *key);
52
void MAM_I_create_user_struct(int is_children_group);
53

54
55
56
57
58
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

59
60
mam_user_reconf_t *user_reconf;

61
/*
62
63
64
65
66
67
68
69
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
70
 */
71
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(void *), void *user_args) {
72
73
74
75
  MPI_Comm dup_comm, thread_comm;

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
76
77
78
79
80
81
82
83
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
84

85
86
87
88
89
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

90
91
92
93
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
94
95

  mall->root = root;
iker_martin's avatar
iker_martin committed
96
  mall->root_parents = root;
97
  mall->zombie = 0;
98
  mall->comm = dup_comm;
99
  mall->thread_comm = thread_comm;
100
101
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
102

103
  mall->name_exec = name_exec;
104
  mall->nodelist = NULL;
105
  mall->nodelist_len = 0;
106
107
108
109
110
111

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

112
  state = MALL_NOT_STARTED;
113

114
  MAM_Init_configuration();
115
  zombies_service_init();
116
  init_malleability_times();
117
  MAM_Def_main_datatype();
118

119
120
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
121
  if(mall->intercomm != MPI_COMM_NULL) { 
122
    Children_init(user_function, user_args);
123
    return MALLEABILITY_CHILDREN;
124
  }
125

126
  MAM_check_hosts();
127
  MAM_Set_initial_configuration();
iker_martin's avatar
iker_martin committed
128

129
130
131
132
133
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

134
  #if USE_MAL_DEBUG
135
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
136
137
  #endif

138
  return MALLEABILITY_NOT_CHILDREN;
139
140
}

141
142
143
144
145
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
146
void MAM_Finalize() {	  
147
148
149
150
151
152
153
154
155
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);
156
  if(mall->nodelist != NULL) free(mall->nodelist);
157

158
  MAM_Free_main_datatype();
159
  free_malleability_times();
160
161
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
162
  if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
163
164
  free(mall);
  free(mall_conf);
165
  free(user_reconf);
iker_martin's avatar
iker_martin committed
166
167
168
169

  zombies_awake();
  zombies_service_free();

170
  state = MALL_UNRESERVED;
171
172
}

173
174
/* 
 * TODO Reescribir
175
176
177
178
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
179
 *
180
181
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
182
183
 *
 */
184
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
185
  int call_checkpoint = 0;
186

187
  //TODO This could be changed to an array with the functions to call in each case
188
189
  switch(state) {
    case MALL_UNRESERVED:
190
      *mam_state = MAM_UNRESERVED;
191
192
      break;
    case MALL_NOT_STARTED:
193
194
195
196
      call_checkpoint = MAM_St_rms(mam_state);
      break;
    case MALL_RMS_COMPLETED:
      call_checkpoint = MAM_St_spawn_start();
197
      break;
198

199
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
200
    case MALL_SPAWN_SINGLE_PENDING:
201
      call_checkpoint = MAM_St_spawn_pending(wait_completed);
202
      break;
203

204
205
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
206
      call_checkpoint = MAM_St_red_start();
207
      break;
208

209
    case MALL_DIST_PENDING:
210
      call_checkpoint = MAM_St_red_pending(mam_state, wait_completed);
211
212
      break;

213
214
    case MALL_USER_PENDING:
      call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
215
      break;
216

217
218
    case MALL_USER_COMPLETED:
      call_checkpoint = MAM_St_user_completed();
219
      break;
220

221
222
    case MALL_SPAWN_ADAPT_PENDING:
      call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
223
224
      break;

225
226
227
    case MALL_SPAWN_ADAPTED:
    case MALL_DIST_COMPLETED:
      call_checkpoint = MAM_St_completed(mam_state);
228
229
      break;
  }
230

231
232
  if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
  if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
233
234
235
  return state;
}

236
237
238
/*
 * TODO
 */
239
240
241
242
243
244
245
246
void MAM_Resume_redistribution(int *mam_state) {
  state = MALL_USER_COMPLETED;
  *mam_state = MAM_PENDING;
}

/*
 * TODO
 */
247
void MAM_Commit(int *mam_state) {
248
  int zombies = 0;
249
  #if USE_MAL_DEBUG
250
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
251
252
  #endif

253
254
255
  // Get times before commiting
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    // This communication is only needed when a root process will become a zombie
256
    malleability_times_broadcast(mall->root_collectives);
257
  }
258

259
  // Free unneded communicators
260
261
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
262
263
264
265
266
267
268
269

  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    MPI_Allreduce(&mall->zombie, &zombies, 1, MPI_INT, MPI_MAX, mall->comm);
    if(zombies) {
      zombies_collect_suspended(mall->comm);
    }
  }
270

271
  // Zombies KILL
272
  if(mall->zombie) {
273
    #if USE_MAL_DEBUG >= 2
274
275
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
276
    MAM_Finalize();
277
    MPI_Finalize();
278
279
280
    exit(0);
  }

281
282
283
284
  // Reset/Free communicators
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge

iker_martin's avatar
iker_martin committed
285
286
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);
287
288
  mall->root = mall_conf->spawn_method == MALL_SPAWN_BASELINE ? mall->root : mall->root_parents;
  mall->root_parents = mall->root;
289
  state = MALL_NOT_STARTED;
290
  if(mam_state != NULL) *mam_state = MAM_COMPLETED;
291

292
  // Set new communicator
293
294
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
295
  #if USE_MAL_DEBUG
296
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
297
  #endif
298
299
300
301
302

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_end = MPI_Wtime();
303
304
}

305
/*
306
307
308
309
310
311
312
313
314
 * This function adds data to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a pointer to a size_t variable where the index of the added data will be stored
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * Finally, it updates the index with the index of the last added data if index is not NULL.
315
 */
316
317
void MAM_Data_add(void *data, size_t *index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
  size_t total_reqs = 0, returned_index;
318

319
  if(is_constant) { //Async
320
    if(is_replicated) {
321
      total_reqs = 1;
322
      add_data(data, total_qty, type, total_reqs, rep_a_data);
323
      returned_index = rep_a_data->entries-1;
324
    } else {
325
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
326
        total_reqs = 1;
327
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
328
        total_reqs = mall->numC;
329
      } 
330
331
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
332
      returned_index = dist_a_data->entries-1;
333
    }
334
  } else { //Sync
335
336
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
337
      returned_index = rep_s_data->entries-1;
338
339
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
340
      returned_index = dist_s_data->entries-1;
341
    }
342
  }
343
344

  if(index != NULL) *index = returned_index;
345
346
}

347
/*
348
349
350
351
352
353
354
355
 * This function modifies a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a value indicating which entry will be modified
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
356
 */
357
void MAM_Data_modify(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
358
359
  size_t total_reqs = 0;

360
361
  if(is_constant) {
    if(is_replicated) {
362
      total_reqs = 1;
363
364
365
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
366
        total_reqs = 1;
367
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
368
369
370
        total_reqs = mall->numC;
      }
      
371
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
372
    }
373
374
375
376
377
378
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
379
380
381
  }
}

382
/*
383
384
385
386
387
 * This functions returns how many data entries are available for one of the specific data structures.
 * It takes the following parameters:
 * - is_replicated: a flag indicating whether the structure is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - entries: a pointer where the amount of entries will be stored
388
 */
389
void MAM_Data_get_entries(int is_replicated, int is_constant, size_t *entries){
390
391
392
  
  if(is_constant) {
    if(is_replicated) {
393
      *entries = rep_a_data->entries;
394
    } else {
395
      *entries = dist_a_data->entries;
396
397
398
    }
  } else {
    if(is_replicated) {
399
      *entries = rep_s_data->entries;
400
    } else {
401
      *entries = dist_s_data->entries;
402
403
404
405
406
    }
  }
}

/*
407
408
409
410
411
412
413
414
 * This function returns a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - index: a value indicating which entry will be modified
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - data: a pointer where the data will be stored. The user must free it
 * - total_qty: the amount of elements in data for all ranks
 * - local_qty: the amount of elements in data for this rank
415
 */
416
void MAM_Data_get_pointer(void **data, size_t index, size_t *total_qty, MPI_Datatype *type, int is_replicated, int is_constant) {
417
418
419
420
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
421
      data_struct = rep_a_data;
422
    } else {
423
      data_struct = dist_a_data;
424
425
426
    }
  } else {
    if(is_replicated) {
427
      data_struct = rep_s_data;
428
    } else {
429
      data_struct = dist_s_data;
430
431
432
    }
  }

433
  *data = data_struct->arrays[index];
434
435
436
  *total_qty = data_struct->qty[index];
  *type = data_struct->types[index];
  //get_block_dist(qty, mall->myId, mall->numP, &dist_data); //FIXME Asegurar que numP es correcto
437
438
}

439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
/*
 * @brief Returns a structure to perform data redistribution during a reconfiguration.
 *
 * This function is intended to be called when the state of MaM is MALL_USER_PENDING only. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *   - mam_user_reconf_t *reconf_info: A pointer to a mam_user_reconf_t structure where the function will store the required information for data redistribution.
 *
 * Return Value:
 *   - MAM_OK: If the function successfully retrieves the reconfiguration information.
 *   - MALL_DENIED: If the function is called when the state of the MaM is not MALL_USER_PENDING.
 */
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return MAM_OK;
}

/*
 * @brief Returns the times used for the different steps of last reconfiguration.
 *
 * This function is intended to be called when a reconfiguration has ended. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *  - double *sp_time:   A pointer where the spawn time will be saved.
 *  - double *sy_time:   A pointer where the sychronous data redistribution time will be saved.
 *  - double *asy_time:  A pointer where the asychronous data redistribution time will be saved.
 *  - double *mall_time: A pointer where the malleability time will be saved.
 */
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
}
474
475
476
477
478
479
480
481
482
483
484
485
486

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
487
  size_t i;
488
  void *aux_send, *aux_recv;
489
490
491

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
492
      aux_send = data_struct->arrays[i];
493
      aux_recv = NULL;
494
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,  
495
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
496
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
497
498
499
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
500
      aux_send = data_struct->arrays[i];
501
      aux_recv = NULL;
502
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
503
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
504
505
506
507
508
509
510
511
512
513
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
514
  size_t i;
515
  void *aux, *aux_s = NULL;
516
517
518

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
519
      aux = data_struct->arrays[i];
520
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN,
521
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
522
      data_struct->arrays[i] = aux;
523
524
525
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
526
      aux = data_struct->arrays[i];
527
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
528
      data_struct->arrays[i] = aux;
529
530
531
532
    }
  }
}

533
534
535
536
537
538

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||
539
540
541
542
//======================================================||
//======================================================||
//======================================================||
//======================================================||
543

544
int MAM_St_rms(int *mam_state) {
545
546
547
548
549
  reset_malleability_times();
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_start = MPI_Wtime();
550
551
552
553
554
555
556

  *mam_state = MAM_NOT_STARTED;
  state = MALL_RMS_COMPLETED;
  MAM_Check_configuration();
  mall->wait_targets_posted = 0;

  //if(CHECK_RMS()) {return MALL_DENIED;}    
557
558
  return 1;
}
559

560
int MAM_St_spawn_start() {
561
  mall->num_parents = mall->numP;
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
  state = spawn_step();
  //FIXME Esto es necesario pero feo
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
  else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }

  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
    return 1;
  }
  return 0;
}

int MAM_St_spawn_pending(int wait_completed) {
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_red_start() {
586
587
588
589
590
591
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    mall->root_collectives = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    mall->root_collectives = mall->root;
  }

592
593
594
595
596
  state = start_redistribution();
  return 1;
}

int MAM_St_red_pending(int *mam_state, int wait_completed) {
597
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
598
599
600
601
602
603
    state = thread_check(wait_completed);
  } else {
    state = check_redistribution(wait_completed);
  }

  if(state != MALL_DIST_PENDING) { 
604
    if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
      MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
    } else {
      MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
    }
    MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
    state = MALL_USER_PENDING;
    *mam_state = MAM_USER_PENDING;
    return 1;
  }
  return 0;
}

int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
  #endif
  if(user_function != NULL) {
    MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
    do {
      user_function(user_args);
    } while(wait_completed && state == MALL_USER_PENDING);
  } else {
    MAM_Resume_redistribution(mam_state);
  }

  if(state != MALL_USER_PENDING) {
    #if USE_MAL_DEBUG
      if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
    #endif
    return 1;
  }
  return 0;
}

int MAM_St_user_completed() {
  state = end_redistribution();
  return 1;
}

int MAM_St_spawn_adapt_pending(int wait_completed) {
645
  wait_completed = 1;
646
647
648
649
650
651
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_start = MPI_Wtime();
  unset_spawn_postpone_flag(state);
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
652
653
/* TODO Comentar problema, basicamente indicar que no es posible de la forma actual
 * Ademas es solo para una operación que hemos visto como "extremadamente" rápida
654
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
655
656
657
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
658
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->spawn_start;
659
660
661
    return 1;
  }
  return 0;
662
663
664
665
666
667
  */
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->spawn_start;
  return 1;
668
669
670
}

int MAM_St_completed(int *mam_state) {
671
  MAM_Commit(mam_state);
672
673
674
675
  return 0;
}


676
677
678
679
680
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
681
682
683
684
//======================================================||
//======================================================||
//======================================================||
//======================================================||
685
686
687
688
689
690
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
691
void Children_init(void (*user_function)(void *), void *user_args) {
692
  size_t i;
693

694
  #if USE_MAL_DEBUG
695
    DEBUG_FUNC("MaM will now initialize spawned processes", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
696
697
  #endif

iker_martin's avatar
iker_martin committed
698
  malleability_connect_children(mall->comm, &(mall->intercomm));
699
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { // For Merge Method, these processes will be added
iker_martin's avatar
iker_martin committed
700
701
    MPI_Comm_rank(mall->intercomm, &mall->myId);
    MPI_Comm_size(mall->intercomm, &mall->numP);
702
  }
703
  mall->root_collectives = mall->root_parents;
704

705
  #if USE_MAL_DEBUG
706
    DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
707
708
  #endif

709
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
710
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
711
    #if USE_MAL_DEBUG >= 2
712
      DEBUG_FUNC("Spawned start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
713
    #endif
714
715
716
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
717

718
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
iker_martin's avatar
iker_martin committed
719
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
720
      for(i=0; i<rep_a_data->entries; i++) {
721
        MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
722
      } 
723
    } else {
iker_martin's avatar
iker_martin committed
724
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
725

726
      for(i=0; i<rep_a_data->entries; i++) {
727
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
728
      } 
729
      #if USE_MAL_DEBUG >= 2
730
        DEBUG_FUNC("Spawned started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
731
      #endif
732

733
      for(i=0; i<rep_a_data->entries; i++) {
734
        async_communication_wait(rep_a_data->requests[i], rep_a_data->request_qty[i]);
735
      }
736
      for(i=0; i<dist_a_data->entries; i++) {
737
        async_communication_wait(dist_a_data->requests[i], dist_a_data->request_qty[i]);
738
      }
739
740
741
742
743
744
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE);
      }

745
      #if USE_MAL_DEBUG >= 2
746
        DEBUG_FUNC("Spawned waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
747
      #endif
748
      for(i=0; i<dist_a_data->entries; i++) {
749
        async_communication_end(dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
750
      }
751
      for(i=0; i<rep_a_data->entries; i++) {
752
        async_communication_end(rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
753
      }
754
    }
755

756
757
758
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
759
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
760
  }
761
  #if USE_MAL_DEBUG
762
    DEBUG_FUNC("Spawned have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
763
  #endif
764

765
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
766
767
768
769
770
771
772
773
774
775
776
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
    user_function(user_args);
  }

777
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN);
778
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
779
780
781
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
iker_martin's avatar
iker_martin committed
782
    recv_data(mall->num_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
783
784

    for(i=0; i<rep_s_data->entries; i++) {
785
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
786
    } 
787
788
789
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
790
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
791
  }
792
  #if USE_MAL_DEBUG
793
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
794
  #endif
795

796
  MAM_Commit(NULL);
797

798
  #if USE_MAL_DEBUG
799
    DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
800
  #endif
801
802
803
804
805
806
807
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||
808
809
//======================================================||
//======================================================||
810
811
812
813
814
815

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
816
817
818
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
819
  mall_conf->times->spawn_start = MPI_Wtime();
820
 
iker_martin's avatar
iker_martin committed
821
  state = init_spawn(mall->thread_comm, &(mall->intercomm));
822

823
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
824
825
826
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
827
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
828
829
830
831
  }
  return state;
}

832

833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
848
  size_t i;
849

850
  if(mall->intercomm == MPI_COMM_NULL) {
851
852
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
853
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
854
  }
855

856
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN);
857
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
858
859
860
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
861
    mall_conf->times->async_start = MPI_Wtime();
862
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
863
864
865
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
866
      for(i=0; i<rep_a_data->entries; i++) {
867
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
868
      } 
869
870
871
872
873

      if(mall->zombie && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
      }
874
      return MALL_DIST_PENDING; 
875
876
    }
  } 
877
  return MALL_USER_PENDING;
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
893
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
894
 */
895
int check_redistribution(int wait_completed) {
896
  int completed, local_completed, all_completed;
897
  size_t i, req_qty;
898
  MPI_Request *req_completed;
899
900
  MPI_Win window;
  local_completed = 1;
901
  #if USE_MAL_DEBUG >= 2
902
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
903
  #endif
904

905
  if(wait_completed) {
906
907
908
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL) && !mall->wait_targets_posted) {
      MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
      mall->wait_targets_posted = 1;
909
910
911
912
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
913
      async_communication_wait(req_completed, req_qty);
914
    }
915
916
917
    for(i=0; i<rep_a_data->entries; i++) {
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
918
      async_communication_wait(req_completed, req_qty);
919
    }
920
921

    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE); }
922
  } else {
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
    if(mall->wait_targets_posted) { 
      MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); 
    } else {
      for(i=0; i<dist_a_data->entries; i++) {
        req_completed = dist_a_data->requests[i];
        req_qty = dist_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }
      for(i=0; i<rep_a_data->entries; i++) {
        req_completed = rep_a_data->requests[i];
        req_qty = rep_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }

      if(local_completed && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); //TODO - Figure out if last process takes profit from calling here
      }
944
    }
945
946
947
948
949
950
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
951
952
  }

953
  #if USE_MAL_DEBUG >= 2
954
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
955
  #endif
956

957
958
959
960
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
961
    async_communication_end(req_completed, req_qty, &window);
962
  }
963
964
965
966
  for(i=0; i<rep_a_data->entries; i++) {
    req_completed = rep_a_data->requests[i];
    req_qty = rep_a_data->request_qty[i];
    window = rep_a_data->windows[i];
967
    async_communication_end(req_completed, req_qty, &window);
968
  }
969

970
971
972
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
973
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
974
  return MALL_USER_PENDING;
975
976
977
978
979
980
981
982
983
984
985
}

/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
986
  size_t i;
987
  int local_state;
988

989
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN);
990
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
991
992
993
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
994
    mall_conf->times->sync_start = MPI_Wtime();
995
996
997
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    for(i=0; i<rep_s_data->entries; i++) {
998
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
999
1000
    }

1001
1002
1003
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1004
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
1005
  }
iker_martin's avatar
iker_martin committed
1006

1007
  local_state = MALL_DIST_COMPLETED;
1008
1009
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->numP > mall->numC) { // Merge Shrink
    local_state = MALL_SPAWN_ADAPT_PENDING;
1010
  }
1011

1012
  return local_state;
1013
1014
1015
1016
1017
1018
1019
1020
1021
}

// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

1022
1023

int comm_state; //FIXME Usar un handler
1024
1025
1026
1027
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
1028
  comm_state = MALL_DIST_PENDING;
1029
1030
1031
1032
1033
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
1034
  return comm_state;
1035
1036
1037
1038
1039
1040
1041
1042
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
1043
int thread_check(int wait_completed) {
1044
  int all_completed = 0;
1045

1046
1047
1048
1049
1050
1051
1052
1053
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1054
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1055
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1056
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
1057
1058
1059
1060
1061
1062

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1063
1064
1065
1066

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1067
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1068
  return MALL_USER_PENDING;
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1080
void* thread_async_work() {
1081
1082
  size_t i;

1083
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1084
  for(i=0; i<rep_a_data->entries; i++) {
1085
    MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
1086
  } 
1087
  comm_state = MALL_DIST_COMPLETED;
1088
1089
  pthread_exit(NULL);
}
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
1102
1103
  MPI_Comm_get_name(*(mall->user_comm), test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
1104
1105
1106
1107
1108
1109
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
1110

1111
1112
1113
/*
 * Función solo necesaria en Merge
 */
1114
1115
1116
1117
1118
1119
1120
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1121
1122
1123
1124
1125
1126
1127
  MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}

/*
 * TODO Por hacer
 */
1128
void MAM_I_create_user_struct(int is_children_group) {
1129
1130
1131
  user_reconf->comm = mall->tmp_comm;

  if(is_children_group) {
1132
    user_reconf->rank_state = MAM_PROC_NEW_RANK;
iker_martin's avatar
iker_martin committed
1133
1134
1135
    user_reconf->numS = mall->num_parents;
    if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numP;
    else user_reconf->numT = mall->num_parents + mall->numP;
1136
1137
1138
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
1139
1140
    if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
    else user_reconf->rank_state = MAM_PROC_CONTINUE;
1141
1142
  }
}