Main.c 20.7 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
iker_martin's avatar
iker_martin committed
4
#include <fcntl.h>
5
#include <unistd.h>
iker_martin's avatar
iker_martin committed
6
#include <sys/stat.h>
7
8
#include "process_stage.h"
#include "Main_datatypes.h"
9
#include "configuration.h"
10
#include "../IOcodes/results.h"
11
#include "../malleability/distribution_methods/Distributed_CommDist.h"
12
#include "../malleability/MAM.h"
13

14
15
#define DR_MAX_SIZE 1000000000

iker_martin's avatar
iker_martin committed
16
int work();
17
double iterate(int async_comm);
18
19
double iterate_relaxed(double *time, double *times_stages);
double iterate_rigid(double *time, double *times_stages);
20

21
void init_group_struct(char *argv[], int argc, int myId, int numP);
22
void init_application();
23
void obtain_op_times();
24
void free_application_data();
25
void free_zombie_process();
26

27
void print_general_info(int myId, int grp, int numP);
28
int print_local_results();
29
int print_final_results();
iker_martin's avatar
iker_martin committed
30
int create_out_file(char *nombre, int *ptr, int newstdout);
31

32
33
34

void init_originals();
void init_targets();
35
void update_targets();
36
void user_redistribution(void *args);
37

iker_martin's avatar
iker_martin committed
38
39
configuration *config_file;
group_data *group;
40
results_data *results;
41
MPI_Comm comm, new_comm;
42
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
43

44
int main(int argc, char *argv[]) {
45
    int numP, myId, res;
iker_martin's avatar
iker_martin committed
46
    int req;
47
    int im_child;
48
    size_t i;
49

50
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
iker_martin's avatar
iker_martin committed
51
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);
52
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
53
    comm = MPI_COMM_WORLD;
54
    new_comm = MPI_COMM_NULL;
iker_martin's avatar
iker_martin committed
55

56
57
    if(req != MPI_THREAD_MULTIPLE) {
      printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
58
59
      fflush(stdout);
      MPI_Abort(MPI_COMM_WORLD, -50);
60
61
    }

62
    init_group_struct(argv, argc, myId, numP);
63
    im_child = MAM_Init(ROOT, &comm, argv[0], user_redistribution, NULL);
64

65
66
    //MAM_Use_valgrind(1);

67
68
    if(im_child) {
      update_targets();
69

70
    } else {
71
      init_application();
72
      init_originals();
73

74
      MPI_Barrier(comm);
75
      results->exec_start = MPI_Wtime();
76
77
    }

78
79
80
    //
    // EMPIEZA LA EJECUCION-------------------------------
    //
81
    do {
82
83
      MPI_Comm_size(comm, &(group->numP));
      MPI_Comm_rank(comm, &(group->myId));
84

85
      if(group->grp != 0) {
86
        obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
87
        MAM_Retrieve_times(&results->spawn_time[group->grp - 1], &results->sync_time[group->grp - 1], &results->async_time[group->grp - 1], &results->user_time[group->grp - 1], &results->malleability_time[group->grp - 1]);
88
      }
89

90
      if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
91
92
93
94
95
96
97
98
        MAM_Set_configuration(config_file->groups[group->grp+1].sm, MAM_STRAT_SPAWN_CLEAR, 
			config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, MAM_STRAT_RED_CLEAR);
	for(i=0; i<config_file->groups[group->grp+1].ss_len; i++) {
	  MAM_Set_key_configuration(MAM_SPAWN_STRATEGIES, config_file->groups[group->grp+1].ss[i], &req);
	}
	for(i=0; i<config_file->groups[group->grp+1].rs_len; i++) {
	  MAM_Set_key_configuration(MAM_RED_STRATEGIES, config_file->groups[group->grp+1].rs[i], &req);
	}
99
        MAM_Set_target_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
100

101
        if(group->grp != 0) {
102
103
          MAM_Data_modify(&(group->grp), 0, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
          MAM_Data_modify(&(group->iter_start), 0, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
104
        }
105
      }
106
    
107
      res = work();
108

109
      if(res==1) { // Se ha llegado al final de la aplicacion
110
        MPI_Barrier(comm);
111
        results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
112
        print_local_results();
113
      }
114
      
115

116
      reset_results_index(results);
117

118
      group->grp = group->grp + 1;
119
    } while(config_file->n_groups > group->grp);
120

121
122
    //
    // TERMINA LA EJECUCION ----------------------------------------------------------
123
    // 
124
    print_final_results(); // Pasado este punto ya no pueden escribir los procesos
125

126
    MPI_Barrier(comm);
127
128
129
    if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
      MPI_Comm_free(&comm);
    }
130
    free_application_data();
131

132
133
134
135
136
    MPI_Finalize();
    return 0;
}

/*
137
138
139
140
141
142
143
144
145
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
146
147
148
149
 *
 * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve
 * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos
 * de procesos. En caso contrario se devuelve 0.
150
 */
iker_martin's avatar
iker_martin committed
151
int work() {
152
  int iter, maxiter, state, res;
153
  int wait_completed = MAM_CHECK_COMPLETION;
154

155
  maxiter = config_file->groups[group->grp].iters;
156
  state = MAM_NOT_STARTED;
157
  res = 0;
158

159
  for(iter=group->iter_start; iter < maxiter; iter++) {
160
    iterate(state);
161
  }
162

163
  if(config_file->n_groups != group->grp + 1)
164
    MAM_Checkpoint(&state, wait_completed, user_redistribution, NULL);
165

166
  iter = 0;
167
  while(state == MAM_PENDING || state == MAM_USER_PENDING) {
168
    if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
169
      iterate(state);
170
171
      iter++;
      group->iter_start = iter;
172
    } else { wait_completed = MAM_WAIT_COMPLETION; }
173
    MAM_Checkpoint(&state, wait_completed, user_redistribution, NULL);
174
  }
175

176
  //if(state == MAM_COMPLETED) {}
177
  if(config_file->n_groups == group->grp + 1) { res=1; }
178
  return res;
179
180
}

181
182
183
184
185
186
187
188
189
190

/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


/*
 * Simula la ejecucción de una iteración de computo en la aplicación
191
192
 * que dura al menos un tiempo determinado por la suma de todas las
 * etapas definidas en la configuracion.
193
 */
194
double iterate(int async_comm) {
195
  double time, *times_stages_aux;
196
  size_t i;
197
198
  double aux = 0;

199
  times_stages_aux = malloc(config_file->n_stages * sizeof(double));
200

201
  if(config_file->rigid_times) {
202
    aux = iterate_rigid(&time, times_stages_aux);
203
  } else {
204
    aux = iterate_relaxed(&time, times_stages_aux);
205
206
  }

207
  // Se esta realizando una redistribucion de datos asincrona
208
  if(async_comm == MAM_PENDING || async_comm == MAM_USER_PENDING) { 
209
    // TODO Que diferencie entre tipo de partes asincronas?
210
    results->iters_async += 1;
211
212
  }

213
  // TODO Pasar el resto de este código a results.c
214
  if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
215
    realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
216
  }
217
  results->iters_time[results->iter_index] = time;
218
  for(i=0; i < config_file->n_stages; i++) {
219
    results->stage_times[i][results->iter_index] = times_stages_aux[i];
220
  }
221
  results->iter_index = results->iter_index + 1;
222
  // TODO Pasar hasta aqui
223

224
  free(times_stages_aux);
225

226
  return aux;
227
228
}

229
230
231

/*
 * Performs an iteration. The gathered times for iterations
232
 * and stages could be IMPRECISE in order to ensure the 
233
234
235
236
237
 * global execution time is precise.
 */
double iterate_relaxed(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;
238
  start_time = MPI_Wtime(); // Imprecise timings
239
240

  for(i=0; i < config_file->n_stages; i++) {
241
    start_time_stage = MPI_Wtime(); 
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

/*
 * Performs an iteration. The gathered times for iterations
 * and stages are ensured to be precise but the global 
 * execution time could be imprecise.
 */
double iterate_rigid(double *time, double *times_stages) {
  size_t i;
  double start_time, start_time_stage, aux=0;

  MPI_Barrier(comm);
  start_time = MPI_Wtime();

  for(i=0; i < config_file->n_stages; i++) {
    start_time_stage = MPI_Wtime();
    aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
265
    MPI_Barrier(comm);
266
267
268
    times_stages[i] = MPI_Wtime() - start_time_stage;
  }

269
  MPI_Barrier(comm);
270
271
272
273
  *time = MPI_Wtime() - start_time; // Guardar tiempos
  return aux;
}

274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
//======================================================||
//======================================================||
//=============INIT/FREE/PRINT FUNCTIONS================||
//======================================================||
//======================================================||

/*
 * Muestra datos generales sobre los procesos, su grupo,
 * en que nodo residen y la version de MPI utilizada.
 */
void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
290
291
  //printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s -- PID=%d\n", myId, grp, numP, name, getpid());
292
293
294
295
296

  free(name);
  free(version);
}

297

298
299
300
/*
 * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
 */
301
302
int print_local_results() {
  int ptr_local, ptr_out, err;
303
304
  char *file_name;

305
306
  // This function causes an overhead in the recorded time for last group
  compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
307
  if(group->myId == ROOT) {
308
309
    ptr_out = dup(1);

310
311
312
313
314
315
316
317
    file_name = NULL;
    file_name = malloc(40 * sizeof(char));
    if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
    err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId);
    if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
    create_out_file(file_name, &ptr_local, 1);
  
    print_config_group(config_file, group->grp);
318
    print_iter_results(*results);
319
    print_stage_results(*results, config_file->n_stages);
320
321
    free(file_name);

322
    fflush(stdout);
323
324
    close(1);
    dup(ptr_out);
325
    close(ptr_out);
326
327
328
329
330
331
332
333
334
  }
  return 0;
}

/*
 * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
 * y las comunicaciones.
 */
int print_final_results() {
335
  int ptr_global, err, ptr_out;
336
337
338
339
  char *file_name;

  if(group->myId == ROOT) {

340
    if(config_file->n_groups == group->grp) {
341
342
343
344
345
346
      file_name = NULL;
      file_name = malloc(20 * sizeof(char));
      if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
      err = snprintf(file_name, 20, "R%d_Global.out", run_id);
      if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

347
      ptr_out = dup(1);
348
      create_out_file(file_name, &ptr_global, 1);
349
350
      print_config(config_file);
      print_global_results(*results, config_file->n_resizes);
351
      fflush(stdout);
352
      free(file_name);
353
354
355

      close(1);
      dup(ptr_out);
356
357
358
359
360
361
362
363
364
    }
  }
  return 0;
}

/*
 * Inicializa la estructura group
 */
void init_group_struct(char *argv[], int argc, int myId, int numP) {
365
  group = malloc(sizeof(group_data));
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
  group->myId        = myId;
  group->numP        = numP;
  group->grp         = 0;
  group->iter_start  = 0;
  group->argc        = argc;
  group->argv        = argv;
}

/*
 * Inicializa los datos para este grupo de procesos.
 *
 * En caso de ser el primer grupo de procesos, lee el fichero de configuracion
 * e inicializa los vectores de comunicacion.
 *
 * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde
 * se comunican con los padres para inicializar sus datos.
 */
void init_application() {
384
385
  int i, last_index;

386
387
388
389
390
391
392
393
  if(group->argc < 2) {
    printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }
  if(group->argc > 2) {
    run_id = atoi(group->argv[2]);
  }

394
  init_config(group->argv[1], &config_file);
395
  results = malloc(sizeof(results_data));
396
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
397
  if(config_file->sdr) {
398
399
400
401
402
403
404
405
406
407
    group->sync_data_groups = config_file->sdr % DR_MAX_SIZE ? config_file->sdr/DR_MAX_SIZE+1 : config_file->sdr/DR_MAX_SIZE;
    group->sync_qty = (int *) malloc(group->sync_data_groups * sizeof(int));
    group->sync_array = (char **) malloc(group->sync_data_groups * sizeof(char *));
    last_index = group->sync_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->sync_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->sync_array[i]), group->sync_qty[i], group->myId, group->numP);
    }
    group->sync_qty[last_index] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->sync_array[last_index]), group->sync_qty[last_index], group->myId, group->numP);
408
  }
409

410
  if(config_file->adr) {
411
412
413
414
415
416
417
418
419
420
    group->async_data_groups = config_file->adr % DR_MAX_SIZE ? config_file->adr/DR_MAX_SIZE+1 : config_file->adr/DR_MAX_SIZE;
    group->async_qty = (int *) malloc(group->async_data_groups * sizeof(int));
    group->async_array = (char **) malloc(group->async_data_groups * sizeof(char *));
    last_index = group->async_data_groups-1; 
    for(i=0; i<last_index; i++) {
      group->async_qty[i] = DR_MAX_SIZE;
      malloc_comm_array(&(group->async_array[i]), group->async_qty[i], group->myId, group->numP);
    }
    group->async_qty[last_index] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    malloc_comm_array(&(group->async_array[last_index]), group->async_qty[last_index], group->myId, group->numP);
421
  }
422
423

  obtain_op_times(1);
424
425
426
427
}

/*
 * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
428
429
430
431
432
433
434
435
 *
 * Si compute esta a 1 se considera que se esta inicializando el entorno
 * y realizará trabajo extra.
 *
 * Si compute esta a 0 se considera un entorno inicializado y solo hay que
 * realizar algunos cambios de reserva de memoria. Si es necesario recalcular
 * algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
 * al tiempo total de ejecucion.
436
 */
437
void obtain_op_times(int compute) {
438
  size_t i;
439
  double time = 0;
440
  for(i=0; i<config_file->n_stages; i++) {
441
    time+=init_stage(config_file, i, *group, comm, compute);
442
  }
443
  if(!compute) {results->wasted_time += time;}
444
445
446
447
448
449
}

/*
 * Libera toda la memoria asociada con la aplicacion
 */
void free_application_data() {
450
  int abort_needed;
451
452
453
454
455
456
457
458
459
  size_t i;

  if(config_file->sdr && group->sync_array != NULL) {
    for(i=0; i<group->sync_data_groups; i++) {
      free(group->sync_array[i]);
      group->sync_array[i] = NULL;
    }
    free(group->sync_qty);
    group->sync_qty = NULL;
460
    free(group->sync_array);
461
462
    group->sync_array = NULL;

463
  }
464
465
466
467
468
469
470
  if(config_file->adr && group->async_array != NULL) {
    for(i=0; i<group->async_data_groups; i++) {
      free(group->async_array[i]);
      group->async_array[i] = NULL;
    }
    free(group->async_qty);
    group->async_qty = NULL;
471
    free(group->async_array);
472
    group->async_array = NULL;
473
  }
474
  abort_needed = MAM_Finalize();
475
  free_zombie_process();
476
  free(group);
477
  if(abort_needed) { MPI_Abort(MPI_COMM_WORLD, -100); }
478
}
479

480
481
482
483
484

/*
 * Libera la memoria asociada a un proceso Zombie
 */
void free_zombie_process() {
485
486
  free_results_data(results, config_file->n_stages);
  free(results);
487
488
489
490
491
492
493
494
495
496
497
498
  
  size_t i;
  if(config_file->adr && group->async_array != NULL) {
    for(i=0; i<group->async_data_groups; i++) {
      free(group->async_array[i]);
      group->async_array[i] = NULL;
    }
    free(group->async_qty);
    group->async_qty = NULL;
    free(group->async_array);
    group->async_array = NULL;
  }
499

500
  free_config(config_file);
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
}


/* 
 * Función para crear un fichero con el nombre pasado como argumento.
 * Si el nombre ya existe, se escribe la informacion a continuacion.
 *
 * El proceso que llama a la función pasa a tener como salida estandar
 * dicho fichero si el valor "newstdout" es verdadero.
 *
 */
int create_out_file(char *nombre, int *ptr, int newstdout) {
  int err;

  *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644);
  if(*ptr < 0) return -1; // No ha sido posible crear el fichero

  if(newstdout) {
    err = close(1);
    if(err < 0) return -2; // No es posible modificar la salida estandar
    err = dup(*ptr);
    if(err < 0) return -3; // No es posible modificar la salida estandar
  }

  return 0;
}
527
528
529
530
531
532
533


//======================================================||
//======================================================||
//================ INIT MALLEABILITY ===================||
//======================================================||
//======================================================||
534
//FIXME TENER EN CUENTA QUE ADR PUEDE SER 0
535
536
537
538
539

void init_originals() {
  size_t i;

  if(config_file->n_groups > 1) {
540
541
542
    MAM_Data_add(&(group->grp), NULL, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
    MAM_Data_add(&run_id, NULL, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
    MAM_Data_add(&(group->iter_start), NULL, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
543
544
545

    if(config_file->sdr) {
      for(i=0; i<group->sync_data_groups; i++) {
546
        MAM_Data_add(group->sync_array[i], NULL, group->sync_qty[i], MPI_CHAR, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE);
547
548
549
550
      }
    }
    if(config_file->adr) {
      for(i=0; i<group->async_data_groups; i++) {
551
        MAM_Data_add(group->async_array[i], NULL, group->async_qty[i], MPI_CHAR, MAM_DATA_DISTRIBUTED, MAM_DATA_CONSTANT);
552
553
554
555
556
557
      }
    }
  }
}

void init_targets() {
558
  size_t i, entries, total_qty;
559
  void *value = NULL;
560
  MPI_Datatype type;
561

562
  MAM_Data_get_pointer(&value, 0, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
563
564
565
  group->grp = *((int *)value);
  group->grp = group->grp + 1;

566

567
  recv_config_file(ROOT, new_comm, &config_file);
568
569
570
  results = malloc(sizeof(results_data));
  init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
  results_comm(results, ROOT, config_file->n_resizes, new_comm);
571

572
  MAM_Data_get_pointer(&value, 1, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
573
574
  run_id = *((int *)value);
      
575
  if(config_file->adr) {
576
    MAM_Data_get_entries(MAM_DATA_DISTRIBUTED, MAM_DATA_CONSTANT, &entries);
577
578
579
    group->async_qty = (int *) malloc(entries * sizeof(int));
    group->async_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
580
      MAM_Data_get_pointer(&value, i, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_CONSTANT);
581
582
583
584
585
586
587
588
589
      group->async_array[i] = (char *)value;
      group->async_qty[i] = DR_MAX_SIZE;
    }
    group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->async_data_groups = entries;
  }
}

void update_targets() { //FIXME Should not be needed after redist -- Declarar antes
590
  size_t i, entries, total_qty;
591
  void *value = NULL;
592
  MPI_Datatype type;
593

594
  MAM_Data_get_pointer(&value, 0, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
595
596
597
  group->iter_start = *((int *)value);

  if(config_file->sdr) {
598
    MAM_Data_get_entries(MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE, &entries);
599
600
601
    group->sync_qty = (int *) malloc(entries * sizeof(int));
    group->sync_array = (char **) malloc(entries * sizeof(char *));
    for(i=0; i<entries; i++) {
602
      MAM_Data_get_pointer(&value, i, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE);
603
604
605
606
607
608
609
      group->sync_array[i] = (char *)value;
      group->sync_qty[i] = DR_MAX_SIZE;
    }
    group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
    group->sync_data_groups = entries;
  }
}
610
611
612
613
614
615
616

void user_redistribution(void *args) {
  int commited;
  mam_user_reconf_t user_reconf;

  MAM_Get_Reconf_Info(&user_reconf);
  new_comm = user_reconf.comm;
617
  if(user_reconf.rank_state == MAM_PROC_NEW_RANK) {
618
619
620
621
622
623
    init_targets();
  } else {
    send_config_file(config_file, ROOT, new_comm);
    results_comm(results, ROOT, config_file->n_resizes, new_comm);

    print_local_results();
624
625
626
    if(user_reconf.rank_state == MAM_PROC_ZOMBIE) {
      free_zombie_process();
    }
627
628
  }

629
  MAM_Resume_redistribution(&commited);
630
}