ProcessDist.c 17.2 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
#include "ProcessDist.h"
9
10
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
11

12
13
14
15
16
17
18
19
20
//--------------PRIVATE CONSTANTS------------------//
#define MAM_HOSTFILE_NAME1 "MAM_HF_ID"  // Constant size name (15)
#define MAM_HOSTFILE_NAME2 "_S"  // Constant size name (2)
#define MAM_HOSTFILE_NAME3 ".tmp"  // Constant size name (4)
#define MAM_HOSTFILE_SIZE1 15 // 11 Chars + 4 Digits 
#define MAM_HOSTFILE_SIZE2 8 // 4 Chars + 3 Digits + \0
#define MAM_HOSTFILE_SIZE MAM_HOSTFILE_SIZE1 + MAM_HOSTFILE_SIZE2 //23 = 15 Chars + 7 Digits + \0
#define MAM_HOSTFILE_LINE_SIZE 32

21
22
//--------------PRIVATE DECLARATIONS---------------//

23
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
24
25
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
26
void set_spawn_cmd(Spawn_data *spawn_data);
27

28
29
30
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);

31
void set_mapping_host(int qty, char *info_type, char *host, size_t index, Spawn_data *spawn_data);
32
33
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
34
int write_hostfile_node(int file, int qty, char *node_name, char **line, size_t *len_og);
35
//--------------------------------SLURM USAGE-------------------------------------//
36
#if MAM_USE_SLURM
37
#include <slurm/slurm.h>
38
39
40
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
41
42
43
44

void generate_info_hostfile_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data);
void fill_hostfile_slurm(char* file_name, size_t used_nodes, int *qty, hostlist_t *hostlist);
size_t fill_multiple_hostfile_slurm(char* file_name, int *qty, hostlist_t *hostlist, char **line, size_t *len_line);
45
46
#endif
//--------------------------------SLURM USAGE-------------------------------------//
47

48
//--------------PUBLIC FUNCTIONS---------------//
49
50
51
52
53
54
55

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 */
56
void processes_dist(Spawn_data *spawn_data) {
57
58
  int used_nodes=0;
  int *procs_array;
59

60
  // GET NEW DISTRIBUTION 
61
62
  node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
  spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
63
#if MAM_USE_SLURM
64
  switch(spawn_data->mapping_fill_method) {
65
    case MAM_PHY_TYPE_STRING:
66
67
68
69
70
      if(spawn_data->spawn_is_multiple) {
        generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      } else {
        generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      }
71
      break;
72
    case MAM_PHY_TYPE_HOSTFILE:
73
      generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
74
75
      break;
  }
76
#else
77
78
79
80
81
  if(spawn_data->spawn_is_multiple) {
    generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  } else {
    generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  }
82
#endif
83
  set_spawn_cmd(spawn_data);
84
  free(procs_array);
85
86
87
}


88
89
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
90
91
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
92
93
 * cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
 * nodo y cuantas creaciones de procesos seran necesarias.
94
 *
95
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
96
97
98
99
100
101
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
102
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
103
  int i, *procs;
104
  procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
105
106

  /* GET NEW DISTRIBUTION  */
107
  switch(mall_conf->spawn_dist) {
108
    case MAM_PHY_DIST_SPREAD: // DIST NODES
109
      spread_dist(spawn_data, used_nodes, procs);
110
      break;
111
    case MAM_PHY_DIST_COMPACT: // DIST CPUs
112
      compact_dist(spawn_data, used_nodes, procs);
113
114
115
116
      break;
  }

  //Copy results to output vector qty
117
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
118
119
120
121
122
123
124
125
126
127
128
129

//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
  if(spawn_data.spawn_is_multiple) {
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
      if(procs[i]) (*total_spawns)++;
    }
  } else {
    *total_spawns = 1;
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
    }
130
131
132
133
134
135
136
137
138
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
139
140
141
142
 * Asume que los procesos que ya existen estan en los nodos mas bajos
 * con el mismo tamBl. //FIXME No deberia asumir el tamBl.
 *
 * FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
143
 */
144
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
145
  int i, tamBl, remainder;
146

147
  *used_nodes = mall->num_nodes;
148
  tamBl = spawn_data.target_qty / *used_nodes;
149
150
151
152
153
  i = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
  if(remainder) {
    procs[i++] = tamBl - remainder;
  }
154
  for(; i<*used_nodes; i++) {
155
156
157
158
159
160
161
162
163
164
165
166
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
167
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
168
169
170
  int i, asigCores;
  int tamBl, remainder;

171
  tamBl = mall->num_cpus;
172
173
174
  asigCores = spawn_data.already_created;
  i = *used_nodes = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
175
176
177
178

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
179
  if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
180
181
    procs[i] = tamBl - remainder;
    asigCores += procs[i];
182
    i = (i+1) % mall->num_nodes;
183
184
185
    (*used_nodes)++;
  }

186
  //Assign tamBl to each node
187
  while(asigCores+tamBl <= spawn_data.target_qty) {
188
189
    asigCores += tamBl;
    procs[i] += tamBl;
190
    i = (i+1) % mall->num_nodes;
191
192
193
194
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
195
196
  if(asigCores < spawn_data.target_qty) { 
    procs[i] += spawn_data.target_qty - asigCores;
197
198
    (*used_nodes)++;
  }
199
  if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes;  //FIXME Si ocurre esto no es un error?
200
201
}

202
203
204
205
206
207
208
209
210
//--------------PRIVATE FUNCTIONS---------------//
//-------------------CMD SET--------------------//

/*
 * Comprueba que comando hay que llamar al realizar
 * el spawn. Todos los sets tienen que hacer el mismo
 * comando.
 *
 */
211
212
void set_spawn_cmd(Spawn_data *spawn_data) {
  int index = 0;
213
214
215
216
217
218
219
220
221
222
223
224
225
  char *cmd_aux;
  switch(mall_conf->external_usage) {
    case MAM_USE_VALGRIND:
      cmd_aux = MAM_VALGRIND_SCRIPT;
      break;
    case MAM_USE_EXTRAE: 
      cmd_aux = MAM_EXTRAE_SCRIPT;
      break;
    default:
      cmd_aux = mall->name_exec;
      break;
  }

226
  for(; index<spawn_data->total_spawns; index++) {
227
228
229
    spawn_data->sets[index].cmd = cmd_aux;
  }
}
230
231
232
233
234
235
236
237
238

//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
239
 *
240
 */
241
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
242
  char *host_str;
243

244
  fill_str_hosts(nodelist, procs_array, nodes, &host_str);
245
  // SET MAPPING
246
  set_mapping_host(spawn_data->spawn_qty, "hosts", host_str, 0, spawn_data);
247
  free(host_str);
248
249
}

250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 *
 */
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
  char *host, *aux, *token, *hostlist_str;
  size_t i=0,j=0,len=0;

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
  while (token != NULL && i < nodes) {
    host = strdup(token);
    if (procs_array[i] != 0) {
      write_str_node(&hostlist_str, len, procs_array[i], host);
268
      set_mapping_host(procs_array[i], "hosts", hostlist_str, j, spawn_data);
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
    token = strtok(NULL, ",");
  }
  free(aux);
  if(hostlist_str != NULL) { free(hostlist_str); }
}


//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//

/*
 * Anyade en la siguiente entrada de spawns la
 * distribucion fisica a utilizar con un par 
 * host/mapping y el total de procesos.
 */
290
void set_mapping_host(int qty, char *info_type, char *host, size_t index, Spawn_data *spawn_data) {
291
292
293
294
295
  MPI_Info *info;

  spawn_data->sets[index].spawn_qty = qty;
  info = &(spawn_data->sets[index].mapping);
  MPI_Info_create(info);
296
  MPI_Info_set(*info, info_type, host);
297
298
}

299
300
301
302
/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
303
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
304
  char *host, *aux, *token;
305
  size_t i=0,len=0;
306
307
308
309

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
310
311
312
  while (token != NULL && i < used_nodes) {
    host = strdup(token);
    if (qty[i] != 0) {
313
      len = write_str_node(hostlist_str, len, qty[i], host);
314
315
316
    }
    i++;
    free(host);
317
    token = strtok(NULL, ",");
318
  }
319
  free(aux);
320
321
322
323
324
325
}

/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
326
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
327
  int err;
328
  char *ocurrence;
329
  size_t i, len, len_node;
330

331
332
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
333
334

  if(len_og == 0) { // Memoria no reservada
335
    *hostlist_str = (char *) malloc((len+1) * sizeof(char));
336
  } else { // Cadena ya tiene datos
337
    *hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
338
  }
339
  if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
340
341

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
342
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
343
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
344
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
345
346
347
348

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
349
    strcpy(*hostlist_str, node_name);
350
351
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
352
    strcat(*hostlist_str, ocurrence);
353
354
355
356
357
358
359
  }

  
  free(ocurrence);
  return len+len_og;
}

360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
/*
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int file, int qty, char *node_name, char **line, size_t *len_og) {
  int err;
  size_t len, len_node, len_int;

  if(*line == NULL) {
    *len_og = MAM_HOSTFILE_LINE_SIZE;
    *line = (char *) malloc(*len_og * sizeof(char));
  }

  len_node = strlen(node_name);
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
  len_int = err;

  len = len_node + len_int + 3;
  if(*len_og < len) {
    *len_og = len+MAM_HOSTFILE_LINE_SIZE;
    *line = (char *) realloc(*line, *len_og * sizeof(char));
  }

  err = snprintf(*line, len, "%s:%d\n", node_name, qty);
  err = write(file, *line, len-1);
  if(err < 0) {
    perror("Error writing to the host file");
    close(file);
    exit(EXIT_FAILURE);
  }
  return 0;
}

396
//--------------------------------SLURM USAGE-------------------------------------//
397
#if MAM_USE_SLURM
398
399
400
401
402
403
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 * Es necesario usar Slurm para usarlo.
 */
404
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
405
  char *hoststring;
406
407

  // CREATE AND SET STRING HOSTS
408
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
409
  set_mapping_host(spawn_data->spawn_qty, "hosts", hoststring, 0, spawn_data);
410
411
412
  free(hoststring);
}

413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
/*
 * Crea y devuelve un conjunto de objetos MPI_Info con 
 * un par host/mapping en el que se indica el mappeado 
 * a utilizar en los nuevos procesos dividido por nodos.
 * Es necesario Slurm para usarlo.
 */
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
  char *host, *hostlist_str;
  size_t i=0,j=0,len=0;
  hostlist_t hostlist;
  
  hostlist_str = NULL;
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      write_str_node(&hostlist_str, len, qty[i], host);
429
      set_mapping_host(qty[i], "hosts", hostlist_str, j, spawn_data);
430
431
432
433
434
435
436
437
438
439
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
  if(hostlist_str != NULL) { free(hostlist_str); }
}

440
441
442
443
444

/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
445
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
446
447
448
449
450
451
452
  char *host;
  size_t i=0,len=0;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
453
      len = write_str_node(hostlist_str, len, qty[i], host);
454
455
456
457
458
459
460
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

461
462
463
464
465
void generate_info_hostfile_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data){
  int index = 0, jid;
  size_t qty_index = 0, len_line = 0;
  char *hostfile_name, *line;
  hostlist_t hostlist;
466

467
468
  char *tmp = getenv("SLURM_JOB_ID");
  jid = tmp != NULL ? atoi(tmp) : 0;
469

470
471
472
473
474
475
476
477
478
479
480
481
482
  line = NULL;
  hostlist = slurm_hostlist_create(nodelist);
  hostfile_name = (char *) malloc(MAM_HOSTFILE_SIZE * sizeof(char));
  snprintf(hostfile_name, MAM_HOSTFILE_SIZE , "%s%04d%s%03d%s", MAM_HOSTFILE_NAME1, jid, MAM_HOSTFILE_NAME2, index, MAM_HOSTFILE_NAME3);

  if(spawn_data->spawn_is_multiple) { // MULTIPLE
    for(; index<spawn_data->total_spawns; index++) {
      // This strat creates 1 hostfile per spawn
      qty_index = fill_multiple_hostfile_slurm(hostfile_name, qty+qty_index, &hostlist, &line, &len_line);
      set_mapping_host(qty[qty_index-1], "hostfile", hostfile_name, index, spawn_data); 
      snprintf(hostfile_name+MAM_HOSTFILE_SIZE1, MAM_HOSTFILE_SIZE2 , "%03d%s", index+1, MAM_HOSTFILE_NAME3);
    }
    free(line);
483

484
485
486
487
  } else { // NOT MULTIPLE
    fill_hostfile_slurm(hostfile_name, used_nodes, qty, &hostlist);
    set_mapping_host(spawn_data->spawn_qty, "hostfile", hostfile_name, index, spawn_data);
  }
488

489
490
  free(hostfile_name);
  slurm_hostlist_destroy(hostlist);
491
492
}

493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
// Function to generate the configuration file
void fill_hostfile_slurm(char* file_name, size_t used_nodes, int *qty, hostlist_t *hostlist) {
  char *host, *line;
  size_t i=0, len_line=0;

  line = NULL;
  int file = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if (file < 0) {
    perror("Error opening the host file");
    exit(EXIT_FAILURE);
  }

  while ( (host = slurm_hostlist_shift(*hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      write_hostfile_node(file, qty[i], host, &line, &len_line);
    }
509
510
511
512
    i++;
    free(host);
  }

513
514
515
  close(file);
  free(line);
}
516

517
518
519
size_t fill_multiple_hostfile_slurm(char* file_name, int *qty, hostlist_t *hostlist, char **line, size_t *len_line) {
  char *host;
  size_t i=0;
520

521
522
523
524
525
  int file = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if (file < 0) {
    perror("Error opening the host file");
    exit(EXIT_FAILURE);
  }
526

527
528
529
530
531
532
533
534
535
  while( (host = slurm_hostlist_shift(*hostlist)) ) {
    if(qty[i] != 0) {
      write_hostfile_node(file, qty[i], host, line, len_line);
      i++;
      break;
    }
    i++;
    free(host); host = NULL;
  }
536

537
538
539
  if(host != NULL) free(host);
  close(file);
  return i;
540
}
541
#endif
542
//--------------------------------SLURM USAGE-------------------------------------//