malleabilityManager.c 35 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
#include "malleabilityManager.h"
#include "malleabilityStates.h"
5
#include "malleabilityDataStructures.h"
6
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
7
#include "malleabilityZombies.h"
8
#include "malleabilityTimes.h"
9
#include "spawn_methods/GenericSpawn.h"
10
11
12
13
14
15
16
17
18
19
20
21
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1


void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

void Children_init();
int spawn_step();
int start_redistribution();
22
int check_redistribution(int wait_completed);
23
int end_redistribution();
iker_martin's avatar
iker_martin committed
24
int shrink_redistribution();
25
26

int thread_creation();
27
int thread_check(int wait_completed);
28
void* thread_async_work();
29

30
void print_comms_state();
31
void malleability_comms_update(MPI_Comm comm);
32

33
int state = MALL_UNRESERVED; //FIXME Mover a otro lado
34
35
36
37
38
39

malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

40
/*
41
42
43
44
45
46
47
48
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
49
 */
50
int MAM_Init(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) {
51
52
  MPI_Comm dup_comm, thread_comm;

53
54
55
56
  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", myId, numP); fflush(stdout); MPI_Barrier(comm);
  #endif

57
58
  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
59

60
61
62
63
64
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

65
66
  MPI_Comm_dup(comm, &dup_comm);
  MPI_Comm_dup(comm, &thread_comm);
67
68
  MPI_Comm_set_name(dup_comm, "MPI_COMM_MAM");
  MPI_Comm_set_name(thread_comm, "MPI_COMM_MAM_THREAD");
69
70
71
72

  mall->myId = myId;
  mall->numP = numP;
  mall->root = root;
73
  mall->root_parents = -1;
74
  mall->comm = dup_comm;
75
  mall->thread_comm = thread_comm;
76
  mall->user_comm = MPI_COMM_NULL;
77

78
  mall->name_exec = name_exec;
79
80
81
  mall->nodelist = nodelist;
  mall->num_cpus = num_cpus;
  mall->num_nodes = num_nodes;
82
83
84
85
86
87

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

88
  state = MALL_NOT_STARTED;
89

90
  zombies_service_init();
91
  init_malleability_times();
92
  MAM_Def_main_datatype();
93

94
95
96
97
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
  if(mall->intercomm != MPI_COMM_NULL ) { 
    Children_init();
98
    return MALLEABILITY_CHILDREN;
99
  }
iker_martin's avatar
iker_martin committed
100

101
102
103
104
105
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

106
107
108
109
110
111
112
  if(nodelist != NULL) { //TODO To be deprecated by using Slurm or else statement
    mall->nodelist_len = strlen(nodelist);
  } else { // If no nodelist is detected, get it from the actual run
    mall->nodelist = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
    MPI_Get_processor_name(mall->nodelist, &mall->nodelist_len);
    //TODO Get name of each process and create real nodelist
  }
113

114
115
116
117
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM has been initialized correctly as parents", myId, numP); fflush(stdout); MPI_Barrier(comm);
  #endif

118
  return MALLEABILITY_NOT_CHILDREN;
119
120
}

121
122
123
124
125
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
126
void MAM_Finalize() {	  
127
128
129
130
131
132
133
134
135
136
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);

137
  MAM_Free_main_datatype();
138
  free_malleability_times();
139
140
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
141
142
  free(mall);
  free(mall_conf);
iker_martin's avatar
iker_martin committed
143
144
145
146

  zombies_awake();
  zombies_service_free();

147
  state = MALL_UNRESERVED;
148
149
}

150
151
/* 
 * TODO Reescribir
152
153
154
155
156
157
158
159
160
161
162
163
164
165
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
 * llamando a la función de nuevo que se han terminado de enviar
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
166
int MAM_Checkpoint(int *mam_state, int wait_completed) {
167
  int is_intercomm;
168
169
170

  switch(state) {
    case MALL_UNRESERVED:
171
      *mam_state = MAM_UNRESERVED;
172
173
      break;
    case MALL_NOT_STARTED:
174
      *mam_state = MAM_NOT_STARTED;
175
      reset_malleability_times();
176
      // Comprobar si se tiene que realizar un redimensionado
177
178
179
180
      
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
181
      mall_conf->times->malleability_start = MPI_Wtime();
182
      //if(CHECK_RMS()) {return MALL_DENIED;}
183

184
185
186
      state = spawn_step();

      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
187
        MAM_Checkpoint(mam_state, wait_completed);
188
189
      }
      break;
190

191
192
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado y comienza la redistribucion
    case MALL_SPAWN_SINGLE_PENDING:
193
      state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
194
      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
195
196
197
        #if USE_MAL_BARRIERS
  	  MPI_Barrier(mall->comm);
	#endif
198
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
199

200
        MAM_Checkpoint(mam_state, wait_completed);
201
202
      }
      break;
203

204
205
206
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
      state = start_redistribution();
207
      MAM_Checkpoint(mam_state, wait_completed);
208
      break;
209

210
    case MALL_DIST_PENDING:
211
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
212
        state = thread_check(wait_completed);
213
      } else {
214
        state = check_redistribution(wait_completed);
215
      }
216
      if(state != MALL_DIST_PENDING) { 
217
        MAM_Checkpoint(mam_state, wait_completed);
218
219
220
221
      }
      break;

    case MALL_SPAWN_ADAPT_PENDING:
222
223
224
225

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
226
      mall_conf->times->spawn_start = MPI_Wtime();
227
      unset_spawn_postpone_flag(state);
228
      state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
229

230
      if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
231
232
233
        #if USE_MAL_BARRIERS
          MPI_Barrier(mall->comm);
	#endif
234
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
235
	MAM_Checkpoint(mam_state, wait_completed);
236
      }
237
      break;
238

239
    case MALL_SPAWN_ADAPTED: //FIXME Borrar?
240
      state = shrink_redistribution();
241
      if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla
242
      MAM_Checkpoint(mam_state, wait_completed);
243
      break;
244

245
246
247
248
249
250
251
252
253
254
    case MALL_DIST_COMPLETED:
      MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
      if(is_intercomm) {
        MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->user_comm); //El que pone 0 va primero
      } else {
        MPI_Comm_dup(mall->intercomm, &mall->user_comm);
      }
      MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");
      state = MALL_COMPLETED;
      *mam_state = MAM_COMPLETED;
255
256
257
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
258
      mall_conf->times->malleability_end = MPI_Wtime();
259
260
      break;
  }
261
262

  if(state > MALL_ZOMBIE && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
263
264
265
  return state;
}

266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
/*
 * Returns an intracommunicator to allow users to perform their
 * own redistributions. The user must free this communicator
 * when is not longer needed.
 *
 * This is a blocking function, must be called by all processes involved in the
 * reconfiguration.
 * TODO Hacer en otro sitio la creacion del comunicador y borrar en commit.
 *
 * The communicator is only returned if the state of reconfiguration
 * is completed (MALL_COMPLETED / MAM_COMPLETED). Otherwise MALL_DENIED is obtained.
 */
int MAM_Get_comm(MPI_Comm *comm, int *targets_qty) {
  if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
    return MALL_DENIED;
  }

  MPI_Comm_dup(mall->user_comm, comm);
  MPI_Comm_set_name(*comm, "MPI_MAM_DUP");
  *targets_qty = mall->numC;
  return 0;
}

289
290
291
292
293
294
295
296
297
298
299
300
301
/*
 * TODO
 */
void MAM_Commit(int *mam_state, MPI_Comm *new_comm) {
  if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
    *mam_state = MALL_DENIED;
    return;
  }

  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

302
303
304
305
306
307
308
309
310
311
  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    int zombies;
    MPI_Allreduce(&state, &zombies, 1, MPI_INT, MPI_MIN, mall->intercomm);
    if(zombies == MALL_ZOMBIE) {
      zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root);
    }
  }
  // Reset/Free unneded communicators
  if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm));
312
313
314
315
316
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { 
    MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
  }

317
318
  MPI_Comm_rank(mall->comm, &(mall->myId));
  MPI_Comm_size(mall->comm, &(mall->numP));
319
320
321
322
323
  mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
  mall->root_parents = -1;
  state = MALL_NOT_STARTED;
  *mam_state = MAM_COMMITED;

324
325
326
  // Set new communicator
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *new_comm = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, new_comm); }
327
328
329
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
330
331
}

332
333
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
334
335
}

336
void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
337
338
  if(state > MALL_NOT_STARTED) return;

339
340
  mall_conf->spawn_method = spawn_method;
  mall_conf->spawn_strategies = spawn_strategies;
341
  mall_conf->spawn_dist = spawn_dist;
342
343
344
345
346
347
348
  mall_conf->red_method = red_method;
  mall_conf->red_strategies = red_strategies;

  if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) && 
	(mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL)) {
    malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
  }
349
350
351
}

/*
352
 * Tiene que ser llamado despues de setear la config
353
 */
354
void MAM_Set_target_number(int numC){
355
356
  if(state > MALL_NOT_STARTED) return;

357
  if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
358
359
360
361
362
    mall->numC = numC;
    mall->numC_spawned = numC - mall->numP;

    if(numC == mall->numP) { // Migrar
      mall->numC_spawned = numC;
363
      mall_conf->spawn_method = MALL_SPAWN_BASELINE;
364
365
366
367
368
369
370
    }
  } else {
    mall->numC = numC;
    mall->numC_spawned = numC;
  }
}

371
372
373
374
375
376
377
/*
 * Anyade a la estructura concreta de datos elegida
 * el nuevo set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "add_data".
378
 *
379
 */
380
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
381
  size_t total_reqs = 0;
382
383
384

  if(is_constant) {
    if(is_replicated) {
385
      total_reqs = 1;
386
      add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
387
    } else {
388
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
389
        total_reqs = 1;
390
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
391
        total_reqs = mall->numC;
392
      }
393
394
395
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
396
397
398
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
    }
399
400
401
402
403
404
  } else {
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
    }
405
406
407
  }
}

408
409
410
411
412
413
414
415
/*
 * Modifica en la estructura concreta de datos elegida en el indice "index"
 * con el set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "modify_data".
 */
416
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
417
418
  size_t total_reqs = 0;

419
420
  if(is_constant) {
    if(is_replicated) {
421
      total_reqs = 1;
422
423
424
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
425
        total_reqs = 1;
426
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
427
428
        total_reqs = mall->numC;
      }
429
430
431
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
432
      
433
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
434
    }
435
436
437
438
439
440
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
441
442
443
  }
}

444
445
446
447
/*
 * Devuelve el numero de entradas para la estructura de descripcion de 
 * datos elegida.
 */
448
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
449
450
451
  
  if(is_constant) {
    if(is_replicated) {
452
      *entries = rep_a_data->entries;
453
    } else {
454
      *entries = dist_a_data->entries;
455
456
457
    }
  } else {
    if(is_replicated) {
458
      *entries = rep_s_data->entries;
459
    } else {
460
      *entries = dist_s_data->entries;
461
462
463
464
465
466
467
468
469
470
471
    }
  }
}

/*
 * Devuelve el elemento de la lista "index" al usuario.
 * La devolución es en el mismo orden que lo han metido los padres
 * con la funcion "malleability_add_data()".
 * Es tarea del usuario saber el tipo de esos datos.
 * TODO Refactor a que sea automatico
 */
472
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
473
474
475
476
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
477
      data_struct = rep_a_data;
478
    } else {
479
      data_struct = dist_a_data;
480
481
482
    }
  } else {
    if(is_replicated) {
483
      data_struct = rep_s_data;
484
    } else {
485
      data_struct = dist_s_data;
486
487
488
    }
  }

489
  *data = data_struct->arrays[index];
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
}


//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
505
  size_t i;
506
  void *aux_send, *aux_recv;
507
508
509

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
510
      aux_send = data_struct->arrays[i];
511
      aux_recv = NULL;
512
513
514
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, 
		      mall_conf->red_strategies, mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
515
516
517
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
518
      aux_send = data_struct->arrays[i];
519
      aux_recv = NULL;
520
521
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
522
523
524
525
526
527
528
529
530
531
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
532
  size_t i;
533
  void *aux, *aux_s = NULL;
534
535
536

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
537
538
      aux = data_struct->arrays[i];
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, 
539
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
540
      data_struct->arrays[i] = aux;
541
542
543
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
544
545
546
      aux = data_struct->arrays[i];
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
      data_struct->arrays[i] = aux;
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
    }
  }
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
void Children_init() {
563
564
  size_t i;
  int numP_parents, root_parents;
565
  int is_intercomm;
566

567
568
569
570
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

571
572
  malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
573
574
575
576
  if(!is_intercomm) { // For intracommunicators, these processes will be added
    MPI_Comm_rank(mall->intercomm, &(mall->myId));
    MPI_Comm_size(mall->intercomm, &(mall->numP));
  }
577

578
  MAM_Comm_main_structures(root_parents);
579

580
  #if USE_MAL_DEBUG
581
    DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
582
583
  #endif

584
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
585
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
586
587
588
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif
589
590
591
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
592

593
594
595
596
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
    } else {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
597

598
      #if USE_MAL_DEBUG >= 2
599
        DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
600
      #endif
601
602
603

      int post_ibarrier = 0; 
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { post_ibarrier=1; }
604
      for(i=0; i<dist_a_data->entries; i++) {
605
        async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
606
      }
607
      #if USE_MAL_DEBUG >= 2
608
        DEBUG_FUNC("Targets waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
609
      #endif
610
611
612
      for(i=0; i<dist_a_data->entries; i++) {
        async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
      }
613
    }
614

615
616
617
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
618
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
619
  }
620
  #if USE_MAL_DEBUG
621
    DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
622
  #endif
623

624
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
625
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
626
627
628
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
629
    recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
630
631
632

    // TODO Crear funcion especifica y anyadir para Asinc
    for(i=0; i<rep_s_data->entries; i++) {
633
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], root_parents, mall->intercomm);
634
    } 
635
636
637
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
638
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
639
  }
640
  #if USE_MAL_DEBUG
641
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
642
  #endif
643

644
  // Guardar los resultados de esta transmision
645
  malleability_times_broadcast(mall->root);
646

647
648
649
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
650
  mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
651
  state = MALL_COMPLETED;
652

653
654
655
656
657
658
659
  if(is_intercomm) {
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->user_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->user_comm);
  }
  MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");

660
661
662
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
663
664
665
666
667
668
669
670
671
672
673
674
675
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
676
677
678
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
679
  mall_conf->times->spawn_start = MPI_Wtime();
680
 
681
  state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
682

683
  if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
684
685
686
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
687
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
688
689
690
691
  }
  return state;
}

692

693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
708
709
710
711
712
713
714
715
  int rootBcast, is_intercomm;

  is_intercomm = 0;
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
  } else { 
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
716
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
717
  }
718

719
720
721
722
723
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
724

725
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); }
726

727
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
728
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
729
    //FIXME No se envian los datos replicados (rep_a_data)
730
731
732
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
733
    mall_conf->times->async_start = MPI_Wtime();
734
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
735
736
737
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
738
      return MALL_DIST_PENDING; 
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
    }
  } 
  return end_redistribution();
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
757
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
758
 */
759
760
int check_redistribution(int wait_completed) {
  int is_intercomm, completed, local_completed, all_completed, post_ibarrier;
761
  size_t i, req_qty;
762
  MPI_Request *req_completed;
763
  MPI_Win window;
764
  post_ibarrier = 0;
765
  local_completed = 1;
766
  #if USE_MAL_DEBUG >= 2
767
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
768
  #endif
769
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
770

771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
  if(wait_completed) {
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
      if( is_intercomm || mall->myId >= mall->numC) {
        post_ibarrier=1;
      }
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
    }
  } else {
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
      local_completed = local_completed && completed;
    }
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
795
796
  }

797
  #if USE_MAL_DEBUG >= 2
798
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
799
  #endif
800

801
802
803
804
805
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
    async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
806
  }
807

808
809
810
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
811
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
812
813
814
815
816
817
818
819
820
821
822
823
824
  return end_redistribution();
}


/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
825
826
  size_t i;
  int is_intercomm, rootBcast, local_state;
827

828
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
829
830
831
832
833
834
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  
835
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
836
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
837
838
839
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
840
    mall_conf->times->sync_start = MPI_Wtime();
841
842
843
844
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    // TODO Crear funcion especifica y anyadir para Asinc
    for(i=0; i<rep_s_data->entries; i++) {
845
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
846
    } 
847
848
849
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
850
    if(!is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
851
  }
iker_martin's avatar
iker_martin committed
852

853
  malleability_times_broadcast(rootBcast);
iker_martin's avatar
iker_martin committed
854

855
856
  local_state = MALL_DIST_COMPLETED;
  if(!is_intercomm) { // Merge Spawn
857
    if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
858
859
860
      local_state = MALL_SPAWN_ADAPT_PENDING;
    }
  }
861

862
  return local_state;
863
864
}

865
866
867
868

///=============================================
///=============================================
///=============================================
869
//TODO DEPRECATED
iker_martin's avatar
iker_martin committed
870
int shrink_redistribution() {
871
872
873
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
874
    double time_extra = MPI_Wtime();
875

876
877
    MPI_Abort(MPI_COMM_WORLD, -20); //                                                         
    zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root);
iker_martin's avatar
iker_martin committed
878
879
    
    if(mall->myId < mall->numC) {
880
      if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
881
882
883
884
885
      if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

      MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
      MPI_Comm_dup(mall->intercomm, &(mall->comm));

886
887
888
      MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
      MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");

889
890
      MPI_Comm_free(&(mall->intercomm));

891
892
893
894

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
895
      mall_conf->times->spawn_time += MPI_Wtime() - time_extra;
896
      return MALL_DIST_COMPLETED;
iker_martin's avatar
iker_martin committed
897
    } else {
898
      return MALL_ZOMBIE;
iker_martin's avatar
iker_martin committed
899
900
901
    }
}

902
903
904
905
906
907
908
// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

909
910

int comm_state; //FIXME Usar un handler
911
912
913
914
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
915
  comm_state = MALL_DIST_PENDING;
916
917
918
919
920
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
921
  return comm_state;
922
923
924
925
926
927
928
929
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
930
int thread_check(int wait_completed) {
931
  int all_completed = 0, is_intercomm;
932

933
934
935
936
937
938
939
940
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

941
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
942
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
943
944
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
  //FIXME No se tiene en cuenta el estado MALL_APP_ENDED
945
946
947
948
949
950

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
951
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
952
953
954
955

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
956
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
957
958
959
960
961
962
963
964
965
966
967
968
  return end_redistribution();
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
969
void* thread_async_work() {
970
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
971
  comm_state = MALL_DIST_COMPLETED;
972
973
  pthread_exit(NULL);
}
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
  MPI_Comm_get_name(mall->user_comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->user_comm, test);
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
994

995
996
997
/*
 * Función solo necesaria en Merge
 */
998
999
1000
1001
1002
1003
1004
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1005
1006
  MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MPI_COMM_MAM");
1007
}
1008