malleabilityManager.c 41.3 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
//#include "malleabilityManager.h"
#include "MAM.h"
5
#include "malleabilityStates.h"
6
#include "malleabilityDataStructures.h"
7
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
8
#include "malleabilityZombies.h"
9
#include "malleabilityTimes.h"
10
#include "malleabilityRMS.h"
11
#include "MAM_Init_Configuration.h"
12
#include "spawn_methods/GenericSpawn.h"
13
14
15
16
17
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1

18
void MAM_Commit(int *mam_state);
19
20
21
22

void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

23

24
25
int MAM_St_rms(int *mam_state);
int MAM_St_spawn_start();
26
27
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
28
int MAM_St_red_pending(int wait_completed);
29
int MAM_St_user_start(int *mam_state);
30
31
32
33
34
35
36
37
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);


38
void Children_init(void (*user_function)(void *), void *user_args);
39
40
int spawn_step();
int start_redistribution();
41
int check_redistribution(int wait_completed);
42
int end_redistribution();
iker_martin's avatar
iker_martin committed
43
int shrink_redistribution();
44
45

int thread_creation();
46
int thread_check(int wait_completed);
47
void* thread_async_work();
48

49
int MAM_I_convert_key(char *key);
50
void MAM_I_create_user_struct(int is_children_group);
51

52
53
54
55
56
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

57
58
mam_user_reconf_t *user_reconf;

59
/*
60
61
62
63
64
65
66
67
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
68
 */
69
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(void *), void *user_args) {
70
  MPI_Comm dup_comm, thread_comm, original_comm;
71
72
73

  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
74
75
76
77
78
79
80
81
  user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));

  MPI_Comm_rank(*comm, &(mall->myId));
  MPI_Comm_size(*comm, &(mall->numP));

  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
  #endif
82

83
84
85
86
87
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

88
89
  MPI_Comm_dup(*comm, &dup_comm);
  MPI_Comm_dup(*comm, &thread_comm);
90
  MPI_Comm_dup(*comm, &original_comm);
91
92
  MPI_Comm_set_name(dup_comm, "MAM_MAIN");
  MPI_Comm_set_name(thread_comm, "MAM_THREAD");
93
  MPI_Comm_set_name(original_comm, "MAM_ORIGINAL");
94
95

  mall->root = root;
iker_martin's avatar
iker_martin committed
96
  mall->root_parents = root;
97
  mall->zombie = 0;
98
  mall->comm = dup_comm;
99
  mall->thread_comm = thread_comm;
100
  mall->original_comm = original_comm;
101
102
  mall->user_comm = comm; 
  mall->tmp_comm = MPI_COMM_NULL;
103

104
  mall->name_exec = name_exec;
105
  mall->nodelist = NULL;
106
  mall->nodelist_len = 0;
107
108
109
110
111
112

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

113
  state = MALL_NOT_STARTED;
114

115
  MAM_Init_configuration();
116
  MAM_Zombies_service_init();
117
  init_malleability_times();
118
  MAM_Def_main_datatype();
119

120
121
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
122
  if(mall->intercomm != MPI_COMM_NULL) { 
123
    Children_init(user_function, user_args);
124
    return MALLEABILITY_CHILDREN;
125
  }
126

127
  //TODO Check potential improvement - If check_hosts does not use slurm, internode_group could be obtained there
128
  MAM_check_hosts();
129
  mall->internode_group = MAM_Is_internode_group();
130
  MAM_Set_initial_configuration();
iker_martin's avatar
iker_martin committed
131

132
133
134
135
136
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

137
  #if USE_MAL_DEBUG
138
    DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
139
140
  #endif

141
  return MALLEABILITY_NOT_CHILDREN;
142
143
}

144
145
146
147
148
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
149
150
int MAM_Finalize() {	  
  int request_abort;
151
152
153
154
155
156
157
158
159
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);
160
  if(mall->nodelist != NULL) free(mall->nodelist);
161

162
  MAM_Free_main_datatype();
163
  request_abort = MAM_Zombies_service_free();
164
  free_malleability_times();
165
166
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_disconnect(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_disconnect(&(mall->thread_comm));
167
  if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
168
  if(mall->original_comm != MPI_COMM_WORLD && mall->original_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->original_comm));
169
170
  free(mall);
  free(mall_conf);
171
  free(user_reconf);
iker_martin's avatar
iker_martin committed
172

173
  state = MALL_UNRESERVED;
174
  return request_abort;
175
176
}

177
178
/* 
 * TODO Reescribir
179
180
181
182
 * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
 * si es posible. Funciona como una máquina de estados.
 * Retorna el estado de la maleabilidad concreto y modifica el argumento
 * "mam_state" a uno generico.
183
 *
184
185
 * El argumento "wait_completed" se utiliza para esperar a la finalización de
 * las tareas llevadas a cabo por parte de MAM.
186
187
 *
 */
188
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
189
  int call_checkpoint = 0;
190

191
  //TODO This could be changed to an array with the functions to call in each case
192
193
  switch(state) {
    case MALL_UNRESERVED:
194
      *mam_state = MAM_UNRESERVED;
195
196
      break;
    case MALL_NOT_STARTED:
197
198
199
200
      call_checkpoint = MAM_St_rms(mam_state);
      break;
    case MALL_RMS_COMPLETED:
      call_checkpoint = MAM_St_spawn_start();
201
      break;
202

203
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
204
    case MALL_SPAWN_SINGLE_PENDING:
205
      call_checkpoint = MAM_St_spawn_pending(wait_completed);
206
      break;
207

208
209
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
210
      call_checkpoint = MAM_St_red_start();
211
      break;
212

213
    case MALL_DIST_PENDING:
214
      call_checkpoint = MAM_St_red_pending(wait_completed);
215
216
      break;

217
218
219
220
    case MALL_USER_START:
      call_checkpoint = MAM_St_user_start(mam_state);
      break;

221
222
    case MALL_USER_PENDING:
      call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
223
      break;
224

225
226
    case MALL_USER_COMPLETED:
      call_checkpoint = MAM_St_user_completed();
227
      break;
228

229
230
    case MALL_SPAWN_ADAPT_PENDING:
      call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
231
232
      break;

233
234
235
    case MALL_SPAWN_ADAPTED:
    case MALL_DIST_COMPLETED:
      call_checkpoint = MAM_St_completed(mam_state);
236
237
      break;
  }
238

239
240
  if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
  if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
241
242
243
  return state;
}

244
245
246
/*
 * TODO
 */
247
248
void MAM_Resume_redistribution(int *mam_state) {
  state = MALL_USER_COMPLETED;
249
  if(mam_state != NULL) *mam_state = MAM_PENDING;
250
251
252
253
254
}

/*
 * TODO
 */
255
void MAM_Commit(int *mam_state) {
256
  int request_abort;
257
  #if USE_MAL_DEBUG
258
    if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
259
260
  #endif

261
262
  // Get times before commiting
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
263
    // This communication is only needed when the root process will become a zombie
264
    malleability_times_broadcast(mall->root_collectives);
265
  }
266

267
  // Free unneded communicators
268
269
  if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_disconnect(&(mall->tmp_comm));
  if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_disconnect(mall->user_comm);
270

271
272
  // Zombies Treatment
  MAM_Zombies_update();
273
  if(mall->zombie) {
274
    #if USE_MAL_DEBUG >= 1
275
276
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
277
278
    request_abort = MAM_Finalize();
    if(request_abort) { MPI_Abort(MPI_COMM_WORLD, -101); }
279
    MPI_Finalize();
280
281
282
    exit(0);
  }

283
  // Reset/Free communicators
284
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MAM_comms_update(mall->intercomm); }
285
286
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge

iker_martin's avatar
iker_martin committed
287
288
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);
289
290
  mall->root = mall_conf->spawn_method == MALL_SPAWN_BASELINE ? mall->root : mall->root_parents;
  mall->root_parents = mall->root;
291
  state = MALL_NOT_STARTED;
292
  if(mam_state != NULL) *mam_state = MAM_COMPLETED;
293

294
  // Set new communicator
295
296
297
  MPI_Comm_dup(mall->comm, mall->user_comm);
  //if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
  //else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
298
  #if USE_MAL_DEBUG
299
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
300
  #endif
301
302
303
304
305

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_end = MPI_Wtime();
306
307
}

308
/*
309
310
311
312
313
314
315
316
317
 * This function adds data to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a pointer to a size_t variable where the index of the added data will be stored
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * Finally, it updates the index with the index of the last added data if index is not NULL.
318
 */
319
320
void MAM_Data_add(void *data, size_t *index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
  size_t total_reqs = 0, returned_index;
321

322
  if(is_constant) { //Async
323
    if(is_replicated) {
324
      total_reqs = 1;
325
      add_data(data, total_qty, type, total_reqs, rep_a_data);
326
      returned_index = rep_a_data->entries-1;
327
    } else {
328
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
329
        total_reqs = 1;
330
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
331
        total_reqs = mall->numC;
332
      } 
333
334
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
335
      returned_index = dist_a_data->entries-1;
336
    }
337
  } else { //Sync
338
339
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
340
      returned_index = rep_s_data->entries-1;
341
342
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
343
      returned_index = dist_s_data->entries-1;
344
    }
345
  }
346
347

  if(index != NULL) *index = returned_index;
348
349
}

350
/*
351
352
353
354
355
356
357
358
 * This function modifies a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - data: a pointer to the data to be added
 * - index: a value indicating which entry will be modified
 * - total_qty: the amount of elements in data
 * - type: the MPI datatype of the data
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
359
 */
360
void MAM_Data_modify(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
361
362
  size_t total_reqs = 0;

363
364
  if(is_constant) {
    if(is_replicated) {
365
      total_reqs = 1;
366
367
368
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
369
        total_reqs = 1;
370
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
371
372
373
        total_reqs = mall->numC;
      }
      
374
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
375
    }
376
377
378
379
380
381
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
382
383
384
  }
}

385
/*
386
387
388
389
390
 * This functions returns how many data entries are available for one of the specific data structures.
 * It takes the following parameters:
 * - is_replicated: a flag indicating whether the structure is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - entries: a pointer where the amount of entries will be stored
391
 */
392
void MAM_Data_get_entries(int is_replicated, int is_constant, size_t *entries){
393
394
395
  
  if(is_constant) {
    if(is_replicated) {
396
      *entries = rep_a_data->entries;
397
    } else {
398
      *entries = dist_a_data->entries;
399
400
401
    }
  } else {
    if(is_replicated) {
402
      *entries = rep_s_data->entries;
403
    } else {
404
      *entries = dist_s_data->entries;
405
406
407
408
409
    }
  }
}

/*
410
411
412
413
414
415
416
417
 * This function returns a data entry to a data structure based on whether the operation is synchronous or asynchronous,
 * and whether the data is replicated or distributed. It takes the following parameters:
 * - index: a value indicating which entry will be modified
 * - is_replicated: a flag indicating whether the data is replicated (MAM_DATA_REPLICATED) or not (MAM_DATA_DISTRIBUTED)
 * - is_constant: a flag indicating whether the operation is asynchronous (MAM_DATA_CONSTANT) or synchronous (MAM_DATA_VARIABLE)
 * - data: a pointer where the data will be stored. The user must free it
 * - total_qty: the amount of elements in data for all ranks
 * - local_qty: the amount of elements in data for this rank
418
 */
419
void MAM_Data_get_pointer(void **data, size_t index, size_t *total_qty, MPI_Datatype *type, int is_replicated, int is_constant) {
420
421
422
423
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
424
      data_struct = rep_a_data;
425
    } else {
426
      data_struct = dist_a_data;
427
428
429
    }
  } else {
    if(is_replicated) {
430
      data_struct = rep_s_data;
431
    } else {
432
      data_struct = dist_s_data;
433
434
435
    }
  }

436
  *data = data_struct->arrays[index];
437
438
439
  *total_qty = data_struct->qty[index];
  *type = data_struct->types[index];
  //get_block_dist(qty, mall->myId, mall->numP, &dist_data); //FIXME Asegurar que numP es correcto
440
441
}

442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
/*
 * @brief Returns a structure to perform data redistribution during a reconfiguration.
 *
 * This function is intended to be called when the state of MaM is MALL_USER_PENDING only. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *   - mam_user_reconf_t *reconf_info: A pointer to a mam_user_reconf_t structure where the function will store the required information for data redistribution.
 *
 * Return Value:
 *   - MAM_OK: If the function successfully retrieves the reconfiguration information.
 *   - MALL_DENIED: If the function is called when the state of the MaM is not MALL_USER_PENDING.
 */
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
  if(state != MALL_USER_PENDING) return MALL_DENIED;

  *reconf_info = *user_reconf;
  return MAM_OK;
}

/*
 * @brief Returns the times used for the different steps of last reconfiguration.
 *
 * This function is intended to be called when a reconfiguration has ended. 
 * It is designed to provide the necessary information for the user to perform data redistribution.
 *
 * Parameters:
 *  - double *sp_time:   A pointer where the spawn time will be saved.
 *  - double *sy_time:   A pointer where the sychronous data redistribution time will be saved.
 *  - double *asy_time:  A pointer where the asychronous data redistribution time will be saved.
472
 *  - double *user_time: A pointer where the user data redistribution time will be saved.
473
474
 *  - double *mall_time: A pointer where the malleability time will be saved.
 */
475
476
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *user_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, user_time, mall_time);
477
}
478
479
480
481
482
483
484
485
486
487
488
489
490

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
491
  size_t i;
492
  void *aux_send, *aux_recv;
493
494
495

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
496
      aux_send = data_struct->arrays[i];
497
      aux_recv = NULL;
498
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,  
499
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
500
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
501
502
503
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
504
      aux_send = data_struct->arrays[i];
505
      aux_recv = NULL;
506
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
507
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
508
509
510
511
512
513
514
515
516
517
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
518
  size_t i;
519
  void *aux, *aux_s = NULL;
520
521
522

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
523
      aux = data_struct->arrays[i];
524
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN,
525
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
526
      data_struct->arrays[i] = aux;
527
528
529
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
530
      aux = data_struct->arrays[i];
531
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
532
      data_struct->arrays[i] = aux;
533
534
535
536
    }
  }
}

537
538
539
540
541
542

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||
543
544
545
546
//======================================================||
//======================================================||
//======================================================||
//======================================================||
547

548
int MAM_St_rms(int *mam_state) {
549
550
551
552
553
  reset_malleability_times();
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->malleability_start = MPI_Wtime();
554

555
  MAM_Check_configuration();
556
557
558
559
560
  *mam_state = MAM_NOT_STARTED;
  state = MALL_RMS_COMPLETED;
  mall->wait_targets_posted = 0;

  //if(CHECK_RMS()) {return MALL_DENIED;}    
561
562
  return 1;
}
563

564
int MAM_St_spawn_start() {
565
  mall->num_parents = mall->numP;
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
  state = spawn_step();
  //FIXME Esto es necesario pero feo
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
  else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }

  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
    return 1;
  }
  return 0;
}

int MAM_St_spawn_pending(int wait_completed) {
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
  if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
    return 1;
  }
  return 0;
}

int MAM_St_red_start() {
590
591
592
593
594
595
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    mall->root_collectives = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    mall->root_collectives = mall->root;
  }

596
597
598
599
  state = start_redistribution();
  return 1;
}

600
int MAM_St_red_pending(int wait_completed) {
601
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
602
603
604
605
606
607
    state = thread_check(wait_completed);
  } else {
    state = check_redistribution(wait_completed);
  }

  if(state != MALL_DIST_PENDING) { 
608
    state = MALL_USER_START;
609
610
611
612
613
    return 1;
  }
  return 0;
}

614
int MAM_St_user_start(int *mam_state) {
615
616
617
618
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
  mall_conf->times->user_start = MPI_Wtime(); // Obtener timestamp de cuando termina user redist
619
620
621
622
623
624
625
626
627
628
629
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  state = MALL_USER_PENDING;
  *mam_state = MAM_USER_PENDING;
  return 1;
}

630
631
632
633
634
635
636
637
638
639
640
641
642
643
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
  #endif
  if(user_function != NULL) {
    MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
    do {
      user_function(user_args);
    } while(wait_completed && state == MALL_USER_PENDING);
  } else {
    MAM_Resume_redistribution(mam_state);
  }

  if(state != MALL_USER_PENDING) {
644
645
646
647
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->user_end = MPI_Wtime(); // Obtener timestamp de cuando termina user redist
648
649
650
651
652
653
654
655
656
657
658
659
660
661
    #if USE_MAL_DEBUG
      if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
    #endif
    return 1;
  }
  return 0;
}

int MAM_St_user_completed() {
  state = end_redistribution();
  return 1;
}

int MAM_St_spawn_adapt_pending(int wait_completed) {
662
  wait_completed = MAM_WAIT_COMPLETION;
663
664
665
666
667
668
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_start = MPI_Wtime();
  unset_spawn_postpone_flag(state);
  state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
669
670
/* TODO Comentar problema, basicamente indicar que no es posible de la forma actual
 * Ademas es solo para una operación que hemos visto como "extremadamente" rápida
671
672
 * NO es posible debido a que solo se puede hacer tras enviar los datos variables 
 * y por tanto pierden validez dichos datos
673
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
674
675
676
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
677
    mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->spawn_start;
678
679
680
    return 1;
  }
  return 0;
681
682
683
684
685
686
  */
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
  mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->spawn_start;
  return 1;
687
688
689
}

int MAM_St_completed(int *mam_state) {
690
  MAM_Commit(mam_state);
691
692
693
694
  return 0;
}


695
696
697
698
699
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
700
701
702
703
//======================================================||
//======================================================||
//======================================================||
//======================================================||
704
705
706
707
708
709
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
710
void Children_init(void (*user_function)(void *), void *user_args) {
711
  size_t i;
712

713
  #if USE_MAL_DEBUG
714
    DEBUG_FUNC("MaM will now initialize spawned processes", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
715
716
  #endif

717
  malleability_connect_children(&(mall->intercomm));
718
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { // For Merge Method, these processes will be added
iker_martin's avatar
iker_martin committed
719
720
    MPI_Comm_rank(mall->intercomm, &mall->myId);
    MPI_Comm_size(mall->intercomm, &mall->numP);
721
  }
722
  mall->root_collectives = mall->root_parents;
723

724
725
726
727
728
729
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL)) {
    mall->internode_group = 0;
  } else {
    mall->internode_group = MAM_Is_internode_group();
  }

730
  #if USE_MAL_DEBUG
731
    DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
732
733
  #endif

734
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
735
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
736
    #if USE_MAL_DEBUG >= 2
737
      DEBUG_FUNC("Spawned start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
738
    #endif
739
740
741
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
742

743
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
iker_martin's avatar
iker_martin committed
744
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
745
      for(i=0; i<rep_a_data->entries; i++) {
746
        MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
747
      } 
748
    } else {
iker_martin's avatar
iker_martin committed
749
      recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
750

751
      for(i=0; i<rep_a_data->entries; i++) {
752
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
753
      } 
754
      #if USE_MAL_DEBUG >= 2
755
        DEBUG_FUNC("Spawned started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
756
      #endif
757

758
      for(i=0; i<rep_a_data->entries; i++) {
759
        async_communication_wait(rep_a_data->requests[i], rep_a_data->request_qty[i]);
760
      }
761
      for(i=0; i<dist_a_data->entries; i++) {
762
        async_communication_wait(dist_a_data->requests[i], dist_a_data->request_qty[i]);
763
      }
764
765
766
767
768
769
      if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE);
      }

770
      #if USE_MAL_DEBUG >= 2
771
        DEBUG_FUNC("Spawned waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
772
      #endif
773
      for(i=0; i<dist_a_data->entries; i++) {
774
        async_communication_end(dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
775
      }
776
      for(i=0; i<rep_a_data->entries; i++) {
777
        async_communication_end(rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
778
      }
779
    }
780

781
782
783
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
784
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
785
  }
786
  #if USE_MAL_DEBUG
787
    DEBUG_FUNC("Spawned have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
788
  #endif
789

790
791
792
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
793
  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
794
795
796
797
798
799
800
801
802
803
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
  }
  MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
  if(user_function != NULL) {
    state = MALL_USER_PENDING;
    MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
    user_function(user_args);
  }
804
805
806
807
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
  mall_conf->times->user_end = MPI_Wtime(); // Obtener timestamp de cuando termina user redist
808

809
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN);
810
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
811
812
813
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
iker_martin's avatar
iker_martin committed
814
    recv_data(mall->num_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
815
816

    for(i=0; i<rep_s_data->entries; i++) {
817
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
818
    } 
819
820
821
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
822
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
823
  }
824
  #if USE_MAL_DEBUG
825
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
826
  #endif
827

828
  MAM_Commit(NULL);
829

830
  #if USE_MAL_DEBUG
831
    DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
832
  #endif
833
834
835
836
837
838
839
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||
840
841
//======================================================||
//======================================================||
842
843
844
845
846
847

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
848
849
850
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
851
  mall_conf->times->spawn_start = MPI_Wtime();
852
 
iker_martin's avatar
iker_martin committed
853
  state = init_spawn(mall->thread_comm, &(mall->intercomm));
854

855
  if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
856
857
858
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
859
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
860
861
862
863
  }
  return state;
}

864

865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
880
  size_t i;
881

882
  if(mall->intercomm == MPI_COMM_NULL) {
883
884
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
885
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
886
  }
887

888
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN);
889
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
890
891
892
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
893
    mall_conf->times->async_start = MPI_Wtime();
894
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
895
896
897
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
898
      for(i=0; i<rep_a_data->entries; i++) {
899
        MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
900
      } 
901
902
903
904
905

      if(mall->zombie && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
      }
906
      return MALL_DIST_PENDING; 
907
908
    }
  } 
909
  return MALL_USER_START;
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
925
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
926
 */
927
int check_redistribution(int wait_completed) {
928
  int completed, local_completed, all_completed;
929
  size_t i, req_qty;
930
  MPI_Request *req_completed;
931
932
  MPI_Win window;
  local_completed = 1;
933
  #if USE_MAL_DEBUG >= 2
934
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
935
  #endif
936

937
  if(wait_completed) {
938
939
940
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL) && !mall->wait_targets_posted) {
      MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
      mall->wait_targets_posted = 1;
941
942
943
944
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
945
      async_communication_wait(req_completed, req_qty);
946
    }
947
948
949
    for(i=0; i<rep_a_data->entries; i++) {
      req_completed = rep_a_data->requests[i];
      req_qty = rep_a_data->request_qty[i];
950
      async_communication_wait(req_completed, req_qty);
951
    }
952
953

    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE); }
954
  } else {
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
    if(mall->wait_targets_posted) { 
      MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); 
    } else {
      for(i=0; i<dist_a_data->entries; i++) {
        req_completed = dist_a_data->requests[i];
        req_qty = dist_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }
      for(i=0; i<rep_a_data->entries; i++) {
        req_completed = rep_a_data->requests[i];
        req_qty = rep_a_data->request_qty[i];
        completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
        local_completed = local_completed && completed;
      }

      if(local_completed && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
        MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
        mall->wait_targets_posted = 1;
        MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); //TODO - Figure out if last process takes profit from calling here
      }
976
    }
977
978
979
980
981
982
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
983
984
  }

985
  #if USE_MAL_DEBUG >= 2
986
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
987
  #endif
988

989
990
991
992
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
993
    async_communication_end(req_completed, req_qty, &window);
994
  }
995
996
997
998
  for(i=0; i<rep_a_data->entries; i++) {
    req_completed = rep_a_data->requests[i];
    req_qty = rep_a_data->request_qty[i];
    window = rep_a_data->windows[i];
999
    async_communication_end(req_completed, req_qty, &window);
1000
  }
1001

1002
1003
1004
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1005
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1006
  return MALL_USER_START;
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
}

/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
1018
  size_t i;
1019
  int local_state;
1020

1021
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN);
1022
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
1023
1024
1025
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1026
    mall_conf->times->sync_start = MPI_Wtime();
1027
1028
1029
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    for(i=0; i<rep_s_data->entries; i++) {
1030
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
1031
1032
    }

1033
1034
1035
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
1036
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
1037
  }
iker_martin's avatar
iker_martin committed
1038

1039
  local_state = MALL_DIST_COMPLETED;
1040
1041
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->numP > mall->numC) { // Merge Shrink
    local_state = MALL_SPAWN_ADAPT_PENDING;
1042
  }
1043

1044
  return local_state;
1045
1046
1047
1048
1049
1050
1051
1052
1053
}

// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

1054
1055

int comm_state; //FIXME Usar un handler
1056
1057
1058
1059
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
1060
  comm_state = MALL_DIST_PENDING;
1061
1062
1063
1064
1065
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
1066
  return comm_state;
1067
1068
1069
1070
1071
1072
1073
1074
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
1075
int thread_check(int wait_completed) {
1076
  int all_completed = 0;
1077

1078
1079
1080
1081
1082
1083
1084
1085
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

1086
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
1087
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
1088
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
1089
1090
1091
1092
1093
1094

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
1095
1096
1097
1098

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
1099
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
1100
  return MALL_USER_START;
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
1112
void* thread_async_work() {
1113
1114
  size_t i;

1115
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
1116
  for(i=0; i<rep_a_data->entries; i++) {
1117
    MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
1118
  } 
1119
  comm_state = MALL_DIST_COMPLETED;
1120
1121
  pthread_exit(NULL);
}
1122
1123
1124


//==============================================================================
1125
1126
1127
1128

/*
 * TODO Por hacer
 */
1129
void MAM_I_create_user_struct(int is_children_group) {
1130
1131
1132
  user_reconf->comm = mall->tmp_comm;

  if(is_children_group) {
1133
    user_reconf->rank_state = MAM_PROC_NEW_RANK;
iker_martin's avatar
iker_martin committed
1134
    user_reconf->numS = mall->num_parents;
1135
    user_reconf->numT = mall->numP;
1136
1137
1138
  } else {
    user_reconf->numS = mall->numP;
    user_reconf->numT = mall->numC;
1139
1140
    if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
    else user_reconf->rank_state = MAM_PROC_CONTINUE;
1141
1142
  }
}