ProcessDist.c 17.9 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
#include "ProcessDist.h"
9
#include "SpawnUtils.h"
10
11
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
12

13
//--------------PRIVATE CONSTANTS------------------//
14
15
16
#define MAM_HOSTFILE_NAME1 "MAM_HF_ID"  // Constant size name (9) -- Part of SIZE1
#define MAM_HOSTFILE_NAME2 "_S"  // Constant size name (2) -- Part of SIZE1
#define MAM_HOSTFILE_NAME3 ".tmp"  // Constant size name (4) -- Part of SIZE2
17
18
19
20
21
#define MAM_HOSTFILE_SIZE1 15 // 11 Chars + 4 Digits 
#define MAM_HOSTFILE_SIZE2 8 // 4 Chars + 3 Digits + \0
#define MAM_HOSTFILE_SIZE MAM_HOSTFILE_SIZE1 + MAM_HOSTFILE_SIZE2 //23 = 15 Chars + 7 Digits + \0
#define MAM_HOSTFILE_LINE_SIZE 32

22
23
//--------------PRIVATE DECLARATIONS---------------//

24
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
25
26
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
27

28
29
30
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);

31
void set_mapping_host(int qty, char *info_type, char *host, size_t index, Spawn_data *spawn_data);
32
33
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
34
int write_hostfile_node(int file, int qty, char *node_name, char **line, size_t *len_og);
35
//--------------------------------SLURM USAGE-------------------------------------//
36
#if MAM_USE_SLURM
37
#include <slurm/slurm.h>
38
39
40
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
41
42
43
44

void generate_info_hostfile_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data);
void fill_hostfile_slurm(char* file_name, size_t used_nodes, int *qty, hostlist_t *hostlist);
size_t fill_multiple_hostfile_slurm(char* file_name, int *qty, hostlist_t *hostlist, char **line, size_t *len_line);
45
46
#endif
//--------------------------------SLURM USAGE-------------------------------------//
47

48
//--------------PUBLIC FUNCTIONS---------------//
49
50
51
52
53
54
55

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 */
56
void processes_dist(Spawn_data *spawn_data) {
57
58
  int used_nodes=0;
  int *procs_array;
59

60
  // GET NEW DISTRIBUTION 
61
62
  node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
  spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
63
#if MAM_USE_SLURM
64
  switch(spawn_data->mapping_fill_method) {
65
    case MAM_PHY_TYPE_STRING:
66
      if(spawn_data->spawn_is_multiple || spawn_data->spawn_is_parallel) {
67
68
69
70
        generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      } else {
        generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
      }
71
      break;
72
    case MAM_PHY_TYPE_HOSTFILE:
73
      generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
74
75
      break;
  }
76
#else
77
  if(spawn_data->spawn_is_multiple || spawn_data->spawn_is_parallel) {
78
79
80
81
    generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  } else {
    generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
  }
82
#endif
83
84
85
86
  char *aux_cmd = get_spawn_cmd();
  for(int index = 0; index<spawn_data->total_spawns; index++) {
    spawn_data->sets[index].cmd = aux_cmd;
  }
87
  free(procs_array);
88
89
}

90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
void set_hostfile_name(char **file_name, int *n, int jid, int index) {
  if(*file_name == NULL) {
    *file_name = (char *) malloc(MAM_HOSTFILE_SIZE * sizeof(char));
  }

  if(*n == 0) {
    jid = jid % 1000;
    snprintf(*file_name, MAM_HOSTFILE_SIZE , "%s%04d%s%03d%s", MAM_HOSTFILE_NAME1, jid, MAM_HOSTFILE_NAME2, index, MAM_HOSTFILE_NAME3);
  } else {
    snprintf((*file_name)+MAM_HOSTFILE_SIZE1, MAM_HOSTFILE_SIZE2 , "%03d%s", index, MAM_HOSTFILE_NAME3);
  }
  *n=1;
}

int read_hostfile_procs(char *file_name, int *qty) {
  char *line = NULL, *ptr;
  FILE *file = NULL;

  file = fopen(file_name, "r");
  if(file == NULL) {
    perror("Could not open hostfile to read");
    MPI_Abort(MPI_COMM_WORLD, -1);
  }

  *qty = 0;
  line = (char *) malloc(MAM_HOSTFILE_LINE_SIZE * sizeof(char));
  while (fgets(line, MAM_HOSTFILE_LINE_SIZE, file) != NULL) {
    size_t len = strlen(line);
    ptr = line + len - 1;
    // Search delimiter
    while (ptr != line && *ptr != ':') { ptr--; }
    if (*ptr == ':') { *qty  += atoi(ptr + 1); }
  }
  return 0;
}

126

127
128
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
129
130
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
131
132
 * cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
 * nodo y cuantas creaciones de procesos seran necesarias.
133
 *
134
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
135
136
137
138
139
140
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
141
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
142
  int i, *procs;
143
  procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
144
145

  /* GET NEW DISTRIBUTION  */
146
  switch(mall_conf->spawn_dist) {
147
    case MAM_PHY_DIST_SPREAD: // DIST NODES
148
      spread_dist(spawn_data, used_nodes, procs);
149
      break;
150
    case MAM_PHY_DIST_COMPACT: // DIST CPUs
151
      compact_dist(spawn_data, used_nodes, procs);
152
153
154
155
      break;
  }

  //Copy results to output vector qty
156
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
157
158

//  if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
159
  if(spawn_data.spawn_is_multiple || spawn_data.spawn_is_parallel) {
160
161
162
163
164
165
166
167
168
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
      if(procs[i]) (*total_spawns)++;
    }
  } else {
    *total_spawns = 1;
    for(i=0; i< *used_nodes; i++) {
      (*qty)[i] = procs[i];
    }
169
170
171
172
173
174
175
176
177
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
178
179
180
181
 * Asume que los procesos que ya existen estan en los nodos mas bajos
 * con el mismo tamBl. //FIXME No deberia asumir el tamBl.
 *
 * FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
182
 */
183
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
184
  int i, tamBl, remainder;
185

186
  *used_nodes = mall->num_nodes;
187
  tamBl = spawn_data.target_qty / *used_nodes;
188
189
190
191
192
  i = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
  if(remainder) {
    procs[i++] = tamBl - remainder;
  }
193
  for(; i<*used_nodes; i++) {
194
195
196
197
198
199
200
201
202
203
204
205
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
206
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
207
208
209
  int i, asigCores;
  int tamBl, remainder;

210
  tamBl = mall->num_cpus;
211
212
213
  asigCores = spawn_data.already_created;
  i = *used_nodes = spawn_data.already_created / tamBl;
  remainder = spawn_data.already_created % tamBl;
214
215
216
217

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
218
  if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
219
220
    procs[i] = tamBl - remainder;
    asigCores += procs[i];
221
    i = (i+1) % mall->num_nodes;
222
223
224
    (*used_nodes)++;
  }

225
  //Assign tamBl to each node
226
  while(asigCores+tamBl <= spawn_data.target_qty) {
227
228
    asigCores += tamBl;
    procs[i] += tamBl;
229
    i = (i+1) % mall->num_nodes;
230
231
232
233
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
234
235
  if(asigCores < spawn_data.target_qty) { 
    procs[i] += spawn_data.target_qty - asigCores;
236
237
    (*used_nodes)++;
  }
238
  if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes;  //FIXME Si ocurre esto no es un error?
239
240
}

241
242
243
244
245
246
247
248
//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
249
 *
250
 */
251
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
252
  char *host_str;
253

254
  fill_str_hosts(nodelist, procs_array, nodes, &host_str);
255
  // SET MAPPING
256
  set_mapping_host(spawn_data->spawn_qty, "hosts", host_str, 0, spawn_data);
257
  free(host_str);
258
259
}

260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 *
 */
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
  char *host, *aux, *token, *hostlist_str;
  size_t i=0,j=0,len=0;

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
  while (token != NULL && i < nodes) {
    host = strdup(token);
    if (procs_array[i] != 0) {
      write_str_node(&hostlist_str, len, procs_array[i], host);
278
      set_mapping_host(procs_array[i], "hosts", hostlist_str, j, spawn_data);
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
    token = strtok(NULL, ",");
  }
  free(aux);
  if(hostlist_str != NULL) { free(hostlist_str); }
}


//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//

/*
 * Anyade en la siguiente entrada de spawns la
 * distribucion fisica a utilizar con un par 
 * host/mapping y el total de procesos.
 */
300
void set_mapping_host(int qty, char *info_type, char *host, size_t index, Spawn_data *spawn_data) {
301
302
303
304
305
  MPI_Info *info;

  spawn_data->sets[index].spawn_qty = qty;
  info = &(spawn_data->sets[index].mapping);
  MPI_Info_create(info);
306
  MPI_Info_set(*info, info_type, host);
307
308
}

309
310
311
312
/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
313
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
314
  char *host, *aux, *token;
315
  size_t i=0,len=0;
316
317
318
319

  aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
  strcpy(aux, nodelist);
  token = strtok(aux, ",");
320
321
322
  while (token != NULL && i < used_nodes) {
    host = strdup(token);
    if (qty[i] != 0) {
323
      len = write_str_node(hostlist_str, len, qty[i], host);
324
325
326
    }
    i++;
    free(host);
327
    token = strtok(NULL, ",");
328
  }
329
  free(aux);
330
331
332
333
334
335
}

/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
336
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
337
  int err;
338
  char *ocurrence;
339
  size_t i, len, len_node;
340

341
342
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
343
344

  if(len_og == 0) { // Memoria no reservada
345
    *hostlist_str = (char *) malloc((len+1) * sizeof(char));
346
  } else { // Cadena ya tiene datos
347
    *hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
348
  }
349
  if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
350
351

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
352
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
353
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
354
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
355
356
357
358

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
359
    strcpy(*hostlist_str, node_name);
360
361
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
362
    strcat(*hostlist_str, ocurrence);
363
364
365
366
367
368
369
  }

  
  free(ocurrence);
  return len+len_og;
}

370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
/*
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int file, int qty, char *node_name, char **line, size_t *len_og) {
  int err;
  size_t len, len_node, len_int;

  if(*line == NULL) {
    *len_og = MAM_HOSTFILE_LINE_SIZE;
    *line = (char *) malloc(*len_og * sizeof(char));
  }

  len_node = strlen(node_name);
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
  len_int = err;

  len = len_node + len_int + 3;
  if(*len_og < len) {
    *len_og = len+MAM_HOSTFILE_LINE_SIZE;
    *line = (char *) realloc(*line, *len_og * sizeof(char));
  }

  err = snprintf(*line, len, "%s:%d\n", node_name, qty);
  err = write(file, *line, len-1);
  if(err < 0) {
    perror("Error writing to the host file");
    close(file);
    exit(EXIT_FAILURE);
  }
  return 0;
}

406
//--------------------------------SLURM USAGE-------------------------------------//
407
#if MAM_USE_SLURM
408
409
410
411
412
413
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 * Es necesario usar Slurm para usarlo.
 */
414
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
415
  char *hoststring;
416
417

  // CREATE AND SET STRING HOSTS
418
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
419
  set_mapping_host(spawn_data->spawn_qty, "hosts", hoststring, 0, spawn_data);
420
421
422
  free(hoststring);
}

423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
/*
 * Crea y devuelve un conjunto de objetos MPI_Info con 
 * un par host/mapping en el que se indica el mappeado 
 * a utilizar en los nuevos procesos dividido por nodos.
 * Es necesario Slurm para usarlo.
 */
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
  char *host, *hostlist_str;
  size_t i=0,j=0,len=0;
  hostlist_t hostlist;
  
  hostlist_str = NULL;
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      write_str_node(&hostlist_str, len, qty[i], host);
439
      set_mapping_host(qty[i], "hosts", hostlist_str, j, spawn_data);
440
441
442
443
444
445
446
447
448
449
      free(hostlist_str); hostlist_str = NULL;
      j++;
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
  if(hostlist_str != NULL) { free(hostlist_str); }
}

450
451
452
453
454

/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
455
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
456
457
458
459
460
461
462
  char *host;
  size_t i=0,len=0;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
463
      len = write_str_node(hostlist_str, len, qty[i], host);
464
465
466
467
468
469
470
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

471
472
473
474
475
void generate_info_hostfile_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data){
  int index = 0, jid;
  size_t qty_index = 0, len_line = 0;
  char *hostfile_name, *line;
  hostlist_t hostlist;
476

477
  char *tmp = getenv("SLURM_JOB_ID");
478
  jid = tmp != NULL ? (atoi(tmp)%1000) : 0;
479

480
481
482
483
484
  line = NULL;
  hostlist = slurm_hostlist_create(nodelist);
  hostfile_name = (char *) malloc(MAM_HOSTFILE_SIZE * sizeof(char));
  snprintf(hostfile_name, MAM_HOSTFILE_SIZE , "%s%04d%s%03d%s", MAM_HOSTFILE_NAME1, jid, MAM_HOSTFILE_NAME2, index, MAM_HOSTFILE_NAME3);

485
  if(spawn_data->spawn_is_multiple || spawn_data->spawn_is_parallel) { // MULTIPLE
486
487
488
489
490
491
492
    for(; index<spawn_data->total_spawns; index++) {
      // This strat creates 1 hostfile per spawn
      qty_index = fill_multiple_hostfile_slurm(hostfile_name, qty+qty_index, &hostlist, &line, &len_line);
      set_mapping_host(qty[qty_index-1], "hostfile", hostfile_name, index, spawn_data); 
      snprintf(hostfile_name+MAM_HOSTFILE_SIZE1, MAM_HOSTFILE_SIZE2 , "%03d%s", index+1, MAM_HOSTFILE_NAME3);
    }
    free(line);
493

494
495
496
497
  } else { // NOT MULTIPLE
    fill_hostfile_slurm(hostfile_name, used_nodes, qty, &hostlist);
    set_mapping_host(spawn_data->spawn_qty, "hostfile", hostfile_name, index, spawn_data);
  }
498

499
500
  free(hostfile_name);
  slurm_hostlist_destroy(hostlist);
501
502
}

503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
// Function to generate the configuration file
void fill_hostfile_slurm(char* file_name, size_t used_nodes, int *qty, hostlist_t *hostlist) {
  char *host, *line;
  size_t i=0, len_line=0;

  line = NULL;
  int file = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if (file < 0) {
    perror("Error opening the host file");
    exit(EXIT_FAILURE);
  }

  while ( (host = slurm_hostlist_shift(*hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
      write_hostfile_node(file, qty[i], host, &line, &len_line);
    }
519
520
521
522
    i++;
    free(host);
  }

523
524
525
  close(file);
  free(line);
}
526

527
528
529
size_t fill_multiple_hostfile_slurm(char* file_name, int *qty, hostlist_t *hostlist, char **line, size_t *len_line) {
  char *host;
  size_t i=0;
530

531
532
533
534
535
  int file = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if (file < 0) {
    perror("Error opening the host file");
    exit(EXIT_FAILURE);
  }
536

537
538
539
540
541
542
543
544
545
  while( (host = slurm_hostlist_shift(*hostlist)) ) {
    if(qty[i] != 0) {
      write_hostfile_node(file, qty[i], host, line, len_line);
      i++;
      break;
    }
    i++;
    free(host); host = NULL;
  }
546

547
548
549
  if(host != NULL) free(host);
  close(file);
  return i;
550
}
551
#endif
552
//--------------------------------SLURM USAGE-------------------------------------//