malleabilityManager.c 40.6 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
//#include "malleabilityManager.h"
#include "MAM.h"
5
#include "malleabilityStates.h"
6
#include "malleabilityDataStructures.h"
7
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
8
#include "malleabilityZombies.h"
9
#include "malleabilityTimes.h"
10
#include "malleabilityRMS.h"
11
#include "MAM_Init_Configuration.h"
12
#include "spawn_methods/GenericSpawn.h"
13
14
15
16
17
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1

18
void MAM_Commit(int *mam_state);
19
20
21
22

void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

23

24
25
int MAM_St_rms(int *mam_state);
int MAM_St_spawn_start();
26
27
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
28
int MAM_St_red_pending(int wait_completed);
29
int MAM_St_user_start(int *mam_state);
30
31
32
33
34
35
36
37
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);


38
void Children_init(void (*user_function)(void *), void *user_args);
39
40
int spawn_step();
int start_redistribution();
41
int check_redistribution(int wait_completed);
42
int end_redistribution();
iker_martin's avatar
iker_martin committed
43
int shrink_redistribution();
44
45

int thread_creation();
46
int thread_check(int wait_completed);
47
void* thread_async_work();
48

49
int MAM_I_convert_key(char *key);
50
void MAM_I_create_user_struct(int is_children_group);
51

52
53
54
55
56
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

57
58
mam_user_reconf_t *user_reconf;

59
/*
60
61
62
63
64
65
66
67
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
68
 */
69
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(void *), void *user_args) {
70
  MPI_Comm dup_comm, thread_comm, original_comm;
71
72
73

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
74
75
76
77
78
79
80
81
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
82

83
84
85
86
87
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

88
89
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
90
  MPI_Comm_dup(*comm, &original_comm);
91
92
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
93
  MPI_Comm_set_name(original_comm, "MAM_ORIGINAL");
94
95

  mall->root = root;
iker_martin's avatar
iker_martin committed
96
  mall->root_parents = root;
97
  mall->zombie = 0;
98
  mall->comm = dup_comm;
99
  mall->thread_comm = thread_comm;
100
  mall->original_comm = original_comm;
101
102
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
103

104
  mall->name_exec = name_exec;
105
  mall->nodelist = NULL;
106
  mall->nodelist_len = 0;
107
108
109
110
111
112

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

113
  state = MALL_NOT_STARTED;
114

115
  MAM_Init_configuration();
116
  MAM_Zombies_service_init();
117
  init_malleability_times();
118
  MAM_Def_main_datatype();
119

120
121
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
122
  if(mall->intercomm != MPI_COMM_NULL) { 
123
    Children_init(user_function, user_args);
124
    return MALLEABILITY_CHILDREN;
125
  }
126

127
  //TODO Check potential improvement - If check_hosts does not use slurm, internode_group could be obtained there
128
  MAM_check_hosts();
129
  mall->internode_group = MAM_Is_internode_group();
130
  MAM_Set_initial_configuration();
iker_martin's avatar
iker_martin committed
131

132
133
134
135
136
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

137
  #if USE_MAL_DEBUG
138
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
139
140
  #endif

141
  return MALLEABILITY_NOT_CHILDREN;
142
143
}

144
145
146
147
148
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
149
150
int MAM_Finalize() {	  
  int request_abort;
151
152
153
154
155
156
157
158
159
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);
160
  if(mall->nodelist != NULL) free(mall->nodelist);
161

162
  MAM_Free_main_datatype();
163
  request_abort = MAM_Zombies_service_free();
164
  free_malleability_times();
165
166
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
167
  if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
168
  if(mall->original_comm != MPI_COMM_WORLD && mall->original_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->original_comm));
169
170
  free(mall);
  free(mall_conf);
171
  free(user_reconf);
iker_martin's avatar
iker_martin committed
172

173
  state = MALL_UNRESERVED;
174
  return request_abort;
175
176
}

177
178
/* 
 * TODO Reescribir
179
180
181
182
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
183
 *
184
185
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
186
187
 *
 */
188
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
189
  int call_checkpoint = 0;
190

191
  //TODO This could be changed to an array with the functions to call in each case
192
193
  switch(state) {
    case MALL_UNRESERVED:
194
      *mam_state = MAM_UNRESERVED;
195
196
      break;
    case MALL_NOT_STARTED:
197
198
199
200
      call_checkpoint = MAM_St_rms(mam_state);
      break;
    case MALL_RMS_COMPLETED:
      call_checkpoint = MAM_St_spawn_start();
201
      break;
202

203
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
204
    case MALL_SPAWN_SINGLE_PENDING:
205
      call_checkpoint = MAM_St_spawn_pending(wait_completed);
206
      break;
207

208
209
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
210
      call_checkpoint = MAM_St_red_start();
211
      break;
212

213
    case MALL_DIST_PENDING:
214
      call_checkpoint = MAM_St_red_pending(wait_completed);
215
216
      break;

217
218
219
220
    case MALL_USER_START:
      call_checkpoint = MAM_St_user_start(mam_state);
      break;

221
222
    case MALL_USER_PENDING:
      call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
223
      break;
224

225
226
    case MALL_USER_COMPLETED:
      call_checkpoint = MAM_St_user_completed();
227
      break;
228

229
230
    case MALL_SPAWN_ADAPT_PENDING:
      call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
231
232
      break;

233
234
235
    case MALL_SPAWN_ADAPTED:
    case MALL_DIST_COMPLETED:
      call_checkpoint = MAM_St_completed(mam_state);
236
237
      break;
  }
238

239
240
  if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
  if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
241
242
243
  return state;
}

244
245
246
/*
 * TODO
 */
247
248
void MAM_Resume_redistribution(int *mam_state) {
  state = MALL_USER_COMPLETED;
249
  if(mam_state != NULL) *mam_state = MAM_PENDING;
250
251
252
253
254
}

/*
 * TODO
 */
255
void MAM_Commit(int *mam_state) {
256
  int request_abort;
257
  #if USE_MAL_DEBUG
258
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
259
260
  #endif

261
262
  // Get times before commiting
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
263
    // This communication is only needed when the root process will become a zombie
264
    malleability_times_broadcast(mall->root_collectives);
265
  }
266

267
  // Free unneded communicators
268
269
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
270

271
272
  // Zombies Treatment
  MAM_Zombies_update();
273
  if(mall->zombie) {
274
    #if USE_MAL_DEBUG >= 1
275
276
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
277
278
    request_abort = MAM_Finalize();
    if(request_abort) { MPI_Abort(MPI_COMM_WORLD, -101); }
279
    MPI_Finalize();
280
281
282
    exit(0);
  }

283
  // Reset/Free communicators
284
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MAM_comms_update(mall->intercomm); }
285
286
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge

iker_martin's avatar
iker_martin committed
287
288
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);
289
290
  mall->root = mall_conf->spawn_method == MALL_SPAWN_BASELINE ? mall->root : mall->root_parents;
  mall->root_parents = mall->root;
291
  state = MALL_NOT_STARTED;
292
  if(mam_state != NULL) *mam_state = MAM_COMPLETED;
293

294
  // Set new communicator
295
296
297
  MPI_Comm_dup(mall->comm, mall->user_comm);
  //if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  //else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
298
  #if USE_MAL_DEBUG
299
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
300
  #endif
301
302
303
304
305

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_end = MPI_Wtime();
306
307
}

308
/*
309
310
311
312
313
314
315
316
317
 * This function adds data to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a pointer to a size_t variable where the index of the added data will be stored
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * Finally, it updates the index with the index of the last added data if index is not NULL.
318
 */
319
320
void MAM_Data_add(void *data, size_t *index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
  size_t total_reqs = 0, returned_index;
321

322
  if(is_constant) { //Async
323
    if(is_replicated) {
324
      total_reqs = 1;
325
      add_data(data, total_qty, type, total_reqs, rep_a_data);
326
      returned_index = rep_a_data->entries-1;
327
    } else {
328
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
329
        total_reqs = 1;
330
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
331
        total_reqs = mall->numC;
332
      } 
333
334
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
335
      returned_index = dist_a_data->entries-1;
336
    }
337
  } else { //Sync
338
339
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
340
      returned_index = rep_s_data->entries-1;
341
342
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
343
      returned_index = dist_s_data->entries-1;
344
    }
345
  }
346
347

  if(index != NULL) *index = returned_index;
348
349
}

350
/*
351
352
353
354
355
356
357
358
 * This function modifies a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a value indicating which entry will be modified
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
359
 */
360
void MAM_Data_modify(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
361
362
  size_t total_reqs = 0;

363
364
  if(is_constant) {
    if(is_replicated) {
365
      total_reqs = 1;
366
367
368
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
369
        total_reqs = 1;
370
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
371
372
373
        total_reqs = mall->numC;
      }
      
374
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
375
    }
376
377
378
379
380
381
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
382
383
384
  }
}

385
/*
386
387
388
389
390
 * This functions returns how many data entries are available for one of the specific data structures.
 * It takes the following parameters:
 * - is_replicated: a flag indicating whether the structure is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - entries: a pointer where the amount of entries will be stored
391
 */
392
void MAM_Data_get_entries(int is_replicated, int is_constant, size_t *entries){
393
394
395
  
  if(is_constant) {
    if(is_replicated) {
396
      *entries = rep_a_data->entries;
397
    } else {
398
      *entries = dist_a_data->entries;
399
400
401
    }
  } else {
    if(is_replicated) {
402
      *entries = rep_s_data->entries;
403
    } else {
404
      *entries = dist_s_data->entries;
405
406
407
408
409
    }
  }
}

/*
410
411
412
413
414
415
416
417
 * This function returns a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - index: a value indicating which entry will be modified
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - data: a pointer where the data will be stored. The user must free it
 * - total_qty: the amount of elements in data for all ranks
 * - local_qty: the amount of elements in data for this rank
418
 */
419
void MAM_Data_get_pointer(void **data, size_t index, size_t *total_qty, MPI_Datatype *type, int is_replicated, int is_constant) {
420
421
422
423
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
424
      data_struct = rep_a_data;
425
    } else {
426
      data_struct = dist_a_data;
427
428
429
    }
  } else {
    if(is_replicated) {
430
      data_struct = rep_s_data;
431
    } else {
432
      data_struct = dist_s_data;
433
434
435
    }
  }

436
  *data = data_struct->arrays[index];
437
438
439
  *total_qty = data_struct->qty[index];
  *type = data_struct->types[index];
  //get_block_dist(qty, mall->myId, mall->numP, &dist_data); //FIXME Asegurar que numP es correcto
440
441
}

442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
/*
 * @brief Returns a structure to perform data redistribution during a reconfiguration.
 *
 * This function is intended to be called when the state of MaM is MALL_USER_PENDING only. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *   - mam_user_reconf_t *reconf_info: A pointer to a mam_user_reconf_t structure where the function will store the required information for data redistribution.
 *
 * Return Value:
 *   - MAM_OK: If the function successfully retrieves the reconfiguration information.
 *   - MALL_DENIED: If the function is called when the state of the MaM is not MALL_USER_PENDING.
 */
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return MAM_OK;
}

/*
 * @brief Returns the times used for the different steps of last reconfiguration.
 *
 * This function is intended to be called when a reconfiguration has ended. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *  - double *sp_time:   A pointer where the spawn time will be saved.
 *  - double *sy_time:   A pointer where the sychronous data redistribution time will be saved.
 *  - double *asy_time:  A pointer where the asychronous data redistribution time will be saved.
 *  - double *mall_time: A pointer where the malleability time will be saved.
 */
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
}
477
478
479
480
481
482
483
484
485
486
487
488
489

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
490
  size_t i;
491
  void *aux_send, *aux_recv;
492
493
494

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
495
      aux_send = data_struct->arrays[i];
496
      aux_recv = NULL;
497
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,  
498
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
499
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
500
501
502
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
503
      aux_send = data_struct->arrays[i];
504
      aux_recv = NULL;
505
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
506
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
507
508
509
510
511
512
513
514
515
516
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
517
  size_t i;
518
  void *aux, *aux_s = NULL;
519
520
521

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
522
      aux = data_struct->arrays[i];
523
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN,
524
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
525
      data_struct->arrays[i] = aux;
526
527
528
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
529
      aux = data_struct->arrays[i];
530
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
531
      data_struct->arrays[i] = aux;
532
533
534
535
    }
  }
}

536
537
538
539
540
541

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||
542
543
544
545
//======================================================||
//======================================================||
//======================================================||
//======================================================||
546

547
int MAM_St_rms(int *mam_state) {
548
549
550
551
552
  reset_malleability_times();
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_start = MPI_Wtime();
553

554
  MAM_Check_configuration();
555
556
557
558
559
  *mam_state = MAM_NOT_STARTED;
  state = MALL_RMS_COMPLETED;
  mall->wait_targets_posted = 0;

  //if(CHECK_RMS()) {return MALL_DENIED;}    
560
561
  return 1;
}
562

563
int MAM_St_spawn_start() {
564
  mall->num_parents = mall->numP;
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
  state = spawn_step();
  //FIXME Esto es necesario pero feo
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
  else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }

  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
    return 1;
  }
  return 0;
}

int MAM_St_spawn_pending(int wait_completed) {
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_red_start() {
589
590
591
592
593
594
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    mall->root_collectives = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    mall->root_collectives = mall->root;
  }

595
596
597
598
  state = start_redistribution();
  return 1;
}

599
int MAM_St_red_pending(int wait_completed) {
600
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
601
602
603
604
605
606
    state = thread_check(wait_completed);
  } else {
    state = check_redistribution(wait_completed);
  }

  if(state != MALL_DIST_PENDING) { 
607
    state = MALL_USER_START;
608
609
610
611
612
    return 1;
  }
  return 0;
}

613
614
615
616
617
618
619
620
621
622
623
624
int MAM_St_user_start(int *mam_state) {
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  state = MALL_USER_PENDING;
  *mam_state = MAM_USER_PENDING;
  return 1;
}

625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
  #endif
  if(user_function != NULL) {
    MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
    do {
      user_function(user_args);
    } while(wait_completed && state == MALL_USER_PENDING);
  } else {
    MAM_Resume_redistribution(mam_state);
  }

  if(state != MALL_USER_PENDING) {
    #if USE_MAL_DEBUG
      if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
    #endif
    return 1;
  }
  return 0;
}

int MAM_St_user_completed() {
  state = end_redistribution();
  return 1;
}

int MAM_St_spawn_adapt_pending(int wait_completed) {
653
  wait_completed = MAM_WAIT_COMPLETION;
654
655
656
657
658
659
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_start = MPI_Wtime();
  unset_spawn_postpone_flag(state);
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
660
661
/* TODO Comentar problema, basicamente indicar que no es posible de la forma actual
 * Ademas es solo para una operación que hemos visto como "extremadamente" rápida
662
663
 * NO es posible debido a que solo se puede hacer tras enviar los datos variables 
 * y por tanto pierden validez dichos datos
664
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
665
666
667
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
668
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->spawn_start;
669
670
671
    return 1;
  }
  return 0;
672
673
674
675
676
677
  */
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->spawn_start;
  return 1;
678
679
680
}

int MAM_St_completed(int *mam_state) {
681
  MAM_Commit(mam_state);
682
683
684
685
  return 0;
}


686
687
688
689
690
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
691
692
693
694
//======================================================||
//======================================================||
//======================================================||
//======================================================||
695
696
697
698
699
700
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
701
void Children_init(void (*user_function)(void *), void *user_args) {
702
  size_t i;
703

704
  #if USE_MAL_DEBUG
705
    DEBUG_FUNC("MaM will now initialize spawned processes", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
706
707
  #endif

708
  malleability_connect_children(&(mall->intercomm));
709
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { // For Merge Method, these processes will be added
iker_martin's avatar
iker_martin committed
710
711
    MPI_Comm_rank(mall->intercomm, &mall->myId);
    MPI_Comm_size(mall->intercomm, &mall->numP);
712
  }
713
  mall->root_collectives = mall->root_parents;
714

715
716
717
718
719
720
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL)) {
    mall->internode_group = 0;
  } else {
    mall->internode_group = MAM_Is_internode_group();
  }

721
  #if USE_MAL_DEBUG
722
    DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
723
724
  #endif

725
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
726
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
727
    #if USE_MAL_DEBUG >= 2
728
      DEBUG_FUNC("Spawned start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
729
    #endif
730
731
732
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
733

734
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
iker_martin's avatar
iker_martin committed
735
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
736
      for(i=0; i<rep_a_data->entries; i++) {
737
        MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
738
      } 
739
    } else {
iker_martin's avatar
iker_martin committed
740
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
741

742
      for(i=0; i<rep_a_data->entries; i++) {
743
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
744
      } 
745
      #if USE_MAL_DEBUG >= 2
746
        DEBUG_FUNC("Spawned started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
747
      #endif
748

749
      for(i=0; i<rep_a_data->entries; i++) {
750
        async_communication_wait(rep_a_data->requests[i], rep_a_data->request_qty[i]);
751
      }
752
      for(i=0; i<dist_a_data->entries; i++) {
753
        async_communication_wait(dist_a_data->requests[i], dist_a_data->request_qty[i]);
754
      }
755
756
757
758
759
760
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE);
      }

761
      #if USE_MAL_DEBUG >= 2
762
        DEBUG_FUNC("Spawned waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
763
      #endif
764
      for(i=0; i<dist_a_data->entries; i++) {
765
        async_communication_end(dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
766
      }
767
      for(i=0; i<rep_a_data->entries; i++) {
768
        async_communication_end(rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
769
      }
770
    }
771

772
773
774
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
775
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
776
  }
777
  #if USE_MAL_DEBUG
778
    DEBUG_FUNC("Spawned have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
779
  #endif
780

781
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
782
783
784
785
786
787
788
789
790
791
792
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
    user_function(user_args);
  }

793
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN);
794
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
795
796
797
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
iker_martin's avatar
iker_martin committed
798
    recv_data(mall->num_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
799
800

    for(i=0; i<rep_s_data->entries; i++) {
801
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
802
    } 
803
804
805
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
806
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
807
  }
808
  #if USE_MAL_DEBUG
809
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
810
  #endif
811

812
  MAM_Commit(NULL);
813

814
  #if USE_MAL_DEBUG
815
    DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
816
  #endif
817
818
819
820
821
822
823
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||
824
825
//======================================================||
//======================================================||
826
827
828
829
830
831

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
832
833
834
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
835
  mall_conf->times->spawn_start = MPI_Wtime();
836
 
iker_martin's avatar
iker_martin committed
837
  state = init_spawn(mall->thread_comm, &(mall->intercomm));
838

839
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
840
841
842
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
843
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
844
845
846
847
  }
  return state;
}

848

849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
864
  size_t i;
865

866
  if(mall->intercomm == MPI_COMM_NULL) {
867
868
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
869
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
870
  }
871

872
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN);
873
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
874
875
876
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
877
    mall_conf->times->async_start = MPI_Wtime();
878
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
879
880
881
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
882
      for(i=0; i<rep_a_data->entries; i++) {
883
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
884
      } 
885
886
887
888
889

      if(mall->zombie && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
      }
890
      return MALL_DIST_PENDING; 
891
892
    }
  } 
893
  return MALL_USER_START;
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
909
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
910
 */
911
int check_redistribution(int wait_completed) {
912
  int completed, local_completed, all_completed;
913
  size_t i, req_qty;
914
  MPI_Request *req_completed;
915
916
  MPI_Win window;
  local_completed = 1;
917
  #if USE_MAL_DEBUG >= 2
918
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
919
  #endif
920

921
  if(wait_completed) {
922
923
924
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL) && !mall->wait_targets_posted) {
      MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
      mall->wait_targets_posted = 1;
925
926
927
928
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
929
      async_communication_wait(req_completed, req_qty);
930
    }
931
932
933
    for(i=0; i<rep_a_data->entries; i++) {
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
934
      async_communication_wait(req_completed, req_qty);
935
    }
936
937

    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE); }
938
  } else {
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
    if(mall->wait_targets_posted) { 
      MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); 
    } else {
      for(i=0; i<dist_a_data->entries; i++) {
        req_completed = dist_a_data->requests[i];
        req_qty = dist_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }
      for(i=0; i<rep_a_data->entries; i++) {
        req_completed = rep_a_data->requests[i];
        req_qty = rep_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }

      if(local_completed && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); //TODO - Figure out if last process takes profit from calling here
      }
960
    }
961
962
963
964
965
966
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
967
968
  }

969
  #if USE_MAL_DEBUG >= 2
970
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
971
  #endif
972

973
974
975
976
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
977
    async_communication_end(req_completed, req_qty, &window);
978
  }
979
980
981
982
  for(i=0; i<rep_a_data->entries; i++) {
    req_completed = rep_a_data->requests[i];
    req_qty = rep_a_data->request_qty[i];
    window = rep_a_data->windows[i];
983
    async_communication_end(req_completed, req_qty, &window);
984
  }
985

986
987
988
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
989
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
990
  return MALL_USER_PENDING;
991
992
993
994
995
996
997
998
999
1000
1001
}

/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
1002
  size_t i;
1003
  int local_state;
1004

1005
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN);
1006
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
1007
1008
1009
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1010
    mall_conf->times->sync_start = MPI_Wtime();
1011
1012
1013
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    for(i=0; i<rep_s_data->entries; i++) {
1014
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
1015
1016
    }

1017
1018
1019
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1020
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
1021
  }
iker_martin's avatar
iker_martin committed
1022

1023
  local_state = MALL_DIST_COMPLETED;
1024
1025
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->numP > mall->numC) { // Merge Shrink
    local_state = MALL_SPAWN_ADAPT_PENDING;
1026
  }
1027

1028
  return local_state;
1029
1030
1031
1032
1033
1034
1035
1036
1037
}

// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

1038
1039

int comm_state; //FIXME Usar un handler
1040
1041
1042
1043
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
1044
  comm_state = MALL_DIST_PENDING;
1045
1046
1047
1048
1049
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
1050
  return comm_state;
1051
1052
1053
1054
1055
1056
1057
1058
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
1059
int thread_check(int wait_completed) {
1060
  int all_completed = 0;
1061

1062
1063
1064
1065
1066
1067
1068
1069
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1070
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1071
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1072
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
1073
1074
1075
1076
1077
1078

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1079
1080
1081
1082

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1083
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1084
  return MALL_USER_PENDING;
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1096
void* thread_async_work() {
1097
1098
  size_t i;

1099
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1100
  for(i=0; i<rep_a_data->entries; i++) {
1101
    MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
1102
  } 
1103
  comm_state = MALL_DIST_COMPLETED;
1104
1105
  pthread_exit(NULL);
}
1106
1107
1108


//==============================================================================
1109
1110
1111
1112

/*
 * TODO Por hacer
 */
1113
void MAM_I_create_user_struct(int is_children_group) {
1114
1115
1116
  user_reconf->comm = mall->tmp_comm;

  if(is_children_group) {
1117
    user_reconf->rank_state = MAM_PROC_NEW_RANK;
iker_martin's avatar
iker_martin committed
1118
    user_reconf->numS = mall->num_parents;
1119
    user_reconf->numT = mall->numP;
1120
1121
1122
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
1123
1124
    if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
    else user_reconf->rank_state = MAM_PROC_CONTINUE;
1125
1126
  }
}