ProcessDist.c 13.5 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
5
#include <unistd.h>
6
#include <string.h>
7
#include <mpi.h>
8
9
#include "ProcessDist.h"

10
//#define USE_SLURM
11
12
13
14
15
16
17

//--------------PRIVATE DECLARATIONS---------------//

void node_dist( struct physical_dist dist, int **qty, int *used_nodes);
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs);
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs);

18
19
20
21
22
23
24
25
26
27
28
void generate_info_string(int target_qty, MPI_Info *info);
//--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM
#include <slurm/slurm.h>
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str);
//@deprecated functions
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info);
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
29

30
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name);
31
32
33
34
//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);

35
//--------------PUBLIC FUNCTIONS---------------//
36
37
38
39
40
41
42
43
44
45
/*
 * Pone los datos para una estructura que guarda los parametros
 * para realizar un mappeado de los procesos.
 *
 * Si la memoria no esta reservada devuelve falso y no hace nada.
 * Si puede realizar los cambios devuelve verdadero.
 *
 * IN parameters -->
 * target_qty: Numero de procesos tras la reconfiguracion
 * alreadyCreated: Numero de procesos padre a considerar
46
 *   La resta de target_qty-alreadyCreated es el numero de hijos a crear
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
 * num_cpus: Numero de cpus totales (En uso o no)
 * num_nodes: Numero de nodos disponibles por esta aplicacion
 * info_type: Indica como realizar el mappeado, si indicarlo
 *   en una cadena (MALL_DIST_STRING) o en un hostfile
 *   (MALL_DIST_HOSTFILE)
 * dist_type: Indica como sera el mappeado, si intentar rellenar
 *   primero los nodos con cpus ya usados (CPUS/BEST/COMPACT) o
 *   que todos los nodos tengan el mismo numero de cpus usados
 *   (NODES/WORST/SPREAD)
 */
int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist) {

  dist->target_qty = target_qty;
  dist->already_created = already_created;
  dist->num_cpus = num_cpus;
62
63
  dist->num_nodes = num_nodes;
  dist->nodelist = nodelist;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
  dist->dist_type = dist_type;
  dist->info_type = info_type;

  return 1;
}

/*
 * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
 * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
 * para los procesos y creando un fichero hostfile.
 *
 * OUT parameters -->
 * info_spawn: Objeto MPI_Info en el que se indica el mappeado
 *   a usar al crear los procesos.
 */
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
80
#ifdef USE_SLURM
81
82
83
84
85
86
  int used_nodes=0;
  int *procs_array;
  // GET NEW DISTRIBUTION 
  node_dist(dist, &procs_array, &used_nodes);
  switch(dist.info_type) {
    case MALL_DIST_STRING:
87
      generate_info_string_slurm(dist.nodelist, procs_array, used_nodes, info_spawn);
88
89
      break;
    case MALL_DIST_HOSTFILE:
90
      generate_info_hostfile_slurm(dist.nodelist, procs_array, used_nodes, info_spawn);
91
92
93
      break;
  }
  free(procs_array);
94
95
96
#else
  generate_info_string(dist.target_qty, info_spawn);
#endif
97
98
99
}


100
101
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/*
 * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
 * cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
 * nodo.
 *
 * Se permiten dos tipos de distribuciones fisicas segun el valor de "dist_type":
 *
 *  COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
 *                      todos los nodos disponibles.
 *  COMM_PHY_CPU   (2): Orientada a completar la capacidad de un nodo antes de
 *                      ocupar otro nodo.
 */
void node_dist(struct physical_dist dist, int **qty, int *used_nodes) {
  int i, *procs;

117
  procs = calloc(dist.num_nodes, sizeof(int)); // Numero de procesos por nodo
118
119
120
121
122
123
124
125
126
127
128
129

  /* GET NEW DISTRIBUTION  */
  switch(dist.dist_type) {
    case MALL_DIST_SPREAD: // DIST NODES @deprecated
      spread_dist(dist, used_nodes, procs);
      break;
    case MALL_DIST_COMPACT: // DIST CPUs
      compact_dist(dist, used_nodes, procs);
      break;
  }

  //Copy results to output vector qty
130
  *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
  for(i=0; i< *used_nodes; i++) {
    (*qty)[i] = procs[i];
  }
  free(procs);
}

/*
 * Distribucion basada en equilibrar el numero de procesos en cada nodo
 * para que todos los nodos tengan el mismo numero. Devuelve el total de
 * nodos utilizados y el numero de procesos a crear en cada nodo.
 *
 * TODO Tener en cuenta procesos ya creados (already_created)
 */
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) {
  int i, tamBl, remainder;

  *used_nodes = dist.num_nodes;
148
149
  tamBl = dist.target_qty / dist.num_nodes;
  remainder = dist.target_qty % dist.num_nodes;
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
  for(i=0; i<remainder; i++) {
    procs[i] = tamBl + 1; 
  }
  for(i=remainder; i<dist.num_nodes; i++) {
    procs[i] = tamBl; 
  }
}

/*
 * Distribucion basada en llenar un nodo de procesos antes de pasar al
 * siguiente nodo. Devuelve el total de nodos utilizados y el numero 
 * de procesos a crear en cada nodo.
 *
 * Tiene en cuenta los procesos ya existentes para el mappeado de 
 * los procesos a crear.
 */
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
  int i, asigCores;
  int tamBl, remainder;

  tamBl = dist.num_cpus / dist.num_nodes;
171
  asigCores = dist.already_created;
172
173
174
175
176
177
178
179
180
181
182
183
  i = *used_nodes = dist.already_created / tamBl;
  remainder = dist.already_created % tamBl;

  //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
  //First nodes could already have existing procs
  //Start from the first with free spaces
  if (remainder) {
    procs[i] = asigCores = tamBl - remainder;
    i = (i+1) % dist.num_nodes;
    (*used_nodes)++;
  }

184
  //Assign tamBl to each node
185
  while(asigCores+tamBl <= dist.target_qty) {
186
187
188
189
190
191
192
    asigCores += tamBl;
    procs[i] += tamBl;
    i = (i+1) % dist.num_nodes;
    (*used_nodes)++;
  }

  //Last node could have less procs than tamBl
193
194
  if(asigCores < dist.target_qty) { 
    procs[i] += dist.target_qty - asigCores;
195
196
197
198
199
    (*used_nodes)++;
  }
  if(*used_nodes > dist.num_nodes) *used_nodes = dist.num_nodes;  //FIXME Si ocurre esto no es un error?
}

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//

/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
 *
 * Actualmente no considera que puedan haber varios nodos
 * y lleva todos al mismo. Las funciones "generate_info_string_slurm"
 * o "generate_info_hostfile_slurm" permiten utilizar varios
 * nodos, pero es necesario activar Slurm.
 */
void generate_info_string(int target_qty, MPI_Info *info){
215
  char *host_string, *host;
216
  int len, err;
217

218
219
220
  host = "localhost";
  //host = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  //MPI_Get_processor_name(host, &len);
221
  // CREATE AND SET STRING HOSTS
222
223
  err = write_str_node(&host_string, 0, target_qty, host);
  if (err<0) {printf("Error when generating mapping: %d\n", err); MPI_Abort(MPI_COMM_WORLD, err);}
224
225
226
  // SET MAPPING
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", host_string);
227
  //free(host);
228
229
230
231
232
  free(host_string);
}

//--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM
233
234
235
236
/*
 * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
 * en el que se indica el mappeado a utilizar en los nuevos
 * procesos.
237
 * Es necesario usar Slurm para usarlo.
238
 */
239
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
240
241
  // CREATE AND SET STRING HOSTS
  char *hoststring;
242
  fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
243
244
245
246
247
248
249
250
251
252
  MPI_Info_create(info);
  MPI_Info_set(*info, "hosts", hoststring);
  free(hoststring);
}


/*
 * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
 * al crear procesos e indicar donde tienen que ser creados.
 */
253
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) {
254
  char *host;
255
  size_t i=0,len=0;
256
257
258
259
260
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
    if(qty[i] != 0) {
261
      len = write_str_node(hostfile_str, len, qty[i], host);
262
263
264
265
266
267
268
    }
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

269
270
#endif
//--------------------------------SLURM USAGE-------------------------------------//
271
272
273
274
/*
 * Añade en una cadena "qty" entradas de "node_name".
 * Realiza la reserva de memoria y la realoja si es necesario.
 */
275
276
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name) {
  int err;
277
  char *ocurrence;
278
  size_t i, len, len_node;
279

280
281
  len_node = strlen(node_name) + 1; // Str length + ','
  len = qty * len_node; // Number of times the node is used
282
283

  if(len_og == 0) { // Memoria no reservada
284
    *hostfile_str = (char *) malloc((len+1) * sizeof(char));
285
  } else { // Cadena ya tiene datos
286
    *hostfile_str = (char *) realloc(*hostfile_str, (len_og + len + 1) * sizeof(char));
287
288
289
290
  }
  if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria

  ocurrence = (char *) malloc((len_node+1) * sizeof(char));
291
  if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
292
  err = snprintf(ocurrence, len_node+1, ",%s", node_name);
293
  if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314

  i=0;
  if(len_og == 0) { // Si se inicializa, la primera es una copia
    i++;
    strcpy(*hostfile_str, node_name);
  }
  for(; i<qty; i++){ // Las siguientes se conctanenan
    strcat(*hostfile_str, ocurrence);
  }

  
  free(ocurrence);
  return len+len_og;
}

//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================

315
316
//--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM
317
318
319
320
321
/* FIXME Por revisar
 * @deprecated
 * Genera un fichero hostfile y lo anyade a un objeto
 * MPI_Info para ser utilizado.
 */
322
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info){
323
324
325
326
327
328
329
330
331
332
    char *hostfile;
    int ptr;

    // CREATE/UPDATE HOSTFILE 
    ptr = create_hostfile(&hostfile);
    MPI_Info_create(info);
    MPI_Info_set(*info, "hostfile", hostfile);
    free(hostfile);

    // SET NEW DISTRIBUTION 
333
    fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
    close(ptr);
}

/*
 * @deprecated
 * Crea un fichero que se utilizara como hostfile
 * para un nuevo grupo de procesos. 
 *
 * El nombre es devuelto en el argumento "file_name",
 * que tiene que ser un puntero vacio.
 *
 * Ademas se devuelve un descriptor de fichero para 
 * modificar el fichero.
 */
int create_hostfile(char **file_name) {
349
350
  int ptr, err;
  size_t len = 11; //FIXME Numero mágico
351
352

  *file_name = NULL;
353
  *file_name = malloc(len * sizeof(char));
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
  if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
  err = snprintf(*file_name, len, "hostfile.o");
  if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero

  ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
  if(ptr < 0) return -3; // No ha sido posible crear el fichero

  return ptr; // Devolver puntero a fichero
}

/*
 * @deprecated
 * Rellena un fichero hostfile indicado por ptr con los nombres
 * de los nodos a utilizar indicados por "job_record" y la cantidad 
 * de procesos que alojara cada nodo indicado por "qty".
 */
370
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
  int i=0;
  char *host;
  hostlist_t hostlist;
  
  hostlist = slurm_hostlist_create(nodelist);
  while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
    write_hostfile_node(ptr, qty[i], host);
    i++;
    free(host);
  }
  slurm_hostlist_destroy(hostlist);
}

/*
 * @deprecated
 * Escribe en el fichero hostfile indicado por ptr una nueva linea.
 *
 * Esta linea indica el nombre de un nodo y la cantidad de procesos a
 * alojar en ese nodo.
 */
int write_hostfile_node(int ptr, int qty, char *node_name) {
392
  int err;
393
  char *line;
394
  size_t len, len_node, len_int;
395
396

  len_node = strlen(node_name);
397
398
  err = snprintf(NULL, 0, "%d", qty);
  if(err < 0) return -1;
399
  len_int = err;
400
401
402

  len = len_node + len_int + 3;
  line = malloc(len * sizeof(char));
403
  if(line == NULL) return -2; // No ha sido posible alojar la memoria
404
405
  err = snprintf(line, len, "%s:%d\n", node_name, qty);

406
  if(err < 0) return -3; // No ha sido posible escribir en el fichero
407
408
409
410
411
412

  write(ptr, line, len-1);
  free(line);

  return 0;
}
413
414
#endif
//--------------------------------SLURM USAGE-------------------------------------//
415
416
417
418
419
420
421
422
423
424
425
426
427
428


//TODO REFACTOR PARA CUANDO SE COMUNIQUE CON RMS
    // Get Slurm job info
    //int jobId;
    //char *tmp;
    //job_info_msg_t *j_info;
    //slurm_job_info_t last_record;
    //tmp = getenv("SLURM_JOB_ID");
    //jobId = atoi(tmp);
    //slurm_load_job(&j_info, jobId, 1);
    //last_record = j_info->job_array[j_info->record_count - 1];
    // Free JOB INFO
    //slurm_free_job_info_msg(j_info);