ProcessDist.c 15.7 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
9
10
11
#include "ProcessDist.h"

//--------------PRIVATE DECLARATIONS---------------//

12
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
13
14
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
15

16
17
18
19
20
21
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);

void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
22
//--------------------------------SLURM USAGE-------------------------------------//
23
#if USE_MAL_SLURM
24
#include <slurm/slurm.h>
25
26
27
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
28
//@deprecated functions
29
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data); 
30
31
32
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
33
34
35
36
37

//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);

38
//--------------PUBLIC FUNCTIONS---------------//
39
40
41
42
43
44
45

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 */
46
void processes_dist(Spawn_data *spawn_data) {
47
48
  int used_nodes=0;
  int *procs_array;
49

50
  // GET NEW DISTRIBUTION 
51
52
  node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
  spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
53
#if USE_MAL_SLURM
54
  switch(spawn_data->mapping_fill_method) {
55
    case MALL_DIST_STRING:
56
57
58
59
60
61
//      if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
      if(spawn_data->spawn_is_multiple) {
        generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      } else {
        generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      }
62
      break;
63
64
    case MALL_DIST_HOSTFILE: // FIXME Does not consider multiple spawn strat
      generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
65
66
      break;
  }
67
#else
68
69
70
71
72
73
//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
  if(spawn_data->spawn_is_multiple) {
    generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  } else {
    generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  }
74
#endif
75
  free(procs_array);
76
77
78
}


79
80
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
81
82
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
83
84
 * cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
 * nodo y cuantas creaciones de procesos seran necesarias.
85
 *
86
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
87
88
89
90
91
92
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
93
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
94
  int i, *procs;
95
  procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
96
97

  /* GET NEW DISTRIBUTION  */
98
  switch(mall_conf->spawn_dist) {
99
    case MALL_DIST_SPREAD: // DIST NODES @deprecated
100
      spread_dist(spawn_data, used_nodes, procs);
101
102
      break;
    case MALL_DIST_COMPACT: // DIST CPUs
103
      compact_dist(spawn_data, used_nodes, procs);
104
105
106
107
      break;
  }

  //Copy results to output vector qty
108
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
109
110
111
112
113
114
115
116
117
118
119
120

//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
  if(spawn_data.spawn_is_multiple) {
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
      if(procs[i]) (*total_spawns)++;
    }
  } else {
    *total_spawns = 1;
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
    }
121
122
123
124
125
126
127
128
129
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
130
131
132
133
 * Asume que los procesos que ya existen estan en los nodos mas bajos
 * con el mismo tamBl. //FIXME No deberia asumir el tamBl.
 *
 * FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
134
 */
135
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
136
  int i, tamBl, remainder;
137

138
  *used_nodes = mall->num_nodes;
139
  tamBl = spawn_data.target_qty / *used_nodes;
140
141
142
143
144
  i = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
  if(remainder) {
    procs[i++] = tamBl - remainder;
  }
145
  for(; i<*used_nodes; i++) {
146
147
148
149
150
151
152
153
154
155
156
157
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
158
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
159
160
161
  int i, asigCores;
  int tamBl, remainder;

162
  tamBl = mall->num_cpus;
163
164
165
  asigCores = spawn_data.already_created;
  i = *used_nodes = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
166
167
168
169

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
170
  if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
171
172
    procs[i] = tamBl - remainder;
    asigCores += procs[i];
173
    i = (i+1) % mall->num_nodes;
174
175
176
    (*used_nodes)++;
  }

177
  //Assign tamBl to each node
178
  while(asigCores+tamBl <= spawn_data.target_qty) {
179
180
    asigCores += tamBl;
    procs[i] += tamBl;
181
    i = (i+1) % mall->num_nodes;
182
183
184
185
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
186
187
  if(asigCores < spawn_data.target_qty) { 
    procs[i] += spawn_data.target_qty - asigCores;
188
189
    (*used_nodes)++;
  }
190
  if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes;  //FIXME Si ocurre esto no es un error?
191
192
}

193
194
195
196
197
198
199
200
201

//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
202
 *
203
 */
204
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
205
  char *host_str;
206

207
  fill_str_hosts(nodelist, procs_array, nodes, &host_str);
208
  // SET MAPPING
209
  set_mapping_host(spawn_data->spawn_qty, host_str, 0, spawn_data);
210
  free(host_str);
211
212
}

213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 *
 */
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
  char *host, *aux, *token, *hostlist_str;
  size_t i=0,j=0,len=0;

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
  while (token != NULL && i < nodes) {
    host = strdup(token);
    if (procs_array[i] != 0) {
      write_str_node(&hostlist_str, len, procs_array[i], host);
      set_mapping_host(procs_array[i], hostlist_str, j, spawn_data);
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
    token = strtok(NULL, ",");
  }
  free(aux);
  if(hostlist_str != NULL) { free(hostlist_str); }
}


//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//

/*
 * Anyade en la siguiente entrada de spawns la
 * distribucion fisica a utilizar con un par 
 * host/mapping y el total de procesos.
 */
void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data) {
  MPI_Info *info;

  spawn_data->sets[index].spawn_qty = qty;
  info = &(spawn_data->sets[index].mapping);
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", host);
}

262
263
264
265
/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
266
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
267
  char *host, *aux, *token;
268
  size_t i=0,len=0;
269
270
271
272

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
273
274
275
  while (token != NULL && i < used_nodes) {
    host = strdup(token);
    if (qty[i] != 0) {
276
      len = write_str_node(hostlist_str, len, qty[i], host);
277
278
279
    }
    i++;
    free(host);
280
    token = strtok(NULL, ",");
281
  }
282
  free(aux);
283
284
285
286
287
288
}

/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
289
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
290
  int err;
291
  char *ocurrence;
292
  size_t i, len, len_node;
293

294
295
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
296
297

  if(len_og == 0) { // Memoria no reservada
298
    *hostlist_str = (char *) malloc((len+1) * sizeof(char));
299
  } else { // Cadena ya tiene datos
300
    *hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
301
  }
302
  if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
303
304

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
305
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
306
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
307
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
308
309
310
311

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
312
    strcpy(*hostlist_str, node_name);
313
314
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
315
    strcat(*hostlist_str, ocurrence);
316
317
318
319
320
321
322
  }

  
  free(ocurrence);
  return len+len_og;
}

323
324
325
326
327
328
329
330
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 * Es necesario usar Slurm para usarlo.
 */
331
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
332
  char *hoststring;
333
334

  // CREATE AND SET STRING HOSTS
335
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
336
  set_mapping_host(spawn_data->spawn_qty, hoststring, 0, spawn_data);
337
338
339
  free(hoststring);
}

340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
/*
 * Crea y devuelve un conjunto de objetos MPI_Info con 
 * un par host/mapping en el que se indica el mappeado 
 * a utilizar en los nuevos procesos dividido por nodos.
 * Es necesario Slurm para usarlo.
 */
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
  char *host, *hostlist_str;
  size_t i=0,j=0,len=0;
  hostlist_t hostlist;
  
  hostlist_str = NULL;
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      write_str_node(&hostlist_str, len, qty[i], host);
      set_mapping_host(qty[i], hostlist_str, j, spawn_data);
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
  if(hostlist_str != NULL) { free(hostlist_str); }
}

367
368
369
370
371

/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
372
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
373
374
375
376
377
378
379
  char *host;
  size_t i=0,len=0;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
380
      len = write_str_node(hostlist_str, len, qty[i], host);
381
382
383
384
385
386
387
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

388
389
390
391
392
393
394
395
396
397
398
//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================

/* FIXME Por revisar
 * @deprecated
 * Genera un fichero hostfile y lo anyade a un objeto
 * MPI_Info para ser utilizado.
 */
399
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data){
400
401
    char *hostfile;
    int ptr;
402
403
404
405
    MPI_Info *info;

    spawn_data->sets[0].spawn_qty = spawn_data->spawn_qty;
    info = &(spawn_data->sets[0].mapping);
406
407
408
409
410
411
412
413

    // CREATE/UPDATE HOSTFILE 
    ptr = create_hostfile(&hostfile);
    MPI_Info_create(info);
    MPI_Info_set(*info, "hostfile", hostfile);
    free(hostfile);

    // SET NEW DISTRIBUTION 
414
    fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
    close(ptr);
}

/*
 * @deprecated
 * Crea un fichero que se utilizara como hostfile
 * para un nuevo grupo de procesos. 
 *
 * El nombre es devuelto en el argumento "file_name",
 * que tiene que ser un puntero vacio.
 *
 * Ademas se devuelve un descriptor de fichero para 
 * modificar el fichero.
 */
int create_hostfile(char **file_name) {
430
431
  int ptr, err;
  size_t len = 11; //FIXME Numero mágico
432
433

  *file_name = NULL;
434
  *file_name = malloc(len * sizeof(char));
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
  if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
  err = snprintf(*file_name, len, "hostfile.o");
  if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

  ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if(ptr < 0) return -3; // No ha sido posible crear el fichero

  return ptr; // Devolver puntero a fichero
}

/*
 * @deprecated
 * Rellena un fichero hostfile indicado por ptr con los nombres
 * de los nodos a utilizar indicados por "job_record" y la cantidad 
 * de procesos que alojara cada nodo indicado por "qty".
 */
451
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
  int i=0;
  char *host;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
    write_hostfile_node(ptr, qty[i], host);
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

/*
 * @deprecated
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int ptr, int qty, char *node_name) {
473
  int err;
474
  char *line;
475
  size_t len, len_node, len_int;
476
477

  len_node = strlen(node_name);
478
479
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
480
  len_int = err;
481
482
483

  len = len_node + len_int + 3;
  line = malloc(len * sizeof(char));
484
  if(line == NULL) return -2; // No ha sido posible alojar la memoria
485
486
  err = snprintf(line, len, "%s:%d\n", node_name, qty);

487
  if(err < 0) return -3; // No ha sido posible escribir en el fichero
488
489
490
491
492
493

  write(ptr, line, len-1);
  free(line);

  return 0;
}
494
495
#endif
//--------------------------------SLURM USAGE-------------------------------------//