ProcessDist.c 16.4 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
#include "ProcessDist.h"
9
10
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
11
12
13

//--------------PRIVATE DECLARATIONS---------------//

14
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
15
16
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
17
void set_spawn_cmd(size_t nodes, Spawn_data *spawn_data);
18

19
20
21
22
23
24
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);

void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
25
//--------------------------------SLURM USAGE-------------------------------------//
26
#if MAM_USE_SLURM
27
#include <slurm/slurm.h>
28
29
30
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
31
//@deprecated functions
32
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data); 
33
34
35
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
36
37
38
39
40

//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);

41
//--------------PUBLIC FUNCTIONS---------------//
42
43
44
45
46
47
48

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 */
49
void processes_dist(Spawn_data *spawn_data) {
50
51
  int used_nodes=0;
  int *procs_array;
52

53
  // GET NEW DISTRIBUTION 
54
55
  node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
  spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
56
#if MAM_USE_SLURM
57
  switch(spawn_data->mapping_fill_method) {
58
    case MAM_PHY_TYPE_STRING:
59
60
61
62
63
      if(spawn_data->spawn_is_multiple) {
        generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      } else {
        generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      }
64
      break;
65
    case MAM_PHY_TYPE_HOSTFILE: // FIXME Does not consider multiple spawn strat
66
      generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
67
68
      break;
  }
69
#else
70
71
72
73
74
  if(spawn_data->spawn_is_multiple) {
    generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  } else {
    generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  }
75
#endif
76
  set_spawn_cmd(used_nodes, spawn_data);
77
  free(procs_array);
78
79
80
}


81
82
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
83
84
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
85
86
 * cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
 * nodo y cuantas creaciones de procesos seran necesarias.
87
 *
88
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
89
90
91
92
93
94
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
95
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
96
  int i, *procs;
97
  procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
98
99

  /* GET NEW DISTRIBUTION  */
100
  switch(mall_conf->spawn_dist) {
101
    case MAM_PHY_DIST_SPREAD: // DIST NODES @deprecated
102
      spread_dist(spawn_data, used_nodes, procs);
103
      break;
104
    case MAM_PHY_DIST_COMPACT: // DIST CPUs
105
      compact_dist(spawn_data, used_nodes, procs);
106
107
108
109
      break;
  }

  //Copy results to output vector qty
110
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
111
112
113
114
115
116
117
118
119
120
121
122

//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
  if(spawn_data.spawn_is_multiple) {
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
      if(procs[i]) (*total_spawns)++;
    }
  } else {
    *total_spawns = 1;
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
    }
123
124
125
126
127
128
129
130
131
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
132
133
134
135
 * Asume que los procesos que ya existen estan en los nodos mas bajos
 * con el mismo tamBl. //FIXME No deberia asumir el tamBl.
 *
 * FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
136
 */
137
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
138
  int i, tamBl, remainder;
139

140
  *used_nodes = mall->num_nodes;
141
  tamBl = spawn_data.target_qty / *used_nodes;
142
143
144
145
146
  i = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
  if(remainder) {
    procs[i++] = tamBl - remainder;
  }
147
  for(; i<*used_nodes; i++) {
148
149
150
151
152
153
154
155
156
157
158
159
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
160
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
161
162
163
  int i, asigCores;
  int tamBl, remainder;

164
  tamBl = mall->num_cpus;
165
166
167
  asigCores = spawn_data.already_created;
  i = *used_nodes = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
168
169
170
171

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
172
  if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
173
174
    procs[i] = tamBl - remainder;
    asigCores += procs[i];
175
    i = (i+1) % mall->num_nodes;
176
177
178
    (*used_nodes)++;
  }

179
  //Assign tamBl to each node
180
  while(asigCores+tamBl <= spawn_data.target_qty) {
181
182
    asigCores += tamBl;
    procs[i] += tamBl;
183
    i = (i+1) % mall->num_nodes;
184
185
186
187
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
188
189
  if(asigCores < spawn_data.target_qty) { 
    procs[i] += spawn_data.target_qty - asigCores;
190
191
    (*used_nodes)++;
  }
192
  if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes;  //FIXME Si ocurre esto no es un error?
193
194
}

195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
//--------------PRIVATE FUNCTIONS---------------//
//-------------------CMD SET--------------------//

/*
 * Comprueba que comando hay que llamar al realizar
 * el spawn. Todos los sets tienen que hacer el mismo
 * comando.
 *
 */
void set_spawn_cmd(size_t nodes, Spawn_data *spawn_data) {
  size_t index = 0;
  char *cmd_aux;
  switch(mall_conf->external_usage) {
    case MAM_USE_VALGRIND:
      cmd_aux = MAM_VALGRIND_SCRIPT;
      break;
    case MAM_USE_EXTRAE: 
      cmd_aux = MAM_EXTRAE_SCRIPT;
      break;
    default:
      cmd_aux = mall->name_exec;
      break;
  }

  for(; index<nodes; index++) {
    spawn_data->sets[index].cmd = cmd_aux;
  }
}
223
224
225
226
227
228
229
230
231

//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
232
 *
233
 */
234
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
235
  char *host_str;
236

237
  fill_str_hosts(nodelist, procs_array, nodes, &host_str);
238
  // SET MAPPING
239
  set_mapping_host(spawn_data->spawn_qty, host_str, 0, spawn_data);
240
  free(host_str);
241
242
}

243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 *
 */
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
  char *host, *aux, *token, *hostlist_str;
  size_t i=0,j=0,len=0;

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
  while (token != NULL && i < nodes) {
    host = strdup(token);
    if (procs_array[i] != 0) {
      write_str_node(&hostlist_str, len, procs_array[i], host);
      set_mapping_host(procs_array[i], hostlist_str, j, spawn_data);
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
    token = strtok(NULL, ",");
  }
  free(aux);
  if(hostlist_str != NULL) { free(hostlist_str); }
}


//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//

/*
 * Anyade en la siguiente entrada de spawns la
 * distribucion fisica a utilizar con un par 
 * host/mapping y el total de procesos.
 */
void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data) {
  MPI_Info *info;

  spawn_data->sets[index].spawn_qty = qty;
  info = &(spawn_data->sets[index].mapping);
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", host);
}

292
293
294
295
/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
296
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
297
  char *host, *aux, *token;
298
  size_t i=0,len=0;
299
300
301
302

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
303
304
305
  while (token != NULL && i < used_nodes) {
    host = strdup(token);
    if (qty[i] != 0) {
306
      len = write_str_node(hostlist_str, len, qty[i], host);
307
308
309
    }
    i++;
    free(host);
310
    token = strtok(NULL, ",");
311
  }
312
  free(aux);
313
314
315
316
317
318
}

/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
319
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
320
  int err;
321
  char *ocurrence;
322
  size_t i, len, len_node;
323

324
325
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
326
327

  if(len_og == 0) { // Memoria no reservada
328
    *hostlist_str = (char *) malloc((len+1) * sizeof(char));
329
  } else { // Cadena ya tiene datos
330
    *hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
331
  }
332
  if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
333
334

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
335
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
336
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
337
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
338
339
340
341

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
342
    strcpy(*hostlist_str, node_name);
343
344
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
345
    strcat(*hostlist_str, ocurrence);
346
347
348
349
350
351
352
  }

  
  free(ocurrence);
  return len+len_og;
}

353
//--------------------------------SLURM USAGE-------------------------------------//
354
#if MAM_USE_SLURM
355
356
357
358
359
360
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 * Es necesario usar Slurm para usarlo.
 */
361
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
362
  char *hoststring;
363
364

  // CREATE AND SET STRING HOSTS
365
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
366
  set_mapping_host(spawn_data->spawn_qty, hoststring, 0, spawn_data);
367
368
369
  free(hoststring);
}

370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
/*
 * Crea y devuelve un conjunto de objetos MPI_Info con 
 * un par host/mapping en el que se indica el mappeado 
 * a utilizar en los nuevos procesos dividido por nodos.
 * Es necesario Slurm para usarlo.
 */
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
  char *host, *hostlist_str;
  size_t i=0,j=0,len=0;
  hostlist_t hostlist;
  
  hostlist_str = NULL;
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      write_str_node(&hostlist_str, len, qty[i], host);
      set_mapping_host(qty[i], hostlist_str, j, spawn_data);
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
  if(hostlist_str != NULL) { free(hostlist_str); }
}

397
398
399
400
401

/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
402
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
403
404
405
406
407
408
409
  char *host;
  size_t i=0,len=0;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
410
      len = write_str_node(hostlist_str, len, qty[i], host);
411
412
413
414
415
416
417
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

418
419
420
421
422
423
424
425
426
427
428
//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================

/* FIXME Por revisar
 * @deprecated
 * Genera un fichero hostfile y lo anyade a un objeto
 * MPI_Info para ser utilizado.
 */
429
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data){
430
431
    char *hostfile;
    int ptr;
432
433
434
435
    MPI_Info *info;

    spawn_data->sets[0].spawn_qty = spawn_data->spawn_qty;
    info = &(spawn_data->sets[0].mapping);
436
437
438
439
440
441
442
443

    // CREATE/UPDATE HOSTFILE 
    ptr = create_hostfile(&hostfile);
    MPI_Info_create(info);
    MPI_Info_set(*info, "hostfile", hostfile);
    free(hostfile);

    // SET NEW DISTRIBUTION 
444
    fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
    close(ptr);
}

/*
 * @deprecated
 * Crea un fichero que se utilizara como hostfile
 * para un nuevo grupo de procesos. 
 *
 * El nombre es devuelto en el argumento "file_name",
 * que tiene que ser un puntero vacio.
 *
 * Ademas se devuelve un descriptor de fichero para 
 * modificar el fichero.
 */
int create_hostfile(char **file_name) {
460
461
  int ptr, err;
  size_t len = 11; //FIXME Numero mágico
462
463

  *file_name = NULL;
464
  *file_name = malloc(len * sizeof(char));
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
  if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
  err = snprintf(*file_name, len, "hostfile.o");
  if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

  ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if(ptr < 0) return -3; // No ha sido posible crear el fichero

  return ptr; // Devolver puntero a fichero
}

/*
 * @deprecated
 * Rellena un fichero hostfile indicado por ptr con los nombres
 * de los nodos a utilizar indicados por "job_record" y la cantidad 
 * de procesos que alojara cada nodo indicado por "qty".
 */
481
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
  int i=0;
  char *host;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
    write_hostfile_node(ptr, qty[i], host);
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

/*
 * @deprecated
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int ptr, int qty, char *node_name) {
503
  int err;
504
  char *line;
505
  size_t len, len_node, len_int;
506
507

  len_node = strlen(node_name);
508
509
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
510
  len_int = err;
511
512
513

  len = len_node + len_int + 3;
  line = malloc(len * sizeof(char));
514
  if(line == NULL) return -2; // No ha sido posible alojar la memoria
515
516
  err = snprintf(line, len, "%s:%d\n", node_name, qty);

517
  if(err < 0) return -3; // No ha sido posible escribir en el fichero
518
519
520
521
522
523

  write(ptr, line, len-1);
  free(line);

  return 0;
}
524
525
#endif
//--------------------------------SLURM USAGE-------------------------------------//