ProcessDist.c 13.5 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
9
#include "ProcessDist.h"

10
//#define USE_SLURM
11
12
13
14
15
16
17

//--------------PRIVATE DECLARATIONS---------------//

void node_dist( struct physical_dist dist, int **qty, int *used_nodes);
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs);
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs);

18
19
20
21
22
23
24
25
26
27
28
void generate_info_string(int target_qty, MPI_Info *info);
//--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM
#include <slurm/slurm.h>
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str);
//@deprecated functions
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info);
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
29

30
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name);
31
32
33
34
//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);

35
//--------------PUBLIC FUNCTIONS---------------//
36
37
38
39
40
41
42
43
44
45
/*
 * Pone los datos para una estructura que guarda los parametros
 * para realizar un mappeado de los procesos.
 *
 * Si la memoria no esta reservada devuelve falso y no hace nada.
 * Si puede realizar los cambios devuelve verdadero.
 *
 * IN parameters -->
 * target_qty: Numero de procesos tras la reconfiguracion
 * alreadyCreated: Numero de procesos padre a considerar
46
 *   La resta de target_qty-alreadyCreated es el numero de hijos a crear
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
 * num_cpus: Numero de cpus totales (En uso o no)
 * num_nodes: Numero de nodos disponibles por esta aplicacion
 * info_type: Indica como realizar el mappeado, si indicarlo
 *   en una cadena (MALL_DIST_STRING) o en un hostfile
 *   (MALL_DIST_HOSTFILE)
 * dist_type: Indica como sera el mappeado, si intentar rellenar
 *   primero los nodos con cpus ya usados (CPUS/BEST/COMPACT) o
 *   que todos los nodos tengan el mismo numero de cpus usados
 *   (NODES/WORST/SPREAD)
 */
int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist) {

  dist->target_qty = target_qty;
  dist->already_created = already_created;
  dist->num_cpus = num_cpus;
62
63
  dist->num_nodes = num_nodes;
  dist->nodelist = nodelist;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
  dist->dist_type = dist_type;
  dist->info_type = info_type;

  return 1;
}

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 * OUT parameters -->
 * info_spawn: Objeto MPI_Info en el que se indica el mappeado
 *   a usar al crear los procesos.
 */
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
80
#ifdef USE_SLURM
81
82
83
84
85
86
  int used_nodes=0;
  int *procs_array;
  // GET NEW DISTRIBUTION 
  node_dist(dist, &procs_array, &used_nodes);
  switch(dist.info_type) {
    case MALL_DIST_STRING:
87
      generate_info_string_slurm(dist.nodelist, procs_array, used_nodes, info_spawn);
88
89
      break;
    case MALL_DIST_HOSTFILE:
90
      generate_info_hostfile_slurm(dist.nodelist, procs_array, used_nodes, info_spawn);
91
92
93
      break;
  }
  free(procs_array);
94
95
96
#else
  generate_info_string(dist.target_qty, info_spawn);
#endif
97
98
99
}


100
101
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
 * cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
 * nodo.
 *
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "dist_type":
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
void node_dist(struct physical_dist dist, int **qty, int *used_nodes) {
  int i, *procs;

117
  procs = calloc(dist.num_nodes, sizeof(int)); // Numero de procesos por nodo
118
119
120
121
122
123
124
125
126
127
128
129

  /* GET NEW DISTRIBUTION  */
  switch(dist.dist_type) {
    case MALL_DIST_SPREAD: // DIST NODES @deprecated
      spread_dist(dist, used_nodes, procs);
      break;
    case MALL_DIST_COMPACT: // DIST CPUs
      compact_dist(dist, used_nodes, procs);
      break;
  }

  //Copy results to output vector qty
130
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
  for(i=0; i< *used_nodes; i++) {
    (*qty)[i] = procs[i];
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
 * TODO Tener en cuenta procesos ya creados (already_created)
 */
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) {
  int i, tamBl, remainder;

  *used_nodes = dist.num_nodes;
148
149
  tamBl = dist.target_qty / dist.num_nodes;
  remainder = dist.target_qty % dist.num_nodes;
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
  for(i=0; i<remainder; i++) {
    procs[i] = tamBl + 1; 
  }
  for(i=remainder; i<dist.num_nodes; i++) {
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
  int i, asigCores;
  int tamBl, remainder;

  tamBl = dist.num_cpus / dist.num_nodes;
  asigCores = 0;
  i = *used_nodes = dist.already_created / tamBl;
  remainder = dist.already_created % tamBl;

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
  if (remainder) {
    procs[i] = asigCores = tamBl - remainder;
    i = (i+1) % dist.num_nodes;
    (*used_nodes)++;
  }

  //Assing tamBl to each node
185
  while(asigCores+tamBl <= dist.target_qty) {
186
187
188
189
190
191
192
    asigCores += tamBl;
    procs[i] += tamBl;
    i = (i+1) % dist.num_nodes;
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
193
194
  if(asigCores < dist.target_qty) { 
    procs[i] += dist.target_qty - asigCores;
195
196
197
198
199
    (*used_nodes)++;
  }
  if(*used_nodes > dist.num_nodes) *used_nodes = dist.num_nodes;  //FIXME Si ocurre esto no es un error?
}

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 * Actualmente no considera que puedan haber varios nodos
 * y lleva todos al mismo. Las funciones "generate_info_string_slurm"
 * o "generate_info_hostfile_slurm" permiten utilizar varios
 * nodos, pero es necesario activar Slurm.
 */
void generate_info_string(int target_qty, MPI_Info *info){
215
  char *host_string, *host;
216
  int len, err;
217

218
219
220
  host = "localhost";
  //host = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  //MPI_Get_processor_name(host, &len);
221
  // CREATE AND SET STRING HOSTS
222
223
  err = write_str_node(&host_string, 0, target_qty, host);
  if (err<0) {printf("Error when generating mapping: %d\n", err); MPI_Abort(MPI_COMM_WORLD, err);}
224
225
226
  // SET MAPPING
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", host_string);
227
  //free(host);
228
229
230
231
232
  free(host_string);
}

//--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM
233
234
235
236
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
237
 * Es necesario usar Slurm para usarlo.
238
 */
239
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
240
241
  // CREATE AND SET STRING HOSTS
  char *hoststring;
242
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
243
244
245
246
247
248
249
250
251
252
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", hoststring);
  free(hoststring);
}


/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
253
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) {
254
  char *host;
255
  size_t i=0,len=0;
256
257
258
259
260
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
261
      len = write_str_node(hostfile_str, len, qty[i], host);
262
263
264
265
266
267
268
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

269
270
#endif
//--------------------------------SLURM USAGE-------------------------------------//
271
272
273
274
/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
275
276
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name) {
  int err;
277
  char *ocurrence;
278
  size_t i, len, len_node;
279

280
281
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
282
283

  if(len_og == 0) { // Memoria no reservada
284
    *hostfile_str = (char *) malloc((len+1) * sizeof(char));
285
  } else { // Cadena ya tiene datos
286
    *hostfile_str = (char *) realloc(*hostfile_str, (len_og + len + 1) * sizeof(char));
287
288
289
290
  }
  if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
291
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
292
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
293
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
    strcpy(*hostfile_str, node_name);
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
    strcat(*hostfile_str, ocurrence);
  }

  
  free(ocurrence);
  return len+len_og;
}

//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================

315
316
//--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM
317
318
319
320
321
/* FIXME Por revisar
 * @deprecated
 * Genera un fichero hostfile y lo anyade a un objeto
 * MPI_Info para ser utilizado.
 */
322
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info){
323
324
325
326
327
328
329
330
331
332
    char *hostfile;
    int ptr;

    // CREATE/UPDATE HOSTFILE 
    ptr = create_hostfile(&hostfile);
    MPI_Info_create(info);
    MPI_Info_set(*info, "hostfile", hostfile);
    free(hostfile);

    // SET NEW DISTRIBUTION 
333
    fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
    close(ptr);
}

/*
 * @deprecated
 * Crea un fichero que se utilizara como hostfile
 * para un nuevo grupo de procesos. 
 *
 * El nombre es devuelto en el argumento "file_name",
 * que tiene que ser un puntero vacio.
 *
 * Ademas se devuelve un descriptor de fichero para 
 * modificar el fichero.
 */
int create_hostfile(char **file_name) {
349
350
  int ptr, err;
  size_t len = 11; //FIXME Numero mágico
351
352

  *file_name = NULL;
353
  *file_name = malloc(len * sizeof(char));
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
  if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
  err = snprintf(*file_name, len, "hostfile.o");
  if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

  ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if(ptr < 0) return -3; // No ha sido posible crear el fichero

  return ptr; // Devolver puntero a fichero
}

/*
 * @deprecated
 * Rellena un fichero hostfile indicado por ptr con los nombres
 * de los nodos a utilizar indicados por "job_record" y la cantidad 
 * de procesos que alojara cada nodo indicado por "qty".
 */
370
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
  int i=0;
  char *host;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
    write_hostfile_node(ptr, qty[i], host);
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

/*
 * @deprecated
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int ptr, int qty, char *node_name) {
392
  int err;
393
  char *line;
394
  size_t len, len_node, len_int;
395
396

  len_node = strlen(node_name);
397
398
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
399
  len_int = err;
400
401
402

  len = len_node + len_int + 3;
  line = malloc(len * sizeof(char));
403
  if(line == NULL) return -2; // No ha sido posible alojar la memoria
404
405
  err = snprintf(line, len, "%s:%d\n", node_name, qty);

406
  if(err < 0) return -3; // No ha sido posible escribir en el fichero
407
408
409
410
411
412

  write(ptr, line, len-1);
  free(line);

  return 0;
}
413
414
#endif
//--------------------------------SLURM USAGE-------------------------------------//
415
416
417
418
419
420
421
422
423
424
425
426
427
428


//TODO REFACTOR PARA CUANDO SE COMUNIQUE CON RMS
    // Get Slurm job info
    //int jobId;
    //char *tmp;
    //job_info_msg_t *j_info;
    //slurm_job_info_t last_record;
    //tmp = getenv("SLURM_JOB_ID");
    //jobId = atoi(tmp);
    //slurm_load_job(&j_info, jobId, 1);
    //last_record = j_info->job_array[j_info->record_count - 1];
    // Free JOB INFO
    //slurm_free_job_info_msg(j_info);