GenericSpawn.c 12.2 KB
Newer Older
1
2
3
4
5
6
7
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <mpi.h>
#include <string.h>
8
9
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
10
#include "../MAM_Configuration.h"
11
12
13
14
#include "ProcessDist.h"
#include "GenericSpawn.h"
#include "Baseline.h"
#include "Merge.h"
15
#include "Spawn_state.h"
16
17
18
19

// This code is a Singleton object -- Only one instance can be used at a given time and
// no multiple calls to perform diferent resizes can be performed at the same time.

20
Spawn_data *spawn_data = NULL;
21
22
23
pthread_t spawn_thread;

//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
iker_martin's avatar
iker_martin committed
24
void set_spawn_configuration(MPI_Comm comm);
25
26
27
28
29
void deallocate_spawn_data();

//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);

30
31
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed);
32
33

//--------------PRIVATE THREADS DECLARATIONS---------------//
34
35
int allocate_thread_spawn(MPI_Comm *child);
void* thread_work(void *args);
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52


//--------------PUBLIC FUNCTIONS---------------//

/*
 * Se solicita la creacion de un nuevo grupo de "numP" procesos con una distribucion
 * fisica "type_dist".
 *
 * Se puede solicitar en primer plano, encargandose por tanto el proceso que llama a esta funcion,
 * o en segundo plano, donde un hilo se encarga de configurar esta creacion.
 *
 * Si se pide en primer plano, al terminarla es posible llamar a "check_spawn_state()" para crear
 * los procesos.
 *
 * Si se pide en segundo plano, llamar a "check_spawn_state()" comprobara si la configuracion para
 * crearlos esta lista, y si es asi, los crea.
 *
53
 * Devuelve el estado de el procedimiento. Si no devuelve "MAM_I_SPAWN_COMPLETED", es necesario llamar a
54
55
 * "check_spawn_state()".
 */
iker_martin's avatar
iker_martin committed
56
int init_spawn(MPI_Comm comm, MPI_Comm *child) {
57
  int local_state;
iker_martin's avatar
iker_martin committed
58
  set_spawn_configuration(comm);
59
60

  if(!spawn_data->spawn_is_async) {
61
    generic_spawn(child, MAM_I_NOT_STARTED);
62
    local_state = get_spawn_state(spawn_data->spawn_is_async);
63
    if (local_state == MAM_I_SPAWN_COMPLETED)
64
65
66
      deallocate_spawn_data();

  } else {
67
    local_state = spawn_data->spawn_is_single ? 
68
69
70
	    MAM_I_SPAWN_SINGLE_PENDING : MAM_I_SPAWN_PENDING;
    local_state = mall_conf->spawn_method == MAM_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
	    MAM_I_SPAWN_ADAPT_POSTPONE : local_state;
71
    set_spawn_state(local_state, 0);
iker_martin's avatar
iker_martin committed
72
    if((spawn_data->spawn_is_single && mall->myId == mall->root) || !spawn_data->spawn_is_single) {
73
      allocate_thread_spawn(child);
74
75
76
    }
  }
    
77
  return local_state;
78
79
80
81
82
83
}

/*
 * Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
 * y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
 */
84
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) { 
85
  int local_state;
86
  int global_state=MAM_I_NOT_STARTED;
87

88
  if(spawn_data->spawn_is_async) { // Async
89
90
    local_state = get_spawn_state(spawn_data->spawn_is_async);

91
    if(local_state == MAM_I_SPAWN_SINGLE_PENDING || local_state == MAM_I_SPAWN_SINGLE_COMPLETED) { // Single
92
      global_state = check_single_state(comm, child, local_state, wait_completed);
93

94
    } else if(local_state == MAM_I_SPAWN_PENDING || local_state == MAM_I_SPAWN_COMPLETED || local_state == MAM_I_SPAWN_ADAPTED) { // Generic
95
      global_state = check_generic_state(comm, local_state, wait_completed);
96

97
    } else if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE) {
98
99
      global_state = local_state;
      
100
    } else {
101
      printf("Error Check spawn: Configuracion invalida State = %d\n", local_state);
102
103
104
      MPI_Abort(MPI_COMM_WORLD, -1);
      return -10;
    }
105
106
  } else if(mall_conf->spawn_method == MAM_SPAWN_MERGE){ // Start Merge shrink Sync
    generic_spawn(child, MAM_I_DIST_COMPLETED);
107
108
    global_state = get_spawn_state(spawn_data->spawn_is_async);
  }
109
  if(global_state == MAM_I_SPAWN_COMPLETED || global_state == MAM_I_SPAWN_ADAPTED)
110
    deallocate_spawn_data();
111

112
113
114
  return global_state;
}

115
/*
116
 * Elimina la bandera bloqueante MAM_I_SPAWN_ADAPT_POSTPONE para los hilos 
117
118
119
120
121
122
123
124
125
 * auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no 
 * avance hasta que se complete la redistribucion de datos. Por tanto,
 * al modificar la bandera los hilos pueden continuar.
 *
 * Por seguridad se comprueba que no se realice el cambio a la bandera a 
 * no ser que se cumplan las 3 condiciones.
 */
void unset_spawn_postpone_flag(int outside_state) {
  int local_state = get_spawn_state(spawn_data->spawn_is_async);
126
127
  if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE && outside_state == MAM_I_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) { 
    set_spawn_state(MAM_I_SPAWN_PENDING, spawn_data->spawn_is_async);
128
    wakeup_redistribution();
129
130
131
  }
}

132
133
134
135
136
137
138
139
/*
 * Funcion bloqueante de los hijos para asegurar que todas las tareas del paso
 * de creacion de los hijos se terminan correctamente.
 *
 * Ademas los hijos obtienen informacion basica de los padres
 * para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
 *
 */
140
void malleability_connect_children(MPI_Comm *parents) {
141
142
  spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));

143
  MAM_Comm_main_structures(*parents, MAM_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
144
  spawn_data->initial_qty = mall->num_parents;
145
  spawn_data->target_qty = mall->numC;
iker_martin's avatar
iker_martin committed
146
147
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
148
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
149
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
150

iker_martin's avatar
iker_martin committed
151
  switch(mall_conf->spawn_method) {
152
    case MAM_SPAWN_BASELINE:
153
      spawn_data->spawn_qty = spawn_data->target_qty;
154
      baseline(*spawn_data, parents);
155
      if(!spawn_data->spawn_is_intercomm) {
156
        intracomm_strategy(MAM_TARGETS, parents);
157
      }
158
      break;
159
    case MAM_SPAWN_MERGE:
160
      spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
161
      merge(*spawn_data, parents, MAM_I_NOT_STARTED);
162
163
164
165
166
167
168
169
170
171
      break;
  }
  free(spawn_data);
}

//--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/*
 * Agrupa en una sola estructura todos los datos de configuración necesarios
 * e inicializa las estructuras necesarias.
 */
iker_martin's avatar
iker_martin committed
172
void set_spawn_configuration(MPI_Comm comm) {
173
174
  spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));

175
  spawn_data->total_spawns = 0;
iker_martin's avatar
iker_martin committed
176
177
178
  spawn_data->initial_qty = mall->numP;
  spawn_data->target_qty = mall->numC;
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single)); 
179
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
180
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
181
  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
182
  spawn_data->comm = comm;
183
  spawn_data->mapping_fill_method = MAM_PHY_TYPE_STRING;
184
  spawn_data->sets = NULL;
185

186
  switch(mall_conf->spawn_method) {
187
    case MAM_SPAWN_BASELINE:
188
189
      spawn_data->spawn_qty = spawn_data->target_qty;
      spawn_data->already_created = 0;
190
      break;
191
    case MAM_SPAWN_MERGE:
192
193
      spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
      spawn_data->already_created = spawn_data->initial_qty;
194
      break;
195
196
197
  }

  if(spawn_data->spawn_is_async) {
198
    init_spawn_state();
199
200
201
202
203
204
205
206
  }
}

/*
 * Libera una estructura de datos spawn_data
 * junto a la destrucion de aquellas estructuras que utiliza.
 */
void deallocate_spawn_data() {
207
208
  int i;
  MPI_Info *info;
209
210
  if(spawn_data == NULL) return;

211
212
213
214
215
216
217
218
219
220
221
  for(i=0; i<spawn_data->total_spawns; i++) {
    info = &(spawn_data->sets[i].mapping);
    if(*info != MPI_INFO_NULL) {
      MPI_Info_free(info);
      *info = MPI_INFO_NULL;
    }
  }

  if(spawn_data->sets != NULL) {
    free(spawn_data->sets);
    spawn_data->sets = NULL;
222
  }
223

224
  if(spawn_data->spawn_is_async) {
225
    free_spawn_state();
226
227
  }
  free(spawn_data); 
228
  spawn_data = NULL;
229
230
231
232
233
234
235
236
237
238
239
240
}


//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//

/*
 * Funcion generica para la creacion de procesos. Obtiene la configuracion
 * y segun esta, elige como deberian crearse los procesos.
 *
 * Cuando termina, modifica la variable global para indicar este cambio
 */
void generic_spawn(MPI_Comm *child, int data_stage) {
241
  int local_state, aux_state;
242
243

  // WORK
iker_martin's avatar
iker_martin committed
244
  if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
245
    processes_dist(spawn_data);
246
  }
247
  switch(mall_conf->spawn_method) {
248
    case MAM_SPAWN_BASELINE:
249
      local_state = baseline(*spawn_data, child);
250
      if(!spawn_data->spawn_is_intercomm) {
251
        local_state = intracomm_strategy(MAM_SOURCES, child);
252
      }
253
      break;
254
    case MAM_SPAWN_MERGE:
255
256
257
258
      local_state = merge(*spawn_data, child, data_stage);
      break;
  }
  // END WORK
259
  aux_state = get_spawn_state(spawn_data->spawn_is_async);
260
  if(!(aux_state == MAM_I_SPAWN_PENDING && local_state == MAM_I_SPAWN_ADAPT_POSTPONE)) {
261
262
    set_spawn_state(local_state, spawn_data->spawn_is_async);
  }
263
264
265
266
267
268
269
270
271
272
}


//--------------PRIVATE THREAD FUNCTIONS---------------//

/*
 * Aloja la memoria para un hilo auxiliar dedicado a la creacion de procesos.
 * No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
 * asociado al terminar.
 */
273
274
int allocate_thread_spawn(MPI_Comm *child) {
  if(pthread_create(&spawn_thread, NULL, thread_work, (void *) child)) {
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
    printf("Error al crear el hilo de SPAWN\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
  if(pthread_detach(spawn_thread)) {
    printf("Error when detaching spawning thread\n");
    MPI_Abort(MPI_COMM_WORLD, -1);
    return -1;
  }
  return 0;
}

/*
 * Funcion llamada por un hilo para que este se encarge
 * de configurar la creacion de un nuevo grupo de procesos.
 *
 * Una vez esta lista la configuracion y es posible crear los procesos
 * se avisa al hilo maestro.
 */
294
void* thread_work(void *args) {
295
  int local_state;
296
  MPI_Comm *child = (MPI_Comm *) args;
297
 
298
  generic_spawn(child, MAM_I_NOT_STARTED);
299

300
  local_state = get_spawn_state(spawn_data->spawn_is_async);
301
  if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE || local_state == MAM_I_SPAWN_PENDING) {
302
    // El grupo de procesos se terminara de juntar tras la redistribucion de datos
303

304
    local_state = wait_redistribution();
305
    generic_spawn(child, MAM_I_DIST_COMPLETED);
306
  }
307
  wakeup_completion();
308
309
310
311
312
313
314
315

  pthread_exit(NULL);
}

/*
 * Comprueba si una creacion de procesos asincrona en el
 * paso "single" ha terminado. 
 * Si no ha terminado se mantiene el estado 
316
 * "MAM_I_SPAWN_SINGLE_PENDING".
317
318
319
 *
 * Si ha terminado se crean los hilos auxiliares para 
 * los procesos no root y se devuelve el estado
320
 * "MAM_I_SPAWN_PENDING".
321
 */
322
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed) {
323
  while(wait_completed && mall->myId == mall->root && global_state == MAM_I_SPAWN_SINGLE_PENDING) {
324
325
    global_state = wait_completion();
  }
iker_martin's avatar
iker_martin committed
326
  MPI_Bcast(&global_state, 1, MPI_INT, mall->root, comm);
327
328
329

  // Non-root processes join root to finalize the spawn
  // They also must join if the application has ended its work
330
331
  if(global_state == MAM_I_SPAWN_SINGLE_COMPLETED) { 
    global_state = MAM_I_SPAWN_PENDING;
332
    set_spawn_state(global_state, spawn_data->spawn_is_async);
333

iker_martin's avatar
iker_martin committed
334
    if(mall->myId != mall->root) {
335
      allocate_thread_spawn(child);
336
337
338
339
340
341
342
343
344
    }
  }
  return global_state;
}

/*
 * Comprueba si una creación de procesos asincrona en el
 * paso "generic" ha terminado.
 * Si no ha terminado devuelve el estado 
345
 * "MAM_I_SPAWN_PENDING".
346
347
 *
 * Si ha terminado libera la memoria asociada a spawn_data
348
 * y devuelve el estado "MAM_I_SPAWN_COMPLETED".
349
 */
350
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed) {
351
352
  int global_state;

353
  while(wait_completed && local_state == MAM_I_SPAWN_PENDING) local_state = wait_completion();
354

355
  MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
356
  if(global_state == MAM_I_SPAWN_COMPLETED || global_state == MAM_I_SPAWN_ADAPTED) {
357
    set_spawn_state(global_state, spawn_data->spawn_is_async);
358
359
360
  }
  return global_state;
}