ProcessDist.c 13.3 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
9
10
11
12
13
14
15
#include "ProcessDist.h"

//--------------PRIVATE DECLARATIONS---------------//

void node_dist( struct physical_dist dist, int **qty, int *used_nodes);
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs);
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs);

16
17
void generate_info_string(int target_qty, MPI_Info *info);
//--------------------------------SLURM USAGE-------------------------------------//
18
#if USE_MAL_SLURM
19
20
21
22
23
24
25
26
#include <slurm/slurm.h>
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str);
//@deprecated functions
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info);
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
27

28
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name);
29
30
31
32
//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);

33
//--------------PUBLIC FUNCTIONS---------------//
34
35
36
37
38
39
40
41
42
43
/*
 * Pone los datos para una estructura que guarda los parametros
 * para realizar un mappeado de los procesos.
 *
 * Si la memoria no esta reservada devuelve falso y no hace nada.
 * Si puede realizar los cambios devuelve verdadero.
 *
 * IN parameters -->
 * target_qty: Numero de procesos tras la reconfiguracion
 * alreadyCreated: Numero de procesos padre a considerar
44
 *   La resta de target_qty-alreadyCreated es el numero de hijos a crear
45
46
47
 * info_type: Indica como realizar el mappeado, si indicarlo
 *   en una cadena (MALL_DIST_STRING) o en un hostfile
 *   (MALL_DIST_HOSTFILE)
48
 * spawn_dist: Indica como sera el mappeado, si intentar rellenar
49
50
51
52
 *   primero los nodos con cpus ya usados (CPUS/BEST/COMPACT) o
 *   que todos los nodos tengan el mismo numero de cpus usados
 *   (NODES/WORST/SPREAD)
 */
53
int physical_struct_create(int target_qty, int already_created, int info_type, struct physical_dist *dist) {
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

  dist->target_qty = target_qty;
  dist->already_created = already_created;
  dist->info_type = info_type;

  return 1;
}

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 * OUT parameters -->
 * info_spawn: Objeto MPI_Info en el que se indica el mappeado
 *   a usar al crear los procesos.
 */
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
72
#if USE_MAL_SLURM
73
74
75
76
77
78
  int used_nodes=0;
  int *procs_array;
  // GET NEW DISTRIBUTION 
  node_dist(dist, &procs_array, &used_nodes);
  switch(dist.info_type) {
    case MALL_DIST_STRING:
79
      generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
80
81
      break;
    case MALL_DIST_HOSTFILE:
82
      generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
83
84
85
      break;
  }
  free(procs_array);
86
87
88
#else
  generate_info_string(dist.target_qty, info_spawn);
#endif
89
90
91
}


92
93
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
94
95
96
97
98
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
 * cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
 * nodo.
 *
99
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
100
101
102
103
104
105
106
107
108
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
void node_dist(struct physical_dist dist, int **qty, int *used_nodes) {
  int i, *procs;

109
  procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
110
111

  /* GET NEW DISTRIBUTION  */
112
  switch(mall_conf->spawn_dist) {
113
114
115
116
117
118
119
120
121
    case MALL_DIST_SPREAD: // DIST NODES @deprecated
      spread_dist(dist, used_nodes, procs);
      break;
    case MALL_DIST_COMPACT: // DIST CPUs
      compact_dist(dist, used_nodes, procs);
      break;
  }

  //Copy results to output vector qty
122
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
  for(i=0; i< *used_nodes; i++) {
    (*qty)[i] = procs[i];
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
 * TODO Tener en cuenta procesos ya creados (already_created)
 */
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) {
  int i, tamBl, remainder;

139
140
141
  *used_nodes = mall->num_nodes;
  tamBl = dist.target_qty / mall->num_nodes;
  remainder = dist.target_qty % mall->num_nodes;
142
143
144
  for(i=0; i<remainder; i++) {
    procs[i] = tamBl + 1; 
  }
145
  for(i=remainder; i<mall->num_nodes; i++) {
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
  int i, asigCores;
  int tamBl, remainder;

162
  tamBl = mall->num_cpus / mall->num_nodes;
163
  asigCores = dist.already_created;
164
165
166
167
168
169
170
  i = *used_nodes = dist.already_created / tamBl;
  remainder = dist.already_created % tamBl;

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
  if (remainder) {
171
172
    procs[i] = tamBl - remainder;
    asigCores += procs[i];
173
    i = (i+1) % mall->num_nodes;
174
175
176
    (*used_nodes)++;
  }

177
  //Assign tamBl to each node
178
  while(asigCores+tamBl <= dist.target_qty) {
179
180
    asigCores += tamBl;
    procs[i] += tamBl;
181
    i = (i+1) % mall->num_nodes;
182
183
184
185
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
186
187
  if(asigCores < dist.target_qty) { 
    procs[i] += dist.target_qty - asigCores;
188
189
    (*used_nodes)++;
  }
190
  if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes;  //FIXME Si ocurre esto no es un error?
191
192
}

193
194
195
196
197
198
199
200
201
202
203
204
205
206
207

//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 * Actualmente no considera que puedan haber varios nodos
 * y lleva todos al mismo. Las funciones "generate_info_string_slurm"
 * o "generate_info_hostfile_slurm" permiten utilizar varios
 * nodos, pero es necesario activar Slurm.
 */
void generate_info_string(int target_qty, MPI_Info *info){
208
  char *host_string, *host;
209
  int len, err;
210

211
212
213
  host = "localhost";
  //host = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  //MPI_Get_processor_name(host, &len);
214
  // CREATE AND SET STRING HOSTS
215
216
  err = write_str_node(&host_string, 0, target_qty, host);
  if (err<0) {printf("Error when generating mapping: %d\n", err); MPI_Abort(MPI_COMM_WORLD, err);}
217
218
219
  // SET MAPPING
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", host_string);
220
  //free(host);
221
222
223
224
  free(host_string);
}

//--------------------------------SLURM USAGE-------------------------------------//
225
#if USE_MAL_SLURM
226
227
228
229
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
230
 * Es necesario usar Slurm para usarlo.
231
 */
232
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
233
234
  // CREATE AND SET STRING HOSTS
  char *hoststring;
235
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
236
237
238
239
240
241
242
243
244
245
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", hoststring);
  free(hoststring);
}


/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
246
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) {
247
  char *host;
248
  size_t i=0,len=0;
249
250
251
252
253
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
254
      len = write_str_node(hostfile_str, len, qty[i], host);
255
256
257
258
259
260
261
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

262
263
#endif
//--------------------------------SLURM USAGE-------------------------------------//
264
265
266
267
/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
268
269
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name) {
  int err;
270
  char *ocurrence;
271
  size_t i, len, len_node;
272

273
274
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
275
276

  if(len_og == 0) { // Memoria no reservada
277
    *hostfile_str = (char *) malloc((len+1) * sizeof(char));
278
  } else { // Cadena ya tiene datos
279
    *hostfile_str = (char *) realloc(*hostfile_str, (len_og + len + 1) * sizeof(char));
280
281
282
283
  }
  if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
284
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
285
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
286
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
    strcpy(*hostfile_str, node_name);
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
    strcat(*hostfile_str, ocurrence);
  }

  
  free(ocurrence);
  return len+len_og;
}

//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================

308
//--------------------------------SLURM USAGE-------------------------------------//
309
#if USE_MAL_SLURM
310
311
312
313
314
/* FIXME Por revisar
 * @deprecated
 * Genera un fichero hostfile y lo anyade a un objeto
 * MPI_Info para ser utilizado.
 */
315
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info){
316
317
318
319
320
321
322
323
324
325
    char *hostfile;
    int ptr;

    // CREATE/UPDATE HOSTFILE 
    ptr = create_hostfile(&hostfile);
    MPI_Info_create(info);
    MPI_Info_set(*info, "hostfile", hostfile);
    free(hostfile);

    // SET NEW DISTRIBUTION 
326
    fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
    close(ptr);
}

/*
 * @deprecated
 * Crea un fichero que se utilizara como hostfile
 * para un nuevo grupo de procesos. 
 *
 * El nombre es devuelto en el argumento "file_name",
 * que tiene que ser un puntero vacio.
 *
 * Ademas se devuelve un descriptor de fichero para 
 * modificar el fichero.
 */
int create_hostfile(char **file_name) {
342
343
  int ptr, err;
  size_t len = 11; //FIXME Numero mágico
344
345

  *file_name = NULL;
346
  *file_name = malloc(len * sizeof(char));
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
  if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
  err = snprintf(*file_name, len, "hostfile.o");
  if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

  ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if(ptr < 0) return -3; // No ha sido posible crear el fichero

  return ptr; // Devolver puntero a fichero
}

/*
 * @deprecated
 * Rellena un fichero hostfile indicado por ptr con los nombres
 * de los nodos a utilizar indicados por "job_record" y la cantidad 
 * de procesos que alojara cada nodo indicado por "qty".
 */
363
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
  int i=0;
  char *host;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
    write_hostfile_node(ptr, qty[i], host);
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

/*
 * @deprecated
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int ptr, int qty, char *node_name) {
385
  int err;
386
  char *line;
387
  size_t len, len_node, len_int;
388
389

  len_node = strlen(node_name);
390
391
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
392
  len_int = err;
393
394
395

  len = len_node + len_int + 3;
  line = malloc(len * sizeof(char));
396
  if(line == NULL) return -2; // No ha sido posible alojar la memoria
397
398
  err = snprintf(line, len, "%s:%d\n", node_name, qty);

399
  if(err < 0) return -3; // No ha sido posible escribir en el fichero
400
401
402
403
404
405

  write(ptr, line, len-1);
  free(line);

  return 0;
}
406
407
#endif
//--------------------------------SLURM USAGE-------------------------------------//
408
409
410
411
412
413
414
415
416
417
418
419
420
421


//TODO REFACTOR PARA CUANDO SE COMUNIQUE CON RMS
    // Get Slurm job info
    //int jobId;
    //char *tmp;
    //job_info_msg_t *j_info;
    //slurm_job_info_t last_record;
    //tmp = getenv("SLURM_JOB_ID");
    //jobId = atoi(tmp);
    //slurm_load_job(&j_info, jobId, 1);
    //last_record = j_info->job_array[j_info->record_count - 1];
    // Free JOB INFO
    //slurm_free_job_info_msg(j_info);