Baseline.c 11 KB
Newer Older
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
4
5
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
6
#include "Baseline.h"
7
#include "Strategy_Single.h"
8
#include "PortService.h"
9
10

//--------------PRIVATE DECLARATIONS---------------//
11
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child);
12
void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
13
14

void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
15
void multiple_strat_parents2(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
16
void multiple_strat_children(MPI_Comm *parents);
17
void multiple_strat_children2(MPI_Comm *parents, Spawn_ports *spawn_port);
18
19
//void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
//void single_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port);
20

21
22
23
24
25
26
27

//--------------PUBLIC FUNCTIONS---------------//
/*
 * Metodo basico para la creacion de procesos. Crea en total
 * spawn_data.spawn_qty procesos.
 */
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
28
  Spawn_ports spawn_port;
29
  MPI_Comm intercomm;
30
  MPI_Comm_get_parent(&intercomm); //FIXME May be a problem for third reconf or more with only expansions
31
  init_ports(&spawn_port);
32

33
  if (intercomm == MPI_COMM_NULL) { // Parents path
34
    baseline_parents(spawn_data, &spawn_port, child);
35
  } else { // Children path
36
37
    if(spawn_data.spawn_is_multiple) { multiple_strat_children2(child, &spawn_port); }
    if(spawn_data.spawn_is_single) { single_strat_children(child, &spawn_port); }
38
  }
39

40
  free_ports(&spawn_port);
41
  return MAM_I_SPAWN_COMPLETED;
42
43
44
}

//--------------PRIVATE FUNCTIONS---------------//
45
46
47
48
49
50

/*
 * Funcion utilizada por los padres para realizar la
 * creación de procesos.
 *
 */
51
void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child) {
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
  int i;
  MPI_Comm comm, *intercomms;

  if (spawn_data.spawn_is_single && mall->myId != mall->root) {
    single_strat_parents(spawn_data, child);
    return;
  }

  comm = spawn_data.spawn_is_single ? MPI_COMM_SELF : spawn_data.comm;
  MPI_Bcast(&spawn_data.total_spawns, 1, MPI_INT, mall->root, comm);
  intercomms = (MPI_Comm*) malloc(spawn_data.total_spawns * sizeof(MPI_Comm)); 
  if(mall->myId != mall->root) {
    spawn_data.sets = (Spawn_set *) malloc(spawn_data.total_spawns * sizeof(Spawn_set));
  }

67
68
69
  #if MAM_DEBUG >= 3
    DEBUG_FUNC("Starting spawning of processes", mall->myId, mall->numP); fflush(stdout);
  #endif
70
71
72
  for(i=0; i<spawn_data.total_spawns; i++) {
    baseline_spawn(spawn_data.sets[i], comm, &intercomms[i]);
  }
73
74
75
  #if MAM_DEBUG >= 3
    DEBUG_FUNC("Sources have created the new processes. Performing additional actions if required.", mall->myId, mall->numP); fflush(stdout);
  #endif
76
77

  // TODO Improvement - Deactivate Multiple spawn before spawning if total_spawns == 1
78
  if(spawn_data.spawn_is_multiple) { multiple_strat_parents2(spawn_data, spawn_port, comm, intercomms, child); }
79
80
81
82
83
84
85
86
  else { *child = intercomms[0]; } 

  if(spawn_data.spawn_is_single) { single_strat_parents(spawn_data, child); }

  free(intercomms);
  if(mall->myId != mall->root) { free(spawn_data.sets); }
}

87
/*
88
89
90
91
 * Funcion basica encargada de la creacion de procesos.
 * Crea un set de procesos segun la configuracion obtenida
 * en ProcessDist.c
 * Devuelve en "child" el intercomunicador que se conecta a los hijos.
92
 */
93
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child) {
94
  int rootBcast = MPI_PROC_NULL;
iker_martin's avatar
iker_martin committed
95
  if(mall->myId == mall->root) rootBcast = MPI_ROOT;
96

97
  int spawn_err = MPI_Comm_spawn(spawn_set.cmd, MPI_ARGV_NULL, spawn_set.spawn_qty, spawn_set.mapping, mall->root, comm, child, MPI_ERRCODES_IGNORE); 
98
99

  if(spawn_err != MPI_SUCCESS) {
100
    printf("Error creating new set of %d procs.\n", spawn_set.spawn_qty);
101
  }
102
  MAM_Comm_main_structures(*child, rootBcast);
103
104
105
106

  return spawn_err;
}

107
108
109
110
111
112
113

void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
  int i, tag;
  char *port_name, aux;

  if(mall->myId == mall->root) {
    port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
114
    tag = MAM_MPITAG_STRAT_MULTIPLE;
115
    MPI_Send(&spawn_data.total_spawns, 1, MPI_INT, MAM_ROOT, tag, intercomms[0]); 
116
117
    MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, tag, intercomms[0], MPI_STATUS_IGNORE);
    for(i=1; i<spawn_data.total_spawns; i++) {
118
      MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MAM_ROOT, tag+i, intercomms[i]);
119
      MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_MPITAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
120
121
122
123
124
    }
  } else { port_name = malloc(1); }

  MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, comm, child);
  for(i=0; i<spawn_data.total_spawns; i++) {
125
    MPI_Comm_disconnect(&intercomms[i]);
126
127
128
129
  }
  free(port_name);
}

130
131
132
133
134
135
136
137
138
139
140
141
void multiple_strat_parents2(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
  int i, rootBcast;
  int buffer[2];
  char aux;

  i = 0; 
  rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;

  buffer[0] = i;
  buffer[1] = spawn_data.total_spawns; 
  MPI_Bcast(buffer, 2, MPI_INT, rootBcast, intercomms[i]);
  if(mall->myId == mall->root) { 
142
      MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_MPITAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
143
144
145
146
147
148
  }

  for(i=1; i<spawn_data.total_spawns; i++) {
    buffer[0] = i;
    MPI_Bcast(buffer, 2, MPI_INT, rootBcast, intercomms[i]);
    if(mall->myId == mall->root) { 
149
      MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_MPITAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    }
  }

  // Reconnect with new children communicator
  if(mall->myId == mall->root) { discover_remote_port(0, spawn_port); }
  else { discover_remote_port(MAM_SERVICE_UNNEEDED, spawn_port); }
  MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, mall->root, comm, child);

  // Free unneeded spawn communicators
  for(i=0; i<spawn_data.total_spawns; i++) { MPI_Comm_disconnect(&intercomms[i]); }

  #if MAM_DEBUG >= 4
    DEBUG_FUNC("Additional spawn action - Multiple PA completed", mall->myId, mall->numP); fflush(stdout);
  #endif
}

166
167
168
169
170
171
172
173
174
175
176
177
178
void multiple_strat_children(MPI_Comm *parents) {
  int i, start, total_spawns, new_root;
  int rootBcast = MPI_PROC_NULL;
  char *port_name, aux;
  MPI_Status stat;
  MPI_Comm newintracomm, intercomm, parents_comm;

  new_root = 0;
  parents_comm = *parents;

  if(mall->myId == mall->root) {
    port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
    MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, parents_comm, &stat);
179
    if(stat.MPI_TAG == MAM_MPITAG_STRAT_MULTIPLE) {
180
181
182
183
184
185
186
187
188
      MPI_Recv(&total_spawns, 1, MPI_INT, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, MPI_STATUS_IGNORE);
      MPI_Open_port(MPI_INFO_NULL, port_name);
      MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm);
      start = 0;
      new_root = 1;
      rootBcast = MPI_ROOT;
    } else {
      MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, &stat);
      // The "+1" is because the first iteration is done before the loop
189
      start = stat.MPI_TAG - MAM_MPITAG_STRAT_MULTIPLE + 1;
190
191
192
193
194
195
196
197
    }
  } else { port_name = malloc(1); }

  MPI_Bcast(&start, 1, MPI_INT, mall->root, mall->comm);
  if(start) {
    MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
    MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
    MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
198
    MPI_Comm_disconnect(&intercomm);
199
200
201
202
203
204
205
206
207
  } else { 
    start = 1; 
    MPI_Comm_dup(mall->comm, &newintracomm);
    MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, mall->comm); // FIXME Seems inneficient - Should be performed by parent root?
  }

  for(i=start; i<total_spawns; i++) {
    MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
    MPI_Bcast(&total_spawns, 1, MPI_INT, rootBcast, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
208
    if(newintracomm != MPI_COMM_WORLD) MPI_Comm_disconnect(&newintracomm);
209
    MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
210
    MPI_Comm_disconnect(&intercomm);
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225

    if(new_root) {
      MPI_Send(&aux, 1, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm); // Ensures order in the created intracommunicator
    }
  }
  
  // Connect with parents  
  MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
  // Update communicator to expected one
  MAM_comms_update(newintracomm);
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);

  if(new_root) MPI_Close_port(port_name);
  free(port_name);
226
227
  MPI_Comm_disconnect(&newintracomm);
  MPI_Comm_disconnect(parents);
228
229
230
  *parents = intercomm;
}

231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
void multiple_strat_children2(MPI_Comm *parents, Spawn_ports *spawn_port) {
  int i, group_id, total_spawns, new_root;
  int buffer[2];
  char aux;
  MPI_Comm newintracomm, intercomm, parents_comm;

  #if MAM_DEBUG >= 4
    DEBUG_FUNC("Additional spawn action - Multiple CH started", mall->myId, mall->numP); fflush(stdout);
  #endif

  new_root = 0;
  parents_comm = *parents;

  MPI_Bcast(buffer, 2, MPI_INT, mall->root_parents, parents_comm);
  group_id = buffer[0];
  total_spawns = buffer[1];
  if(mall->myId == mall->root && !group_id) { new_root = 1;  }
  open_port(spawn_port, new_root, group_id);

  if(group_id) {
    if(mall->myId == mall->root) { discover_remote_port(0, spawn_port); }
    else { discover_remote_port(MAM_SERVICE_UNNEEDED, spawn_port); }

    MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
    MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
    MPI_Comm_disconnect(&intercomm);
  } else { // Root group of targets
    group_id = 1; 
    MPI_Comm_dup(mall->comm, &newintracomm);

    if(new_root) {
262
      MPI_Send(&aux, 1, MPI_CHAR, mall->root_parents, MAM_MPITAG_STRAT_MULTIPLE, parents_comm); // Ensures order in the created intracommunicator
263
264
265
266
267
268
269
270
271
272
    }
  }

  for(i=group_id; i<total_spawns; i++) {
    MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
    if(newintracomm != MPI_COMM_WORLD) MPI_Comm_disconnect(&newintracomm);
    MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
    MPI_Comm_disconnect(&intercomm);

    if(new_root) {
273
      MPI_Send(&aux, 1, MPI_CHAR, mall->root_parents, MAM_MPITAG_STRAT_MULTIPLE, parents_comm); // Ensures order in the created intracommunicator
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
    }
  }
  
  // Connect with sources
  MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
  // Update communicator to expected one
  MAM_comms_update(newintracomm);
  MPI_Comm_rank(mall->comm, &mall->myId);
  MPI_Comm_size(mall->comm, &mall->numP);

  MPI_Comm_disconnect(&newintracomm);
  MPI_Comm_disconnect(parents);
  *parents = intercomm;

  #if MAM_DEBUG >= 4
    DEBUG_FUNC("Additional spawn action - Multiple CH completed", mall->myId, mall->numP); fflush(stdout);
  #endif
291
}