malleabilityManager.c 35.2 KB
Newer Older
1
#include <pthread.h>
2
#include <string.h>
3
4
#include "malleabilityManager.h"
#include "malleabilityStates.h"
5
#include "malleabilityDataStructures.h"
6
#include "malleabilityTypes.h"
iker_martin's avatar
iker_martin committed
7
#include "malleabilityZombies.h"
8
#include "malleabilityTimes.h"
9
#include "spawn_methods/GenericSpawn.h"
10
11
12
13
14
15
16
17
18
19
20
21
#include "CommDist.h"

#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1


void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);

void Children_init();
int spawn_step();
int start_redistribution();
22
int check_redistribution(int wait_completed);
23
int end_redistribution();
iker_martin's avatar
iker_martin committed
24
int shrink_redistribution();
25
26

int thread_creation();
27
int thread_check(int wait_completed);
28
void* thread_async_work();
29

30
void print_comms_state();
31
void malleability_comms_update(MPI_Comm comm);
32

33
int state = MALL_UNRESERVED; //FIXME Mover a otro lado
34
35
36
37
38
39

malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;

40
/*
41
42
43
44
45
46
47
48
 * Inicializa la reserva de memoria para el modulo de maleabilidad
 * creando todas las estructuras necesarias y copias de comunicadores
 * para no interferir en la aplicación.
 *
 * Si es llamada por un grupo de procesos creados de forma dinámica,
 * inicializan la comunicacion con sus padres. En este caso, al terminar 
 * la comunicacion los procesos hijo estan preparados para ejecutar la
 * aplicacion.
49
 */
50
int MAM_Init(int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) {
51
52
  MPI_Comm dup_comm, thread_comm;

53
54
55
56
  #if USE_MAL_DEBUG
    DEBUG_FUNC("Initializing MaM", myId, numP); fflush(stdout); MPI_Barrier(comm);
  #endif

57
58
  mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
  mall = (malleability_t *) malloc(sizeof(malleability_t));
59

60
61
62
63
64
  rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
  dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));

65
66
  MPI_Comm_dup(comm, &dup_comm);
  MPI_Comm_dup(comm, &thread_comm);
67
68
  MPI_Comm_set_name(dup_comm, "MPI_COMM_MAM");
  MPI_Comm_set_name(thread_comm, "MPI_COMM_MAM_THREAD");
69

70
71
  MPI_Comm_rank(comm, &(mall->myId));
  MPI_Comm_size(comm, &(mall->numP));
72
  mall->root = root;
73
  mall->root_parents = -1;
74
  mall->comm = dup_comm;
75
  mall->thread_comm = thread_comm;
76
  mall->user_comm = MPI_COMM_NULL;
77

78
  mall->name_exec = name_exec;
79
80
81
  mall->nodelist = nodelist;
  mall->num_cpus = num_cpus;
  mall->num_nodes = num_nodes;
82
83
84
85
86
87

  rep_s_data->entries = 0;
  rep_a_data->entries = 0;
  dist_s_data->entries = 0;
  dist_a_data->entries = 0;

88
  state = MALL_NOT_STARTED;
89

90
  zombies_service_init();
91
  init_malleability_times();
92
  MAM_Def_main_datatype();
93

94
95
96
97
  // Si son el primer grupo de procesos, obtienen los datos de los padres
  MPI_Comm_get_parent(&(mall->intercomm));
  if(mall->intercomm != MPI_COMM_NULL ) { 
    Children_init();
98
    return MALLEABILITY_CHILDREN;
99
  }
iker_martin's avatar
iker_martin committed
100

101
102
103
104
105
  #if USE_MAL_BARRIERS && USE_MAL_DEBUG
    if(mall->myId == mall->root)
      printf("MaM: Using barriers to record times.\n");
  #endif

106
107
108
109
110
111
112
  if(nodelist != NULL) { //TODO To be deprecated by using Slurm or else statement
    mall->nodelist_len = strlen(nodelist);
  } else { // If no nodelist is detected, get it from the actual run
    mall->nodelist = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
    MPI_Get_processor_name(mall->nodelist, &mall->nodelist_len);
    //TODO Get name of each process and create real nodelist
  }
113

114
115
116
117
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM has been initialized correctly as parents", myId, numP); fflush(stdout); MPI_Barrier(comm);
  #endif

118
  return MALLEABILITY_NOT_CHILDREN;
119
120
}

121
122
123
124
125
/*
 * Elimina toda la memoria reservado por el modulo
 * de maleabilidad y asegura que los zombies
 * despierten si los hubiese.
 */
126
void MAM_Finalize() {	  
127
128
129
130
131
132
133
134
135
136
  free_malleability_data_struct(rep_s_data);
  free_malleability_data_struct(rep_a_data);
  free_malleability_data_struct(dist_s_data);
  free_malleability_data_struct(dist_a_data);

  free(rep_s_data);
  free(rep_a_data);
  free(dist_s_data);
  free(dist_a_data);

137
  MAM_Free_main_datatype();
138
  free_malleability_times();
139
140
  if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
  if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
141
142
  free(mall);
  free(mall_conf);
iker_martin's avatar
iker_martin committed
143
144
145
146

  zombies_awake();
  zombies_service_free();

147
  state = MALL_UNRESERVED;
148
149
}

150
151
/* 
 * TODO Reescribir
152
153
154
155
156
157
158
159
160
161
162
163
164
165
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
 * llamando a la función de nuevo que se han terminado de enviar
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
166
int MAM_Checkpoint(int *mam_state, int wait_completed) {
167
  int is_intercomm;
168
169
170

  switch(state) {
    case MALL_UNRESERVED:
171
      *mam_state = MAM_UNRESERVED;
172
173
      break;
    case MALL_NOT_STARTED:
174
      *mam_state = MAM_NOT_STARTED;
175
      reset_malleability_times();
176
      // Comprobar si se tiene que realizar un redimensionado
177
178
179
180
      
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
181
      mall_conf->times->malleability_start = MPI_Wtime();
182
      //if(CHECK_RMS()) {return MALL_DENIED;}
183

184
185
186
      state = spawn_step();

      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
187
        MAM_Checkpoint(mam_state, wait_completed);
188
189
      }
      break;
190

191
192
    case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado y comienza la redistribucion
    case MALL_SPAWN_SINGLE_PENDING:
193
      state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
194
      if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
195
196
197
        #if USE_MAL_BARRIERS
  	  MPI_Barrier(mall->comm);
	#endif
198
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
199

200
        MAM_Checkpoint(mam_state, wait_completed);
201
202
      }
      break;
203

204
205
206
    case MALL_SPAWN_ADAPT_POSTPONE:
    case MALL_SPAWN_COMPLETED:
      state = start_redistribution();
207
      MAM_Checkpoint(mam_state, wait_completed);
208
      break;
209

210
    case MALL_DIST_PENDING:
211
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
212
        state = thread_check(wait_completed);
213
      } else {
214
        state = check_redistribution(wait_completed);
215
      }
216
      if(state != MALL_DIST_PENDING) { 
217
        MAM_Checkpoint(mam_state, wait_completed);
218
219
220
221
      }
      break;

    case MALL_SPAWN_ADAPT_PENDING:
222
223
224
225

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
226
      mall_conf->times->spawn_start = MPI_Wtime();
227
      unset_spawn_postpone_flag(state);
228
      state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
229

230
      if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
231
232
233
        #if USE_MAL_BARRIERS
          MPI_Barrier(mall->comm);
	#endif
234
        mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
235
	MAM_Checkpoint(mam_state, wait_completed);
236
      }
237
      break;
238

239
    case MALL_SPAWN_ADAPTED: //FIXME Borrar?
240
      state = shrink_redistribution();
241
      if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla
242
      MAM_Checkpoint(mam_state, wait_completed);
243
      break;
244

245
246
247
248
249
250
251
252
253
254
    case MALL_DIST_COMPLETED:
      MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
      if(is_intercomm) {
        MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->user_comm); //El que pone 0 va primero
      } else {
        MPI_Comm_dup(mall->intercomm, &mall->user_comm);
      }
      MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");
      state = MALL_COMPLETED;
      *mam_state = MAM_COMPLETED;
255
256
257
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
258
      mall_conf->times->malleability_end = MPI_Wtime();
259
260
      break;
  }
261
262

  if(state > MALL_ZOMBIE && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
263
264
265
  return state;
}

266
267
/*
 * Returns an intracommunicator to allow users to perform their
268
269
270
 * own redistributions. The user may free this communicator
 * when is not longer needed. It will be freed by MaM when
 * commiting the reconfiguration.
271
272
273
274
275
 *
 *
 * The communicator is only returned if the state of reconfiguration
 * is completed (MALL_COMPLETED / MAM_COMPLETED). Otherwise MALL_DENIED is obtained.
 */
276
int MAM_Get_comm(MPI_Comm *comm) {
277
278
279
  if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
    return MALL_DENIED;
  }
280
  *comm = mall->user_comm;
281
282
283
  return 0;
}

284
285
286
/*
 * TODO
 */
287
void MAM_Commit(int *mam_state, MPI_Comm *updated_comm) {
288
289
290
291
292
293
294
295
296
  if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
    *mam_state = MALL_DENIED;
    return;
  }

  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

297
298
299
300
301
302
303
304
  // Zombies treatment
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
    int zombies;
    MPI_Allreduce(&state, &zombies, 1, MPI_INT, MPI_MIN, mall->intercomm);
    if(zombies == MALL_ZOMBIE) {
      zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root);
    }
  }
305

306
  // Reset/Free unneded communicators
307
308
  if(*updated_comm != MPI_COMM_WORLD && *updated_comm != MPI_COMM_NULL) MPI_Comm_free(updated_comm);
  if(mall->user_comm != MPI_COMM_WORLD && mall->user_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->user_comm));
309
310
311
312
313
  if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
  if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { 
    MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
  }

314
315
316
317
318
319
320
321
322
323
  // Zombies KILL
  if(state == MALL_ZOMBIE) {
    MAM_Finalize();
    MPI_Finalize(); 
    #if USE_MAL_DEBUG
      DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
    #endif
    exit(0);
  }

324
325
  MPI_Comm_rank(mall->comm, &(mall->myId));
  MPI_Comm_size(mall->comm, &(mall->numP));
326
327
328
329
330
  mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
  mall->root_parents = -1;
  state = MALL_NOT_STARTED;
  *mam_state = MAM_COMMITED;

331
  // Set new communicator
332
333
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *updated_comm = MPI_COMM_WORLD; }
  else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, updated_comm); }
334
335
336
  #if USE_MAL_DEBUG
    if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
337
338
}

339
340
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
  MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
341
342
}

343
void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
344
345
  if(state > MALL_NOT_STARTED) return;

346
347
  mall_conf->spawn_method = spawn_method;
  mall_conf->spawn_strategies = spawn_strategies;
348
  mall_conf->spawn_dist = spawn_dist;
349
350
351
352
353
354
355
  mall_conf->red_method = red_method;
  mall_conf->red_strategies = red_strategies;

  if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) && 
	(mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL)) {
    malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
  }
356
357
358
}

/*
359
 * Tiene que ser llamado despues de setear la config
360
 */
361
void MAM_Set_target_number(int numC){
362
363
  if(state > MALL_NOT_STARTED) return;

364
  if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
365
366
367
368
369
    mall->numC = numC;
    mall->numC_spawned = numC - mall->numP;

    if(numC == mall->numP) { // Migrar
      mall->numC_spawned = numC;
370
      mall_conf->spawn_method = MALL_SPAWN_BASELINE;
371
372
373
374
375
376
377
    }
  } else {
    mall->numC = numC;
    mall->numC_spawned = numC;
  }
}

378
379
380
381
382
383
384
/*
 * Anyade a la estructura concreta de datos elegida
 * el nuevo set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "add_data".
385
 *
386
 */
387
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
388
  size_t total_reqs = 0;
389
390
391

  if(is_constant) {
    if(is_replicated) {
392
      total_reqs = 1;
393
      add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
394
    } else {
395
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
396
        total_reqs = 1;
397
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
398
        total_reqs = mall->numC;
399
      }
400
401
402
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
403
404
405
      
      add_data(data, total_qty, type, total_reqs, dist_a_data);
    }
406
407
408
409
410
411
  } else {
    if(is_replicated) {
      add_data(data, total_qty, type, total_reqs, rep_s_data);
    } else {
      add_data(data, total_qty, type, total_reqs, dist_s_data);
    }
412
413
414
  }
}

415
416
417
418
419
420
421
422
/*
 * Modifica en la estructura concreta de datos elegida en el indice "index"
 * con el set de datos "data" de un total de "total_qty" elementos.
 *
 * Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
 *
 * Mas informacion en la funcion "modify_data".
 */
423
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
424
425
  size_t total_reqs = 0;

426
427
  if(is_constant) {
    if(is_replicated) {
428
      total_reqs = 1;
429
430
431
      modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? 
    } else {    
      if(mall_conf->red_method  == MALL_RED_BASELINE) {
432
        total_reqs = 1;
433
      } else if(mall_conf->red_method  == MALL_RED_POINT || mall_conf->red_method  == MALL_RED_RMA_LOCK || mall_conf->red_method  == MALL_RED_RMA_LOCKALL) {
434
435
        total_reqs = mall->numC;
      }
436
437
438
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
        total_reqs++;
      }
439
      
440
      modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
441
    }
442
443
444
445
446
447
  } else {
    if(is_replicated) {
      modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
    } else {
      modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
    }
448
449
450
  }
}

451
452
453
454
/*
 * Devuelve el numero de entradas para la estructura de descripcion de 
 * datos elegida.
 */
455
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
456
457
458
  
  if(is_constant) {
    if(is_replicated) {
459
      *entries = rep_a_data->entries;
460
    } else {
461
      *entries = dist_a_data->entries;
462
463
464
    }
  } else {
    if(is_replicated) {
465
      *entries = rep_s_data->entries;
466
    } else {
467
      *entries = dist_s_data->entries;
468
469
470
471
472
473
474
475
476
477
478
    }
  }
}

/*
 * Devuelve el elemento de la lista "index" al usuario.
 * La devolución es en el mismo orden que lo han metido los padres
 * con la funcion "malleability_add_data()".
 * Es tarea del usuario saber el tipo de esos datos.
 * TODO Refactor a que sea automatico
 */
479
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
480
481
482
483
  malleability_data_t *data_struct;

  if(is_constant) {
    if(is_replicated) {
484
      data_struct = rep_a_data;
485
    } else {
486
      data_struct = dist_a_data;
487
488
489
    }
  } else {
    if(is_replicated) {
490
      data_struct = rep_s_data;
491
    } else {
492
      data_struct = dist_s_data;
493
494
495
    }
  }

496
  *data = data_struct->arrays[index];
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
}


//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||

/*
 * Funcion generalizada para enviar datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
512
  size_t i;
513
  void *aux_send, *aux_recv;
514
515
516

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
517
      aux_send = data_struct->arrays[i];
518
      aux_recv = NULL;
519
520
521
      async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, 
		      mall_conf->red_strategies, mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
522
523
524
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
525
      aux_send = data_struct->arrays[i];
526
      aux_recv = NULL;
527
528
      sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
      if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
529
530
531
532
533
534
535
536
537
538
    }
  }
}

/*
 * Funcion generalizada para recibir datos desde los hijos.
 * La asincronizidad se refiere a si el hilo padre e hijo lo hacen
 * de forma bloqueante o no. El padre puede tener varios hilos.
 */
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
539
  size_t i;
540
  void *aux, *aux_s = NULL;
541
542
543

  if(is_asynchronous) {
    for(i=0; i < data_struct->entries; i++) {
544
545
      aux = data_struct->arrays[i];
      async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, 
546
		      mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
547
      data_struct->arrays[i] = aux;
548
549
550
    }
  } else {
    for(i=0; i < data_struct->entries; i++) {
551
552
553
      aux = data_struct->arrays[i];
      sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
      data_struct->arrays[i] = aux;
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
    }
  }
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
void Children_init() {
570
571
  size_t i;
  int numP_parents, root_parents;
572
  int is_intercomm;
573

574
575
576
577
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif

578
579
  malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
580
581
582
583
  if(!is_intercomm) { // For intracommunicators, these processes will be added
    MPI_Comm_rank(mall->intercomm, &(mall->myId));
    MPI_Comm_size(mall->intercomm, &(mall->numP));
  }
584

585
  MAM_Comm_main_structures(root_parents);
586

587
  #if USE_MAL_DEBUG
588
    DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
589
590
  #endif

591
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
592
  if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
593
594
595
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif
596
597
598
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
599

600
601
602
603
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
    } else {
      recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); 
604

605
      #if USE_MAL_DEBUG >= 2
606
        DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
607
      #endif
608
609
610

      int post_ibarrier = 0; 
      if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { post_ibarrier=1; }
611
      for(i=0; i<dist_a_data->entries; i++) {
612
        async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
613
      }
614
      #if USE_MAL_DEBUG >= 2
615
        DEBUG_FUNC("Targets waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
616
      #endif
617
618
619
      for(i=0; i<dist_a_data->entries; i++) {
        async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
      }
620
    }
621

622
623
624
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
625
    mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
626
  }
627
  #if USE_MAL_DEBUG
628
    DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
629
  #endif
630

631
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
632
  if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
633
634
635
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
636
    recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
637
638
639

    // TODO Crear funcion especifica y anyadir para Asinc
    for(i=0; i<rep_s_data->entries; i++) {
640
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], root_parents, mall->intercomm);
641
    } 
642
643
644
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
645
    mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
646
  }
647
  #if USE_MAL_DEBUG
648
    DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
649
  #endif
650

651
  // Guardar los resultados de esta transmision
652
  malleability_times_broadcast(mall->root);
653

654
655
656
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
657
  mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
658
  state = MALL_COMPLETED;
659

660
661
662
663
664
665
666
  if(is_intercomm) {
    MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->user_comm); //El que pone 0 va primero
  } else {
    MPI_Comm_dup(mall->intercomm, &mall->user_comm);
  }
  MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");

667
668
669
  #if USE_MAL_DEBUG
    DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
  #endif
670
671
672
673
674
675
676
677
678
679
680
681
682
}

//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||

/*
 * Se encarga de realizar la creacion de los procesos hijos.
 * Si se pide en segundo plano devuelve el estado actual.
 */
int spawn_step(){
683
684
685
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->comm);
  #endif
686
  mall_conf->times->spawn_start = MPI_Wtime();
687
 
688
  state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
689

690
  if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
691
692
693
      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
694
      mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
695
696
697
698
  }
  return state;
}

699

700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
/*
 * Comienza la redistribucion de los datos con el nuevo grupo de procesos.
 *
 * Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
 * se realiza el envio asincrono y/o sincrono si lo hay.
 *
 * En caso de que haya comunicacion asincrona, se comienza y se termina la funcion 
 * indicando que se ha comenzado un envio asincrono.
 *
 * Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
 *
 * Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
 * grupos de procesos.
 */
int start_redistribution() {
715
716
717
718
719
720
721
722
  int rootBcast, is_intercomm;

  is_intercomm = 0;
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
  } else { 
    // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
    //   y se trata del spawn Merge Shrink
723
    MPI_Comm_dup(mall->comm, &(mall->intercomm));
724
  }
725

726
727
728
729
730
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
731

732
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); }
733

734
  comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
735
  if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
736
    //FIXME No se envian los datos replicados (rep_a_data)
737
738
739
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
740
    mall_conf->times->async_start = MPI_Wtime();
741
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
742
743
744
      return thread_creation();
    } else {
      send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
745
      return MALL_DIST_PENDING; 
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
    }
  } 
  return end_redistribution();
}


/*
 * Comprueba si la redistribucion asincrona ha terminado. 
 * Si no ha terminado la funcion termina indicandolo, en caso contrario,
 * se continua con la comunicacion sincrona, el envio de resultados y
 * se desconectan los grupos de procesos.
 *
 * Esta funcion permite dos modos de funcionamiento al comprobar si la
 * comunicacion asincrona ha terminado.
 * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera 
 * terminada cuando los padres terminan de enviar.
 * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
 * los hijos han terminado de recibir.
764
 * //FIXME Modificar para que se tenga en cuenta rep_a_data
765
 */
766
767
int check_redistribution(int wait_completed) {
  int is_intercomm, completed, local_completed, all_completed, post_ibarrier;
768
  size_t i, req_qty;
769
  MPI_Request *req_completed;
770
  MPI_Win window;
771
  post_ibarrier = 0;
772
  local_completed = 1;
773
  #if USE_MAL_DEBUG >= 2
774
    DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
775
  #endif
776
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
777

778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
  if(wait_completed) {
    if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
      if( is_intercomm || mall->myId >= mall->numC) {
        post_ibarrier=1;
      }
    }
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
    }
  } else {
    for(i=0; i<dist_a_data->entries; i++) {
      req_completed = dist_a_data->requests[i];
      req_qty = dist_a_data->request_qty[i];
      completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
      local_completed = local_completed && completed;
    }
    #if USE_MAL_DEBUG >= 2
      DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
    #endif

    MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
    if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
802
803
  }

804
  #if USE_MAL_DEBUG >= 2
805
    DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
806
  #endif
807

808
809
810
811
812
  for(i=0; i<dist_a_data->entries; i++) {
    req_completed = dist_a_data->requests[i];
    req_qty = dist_a_data->request_qty[i];
    window = dist_a_data->windows[i];
    async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
813
  }
814

815
816
817
  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
818
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
819
820
821
822
823
824
825
826
827
828
829
830
831
  return end_redistribution();
}


/*
 * Termina la redistribución de los datos con los hijos, comprobando
 * si se han realizado iteraciones con comunicaciones en segundo plano
 * y enviando cuantas iteraciones se han realizado a los hijos.
 *
 * Además se realizan las comunicaciones síncronas se las hay.
 * Finalmente termina enviando los datos temporales a los hijos.
 */ 
int end_redistribution() {
832
833
  size_t i;
  int is_intercomm, rootBcast, local_state;
834

835
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
836
837
838
839
840
841
  if(is_intercomm) {
    rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
  } else {
    rootBcast = mall->root;
  }
  
842
  comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
843
  if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
844
845
846
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
847
    mall_conf->times->sync_start = MPI_Wtime();
848
849
850
851
    send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);

    // TODO Crear funcion especifica y anyadir para Asinc
    for(i=0; i<rep_s_data->entries; i++) {
852
      MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
853
    } 
854
855
856
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->intercomm);
    #endif
857
    if(!is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
858
  }
iker_martin's avatar
iker_martin committed
859

860
  malleability_times_broadcast(rootBcast);
iker_martin's avatar
iker_martin committed
861

862
863
  local_state = MALL_DIST_COMPLETED;
  if(!is_intercomm) { // Merge Spawn
864
    if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
865
866
867
      local_state = MALL_SPAWN_ADAPT_PENDING;
    }
  }
868

869
  return local_state;
870
871
}

872
873
874
875

///=============================================
///=============================================
///=============================================
876
//TODO DEPRECATED
iker_martin's avatar
iker_martin committed
877
int shrink_redistribution() {
878
879
880
    #if USE_MAL_BARRIERS
      MPI_Barrier(mall->comm);
    #endif
881
    double time_extra = MPI_Wtime();
882

883
884
    MPI_Abort(MPI_COMM_WORLD, -20); //                                                         
    zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root);
iker_martin's avatar
iker_martin committed
885
886
    
    if(mall->myId < mall->numC) {
887
      if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
888
889
890
891
892
      if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

      MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
      MPI_Comm_dup(mall->intercomm, &(mall->comm));

893
894
895
      MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
      MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");

896
897
      MPI_Comm_free(&(mall->intercomm));

898
899
900
901

      #if USE_MAL_BARRIERS
        MPI_Barrier(mall->comm);
      #endif
902
      mall_conf->times->spawn_time += MPI_Wtime() - time_extra;
903
      return MALL_DIST_COMPLETED;
iker_martin's avatar
iker_martin committed
904
    } else {
905
      return MALL_ZOMBIE;
iker_martin's avatar
iker_martin committed
906
907
908
    }
}

909
910
911
912
913
914
915
// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||

916
917

int comm_state; //FIXME Usar un handler
918
919
920
921
/*
 * Crea una hebra para ejecutar una comunicación en segundo plano.
 */
int thread_creation() {
922
  comm_state = MALL_DIST_PENDING;
923
924
925
926
927
  if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
    printf("Error al crear el hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
928
  return comm_state;
929
930
931
932
933
934
935
936
}

/*
 * Comprobación por parte de una hebra maestra que indica
 * si una hebra esclava ha terminado su comunicación en segundo plano.
 *
 * El estado de la comunicación es devuelto al finalizar la función. 
 */
937
int thread_check(int wait_completed) {
938
  int all_completed = 0, is_intercomm;
939

940
941
942
943
944
945
946
947
  if(wait_completed && comm_state == MALL_DIST_PENDING) {
    if(pthread_join(mall->async_thread, NULL)) {
      printf("Error al esperar al hilo\n");
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -2;
    } 
  }

948
  // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
949
  MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
950
951
  if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended 
  //FIXME No se tiene en cuenta el estado MALL_APP_ENDED
952
953
954
955
956
957

  if(pthread_join(mall->async_thread, NULL)) {
    printf("Error al esperar al hilo\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -2;
  } 
958
  MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
959
960
961
962

  #if USE_MAL_BARRIERS
    MPI_Barrier(mall->intercomm);
  #endif
963
  if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
964
965
966
967
968
969
970
971
972
973
974
975
  return end_redistribution();
}


/*
 * Función ejecutada por una hebra.
 * Ejecuta una comunicación síncrona con los hijos que
 * para el usuario se puede considerar como en segundo plano.
 *
 * Cuando termina la comunicación la hebra maestra puede comprobarlo
 * por el valor "commAsync".
 */
976
void* thread_async_work() {
977
  send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
978
  comm_state = MALL_DIST_COMPLETED;
979
980
  pthread_exit(NULL);
}
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000


//==============================================================================
/*
 * Muestra por pantalla el estado actual de todos los comunicadores
 */
void print_comms_state() {
  int tester;
  char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));

  MPI_Comm_get_name(mall->comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
  MPI_Comm_get_name(mall->user_comm, test, &tester);
  printf("P%d Comm=%d Name=%s\n", mall->myId, mall->user_comm, test);
  if(mall->intercomm != MPI_COMM_NULL) {
    MPI_Comm_get_name(mall->intercomm, test, &tester);
    printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
  }
  free(test);
}
1001

1002
1003
1004
/*
 * Función solo necesaria en Merge
 */
1005
1006
1007
1008
1009
1010
1011
void malleability_comms_update(MPI_Comm comm) {
  if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
  if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));

  MPI_Comm_dup(comm, &(mall->thread_comm));
  MPI_Comm_dup(comm, &(mall->comm));

1012
1013
  MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MAM_THREAD");
  MPI_Comm_set_name(mall->comm, "MPI_COMM_MAM");
1014
}
1015