Baseline.c 12.9 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
4
5
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
6
#include "Baseline.h"
7
#include "Spawn_state.h"
8
#include "PortService.h"
9

10
#define MAM_TAG_STRAT_SINGLE 130
11
#define MAM_TAG_STRAT_MULTIPLE 131
12
13
14
#define MAM_TAG_STRAT_MULTIPLE_FIRST 131
#define MAM_TAG_STRAT_MULTIPLE_OTHER 132

15
//--------------PRIVATE DECLARATIONS---------------//
16
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child);
17
void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
18
19

void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
20
void multiple_strat_parents2(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
21
void multiple_strat_children(MPI_Comm *parents);
22
void multiple_strat_children2(MPI_Comm *parents, Spawn_ports *spawn_port);
23
void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
24
void single_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port);
25

26
27
28
29
30
31
32

//--------------PUBLIC FUNCTIONS---------------//
/*
 * Metodo basico para la creacion de procesos. Crea en total
 * spawn_data.spawn_qty procesos.
 */
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
33
  Spawn_ports spawn_port;
34
  MPI_Comm intercomm;
35
  MPI_Comm_get_parent(&intercomm); //FIXME May be a problem for third reconf or more with only expansions
36
  init_ports(&spawn_port);
37

38
  if (intercomm == MPI_COMM_NULL) { // Parents path
39
    baseline_parents(spawn_data, &spawn_port, child);
40
  } else { // Children path
41
42
    if(spawn_data.spawn_is_multiple) { multiple_strat_children2(child, &spawn_port); }
    if(spawn_data.spawn_is_single) { single_strat_children(child, &spawn_port); }
43
  }
44

45
  free_ports(&spawn_port);
46
  return MAM_I_SPAWN_COMPLETED;
47
48
49
}

//--------------PRIVATE FUNCTIONS---------------//
50
51
52
53
54
55

/*
 * Funcion utilizada por los padres para realizar la
 * creación de procesos.
 *
 */
56
void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child) {
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  int i;
  MPI_Comm comm, *intercomms;

  if (spawn_data.spawn_is_single && mall->myId != mall->root) {
    single_strat_parents(spawn_data, child);
    return;
  }

  comm = spawn_data.spawn_is_single ? MPI_COMM_SELF : spawn_data.comm;
  MPI_Bcast(&spawn_data.total_spawns, 1, MPI_INT, mall->root, comm);
  intercomms = (MPI_Comm*) malloc(spawn_data.total_spawns * sizeof(MPI_Comm)); 
  if(mall->myId != mall->root) {
    spawn_data.sets = (Spawn_set *) malloc(spawn_data.total_spawns * sizeof(Spawn_set));
  }

72
73
74
  #if MAM_DEBUG >= 3
    DEBUG_FUNC("Starting spawning of processes", mall->myId, mall->numP); fflush(stdout);
  #endif
75
76
77
  for(i=0; i<spawn_data.total_spawns; i++) {
    baseline_spawn(spawn_data.sets[i], comm, &intercomms[i]);
  }
78
79
80
  #if MAM_DEBUG >= 3
    DEBUG_FUNC("Sources have created the new processes. Performing additional actions if required.", mall->myId, mall->numP); fflush(stdout);
  #endif
81
82

  // TODO Improvement - Deactivate Multiple spawn before spawning if total_spawns == 1
83
  if(spawn_data.spawn_is_multiple) { multiple_strat_parents2(spawn_data, spawn_port, comm, intercomms, child); }
84
85
86
87
88
89
90
91
  else { *child = intercomms[0]; } 

  if(spawn_data.spawn_is_single) { single_strat_parents(spawn_data, child); }

  free(intercomms);
  if(mall->myId != mall->root) { free(spawn_data.sets); }
}

92
/*
93
94
95
96
 * Funcion basica encargada de la creacion de procesos.
 * Crea un set de procesos segun la configuracion obtenida
 * en ProcessDist.c
 * Devuelve en "child" el intercomunicador que se conecta a los hijos.
97
 */
98
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child) {
99
  int rootBcast = MPI_PROC_NULL;
iker_martin's avatar
iker_martin committed
100
  if(mall->myId == mall->root) rootBcast = MPI_ROOT;
101

102
  int spawn_err = MPI_Comm_spawn(spawn_set.cmd, MPI_ARGV_NULL, spawn_set.spawn_qty, spawn_set.mapping, mall->root, comm, child, MPI_ERRCODES_IGNORE); 
103
104

  if(spawn_err != MPI_SUCCESS) {
105
    printf("Error creating new set of %d procs.\n", spawn_set.spawn_qty);
106
  }
107
  MAM_Comm_main_structures(*child, rootBcast);
108
109
110
111

  return spawn_err;
}

112
113
114
115
116
117
118
119

void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
  int i, tag;
  char *port_name, aux;

  if(mall->myId == mall->root) {
    port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
    tag = MAM_TAG_STRAT_MULTIPLE_FIRST;
120
    MPI_Send(&spawn_data.total_spawns, 1, MPI_INT, MAM_ROOT, tag, intercomms[0]); 
121
122
    MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, tag, intercomms[0], MPI_STATUS_IGNORE);
    for(i=1; i<spawn_data.total_spawns; i++) {
123
      MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MAM_ROOT, tag+i, intercomms[i]);
124
125
126
127
128
129
      MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_MULTIPLE_FIRST, intercomms[0], MPI_STATUS_IGNORE);
    }
  } else { port_name = malloc(1); }

  MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, comm, child);
  for(i=0; i<spawn_data.total_spawns; i++) {
130
    MPI_Comm_disconnect(&intercomms[i]);
131
132
133
134
  }
  free(port_name);
}

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
void multiple_strat_parents2(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
  int i, rootBcast;
  int buffer[2];
  char aux;

  i = 0; 
  rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;

  buffer[0] = i;
  buffer[1] = spawn_data.total_spawns; 
  MPI_Bcast(buffer, 2, MPI_INT, rootBcast, intercomms[i]);
  if(mall->myId == mall->root) { 
      MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
  }

  for(i=1; i<spawn_data.total_spawns; i++) {
    buffer[0] = i;
    MPI_Bcast(buffer, 2, MPI_INT, rootBcast, intercomms[i]);
    if(mall->myId == mall->root) { 
      MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
    }
  }

  // Reconnect with new children communicator
  if(mall->myId == mall->root) { discover_remote_port(0, spawn_port); }
  else { discover_remote_port(MAM_SERVICE_UNNEEDED, spawn_port); }
  MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, mall->root, comm, child);

  // Free unneeded spawn communicators
  for(i=0; i<spawn_data.total_spawns; i++) { MPI_Comm_disconnect(&intercomms[i]); }

  #if MAM_DEBUG >= 4
    DEBUG_FUNC("Additional spawn action - Multiple PA completed", mall->myId, mall->numP); fflush(stdout);
  #endif
}

171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
void multiple_strat_children(MPI_Comm *parents) {
  int i, start, total_spawns, new_root;
  int rootBcast = MPI_PROC_NULL;
  char *port_name, aux;
  MPI_Status stat;
  MPI_Comm newintracomm, intercomm, parents_comm;

  new_root = 0;
  parents_comm = *parents;

  if(mall->myId == mall->root) {
    port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
    MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, parents_comm, &stat);
    if(stat.MPI_TAG == MAM_TAG_STRAT_MULTIPLE_FIRST) {
      MPI_Recv(&total_spawns, 1, MPI_INT, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, MPI_STATUS_IGNORE);
      MPI_Open_port(MPI_INFO_NULL, port_name);
      MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm);
      start = 0;
      new_root = 1;
      rootBcast = MPI_ROOT;
    } else {
      MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, &stat);
      // The "+1" is because the first iteration is done before the loop
      start = stat.MPI_TAG - MAM_TAG_STRAT_MULTIPLE_FIRST + 1;
    }
  } else { port_name = malloc(1); }

  MPI_Bcast(&start, 1, MPI_INT, mall->root, mall->comm);
  if(start) {
    MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
    MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
    MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
203
    MPI_Comm_disconnect(&intercomm);
204
205
206
207
208
209
210
211
212
  } else { 
    start = 1; 
    MPI_Comm_dup(mall->comm, &newintracomm);
    MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, mall->comm); // FIXME Seems inneficient - Should be performed by parent root?
  }

  for(i=start; i<total_spawns; i++) {
    MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
    MPI_Bcast(&total_spawns, 1, MPI_INT, rootBcast, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
213
    if(newintracomm != MPI_COMM_WORLD) MPI_Comm_disconnect(&newintracomm);
214
    MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
215
    MPI_Comm_disconnect(&intercomm);
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230

    if(new_root) {
      MPI_Send(&aux, 1, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm); // Ensures order in the created intracommunicator
    }
  }
  
  // Connect with parents  
  MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
  // Update communicator to expected one
  MAM_comms_update(newintracomm);
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);

  if(new_root) MPI_Close_port(port_name);
  free(port_name);
231
232
  MPI_Comm_disconnect(&newintracomm);
  MPI_Comm_disconnect(parents);
233
234
235
  *parents = intercomm;
}

236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
void multiple_strat_children2(MPI_Comm *parents, Spawn_ports *spawn_port) {
  int i, group_id, total_spawns, new_root;
  int buffer[2];
  char aux;
  MPI_Comm newintracomm, intercomm, parents_comm;

  #if MAM_DEBUG >= 4
    DEBUG_FUNC("Additional spawn action - Multiple CH started", mall->myId, mall->numP); fflush(stdout);
  #endif

  new_root = 0;
  parents_comm = *parents;

  MPI_Bcast(buffer, 2, MPI_INT, mall->root_parents, parents_comm);
  group_id = buffer[0];
  total_spawns = buffer[1];
  if(mall->myId == mall->root && !group_id) { new_root = 1;  }
  open_port(spawn_port, new_root, group_id);

  if(group_id) {
    if(mall->myId == mall->root) { discover_remote_port(0, spawn_port); }
    else { discover_remote_port(MAM_SERVICE_UNNEEDED, spawn_port); }

    MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
    MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
    MPI_Comm_disconnect(&intercomm);
  } else { // Root group of targets
    group_id = 1; 
    MPI_Comm_dup(mall->comm, &newintracomm);

    if(new_root) {
      MPI_Send(&aux, 1, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_MULTIPLE, parents_comm); // Ensures order in the created intracommunicator
    }
  }

  for(i=group_id; i<total_spawns; i++) {
    MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
    if(newintracomm != MPI_COMM_WORLD) MPI_Comm_disconnect(&newintracomm);
    MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
    MPI_Comm_disconnect(&intercomm);

    if(new_root) {
      MPI_Send(&aux, 1, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_MULTIPLE, parents_comm); // Ensures order in the created intracommunicator
    }
  }
  
  // Connect with sources
  MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
  // Update communicator to expected one
  MAM_comms_update(newintracomm);
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);

  MPI_Comm_disconnect(&newintracomm);
  MPI_Comm_disconnect(parents);
  *parents = intercomm;

  #if MAM_DEBUG >= 4
    DEBUG_FUNC("Additional spawn action - Multiple CH completed", mall->myId, mall->numP); fflush(stdout);
  #endif
}

298
299
300
301
/* 
 * Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
 * Si el valor es diferente, la creación es solo con la participación del proceso root
 */
302
void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
303
304
305
  char *port_name;
  MPI_Comm newintercomm;

iker_martin's avatar
iker_martin committed
306
  if (mall->myId == mall->root) {
307
    port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
308
    MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_SINGLE, *child, MPI_STATUS_IGNORE);
309

310
    set_spawn_state(MAM_I_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
311
    wakeup_completion();
312
313
314
315
  } else {
    port_name = malloc(1);
  }

iker_martin's avatar
iker_martin committed
316
  MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, spawn_data.comm, &newintercomm);
317

iker_martin's avatar
iker_martin committed
318
  if(mall->myId == mall->root)
319
    MPI_Comm_disconnect(child);
320
321
  free(port_name);
  *child = newintercomm;
322
323
324
325

  #if MAM_DEBUG >= 4
    DEBUG_FUNC("Additional spawn action - Single PA completed", mall->myId, mall->numP); fflush(stdout);
  #endif
326
327
328
329
330
331
332
333
334
}

/*
 * Conectar grupo de hijos con grupo de padres
 * Devuelve un intercomunicador para hablar con los padres
 *
 * Solo se utiliza cuando la creación de los procesos ha sido
 * realizada por un solo proceso padre
 */
335
void single_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port) {
336
  MPI_Comm newintercomm;
337
  int is_root = mall->myId == mall->root ? 1 : 0;
338

339
  open_port(spawn_port, is_root, MAM_SERVICE_UNNEEDED);
iker_martin's avatar
iker_martin committed
340
  if(mall->myId == mall->root) {
341
    MPI_Send(spawn_port->port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_SINGLE, *parents);
342
343
  }

344
  MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, mall->comm, &newintercomm);
345
  MPI_Comm_disconnect(parents);
346
347
  *parents = newintercomm;
}