ProcessDist.c 15.7 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
9
10
11
#include "ProcessDist.h"

//--------------PRIVATE DECLARATIONS---------------//

12
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
13
14
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
15

16
17
18
19
20
21
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);

void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
22
//--------------------------------SLURM USAGE-------------------------------------//
23
#if USE_MAL_SLURM
24
#include <slurm/slurm.h>
25
26
27
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
28
//@deprecated functions
29
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data); 
30
31
32
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
33
34
35
36
37

//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);

38
//--------------PUBLIC FUNCTIONS---------------//
39
40
41
42
43
44
45

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 */
46
void processes_dist(Spawn_data *spawn_data) {
47
48
  int used_nodes=0;
  int *procs_array;
49

50
  // GET NEW DISTRIBUTION 
51
52
  node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
  spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
53
#if USE_MAL_SLURM
54
  switch(spawn_data->mapping_fill_method) {
55
    case MALL_DIST_STRING:
56
57
58
59
60
61
//      if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
      if(spawn_data->spawn_is_multiple) {
        generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      } else {
        generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      }
62
      break;
63
64
    case MALL_DIST_HOSTFILE: // FIXME Does not consider multiple spawn strat
      generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
65
66
      break;
  }
67
#else
68
69
70
71
72
73
//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
  if(spawn_data->spawn_is_multiple) {
    generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  } else {
    generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  }
74
#endif
75
  free(procs_array);
76
77
78
}


79
80
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
81
82
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
83
84
 * cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
 * nodo y cuantas creaciones de procesos seran necesarias.
85
 *
86
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
87
88
89
90
91
92
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
93
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
94
  int i, *procs;
95
  procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
96
97

  /* GET NEW DISTRIBUTION  */
98
  switch(mall_conf->spawn_dist) {
99
    case MALL_DIST_SPREAD: // DIST NODES @deprecated
100
      spread_dist(spawn_data, used_nodes, procs);
101
102
      break;
    case MALL_DIST_COMPACT: // DIST CPUs
103
      compact_dist(spawn_data, used_nodes, procs);
104
105
106
107
      break;
  }

  //Copy results to output vector qty
108
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
109
110
111
112
113
114
115
116
117
118
119
120
121

//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
  if(spawn_data.spawn_is_multiple) {
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
      if(procs[i]) (*total_spawns)++;
    printf("procs[%d] = %d\n", i, procs[i]);
    }
  } else {
    *total_spawns = 1;
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
    }
122
123
124
125
126
127
128
129
130
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
131
132
133
134
 * Asume que los procesos que ya existen estan en los nodos mas bajos
 * con el mismo tamBl. //FIXME No deberia asumir el tamBl.
 *
 * FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
135
 */
136
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
137
  int i, tamBl;
138

139
  *used_nodes = mall->num_nodes;
140
141
142
  tamBl = spawn_data.target_qty / *used_nodes;
  i = spawn_data.already_created ? spawn_data.already_created / tamBl : 0;
  for(; i<*used_nodes; i++) {
143
144
145
146
147
148
149
150
151
152
153
154
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
155
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
156
157
158
  int i, asigCores;
  int tamBl, remainder;

159
  tamBl = mall->num_cpus;
160
161
162
  asigCores = spawn_data.already_created;
  i = *used_nodes = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
163
164
165
166

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
167
  if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
168
169
    procs[i] = tamBl - remainder;
    asigCores += procs[i];
170
    i = (i+1) % mall->num_nodes;
171
172
173
    (*used_nodes)++;
  }

174
  //Assign tamBl to each node
175
  while(asigCores+tamBl <= spawn_data.target_qty) {
176
177
    asigCores += tamBl;
    procs[i] += tamBl;
178
    i = (i+1) % mall->num_nodes;
179
180
181
182
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
183
184
  if(asigCores < spawn_data.target_qty) { 
    procs[i] += spawn_data.target_qty - asigCores;
185
186
    (*used_nodes)++;
  }
187
  if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes;  //FIXME Si ocurre esto no es un error?
188
189
}

190
191
192
193
194
195
196
197
198

//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
199
 *
200
 */
201
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
202
  char *host_str;
203

204
  fill_str_hosts(nodelist, procs_array, nodes, &host_str);
205
  // SET MAPPING
206
  set_mapping_host(spawn_data->spawn_qty, host_str, 0, spawn_data);
207
  free(host_str);
208
209
}

210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 *
 */
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
  char *host, *aux, *token, *hostlist_str;
  size_t i=0,j=0,len=0;

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
  while (token != NULL && i < nodes) {
    host = strdup(token);
    if (procs_array[i] != 0) {
      write_str_node(&hostlist_str, len, procs_array[i], host);
      set_mapping_host(procs_array[i], hostlist_str, j, spawn_data);
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
    token = strtok(NULL, ",");
  }
  free(aux);
  if(hostlist_str != NULL) { free(hostlist_str); }
}


//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//

/*
 * Anyade en la siguiente entrada de spawns la
 * distribucion fisica a utilizar con un par 
 * host/mapping y el total de procesos.
 */
void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data) {
  MPI_Info *info;

  spawn_data->sets[index].spawn_qty = qty;
  info = &(spawn_data->sets[index].mapping);
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", host);
}

259
260
261
262
/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
263
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
264
  char *host, *aux, *token;
265
  size_t i=0,len=0;
266
267
268
269

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
270
271
272
  while (token != NULL && i < used_nodes) {
    host = strdup(token);
    if (qty[i] != 0) {
273
      len = write_str_node(hostlist_str, len, qty[i], host);
274
275
276
    }
    i++;
    free(host);
277
    token = strtok(NULL, ",");
278
  }
279
  free(aux);
280
281
282
283
284
285
}

/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
286
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
287
  int err;
288
  char *ocurrence;
289
  size_t i, len, len_node;
290

291
292
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
293
294

  if(len_og == 0) { // Memoria no reservada
295
    *hostlist_str = (char *) malloc((len+1) * sizeof(char));
296
  } else { // Cadena ya tiene datos
297
    *hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
298
  }
299
  if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
300
301

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
302
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
303
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
304
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
305
306
307
308

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
309
    strcpy(*hostlist_str, node_name);
310
311
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
312
    strcat(*hostlist_str, ocurrence);
313
314
315
316
317
318
319
  }

  
  free(ocurrence);
  return len+len_og;
}

320
321
322
323
324
325
326
327
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 * Es necesario usar Slurm para usarlo.
 */
328
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
329
  char *hoststring;
330
331

  // CREATE AND SET STRING HOSTS
332
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
333
  set_mapping_host(spawn_data->spawn_qty, hoststring, 0, spawn_data);
334
335
336
  free(hoststring);
}

337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
/*
 * Crea y devuelve un conjunto de objetos MPI_Info con 
 * un par host/mapping en el que se indica el mappeado 
 * a utilizar en los nuevos procesos dividido por nodos.
 * Es necesario Slurm para usarlo.
 */
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
  char *host, *hostlist_str;
  size_t i=0,j=0,len=0;
  hostlist_t hostlist;
  
  hostlist_str = NULL;
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      write_str_node(&hostlist_str, len, qty[i], host);
      set_mapping_host(qty[i], hostlist_str, j, spawn_data);
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
  if(hostlist_str != NULL) { free(hostlist_str); }
}

364
365
366
367
368

/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
369
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
370
371
372
373
374
375
376
  char *host;
  size_t i=0,len=0;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
377
      len = write_str_node(hostlist_str, len, qty[i], host);
378
379
380
381
382
383
384
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

385
386
387
388
389
390
391
392
393
394
395
//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================

/* FIXME Por revisar
 * @deprecated
 * Genera un fichero hostfile y lo anyade a un objeto
 * MPI_Info para ser utilizado.
 */
396
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data){
397
398
    char *hostfile;
    int ptr;
399
400
401
402
    MPI_Info *info;

    spawn_data->sets[0].spawn_qty = spawn_data->spawn_qty;
    info = &(spawn_data->sets[0].mapping);
403
404
405
406
407
408
409
410

    // CREATE/UPDATE HOSTFILE 
    ptr = create_hostfile(&hostfile);
    MPI_Info_create(info);
    MPI_Info_set(*info, "hostfile", hostfile);
    free(hostfile);

    // SET NEW DISTRIBUTION 
411
    fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
    close(ptr);
}

/*
 * @deprecated
 * Crea un fichero que se utilizara como hostfile
 * para un nuevo grupo de procesos. 
 *
 * El nombre es devuelto en el argumento "file_name",
 * que tiene que ser un puntero vacio.
 *
 * Ademas se devuelve un descriptor de fichero para 
 * modificar el fichero.
 */
int create_hostfile(char **file_name) {
427
428
  int ptr, err;
  size_t len = 11; //FIXME Numero mágico
429
430

  *file_name = NULL;
431
  *file_name = malloc(len * sizeof(char));
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
  if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
  err = snprintf(*file_name, len, "hostfile.o");
  if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

  ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if(ptr < 0) return -3; // No ha sido posible crear el fichero

  return ptr; // Devolver puntero a fichero
}

/*
 * @deprecated
 * Rellena un fichero hostfile indicado por ptr con los nombres
 * de los nodos a utilizar indicados por "job_record" y la cantidad 
 * de procesos que alojara cada nodo indicado por "qty".
 */
448
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
  int i=0;
  char *host;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
    write_hostfile_node(ptr, qty[i], host);
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

/*
 * @deprecated
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int ptr, int qty, char *node_name) {
470
  int err;
471
  char *line;
472
  size_t len, len_node, len_int;
473
474

  len_node = strlen(node_name);
475
476
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
477
  len_int = err;
478
479
480

  len = len_node + len_int + 3;
  line = malloc(len * sizeof(char));
481
  if(line == NULL) return -2; // No ha sido posible alojar la memoria
482
483
  err = snprintf(line, len, "%s:%d\n", node_name, qty);

484
  if(err < 0) return -3; // No ha sido posible escribir en el fichero
485
486
487
488
489
490

  write(ptr, line, len-1);
  free(line);

  return 0;
}
491
492
#endif
//--------------------------------SLURM USAGE-------------------------------------//