ProcessDist.c 12 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
5
6
7
8
9
10
11
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <mpi.h>
#include "ProcessDist.h"

//--------------PRIVATE DECLARATIONS---------------//

12
13
14
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes);
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
iker_martin's avatar
iker_martin committed
15

16
17
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str);
iker_martin's avatar
iker_martin committed
18
//--------------------------------SLURM USAGE-------------------------------------//
19
#if USE_MAL_SLURM
iker_martin's avatar
iker_martin committed
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <slurm/slurm.h>
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str);
//@deprecated functions
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info);
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//

int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name);
//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);

//--------------PUBLIC FUNCTIONS---------------//

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 * OUT parameters -->
 * info_spawn: Objeto MPI_Info en el que se indica el mappeado
 *   a usar al crear los procesos.
 */
45
void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn) {
iker_martin's avatar
iker_martin committed
46
47
  int used_nodes=0;
  int *procs_array;
48

iker_martin's avatar
iker_martin committed
49
  // GET NEW DISTRIBUTION 
50
51
52
  node_dist(spawn_data, &procs_array, &used_nodes);
#if USE_MAL_SLURM
  switch(spawn_data.mapping_fill_method) {
iker_martin's avatar
iker_martin committed
53
    case MALL_DIST_STRING:
54
      generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
iker_martin's avatar
iker_martin committed
55
56
      break;
    case MALL_DIST_HOSTFILE:
57
      generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
iker_martin's avatar
iker_martin committed
58
59
60
61
      break;
  }
  free(procs_array);
#else
62
  generate_info_string(mall->nodelist, procs_array, used_nodes, info_spawn);
iker_martin's avatar
iker_martin committed
63
64
65
66
67
68
69
70
71
72
73
#endif
}


//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
 * cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
 * nodo.
 *
74
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
iker_martin's avatar
iker_martin committed
75
76
77
78
79
80
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
81
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes) {
iker_martin's avatar
iker_martin committed
82
83
  int i, *procs;

84
  procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
iker_martin's avatar
iker_martin committed
85
86

  /* GET NEW DISTRIBUTION  */
87
  switch(mall_conf->spawn_dist) {
iker_martin's avatar
iker_martin committed
88
    case MALL_DIST_SPREAD: // DIST NODES @deprecated
89
      spread_dist(spawn_data, used_nodes, procs);
iker_martin's avatar
iker_martin committed
90
91
      break;
    case MALL_DIST_COMPACT: // DIST CPUs
92
      compact_dist(spawn_data, used_nodes, procs);
iker_martin's avatar
iker_martin committed
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
      break;
  }

  //Copy results to output vector qty
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
  for(i=0; i< *used_nodes; i++) {
    (*qty)[i] = procs[i];
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
109
 * FIXME Tener en cuenta procesos ya creados (already_created)
iker_martin's avatar
iker_martin committed
110
 */
111
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
iker_martin's avatar
iker_martin committed
112
113
  int i, tamBl, remainder;

114
115
116
  *used_nodes = mall->num_nodes;
  tamBl = spawn_data.target_qty / mall->num_nodes;
  remainder = spawn_data.target_qty % mall->num_nodes;
iker_martin's avatar
iker_martin committed
117
118
119
  for(i=0; i<remainder; i++) {
    procs[i] = tamBl + 1; 
  }
120
  for(i=remainder; i<mall->num_nodes; i++) {
iker_martin's avatar
iker_martin committed
121
122
123
124
125
126
127
128
129
130
131
132
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
133
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
iker_martin's avatar
iker_martin committed
134
135
136
  int i, asigCores;
  int tamBl, remainder;

137
138
139
140
  tamBl = mall->num_cpus / mall->num_nodes;
  asigCores = spawn_data.already_created;
  i = *used_nodes = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
iker_martin's avatar
iker_martin committed
141
142
143
144
145
146
147

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
  if (remainder) {
    procs[i] = tamBl - remainder;
    asigCores += procs[i];
148
    i = (i+1) % mall->num_nodes;
iker_martin's avatar
iker_martin committed
149
150
151
152
    (*used_nodes)++;
  }

  //Assign tamBl to each node
153
  while(asigCores+tamBl <= spawn_data.target_qty) {
iker_martin's avatar
iker_martin committed
154
155
    asigCores += tamBl;
    procs[i] += tamBl;
156
    i = (i+1) % mall->num_nodes;
iker_martin's avatar
iker_martin committed
157
158
159
160
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
161
162
  if(asigCores < spawn_data.target_qty) { 
    procs[i] += spawn_data.target_qty - asigCores;
iker_martin's avatar
iker_martin committed
163
164
    (*used_nodes)++;
  }
165
  if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes;  //FIXME Si ocurre esto no es un error?
iker_martin's avatar
iker_martin committed
166
167
168
169
170
171
172
173
174
175
176
}


//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
177
 *
iker_martin's avatar
iker_martin committed
178
 */
179
180
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
  char *host_str;
iker_martin's avatar
iker_martin committed
181

182
  fill_str_hosts(nodelist, procs_array, nodes, &host_str);
iker_martin's avatar
iker_martin committed
183
184
  // SET MAPPING
  MPI_Info_create(info);
185
186
  MPI_Info_set(*info, "hosts", mall->nodelist);
  free(host_str);
iker_martin's avatar
iker_martin committed
187
188
189
190
191
192
}

/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
193
194
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str) {
  char *host, *aux, *token;
iker_martin's avatar
iker_martin committed
195
  size_t i=0,len=0;
196
197
198
199
200
201
202
203

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
  while (token != NULL && i < used_nodes) {
    host = strdup(token);
    if (qty[i] != 0) {
      len = write_str_node(host_str, len, qty[i], host);
iker_martin's avatar
iker_martin committed
204
205
206
    }
    i++;
    free(host);
207
    token = strtok(NULL, ",");
iker_martin's avatar
iker_martin committed
208
  }
209
  free(aux);
iker_martin's avatar
iker_martin committed
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
}

/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name) {
  int err;
  char *ocurrence;
  size_t i, len, len_node;

  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used

  if(len_og == 0) { // Memoria no reservada
    *hostfile_str = (char *) malloc((len+1) * sizeof(char));
  } else { // Cadena ya tiene datos
    *hostfile_str = (char *) realloc(*hostfile_str, (len_og + len + 1) * sizeof(char));
  }
  if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
    strcpy(*hostfile_str, node_name);
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
    strcat(*hostfile_str, ocurrence);
  }

  
  free(ocurrence);
  return len+len_og;
}

250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 * Es necesario usar Slurm para usarlo.
 */
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
  // CREATE AND SET STRING HOSTS
  char *hoststring;
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", hoststring);
  free(hoststring);
}


/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) {
  char *host;
  size_t i=0,len=0;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      len = write_str_node(hostfile_str, len, qty[i], host);
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

iker_martin's avatar
iker_martin committed
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================

/* FIXME Por revisar
 * @deprecated
 * Genera un fichero hostfile y lo anyade a un objeto
 * MPI_Info para ser utilizado.
 */
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info){
    char *hostfile;
    int ptr;

    // CREATE/UPDATE HOSTFILE 
    ptr = create_hostfile(&hostfile);
    MPI_Info_create(info);
    MPI_Info_set(*info, "hostfile", hostfile);
    free(hostfile);

    // SET NEW DISTRIBUTION 
    fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
    close(ptr);
}

/*
 * @deprecated
 * Crea un fichero que se utilizara como hostfile
 * para un nuevo grupo de procesos. 
 *
 * El nombre es devuelto en el argumento "file_name",
 * que tiene que ser un puntero vacio.
 *
 * Ademas se devuelve un descriptor de fichero para 
 * modificar el fichero.
 */
int create_hostfile(char **file_name) {
  int ptr, err;
  size_t len = 11; //FIXME Numero mágico

  *file_name = NULL;
  *file_name = malloc(len * sizeof(char));
  if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
  err = snprintf(*file_name, len, "hostfile.o");
  if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

  ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if(ptr < 0) return -3; // No ha sido posible crear el fichero

  return ptr; // Devolver puntero a fichero
}

/*
 * @deprecated
 * Rellena un fichero hostfile indicado por ptr con los nombres
 * de los nodos a utilizar indicados por "job_record" y la cantidad 
 * de procesos que alojara cada nodo indicado por "qty".
 */
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
  int i=0;
  char *host;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
    write_hostfile_node(ptr, qty[i], host);
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

/*
 * @deprecated
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int ptr, int qty, char *node_name) {
  int err;
  char *line;
  size_t len, len_node, len_int;

  len_node = strlen(node_name);
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
  len_int = err;

  len = len_node + len_int + 3;
  line = malloc(len * sizeof(char));
  if(line == NULL) return -2; // No ha sido posible alojar la memoria
  err = snprintf(line, len, "%s:%d\n", node_name, qty);

  if(err < 0) return -3; // No ha sido posible escribir en el fichero

  write(ptr, line, len-1);
  free(line);

  return 0;
}
#endif
//--------------------------------SLURM USAGE-------------------------------------//