GenericSpawn.c 11.9 KB
Newer Older
1
2
3
4
5
6
7
8
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
9
#include "../malleabilityDataStructures.h"
10
#include "../MAM_Configuration.h"
11
12
13
14
#include "ProcessDist.h"
#include "GenericSpawn.h"
#include "Baseline.h"
#include "Merge.h"
15
#include "Spawn_state.h"
16
17
18
19

// This code is a Singleton object -- Only one instance can be used at a given time and
// no multiple calls to perform diferent resizes can be performed at the same time.

20
Spawn_data *spawn_data = NULL;
21
22
23
pthread_t spawn_thread;

//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
iker_martin's avatar
iker_martin committed
24
void set_spawn_configuration(MPI_Comm comm);
25
26
27
28
29
void deallocate_spawn_data();

//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);

30
31
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed);
32
33

//--------------PRIVATE THREADS DECLARATIONS---------------//
34
35
int allocate_thread_spawn(MPI_Comm *child);
void* thread_work(void *args);
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55


//--------------PUBLIC FUNCTIONS---------------//

/*
 * Se solicita la creacion de un nuevo grupo de "numP" procesos con una distribucion
 * fisica "type_dist".
 *
 * Se puede solicitar en primer plano, encargandose por tanto el proceso que llama a esta funcion,
 * o en segundo plano, donde un hilo se encarga de configurar esta creacion.
 *
 * Si se pide en primer plano, al terminarla es posible llamar a "check_spawn_state()" para crear
 * los procesos.
 *
 * Si se pide en segundo plano, llamar a "check_spawn_state()" comprobara si la configuracion para
 * crearlos esta lista, y si es asi, los crea.
 *
 * Devuelve el estado de el procedimiento. Si no devuelve "MALL_SPAWN_COMPLETED", es necesario llamar a
 * "check_spawn_state()".
 */
iker_martin's avatar
iker_martin committed
56
int init_spawn(MPI_Comm comm, MPI_Comm *child) {
57
  int local_state;
iker_martin's avatar
iker_martin committed
58
  set_spawn_configuration(comm);
59
60

  if(!spawn_data->spawn_is_async) {
61
    generic_spawn(child, MALL_NOT_STARTED);
62
63
64
65
66
    local_state = get_spawn_state(spawn_data->spawn_is_async);
    if (local_state == MALL_SPAWN_COMPLETED)
      deallocate_spawn_data();

  } else {
67
68
    local_state = spawn_data->spawn_is_single ? 
	    MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
69
    local_state = mall_conf->spawn_method == MALL_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
70
	    MALL_SPAWN_ADAPT_POSTPONE : local_state;
71
    set_spawn_state(local_state, 0);
iker_martin's avatar
iker_martin committed
72
    if((spawn_data->spawn_is_single && mall->myId == mall->root) || !spawn_data->spawn_is_single) {
73
      allocate_thread_spawn(child);
74
75
76
    }
  }
    
77
  return local_state;
78
79
80
81
82
83
}

/*
 * Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
 * y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
 */
84
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) { 
85
86
87
  int local_state;
  int global_state=MALL_NOT_STARTED;

88
  if(spawn_data->spawn_is_async) { // Async
89
90
    local_state = get_spawn_state(spawn_data->spawn_is_async);

91
    if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
92
      global_state = check_single_state(comm, child, local_state, wait_completed);
93

94
    } else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Generic
95
      global_state = check_generic_state(comm, local_state, wait_completed);
96

97
98
99
    } else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
      global_state = local_state;
      
100
    } else {
101
      printf("Error Check spawn: Configuracion invalida State = %d\n", local_state);
102
103
104
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -10;
    }
105
  } else if(mall_conf->spawn_method == MALL_SPAWN_MERGE){ // Start Merge shrink Sync
106
107
108
    generic_spawn(child, MALL_DIST_COMPLETED);
    global_state = get_spawn_state(spawn_data->spawn_is_async);
  }
109
  if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED)
110
    deallocate_spawn_data();
111

112
113
114
  return global_state;
}

115
116
117
118
119
120
121
122
123
124
125
126
/*
 * Elimina la bandera bloqueante MALL_SPAWN_ADAPT_POSTPONE para los hilos 
 * auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no 
 * avance hasta que se complete la redistribucion de datos. Por tanto,
 * al modificar la bandera los hilos pueden continuar.
 *
 * Por seguridad se comprueba que no se realice el cambio a la bandera a 
 * no ser que se cumplan las 3 condiciones.
 */
void unset_spawn_postpone_flag(int outside_state) {
  int local_state = get_spawn_state(spawn_data->spawn_is_async);
  if(local_state == MALL_SPAWN_ADAPT_POSTPONE && outside_state == MALL_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) { 
127
    set_spawn_state(MALL_SPAWN_PENDING, spawn_data->spawn_is_async);
128
    wakeup_redistribution();
129
130
131
  }
}

132
133
134
135
136
137
138
139
/*
 * Funcion bloqueante de los hijos para asegurar que todas las tareas del paso
 * de creacion de los hijos se terminan correctamente.
 *
 * Ademas los hijos obtienen informacion basica de los padres
 * para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
 *
 */
iker_martin's avatar
iker_martin committed
140
void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents) {
141
  spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
iker_martin's avatar
iker_martin committed
142
143
  spawn_data->spawn_qty = mall->numP;
  spawn_data->target_qty = mall->numP;
144
145
  spawn_data->comm = comm;

iker_martin's avatar
iker_martin committed
146
147
148
149
  MAM_Comm_main_structures(MALLEABILITY_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
  MPI_Comm_remote_size(*parents, &spawn_data->initial_qty);
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
150
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
151

iker_martin's avatar
iker_martin committed
152
  switch(mall_conf->spawn_method) {
153
    case MALL_SPAWN_BASELINE:
154
      baseline(*spawn_data, parents);
155
156
157
      if(!spawn_data->spawn_is_intercomm) {
        intracomm_strategy(MALLEABILITY_CHILDREN, parents);
      }
158
159
      break;
    case MALL_SPAWN_MERGE:
160
      spawn_data->target_qty += spawn_data->initial_qty;
161
      merge(*spawn_data, parents, MALL_NOT_STARTED);
162
163
164
      break;
  }

iker_martin's avatar
iker_martin committed
165
  mall->num_parents = spawn_data->initial_qty;
166

167
168
169
170
171
172
173
174
  free(spawn_data);
}

//--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/*
 * Agrupa en una sola estructura todos los datos de configuración necesarios
 * e inicializa las estructuras necesarias.
 */
iker_martin's avatar
iker_martin committed
175
void set_spawn_configuration(MPI_Comm comm) {
176
177
  spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));

iker_martin's avatar
iker_martin committed
178
179
180
  spawn_data->initial_qty = mall->numP;
  spawn_data->target_qty = mall->numC;
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single)); 
181
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
182
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
183
  spawn_data->comm = comm;
184
  spawn_data->mapping_fill_method = MALL_DIST_STRING;
185

186
  switch(mall_conf->spawn_method) {
187
188
189
    case MALL_SPAWN_BASELINE:
      spawn_data->spawn_qty = spawn_data->target_qty;
      spawn_data->already_created = 0;
190
      break;
191
192
193
    case MALL_SPAWN_MERGE:
      spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
      spawn_data->already_created = spawn_data->initial_qty;
194
      break;
195
196
197
  }

  if(spawn_data->spawn_is_async) {
198
    init_spawn_state();
199
  }
200
  spawn_data->mapping = MPI_INFO_NULL;
201
202
203
204
205
206
207
}

/*
 * Libera una estructura de datos spawn_data
 * junto a la destrucion de aquellas estructuras que utiliza.
 */
void deallocate_spawn_data() {
208
209
  if(spawn_data == NULL) return;

210
211
212
213
  if(spawn_data->mapping != MPI_INFO_NULL) {
    MPI_Info_free(&(spawn_data->mapping));
  }
  if(spawn_data->spawn_is_async) {
214
    free_spawn_state();
215
216
  }
  free(spawn_data); 
217
  spawn_data = NULL;
218
219
220
221
222
223
224
225
226
227
228
229
}


//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//

/*
 * Funcion generica para la creacion de procesos. Obtiene la configuracion
 * y segun esta, elige como deberian crearse los procesos.
 *
 * Cuando termina, modifica la variable global para indicar este cambio
 */
void generic_spawn(MPI_Comm *child, int data_stage) {
230
  int local_state, aux_state;
231
232

  // WORK
iker_martin's avatar
iker_martin committed
233
  if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
234
    processes_dist(*spawn_data, &(spawn_data->mapping));
235
  }
236
  switch(mall_conf->spawn_method) {
237
238
    case MALL_SPAWN_BASELINE:
      local_state = baseline(*spawn_data, child);
239
240
241
      if(!spawn_data->spawn_is_intercomm) {
        local_state = intracomm_strategy(MALLEABILITY_NOT_CHILDREN, child);
      }
242
243
244
245
246
247
      break;
    case MALL_SPAWN_MERGE:
      local_state = merge(*spawn_data, child, data_stage);
      break;
  }
  // END WORK
248
249
250
251
  aux_state = get_spawn_state(spawn_data->spawn_is_async);
  if(!(aux_state == MALL_SPAWN_PENDING && local_state == MALL_SPAWN_ADAPT_POSTPONE)) {
    set_spawn_state(local_state, spawn_data->spawn_is_async);
  }
252
253
254
255
256
257
258
259
260
261
}


//--------------PRIVATE THREAD FUNCTIONS---------------//

/*
 * Aloja la memoria para un hilo auxiliar dedicado a la creacion de procesos.
 * No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
 * asociado al terminar.
 */
262
263
int allocate_thread_spawn(MPI_Comm *child) {
  if(pthread_create(&spawn_thread, NULL, thread_work, (void *) child)) {
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
    printf("Error al crear el hilo de SPAWN\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
  if(pthread_detach(spawn_thread)) {
    printf("Error when detaching spawning thread\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
  return 0;
}

/*
 * Funcion llamada por un hilo para que este se encarge
 * de configurar la creacion de un nuevo grupo de procesos.
 *
 * Una vez esta lista la configuracion y es posible crear los procesos
 * se avisa al hilo maestro.
 */
283
void* thread_work(void *args) {
284
  int local_state;
285
  MPI_Comm *child = (MPI_Comm *) args;
286
 
287
  generic_spawn(child, MALL_NOT_STARTED);
288

289
  local_state = get_spawn_state(spawn_data->spawn_is_async);
290
  if(local_state == MALL_SPAWN_ADAPT_POSTPONE || local_state == MALL_SPAWN_PENDING) {
291
    // El grupo de procesos se terminara de juntar tras la redistribucion de datos
292

293
    local_state = wait_redistribution();
294
    generic_spawn(child, MALL_DIST_COMPLETED);
295
  }
296
  wakeup_completion();
297
298
299
300
301
302
303
304
305
306
307
308
309
310

  pthread_exit(NULL);
}

/*
 * Comprueba si una creacion de procesos asincrona en el
 * paso "single" ha terminado. 
 * Si no ha terminado se mantiene el estado 
 * "MALL_SPAWN_SINGLE_PENDING".
 *
 * Si ha terminado se crean los hilos auxiliares para 
 * los procesos no root y se devuelve el estado
 * "MALL_SPAWN_PENDING".
 */
311
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed) {
312
313
314
  while(wait_completed && mall->myId == mall->root && global_state == MALL_SPAWN_SINGLE_PENDING) {
    global_state = wait_completion();
  }
iker_martin's avatar
iker_martin committed
315
  MPI_Bcast(&global_state, 1, MPI_INT, mall->root, comm);
316
317
318
319
320

  // Non-root processes join root to finalize the spawn
  // They also must join if the application has ended its work
  if(global_state == MALL_SPAWN_SINGLE_COMPLETED) { 
    global_state = MALL_SPAWN_PENDING;
321
    set_spawn_state(global_state, spawn_data->spawn_is_async);
322

iker_martin's avatar
iker_martin committed
323
    if(mall->myId != mall->root) {
324
      allocate_thread_spawn(child);
325
326
327
328
329
330
331
332
333
334
335
336
337
338
    }
  }
  return global_state;
}

/*
 * Comprueba si una creación de procesos asincrona en el
 * paso "generic" ha terminado.
 * Si no ha terminado devuelve el estado 
 * "MALL_SPAWN_PENDING".
 *
 * Si ha terminado libera la memoria asociada a spawn_data
 * y devuelve el estado "MALL_SPAWN_COMPLETED".
 */
339
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed) {
340
341
  int global_state;

342
343
  while(wait_completed && local_state == MALL_SPAWN_PENDING) local_state = wait_completion();

344
  MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
345
  if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
346
    set_spawn_state(global_state, spawn_data->spawn_is_async);
347
348
349
  }
  return global_state;
}