GenericSpawn.c 11.9 KB
Newer Older
1
2
3
4
5
6
7
8
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
9
#include "../malleabilityDataStructures.h"
10
#include "../MAM_Configuration.h"
11
12
13
14
#include "ProcessDist.h"
#include "GenericSpawn.h"
#include "Baseline.h"
#include "Merge.h"
15
#include "Spawn_state.h"
16
17
18
19

// This code is a Singleton object -- Only one instance can be used at a given time and
// no multiple calls to perform diferent resizes can be performed at the same time.

20
Spawn_data *spawn_data = NULL;
21
22
23
pthread_t spawn_thread;

//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
iker_martin's avatar
iker_martin committed
24
void set_spawn_configuration(MPI_Comm comm);
25
26
27
28
29
void deallocate_spawn_data();

//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);

30
31
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed);
32
33

//--------------PRIVATE THREADS DECLARATIONS---------------//
34
35
int allocate_thread_spawn(MPI_Comm *child);
void* thread_work(void *args);
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55


//--------------PUBLIC FUNCTIONS---------------//

/*
 * Se solicita la creacion de un nuevo grupo de "numP" procesos con una distribucion
 * fisica "type_dist".
 *
 * Se puede solicitar en primer plano, encargandose por tanto el proceso que llama a esta funcion,
 * o en segundo plano, donde un hilo se encarga de configurar esta creacion.
 *
 * Si se pide en primer plano, al terminarla es posible llamar a "check_spawn_state()" para crear
 * los procesos.
 *
 * Si se pide en segundo plano, llamar a "check_spawn_state()" comprobara si la configuracion para
 * crearlos esta lista, y si es asi, los crea.
 *
 * Devuelve el estado de el procedimiento. Si no devuelve "MALL_SPAWN_COMPLETED", es necesario llamar a
 * "check_spawn_state()".
 */
iker_martin's avatar
iker_martin committed
56
int init_spawn(MPI_Comm comm, MPI_Comm *child) {
57
  int local_state;
iker_martin's avatar
iker_martin committed
58
  set_spawn_configuration(comm);
59
60

  if(!spawn_data->spawn_is_async) {
61
    generic_spawn(child, MALL_NOT_STARTED);
62
63
64
65
66
    local_state = get_spawn_state(spawn_data->spawn_is_async);
    if (local_state == MALL_SPAWN_COMPLETED)
      deallocate_spawn_data();

  } else {
67
68
    local_state = spawn_data->spawn_is_single ? 
	    MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
69
    local_state = mall_conf->spawn_method == MALL_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
70
	    MALL_SPAWN_ADAPT_POSTPONE : local_state;
71
    set_spawn_state(local_state, 0);
iker_martin's avatar
iker_martin committed
72
    if((spawn_data->spawn_is_single && mall->myId == mall->root) || !spawn_data->spawn_is_single) {
73
      allocate_thread_spawn(child);
74
75
76
    }
  }
    
77
  return local_state;
78
79
80
81
82
83
}

/*
 * Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
 * y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
 */
84
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) { 
85
86
87
  int local_state;
  int global_state=MALL_NOT_STARTED;

88
  if(spawn_data->spawn_is_async) { // Async
89
90
    local_state = get_spawn_state(spawn_data->spawn_is_async);

91
    if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
92
      global_state = check_single_state(comm, child, local_state, wait_completed);
93

94
    } else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Generic
95
      global_state = check_generic_state(comm, local_state, wait_completed);
96

97
98
99
    } else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
      global_state = local_state;
      
100
    } else {
101
      printf("Error Check spawn: Configuracion invalida State = %d\n", local_state);
102
103
104
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -10;
    }
105
  } else if(mall_conf->spawn_method == MALL_SPAWN_MERGE){ // Start Merge shrink Sync
106
107
108
    generic_spawn(child, MALL_DIST_COMPLETED);
    global_state = get_spawn_state(spawn_data->spawn_is_async);
  }
109
  if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED)
110
    deallocate_spawn_data();
111

112
113
114
  return global_state;
}

115
116
117
118
119
120
121
122
123
124
125
126
/*
 * Elimina la bandera bloqueante MALL_SPAWN_ADAPT_POSTPONE para los hilos 
 * auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no 
 * avance hasta que se complete la redistribucion de datos. Por tanto,
 * al modificar la bandera los hilos pueden continuar.
 *
 * Por seguridad se comprueba que no se realice el cambio a la bandera a 
 * no ser que se cumplan las 3 condiciones.
 */
void unset_spawn_postpone_flag(int outside_state) {
  int local_state = get_spawn_state(spawn_data->spawn_is_async);
  if(local_state == MALL_SPAWN_ADAPT_POSTPONE && outside_state == MALL_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) { 
127
    set_spawn_state(MALL_SPAWN_PENDING, spawn_data->spawn_is_async);
128
    wakeup_redistribution();
129
130
131
  }
}

132
133
134
135
136
137
138
139
/*
 * Funcion bloqueante de los hijos para asegurar que todas las tareas del paso
 * de creacion de los hijos se terminan correctamente.
 *
 * Ademas los hijos obtienen informacion basica de los padres
 * para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
 *
 */
iker_martin's avatar
iker_martin committed
140
void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents) {
141
  spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
iker_martin's avatar
iker_martin committed
142
143
  spawn_data->spawn_qty = mall->numP;
  spawn_data->target_qty = mall->numP;
144
145
  spawn_data->comm = comm;

iker_martin's avatar
iker_martin committed
146
  MAM_Comm_main_structures(MALLEABILITY_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
147
148
  //MPI_Comm_remote_size(*parents, &spawn_data->initial_qty);
  spawn_data->initial_qty = mall->num_parents;
iker_martin's avatar
iker_martin committed
149
150
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
151
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
152

iker_martin's avatar
iker_martin committed
153
  switch(mall_conf->spawn_method) {
154
    case MALL_SPAWN_BASELINE:
155
      baseline(*spawn_data, parents);
156
157
158
      if(!spawn_data->spawn_is_intercomm) {
        intracomm_strategy(MALLEABILITY_CHILDREN, parents);
      }
159
160
      break;
    case MALL_SPAWN_MERGE:
161
      spawn_data->target_qty += spawn_data->initial_qty;
162
      merge(*spawn_data, parents, MALL_NOT_STARTED);
163
164
      break;
  }
165
  //mall->num_parents = spawn_data->initial_qty;
166
167
168
169
170
171
172
173
  free(spawn_data);
}

//--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/*
 * Agrupa en una sola estructura todos los datos de configuración necesarios
 * e inicializa las estructuras necesarias.
 */
iker_martin's avatar
iker_martin committed
174
void set_spawn_configuration(MPI_Comm comm) {
175
176
  spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));

iker_martin's avatar
iker_martin committed
177
178
179
  spawn_data->initial_qty = mall->numP;
  spawn_data->target_qty = mall->numC;
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single)); 
180
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
181
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
182
  spawn_data->comm = comm;
183
  spawn_data->mapping_fill_method = MALL_DIST_STRING;
184

185
  switch(mall_conf->spawn_method) {
186
187
188
    case MALL_SPAWN_BASELINE:
      spawn_data->spawn_qty = spawn_data->target_qty;
      spawn_data->already_created = 0;
189
      break;
190
191
192
    case MALL_SPAWN_MERGE:
      spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
      spawn_data->already_created = spawn_data->initial_qty;
193
      break;
194
195
196
  }

  if(spawn_data->spawn_is_async) {
197
    init_spawn_state();
198
  }
199
  spawn_data->mapping = MPI_INFO_NULL;
200
201
202
203
204
205
206
}

/*
 * Libera una estructura de datos spawn_data
 * junto a la destrucion de aquellas estructuras que utiliza.
 */
void deallocate_spawn_data() {
207
208
  if(spawn_data == NULL) return;

209
210
211
212
  if(spawn_data->mapping != MPI_INFO_NULL) {
    MPI_Info_free(&(spawn_data->mapping));
  }
  if(spawn_data->spawn_is_async) {
213
    free_spawn_state();
214
215
  }
  free(spawn_data); 
216
  spawn_data = NULL;
217
218
219
220
221
222
223
224
225
226
227
228
}


//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//

/*
 * Funcion generica para la creacion de procesos. Obtiene la configuracion
 * y segun esta, elige como deberian crearse los procesos.
 *
 * Cuando termina, modifica la variable global para indicar este cambio
 */
void generic_spawn(MPI_Comm *child, int data_stage) {
229
  int local_state, aux_state;
230
231

  // WORK
iker_martin's avatar
iker_martin committed
232
  if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
233
    processes_dist(*spawn_data, &(spawn_data->mapping));
234
  }
235
  switch(mall_conf->spawn_method) {
236
237
    case MALL_SPAWN_BASELINE:
      local_state = baseline(*spawn_data, child);
238
239
240
      if(!spawn_data->spawn_is_intercomm) {
        local_state = intracomm_strategy(MALLEABILITY_NOT_CHILDREN, child);
      }
241
242
243
244
245
246
      break;
    case MALL_SPAWN_MERGE:
      local_state = merge(*spawn_data, child, data_stage);
      break;
  }
  // END WORK
247
248
249
250
  aux_state = get_spawn_state(spawn_data->spawn_is_async);
  if(!(aux_state == MALL_SPAWN_PENDING && local_state == MALL_SPAWN_ADAPT_POSTPONE)) {
    set_spawn_state(local_state, spawn_data->spawn_is_async);
  }
251
252
253
254
255
256
257
258
259
260
}


//--------------PRIVATE THREAD FUNCTIONS---------------//

/*
 * Aloja la memoria para un hilo auxiliar dedicado a la creacion de procesos.
 * No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
 * asociado al terminar.
 */
261
262
int allocate_thread_spawn(MPI_Comm *child) {
  if(pthread_create(&spawn_thread, NULL, thread_work, (void *) child)) {
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
    printf("Error al crear el hilo de SPAWN\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
  if(pthread_detach(spawn_thread)) {
    printf("Error when detaching spawning thread\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
  return 0;
}

/*
 * Funcion llamada por un hilo para que este se encarge
 * de configurar la creacion de un nuevo grupo de procesos.
 *
 * Una vez esta lista la configuracion y es posible crear los procesos
 * se avisa al hilo maestro.
 */
282
void* thread_work(void *args) {
283
  int local_state;
284
  MPI_Comm *child = (MPI_Comm *) args;
285
 
286
  generic_spawn(child, MALL_NOT_STARTED);
287

288
  local_state = get_spawn_state(spawn_data->spawn_is_async);
289
  if(local_state == MALL_SPAWN_ADAPT_POSTPONE || local_state == MALL_SPAWN_PENDING) {
290
    // El grupo de procesos se terminara de juntar tras la redistribucion de datos
291

292
    local_state = wait_redistribution();
293
    generic_spawn(child, MALL_DIST_COMPLETED);
294
  }
295
  wakeup_completion();
296
297
298
299
300
301
302
303
304
305
306
307
308
309

  pthread_exit(NULL);
}

/*
 * Comprueba si una creacion de procesos asincrona en el
 * paso "single" ha terminado. 
 * Si no ha terminado se mantiene el estado 
 * "MALL_SPAWN_SINGLE_PENDING".
 *
 * Si ha terminado se crean los hilos auxiliares para 
 * los procesos no root y se devuelve el estado
 * "MALL_SPAWN_PENDING".
 */
310
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed) {
311
312
313
  while(wait_completed && mall->myId == mall->root && global_state == MALL_SPAWN_SINGLE_PENDING) {
    global_state = wait_completion();
  }
iker_martin's avatar
iker_martin committed
314
  MPI_Bcast(&global_state, 1, MPI_INT, mall->root, comm);
315
316
317
318
319

  // Non-root processes join root to finalize the spawn
  // They also must join if the application has ended its work
  if(global_state == MALL_SPAWN_SINGLE_COMPLETED) { 
    global_state = MALL_SPAWN_PENDING;
320
    set_spawn_state(global_state, spawn_data->spawn_is_async);
321

iker_martin's avatar
iker_martin committed
322
    if(mall->myId != mall->root) {
323
      allocate_thread_spawn(child);
324
325
326
327
328
329
330
331
332
333
334
335
336
337
    }
  }
  return global_state;
}

/*
 * Comprueba si una creación de procesos asincrona en el
 * paso "generic" ha terminado.
 * Si no ha terminado devuelve el estado 
 * "MALL_SPAWN_PENDING".
 *
 * Si ha terminado libera la memoria asociada a spawn_data
 * y devuelve el estado "MALL_SPAWN_COMPLETED".
 */
338
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed) {
339
340
  int global_state;

341
342
  while(wait_completed && local_state == MALL_SPAWN_PENDING) local_state = wait_completion();

343
  MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
344
  if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
345
    set_spawn_state(global_state, spawn_data->spawn_is_async);
346
347
348
  }
  return global_state;
}