ProcessDist.c 15.8 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
#include "ProcessDist.h"
9
10
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
11
12
13

//--------------PRIVATE DECLARATIONS---------------//

14
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
15
16
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
17

18
19
20
21
22
23
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);

void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
24
//--------------------------------SLURM USAGE-------------------------------------//
25
#if MAM_USE_SLURM
26
#include <slurm/slurm.h>
27
28
29
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
30
//@deprecated functions
31
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data); 
32
33
34
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
35
36
37
38
39

//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);

40
//--------------PUBLIC FUNCTIONS---------------//
41
42
43
44
45
46
47

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 */
48
void processes_dist(Spawn_data *spawn_data) {
49
50
  int used_nodes=0;
  int *procs_array;
51

52
  // GET NEW DISTRIBUTION 
53
54
  node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
  spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
55
#if MAM_USE_SLURM
56
  switch(spawn_data->mapping_fill_method) {
57
    case MAM_PHY_TYPE_STRING:
58
59
60
61
62
63
//      if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
      if(spawn_data->spawn_is_multiple) {
        generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      } else {
        generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      }
64
      break;
65
    case MAM_PHY_TYPE_HOSTFILE: // FIXME Does not consider multiple spawn strat
66
      generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
67
68
      break;
  }
69
#else
70
71
72
73
74
75
//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
  if(spawn_data->spawn_is_multiple) {
    generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  } else {
    generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  }
76
#endif
77
  free(procs_array);
78
79
80
}


81
82
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
83
84
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
85
86
 * cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
 * nodo y cuantas creaciones de procesos seran necesarias.
87
 *
88
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
89
90
91
92
93
94
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
95
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
96
  int i, *procs;
97
  procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
98
99

  /* GET NEW DISTRIBUTION  */
100
  switch(mall_conf->spawn_dist) {
101
    case MAM_PHY_DIST_SPREAD: // DIST NODES @deprecated
102
      spread_dist(spawn_data, used_nodes, procs);
103
      break;
104
    case MAM_PHY_DIST_COMPACT: // DIST CPUs
105
      compact_dist(spawn_data, used_nodes, procs);
106
107
108
109
      break;
  }

  //Copy results to output vector qty
110
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
111
112
113
114
115
116
117
118
119
120
121
122

//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
  if(spawn_data.spawn_is_multiple) {
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
      if(procs[i]) (*total_spawns)++;
    }
  } else {
    *total_spawns = 1;
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
    }
123
124
125
126
127
128
129
130
131
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
132
133
134
135
 * Asume que los procesos que ya existen estan en los nodos mas bajos
 * con el mismo tamBl. //FIXME No deberia asumir el tamBl.
 *
 * FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
136
 */
137
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
138
  int i, tamBl, remainder;
139

140
  *used_nodes = mall->num_nodes;
141
  tamBl = spawn_data.target_qty / *used_nodes;
142
143
144
145
146
  i = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
  if(remainder) {
    procs[i++] = tamBl - remainder;
  }
147
  for(; i<*used_nodes; i++) {
148
149
150
151
152
153
154
155
156
157
158
159
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
160
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
161
162
163
  int i, asigCores;
  int tamBl, remainder;

164
  tamBl = mall->num_cpus;
165
166
167
  asigCores = spawn_data.already_created;
  i = *used_nodes = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
168
169
170
171

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
172
  if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
173
174
    procs[i] = tamBl - remainder;
    asigCores += procs[i];
175
    i = (i+1) % mall->num_nodes;
176
177
178
    (*used_nodes)++;
  }

179
  //Assign tamBl to each node
180
  while(asigCores+tamBl <= spawn_data.target_qty) {
181
182
    asigCores += tamBl;
    procs[i] += tamBl;
183
    i = (i+1) % mall->num_nodes;
184
185
186
187
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
188
189
  if(asigCores < spawn_data.target_qty) { 
    procs[i] += spawn_data.target_qty - asigCores;
190
191
    (*used_nodes)++;
  }
192
  if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes;  //FIXME Si ocurre esto no es un error?
193
194
}

195
196
197
198
199
200
201
202
203

//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
204
 *
205
 */
206
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
207
  char *host_str;
208

209
  fill_str_hosts(nodelist, procs_array, nodes, &host_str);
210
  // SET MAPPING
211
  set_mapping_host(spawn_data->spawn_qty, host_str, 0, spawn_data);
212
  free(host_str);
213
214
}

215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 *
 */
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
  char *host, *aux, *token, *hostlist_str;
  size_t i=0,j=0,len=0;

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
  while (token != NULL && i < nodes) {
    host = strdup(token);
    if (procs_array[i] != 0) {
      write_str_node(&hostlist_str, len, procs_array[i], host);
      set_mapping_host(procs_array[i], hostlist_str, j, spawn_data);
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
    token = strtok(NULL, ",");
  }
  free(aux);
  if(hostlist_str != NULL) { free(hostlist_str); }
}


//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//

/*
 * Anyade en la siguiente entrada de spawns la
 * distribucion fisica a utilizar con un par 
 * host/mapping y el total de procesos.
 */
void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data) {
  MPI_Info *info;

  spawn_data->sets[index].spawn_qty = qty;
  info = &(spawn_data->sets[index].mapping);
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", host);
}

264
265
266
267
/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
268
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
269
  char *host, *aux, *token;
270
  size_t i=0,len=0;
271
272
273
274

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
275
276
277
  while (token != NULL && i < used_nodes) {
    host = strdup(token);
    if (qty[i] != 0) {
278
      len = write_str_node(hostlist_str, len, qty[i], host);
279
280
281
    }
    i++;
    free(host);
282
    token = strtok(NULL, ",");
283
  }
284
  free(aux);
285
286
287
288
289
290
}

/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
291
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
292
  int err;
293
  char *ocurrence;
294
  size_t i, len, len_node;
295

296
297
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
298
299

  if(len_og == 0) { // Memoria no reservada
300
    *hostlist_str = (char *) malloc((len+1) * sizeof(char));
301
  } else { // Cadena ya tiene datos
302
    *hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
303
  }
304
  if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
305
306

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
307
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
308
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
309
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
310
311
312
313

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
314
    strcpy(*hostlist_str, node_name);
315
316
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
317
    strcat(*hostlist_str, ocurrence);
318
319
320
321
322
323
324
  }

  
  free(ocurrence);
  return len+len_og;
}

325
//--------------------------------SLURM USAGE-------------------------------------//
326
#if MAM_USE_SLURM
327
328
329
330
331
332
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 * Es necesario usar Slurm para usarlo.
 */
333
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
334
  char *hoststring;
335
336

  // CREATE AND SET STRING HOSTS
337
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
338
  set_mapping_host(spawn_data->spawn_qty, hoststring, 0, spawn_data);
339
340
341
  free(hoststring);
}

342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
/*
 * Crea y devuelve un conjunto de objetos MPI_Info con 
 * un par host/mapping en el que se indica el mappeado 
 * a utilizar en los nuevos procesos dividido por nodos.
 * Es necesario Slurm para usarlo.
 */
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
  char *host, *hostlist_str;
  size_t i=0,j=0,len=0;
  hostlist_t hostlist;
  
  hostlist_str = NULL;
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      write_str_node(&hostlist_str, len, qty[i], host);
      set_mapping_host(qty[i], hostlist_str, j, spawn_data);
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
  if(hostlist_str != NULL) { free(hostlist_str); }
}

369
370
371
372
373

/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
374
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
375
376
377
378
379
380
381
  char *host;
  size_t i=0,len=0;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
382
      len = write_str_node(hostlist_str, len, qty[i], host);
383
384
385
386
387
388
389
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

390
391
392
393
394
395
396
397
398
399
400
//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================

/* FIXME Por revisar
 * @deprecated
 * Genera un fichero hostfile y lo anyade a un objeto
 * MPI_Info para ser utilizado.
 */
401
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data){
402
403
    char *hostfile;
    int ptr;
404
405
406
407
    MPI_Info *info;

    spawn_data->sets[0].spawn_qty = spawn_data->spawn_qty;
    info = &(spawn_data->sets[0].mapping);
408
409
410
411
412
413
414
415

    // CREATE/UPDATE HOSTFILE 
    ptr = create_hostfile(&hostfile);
    MPI_Info_create(info);
    MPI_Info_set(*info, "hostfile", hostfile);
    free(hostfile);

    // SET NEW DISTRIBUTION 
416
    fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
    close(ptr);
}

/*
 * @deprecated
 * Crea un fichero que se utilizara como hostfile
 * para un nuevo grupo de procesos. 
 *
 * El nombre es devuelto en el argumento "file_name",
 * que tiene que ser un puntero vacio.
 *
 * Ademas se devuelve un descriptor de fichero para 
 * modificar el fichero.
 */
int create_hostfile(char **file_name) {
432
433
  int ptr, err;
  size_t len = 11; //FIXME Numero mágico
434
435

  *file_name = NULL;
436
  *file_name = malloc(len * sizeof(char));
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
  if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
  err = snprintf(*file_name, len, "hostfile.o");
  if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

  ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if(ptr < 0) return -3; // No ha sido posible crear el fichero

  return ptr; // Devolver puntero a fichero
}

/*
 * @deprecated
 * Rellena un fichero hostfile indicado por ptr con los nombres
 * de los nodos a utilizar indicados por "job_record" y la cantidad 
 * de procesos que alojara cada nodo indicado por "qty".
 */
453
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
  int i=0;
  char *host;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
    write_hostfile_node(ptr, qty[i], host);
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

/*
 * @deprecated
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int ptr, int qty, char *node_name) {
475
  int err;
476
  char *line;
477
  size_t len, len_node, len_int;
478
479

  len_node = strlen(node_name);
480
481
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
482
  len_int = err;
483
484
485

  len = len_node + len_int + 3;
  line = malloc(len * sizeof(char));
486
  if(line == NULL) return -2; // No ha sido posible alojar la memoria
487
488
  err = snprintf(line, len, "%s:%d\n", node_name, qty);

489
  if(err < 0) return -3; // No ha sido posible escribir en el fichero
490
491
492
493
494
495

  write(ptr, line, len-1);
  free(line);

  return 0;
}
496
497
#endif
//--------------------------------SLURM USAGE-------------------------------------//