Commit 8097cf12 authored by iker_martin's avatar iker_martin
Browse files

Improved usage of Process physical distribution when slurm is not enabled....

Improved usage of Process physical distribution when slurm is not enabled. Minor changes to reduce amount of args.
parent 01a27046
...@@ -12,24 +12,6 @@ ...@@ -12,24 +12,6 @@
#define DEBUG_FUNC(debug_string, rank, numP) printf("MaM [P%d/%d]: %s -- %s:%s:%d\n", rank, numP, debug_string, __FILE__, __func__, __LINE__) #define DEBUG_FUNC(debug_string, rank, numP) printf("MaM [P%d/%d]: %s -- %s:%s:%d\n", rank, numP, debug_string, __FILE__, __func__, __LINE__)
/* --- PHYSICAL DIST STRUCTURE --- */
struct physical_dist {
int target_qty, already_created;
int info_type;
};
/* --- SPAWN STRUCTURE --- */
typedef struct {
int spawn_qty, initial_qty, target_qty;
int already_created;
int spawn_is_single, spawn_is_async;
MPI_Info mapping;
MPI_Datatype dtype;
struct physical_dist dist; // Used to create mapping var
MPI_Comm comm, returned_comm;
} Spawn_data;
/* --- TIME CAPTURE STRUCTURE --- */ /* --- TIME CAPTURE STRUCTURE --- */
typedef struct { typedef struct {
// Spawn, Sync and Async time // Spawn, Sync and Async time
......
...@@ -232,3 +232,16 @@ int MAM_I_slurm_getjob_hosts_info() { ...@@ -232,3 +232,16 @@ int MAM_I_slurm_getjob_hosts_info() {
return 0; return 0;
} }
#endif #endif
//TODO REFACTOR PARA CUANDO SE COMUNIQUE CON RMS
// Get Slurm job info
//int jobId;
//char *tmp;
//job_info_msg_t *j_info;
//slurm_job_info_t last_record;
//tmp = getenv("SLURM_JOB_ID");
//jobId = atoi(tmp);
//slurm_load_job(&j_info, jobId, 1);
//last_record = j_info->job_array[j_info->record_count - 1];
// Free JOB INFO
//slurm_free_job_info_msg(j_info);
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <mpi.h> #include <mpi.h>
#include <string.h> #include <string.h>
#include "../malleabilityDataStructures.h" #include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
int baseline(Spawn_data spawn_data, MPI_Comm *child); int baseline(Spawn_data spawn_data, MPI_Comm *child);
#endif #endif
...@@ -177,6 +177,7 @@ void set_spawn_configuration(MPI_Comm comm) { ...@@ -177,6 +177,7 @@ void set_spawn_configuration(MPI_Comm comm) {
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single)); MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async)); MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
spawn_data->comm = comm; spawn_data->comm = comm;
spawn_data->mapping_fill_method = MALL_DIST_STRING;
switch(mall_conf->spawn_method) { switch(mall_conf->spawn_method) {
...@@ -193,11 +194,7 @@ void set_spawn_configuration(MPI_Comm comm) { ...@@ -193,11 +194,7 @@ void set_spawn_configuration(MPI_Comm comm) {
if(spawn_data->spawn_is_async) { if(spawn_data->spawn_is_async) {
init_spawn_state(); init_spawn_state();
} }
spawn_data->mapping = MPI_INFO_NULL; spawn_data->mapping = MPI_INFO_NULL;
if(mall->myId == mall->root) {
physical_struct_create(spawn_data->target_qty, spawn_data->already_created, MALL_DIST_STRING, &(spawn_data->dist));
}
} }
/* /*
...@@ -231,7 +228,7 @@ void generic_spawn(MPI_Comm *child, int data_stage) { ...@@ -231,7 +228,7 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
// WORK // WORK
if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
processes_dist(spawn_data->dist, &(spawn_data->mapping)); processes_dist(*spawn_data, &(spawn_data->mapping));
} }
switch(mall_conf->spawn_method) { switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE: case MALL_SPAWN_BASELINE:
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include "Spawn_DataStructure.h"
int init_spawn(MPI_Comm comm, MPI_Comm *child); int init_spawn(MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed); int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed);
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include "../malleabilityDataStructures.h" #include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state); int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state);
......
...@@ -9,11 +9,12 @@ ...@@ -9,11 +9,12 @@
//--------------PRIVATE DECLARATIONS---------------// //--------------PRIVATE DECLARATIONS---------------//
void node_dist( struct physical_dist dist, int **qty, int *used_nodes); void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes);
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs); void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs); void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void generate_info_string(int target_qty, MPI_Info *info); void generate_info_string(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str);
//--------------------------------SLURM USAGE-------------------------------------// //--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM #if USE_MAL_SLURM
#include <slurm/slurm.h> #include <slurm/slurm.h>
...@@ -31,33 +32,6 @@ int create_hostfile(char **file_name); ...@@ -31,33 +32,6 @@ int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name); int write_hostfile_node(int ptr, int qty, char *node_name);
//--------------PUBLIC FUNCTIONS---------------// //--------------PUBLIC FUNCTIONS---------------//
/*
* Pone los datos para una estructura que guarda los parametros
* para realizar un mappeado de los procesos.
*
* Si la memoria no esta reservada devuelve falso y no hace nada.
* Si puede realizar los cambios devuelve verdadero.
*
* IN parameters -->
* target_qty: Numero de procesos tras la reconfiguracion
* alreadyCreated: Numero de procesos padre a considerar
* La resta de target_qty-alreadyCreated es el numero de hijos a crear
* info_type: Indica como realizar el mappeado, si indicarlo
* en una cadena (MALL_DIST_STRING) o en un hostfile
* (MALL_DIST_HOSTFILE)
* spawn_dist: Indica como sera el mappeado, si intentar rellenar
* primero los nodos con cpus ya usados (CPUS/BEST/COMPACT) o
* que todos los nodos tengan el mismo numero de cpus usados
* (NODES/WORST/SPREAD)
*/
int physical_struct_create(int target_qty, int already_created, int info_type, struct physical_dist *dist) {
dist->target_qty = target_qty;
dist->already_created = already_created;
dist->info_type = info_type;
return 1;
}
/* /*
* Configura la creacion de un nuevo grupo de procesos, reservando la memoria * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
...@@ -68,13 +42,14 @@ int physical_struct_create(int target_qty, int already_created, int info_type, s ...@@ -68,13 +42,14 @@ int physical_struct_create(int target_qty, int already_created, int info_type, s
* info_spawn: Objeto MPI_Info en el que se indica el mappeado * info_spawn: Objeto MPI_Info en el que se indica el mappeado
* a usar al crear los procesos. * a usar al crear los procesos.
*/ */
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) { void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn) {
#if USE_MAL_SLURM
int used_nodes=0; int used_nodes=0;
int *procs_array; int *procs_array;
// GET NEW DISTRIBUTION // GET NEW DISTRIBUTION
node_dist(dist, &procs_array, &used_nodes); node_dist(spawn_data, &procs_array, &used_nodes);
switch(dist.info_type) { #if USE_MAL_SLURM
switch(spawn_data.mapping_fill_method) {
case MALL_DIST_STRING: case MALL_DIST_STRING:
generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, info_spawn); generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
break; break;
...@@ -84,7 +59,7 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) { ...@@ -84,7 +59,7 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
} }
free(procs_array); free(procs_array);
#else #else
generate_info_string(dist.target_qty, info_spawn); generate_info_string(mall->nodelist, procs_array, used_nodes, info_spawn);
#endif #endif
} }
...@@ -103,7 +78,7 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) { ...@@ -103,7 +78,7 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
* COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de * COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de
* ocupar otro nodo. * ocupar otro nodo.
*/ */
void node_dist(struct physical_dist dist, int **qty, int *used_nodes) { void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes) {
int i, *procs; int i, *procs;
procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
...@@ -111,10 +86,10 @@ void node_dist(struct physical_dist dist, int **qty, int *used_nodes) { ...@@ -111,10 +86,10 @@ void node_dist(struct physical_dist dist, int **qty, int *used_nodes) {
/* GET NEW DISTRIBUTION */ /* GET NEW DISTRIBUTION */
switch(mall_conf->spawn_dist) { switch(mall_conf->spawn_dist) {
case MALL_DIST_SPREAD: // DIST NODES @deprecated case MALL_DIST_SPREAD: // DIST NODES @deprecated
spread_dist(dist, used_nodes, procs); spread_dist(spawn_data, used_nodes, procs);
break; break;
case MALL_DIST_COMPACT: // DIST CPUs case MALL_DIST_COMPACT: // DIST CPUs
compact_dist(dist, used_nodes, procs); compact_dist(spawn_data, used_nodes, procs);
break; break;
} }
...@@ -131,14 +106,14 @@ void node_dist(struct physical_dist dist, int **qty, int *used_nodes) { ...@@ -131,14 +106,14 @@ void node_dist(struct physical_dist dist, int **qty, int *used_nodes) {
* para que todos los nodos tengan el mismo numero. Devuelve el total de * para que todos los nodos tengan el mismo numero. Devuelve el total de
* nodos utilizados y el numero de procesos a crear en cada nodo. * nodos utilizados y el numero de procesos a crear en cada nodo.
* *
* TODO Tener en cuenta procesos ya creados (already_created) * FIXME Tener en cuenta procesos ya creados (already_created)
*/ */
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) { void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, tamBl, remainder; int i, tamBl, remainder;
*used_nodes = mall->num_nodes; *used_nodes = mall->num_nodes;
tamBl = dist.target_qty / mall->num_nodes; tamBl = spawn_data.target_qty / mall->num_nodes;
remainder = dist.target_qty % mall->num_nodes; remainder = spawn_data.target_qty % mall->num_nodes;
for(i=0; i<remainder; i++) { for(i=0; i<remainder; i++) {
procs[i] = tamBl + 1; procs[i] = tamBl + 1;
} }
...@@ -155,14 +130,14 @@ void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -155,14 +130,14 @@ void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) {
* Tiene en cuenta los procesos ya existentes para el mappeado de * Tiene en cuenta los procesos ya existentes para el mappeado de
* los procesos a crear. * los procesos a crear.
*/ */
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, asigCores; int i, asigCores;
int tamBl, remainder; int tamBl, remainder;
tamBl = mall->num_cpus / mall->num_nodes; tamBl = mall->num_cpus / mall->num_nodes;
asigCores = dist.already_created; asigCores = spawn_data.already_created;
i = *used_nodes = dist.already_created / tamBl; i = *used_nodes = spawn_data.already_created / tamBl;
remainder = dist.already_created % tamBl; remainder = spawn_data.already_created % tamBl;
//FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
//First nodes could already have existing procs //First nodes could already have existing procs
...@@ -175,7 +150,7 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -175,7 +150,7 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
} }
//Assign tamBl to each node //Assign tamBl to each node
while(asigCores+tamBl <= dist.target_qty) { while(asigCores+tamBl <= spawn_data.target_qty) {
asigCores += tamBl; asigCores += tamBl;
procs[i] += tamBl; procs[i] += tamBl;
i = (i+1) % mall->num_nodes; i = (i+1) % mall->num_nodes;
...@@ -183,8 +158,8 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -183,8 +158,8 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
} }
//Last node could have less procs than tamBl //Last node could have less procs than tamBl
if(asigCores < dist.target_qty) { if(asigCores < spawn_data.target_qty) {
procs[i] += dist.target_qty - asigCores; procs[i] += spawn_data.target_qty - asigCores;
(*used_nodes)++; (*used_nodes)++;
} }
if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes; //FIXME Si ocurre esto no es un error? if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes; //FIXME Si ocurre esto no es un error?
...@@ -199,68 +174,38 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -199,68 +174,38 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
* en el que se indica el mappeado a utilizar en los nuevos * en el que se indica el mappeado a utilizar en los nuevos
* procesos. * procesos.
* *
* Actualmente no considera que puedan haber varios nodos *
* y lleva todos al mismo. Las funciones "generate_info_string_slurm"
* o "generate_info_hostfile_slurm" permiten utilizar varios
* nodos, pero es necesario activar Slurm.
*/ */
void generate_info_string(int target_qty, MPI_Info *info){ void generate_info_string(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
char *host_string, *host; char *host_str;
int len, err;
host = "localhost"; fill_str_hosts(nodelist, procs_array, nodes, &host_str);
//host = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
//MPI_Get_processor_name(host, &len);
// CREATE AND SET STRING HOSTS
err = write_str_node(&host_string, 0, target_qty, host);
if (err<0) {printf("Error when generating mapping: %d\n", err); MPI_Abort(MPI_COMM_WORLD, err);}
// SET MAPPING // SET MAPPING
MPI_Info_create(info); MPI_Info_create(info);
MPI_Info_set(*info, "hosts", host_string); MPI_Info_set(*info, "hosts", mall->nodelist);
//free(host); free(host_str);
free(host_string);
} }
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
* Es necesario usar Slurm para usarlo.
*/
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
// CREATE AND SET STRING HOSTS
char *hoststring;
fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", hoststring);
free(hoststring);
}
/* /*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts" * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados. * al crear procesos e indicar donde tienen que ser creados.
*/ */
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) { void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str) {
char *host; char *host;
size_t i=0,len=0; size_t i=0,len=0;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(nodelist); char *token = strtok(nodelist, ",");
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) { while (token != NULL && i < used_nodes) {
if(qty[i] != 0) { host = strdup(token);
len = write_str_node(hostfile_str, len, qty[i], host); if (qty[i] != 0) {
len = write_str_node(host_str, len, qty[i], host);
} }
i++; i++;
free(host); free(host);
token = strtok(NULL, ",");
} }
slurm_hostlist_destroy(hostlist);
} }
#endif
//--------------------------------SLURM USAGE-------------------------------------//
/* /*
* Añade en una cadena "qty" entradas de "node_name". * Añade en una cadena "qty" entradas de "node_name".
* Realiza la reserva de memoria y la realoja si es necesario. * Realiza la reserva de memoria y la realoja si es necesario.
...@@ -299,14 +244,50 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na ...@@ -299,14 +244,50 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na
return len+len_og; return len+len_og;
} }
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
* Es necesario usar Slurm para usarlo.
*/
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
// CREATE AND SET STRING HOSTS
char *hoststring;
fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", hoststring);
free(hoststring);
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) {
char *host;
size_t i=0,len=0;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
len = write_str_node(hostfile_str, len, qty[i], host);
}
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
}
//==================================================== //====================================================
//==================================================== //====================================================
//============DEPRECATED FUNCTIONS==================== //============DEPRECATED FUNCTIONS====================
//==================================================== //====================================================
//==================================================== //====================================================
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
/* FIXME Por revisar /* FIXME Por revisar
* @deprecated * @deprecated
* Genera un fichero hostfile y lo anyade a un objeto * Genera un fichero hostfile y lo anyade a un objeto
...@@ -405,17 +386,3 @@ int write_hostfile_node(int ptr, int qty, char *node_name) { ...@@ -405,17 +386,3 @@ int write_hostfile_node(int ptr, int qty, char *node_name) {
} }
#endif #endif
//--------------------------------SLURM USAGE-------------------------------------// //--------------------------------SLURM USAGE-------------------------------------//
//TODO REFACTOR PARA CUANDO SE COMUNIQUE CON RMS
// Get Slurm job info
//int jobId;
//char *tmp;
//job_info_msg_t *j_info;
//slurm_job_info_t last_record;
//tmp = getenv("SLURM_JOB_ID");
//jobId = atoi(tmp);
//slurm_load_job(&j_info, jobId, 1);
//last_record = j_info->job_array[j_info->record_count - 1];
// Free JOB INFO
//slurm_free_job_info_msg(j_info);
...@@ -7,8 +7,8 @@ ...@@ -7,8 +7,8 @@
#include <string.h> #include <string.h>
#include "../malleabilityStates.h" #include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h" #include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
int physical_struct_create(int target_qty, int already_created, int info_type, struct physical_dist *dist); void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn);
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn);
#endif #endif
#ifndef MAM_SPAWN_DATASTRUCTURE_H
#define MAM_SPAWN_DATASTRUCTURE_H
#include <mpi.h>
/* --- SPAWN STRUCTURE --- */
typedef struct {
int spawn_qty, initial_qty, target_qty;
int already_created;
int spawn_is_single, spawn_is_async;
MPI_Info mapping;
int mapping_fill_method;
MPI_Comm comm, returned_comm;
} Spawn_data;
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment