Commit e472a657 authored by iker_martin's avatar iker_martin
Browse files

Added new strategy MULTIPLE_SPAWN. Allows to create a different COMM_WORLD per...

Added new strategy MULTIPLE_SPAWN. Allows to create a different COMM_WORLD per node with the benefit of allowing zombies to finalize in more situations. Spawn now uses an array to spawn groups instead of only expecting to spawn one group. Minor bug fixes in ProcessDist.c
parent 96b75523
......@@ -34,7 +34,7 @@ mam_config_setting_t configSettings[] = {
{NULL, 1, INT_MAX, {.set_config_complex = MAM_I_set_target_number }, MAM_NUM_TARGETS_ENV}
};
unsigned int masks_spawn[] = {MAM_STRAT_CLEAR_VALUE, MAM_MASK_PTHREAD, MAM_MASK_SPAWN_SINGLE, MAM_MASK_SPAWN_INTERCOMM};
unsigned int masks_spawn[] = {MAM_STRAT_CLEAR_VALUE, MAM_MASK_PTHREAD, MAM_MASK_SPAWN_SINGLE, MAM_MASK_SPAWN_INTERCOMM, MAM_MASK_SPAWN_MULTIPLE};
unsigned int masks_red[] = {MAM_STRAT_CLEAR_VALUE, MAM_MASK_PTHREAD, MAM_MASK_RED_WAIT_SOURCES, MAM_MASK_RED_WAIT_TARGETS};
/**
......@@ -189,10 +189,17 @@ void MAM_Set_initial_configuration() {
}
void MAM_Check_configuration() {
int global_internodes;
if(mall->numC == mall->numP) { // Migrate
MAM_Set_key_configuration(MAM_SPAWN_METHOD, MALL_SPAWN_BASELINE, NULL);
}
MPI_Allreduce(&mall->internode_group, &global_internodes, 1, MPI_INT, MPI_MAX, mall->comm);
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL)
&& global_internodes) { // Remove internode MPI_COMM_WORLDs
MAM_Set_key_configuration(MAM_SPAWN_METHOD, MALL_SPAWN_BASELINE, NULL);
}
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
if(MAM_I_contains_strat(mall_conf->spawn_strategies, MAM_MASK_SPAWN_INTERCOMM)) {
MAM_I_remove_strat(&mall_conf->spawn_strategies, MAM_MASK_SPAWN_INTERCOMM);
......@@ -270,6 +277,9 @@ int MAM_I_set_spawn_strat(unsigned int strategy, unsigned int *strategies) {
case MAM_STRAT_SPAWN_INTERCOMM:
result = MAM_I_add_strat(strategies, MAM_MASK_SPAWN_INTERCOMM);
break;
case MAM_STRAT_SPAWN_MULTIPLE:
result = MAM_I_add_strat(strategies, MAM_MASK_SPAWN_MULTIPLE);
break;
default:
//Unkown strategy
result = MALL_DENIED;
......
......@@ -12,6 +12,7 @@
#define MAM_MASK_PTHREAD 0x01
#define MAM_MASK_SPAWN_SINGLE 0x02
#define MAM_MASK_SPAWN_INTERCOMM 0x04
#define MAM_MASK_SPAWN_MULTIPLE 0x08
#define MAM_MASK_RED_WAIT_SOURCES 0x02
#define MAM_MASK_RED_WAIT_TARGETS 0x04
......
......@@ -7,7 +7,7 @@ int state = MALL_UNRESERVED;
* de MaM.
*/
void MAM_Def_main_datatype() {
int i, counts = 10;
int i, counts = 11;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
......@@ -29,10 +29,11 @@ void MAM_Def_main_datatype() {
MPI_Get_address(&(mall_conf->red_strategies), &displs[4]);
MPI_Get_address(&(mall->root_parents), &displs[5]);
MPI_Get_address(&(mall->num_parents), &displs[6]); //TODO Add only when Intercomm strat active?
MPI_Get_address(&(mall->num_cpus), &displs[7]);
MPI_Get_address(&(mall->num_nodes), &displs[8]);
MPI_Get_address(&(mall->nodelist_len), &displs[9]);
MPI_Get_address(&(mall->num_parents), &displs[6]); //TODO Add only when Single strat active?
MPI_Get_address(&(mall->numC), &displs[7]); //TODO Add only when MultipleSpawn strat active?
MPI_Get_address(&(mall->num_cpus), &displs[8]);
MPI_Get_address(&(mall->num_nodes), &displs[9]);
MPI_Get_address(&(mall->nodelist_len), &displs[10]);
MPI_Type_create_struct(counts, blocklengths, displs, types, &mall->struct_type);
MPI_Type_commit(&mall->struct_type);
......@@ -48,13 +49,45 @@ void MAM_Free_main_datatype() {
* Comunica datos necesarios de las estructuras
* principales de MAM de sources a targets.
*/
void MAM_Comm_main_structures(int rootBcast) {
void MAM_Comm_main_structures(MPI_Comm comm, int rootBcast) {
MPI_Bcast(MPI_BOTTOM, 1, mall->struct_type, rootBcast, mall->intercomm);
MPI_Bcast(MPI_BOTTOM, 1, mall->struct_type, rootBcast, comm);
if(mall->nodelist == NULL) {
mall->nodelist = calloc(mall->nodelist_len+1, sizeof(char));
mall->nodelist[mall->nodelist_len] = '\0';
}
MPI_Bcast(mall->nodelist, mall->nodelist_len, MPI_CHAR, rootBcast, mall->intercomm);
MPI_Bcast(mall->nodelist, mall->nodelist_len, MPI_CHAR, rootBcast, comm);
}
/*
* Muestra por pantalla el estado actual de todos los comunicadores
*/
void MAM_print_comms_state() {
int tester;
char *comm_name = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));
MPI_Comm_get_name(mall->comm, comm_name, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, comm_name);
MPI_Comm_get_name(*(mall->user_comm), comm_name, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), comm_name);
if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_get_name(mall->intercomm, comm_name, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, comm_name);
}
free(comm_name);
}
/*
* Función para modificar los comunicadores principales de MaM
*/
void MAM_comms_update(MPI_Comm comm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
MPI_Comm_dup(comm, &(mall->thread_comm));
MPI_Comm_dup(comm, &(mall->comm));
MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}
......@@ -39,7 +39,7 @@ typedef struct {
int root, root_collectives;
int num_parents, root_parents;
pthread_t async_thread;
MPI_Comm comm, thread_comm;
MPI_Comm comm, thread_comm, original_comm;
MPI_Comm intercomm, tmp_comm;
MPI_Comm *user_comm;
MPI_Datatype struct_type;
......@@ -50,6 +50,7 @@ typedef struct {
char *name_exec, *nodelist;
int num_cpus, num_nodes, nodelist_len;
int internode_group;
} malleability_t;
/* --- VARIABLES --- */
......@@ -60,7 +61,9 @@ extern int state;
/* --- FUNCTIONS --- */
void MAM_Def_main_datatype();
void MAM_Free_main_datatype();
void MAM_Comm_main_structures(int rootBcast);
void MAM_Comm_main_structures(MPI_Comm comm, int rootBcast);
void MAM_print_comms_state();
void MAM_comms_update(MPI_Comm comm);
#endif
......@@ -15,7 +15,7 @@ enum mam_proc_states{MAM_PROC_CONTINUE, MAM_PROC_NEW_RANK, MAM_PROC_ZOMBIE};
enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE, MAM_METHODS_SPAWN_LEN};
enum mam_spawn_strategies{MAM_STRAT_SPAWN_CLEAR, MAM_STRAT_SPAWN_PTHREAD, MAM_STRAT_SPAWN_SINGLE, MAM_STRAT_SPAWN_INTERCOMM, MAM_STRATS_SPAWN_LEN};
enum mam_spawn_strategies{MAM_STRAT_SPAWN_CLEAR, MAM_STRAT_SPAWN_PTHREAD, MAM_STRAT_SPAWN_SINGLE, MAM_STRAT_SPAWN_INTERCOMM, MAM_STRAT_SPAWN_MULTIPLE, MAM_STRATS_SPAWN_LEN};
enum mam_phy_dist_methods{MALL_DIST_SPREAD = 1, MALL_DIST_COMPACT, MAM_METHODS_PHYSICAL_DISTRIBUTION_LEN}; //FIXME Cambiar nombres a PHY_DIST?
enum mam_phy_info_methods{MALL_DIST_STRING = 1, MALL_DIST_HOSTFILE}; //FIXME Cambiar nombres a PHY_DIST?
......
......@@ -6,9 +6,17 @@
#include "Baseline.h"
#include "Spawn_state.h"
#define MAM_TAG_STRAT_SINGLE 130
#define MAM_TAG_STRAT_MULTIPLE_FIRST 131
#define MAM_TAG_STRAT_MULTIPLE_OTHER 132
//--------------PRIVATE DECLARATIONS---------------//
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child);
int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child);
void baseline_parents(Spawn_data spawn_data, MPI_Comm *child);
void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
void multiple_strat_children(MPI_Comm *parents);
void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
void single_strat_children(MPI_Comm *parents);
......@@ -19,55 +27,176 @@ void single_strat_children(MPI_Comm *parents);
*/
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
MPI_Comm intercomm;
MPI_Comm_get_parent(&intercomm);
MPI_Comm_get_parent(&intercomm); //FIXME May be a problem for third reconf or more with only expansions
if (intercomm == MPI_COMM_NULL) { // Parents path
if (spawn_data.spawn_is_single) {
single_strat_parents(spawn_data, child);
} else {
baseline_spawn(spawn_data, spawn_data.comm, child);
}
} else if(spawn_data.spawn_is_single) { // Children path
single_strat_children(child);
baseline_parents(spawn_data, child);
} else { // Children path
if(spawn_data.spawn_is_multiple) { multiple_strat_children(child); }
if(spawn_data.spawn_is_single) { single_strat_children(child); }
}
return MALL_SPAWN_COMPLETED;
}
//--------------PRIVATE FUNCTIONS---------------//
/*
* Funcion utilizada por los padres para realizar la
* creación de procesos.
*
*/
void baseline_parents(Spawn_data spawn_data, MPI_Comm *child) {
int i;
MPI_Comm comm, *intercomms;
if (spawn_data.spawn_is_single && mall->myId != mall->root) {
single_strat_parents(spawn_data, child);
return;
}
comm = spawn_data.spawn_is_single ? MPI_COMM_SELF : spawn_data.comm;
MPI_Bcast(&spawn_data.total_spawns, 1, MPI_INT, mall->root, comm);
intercomms = (MPI_Comm*) malloc(spawn_data.total_spawns * sizeof(MPI_Comm));
if(mall->myId != mall->root) {
spawn_data.sets = (Spawn_set *) malloc(spawn_data.total_spawns * sizeof(Spawn_set));
}
for(i=0; i<spawn_data.total_spawns; i++) {
baseline_spawn(spawn_data.sets[i], comm, &intercomms[i]);
}
// TODO Improvement - Deactivate Multiple spawn before spawning if total_spawns == 1
if(spawn_data.spawn_is_multiple) { multiple_strat_parents(spawn_data, comm, intercomms, child); }
else { *child = intercomms[0]; }
if(spawn_data.spawn_is_single) { single_strat_parents(spawn_data, child); }
free(intercomms);
if(mall->myId != mall->root) { free(spawn_data.sets); }
}
/*
* Crea un grupo de procesos segun la configuracion indicada por la funcion
* "processes_dist()".
* Funcion basica encargada de la creacion de procesos.
* Crea un set de procesos segun la configuracion obtenida
* en ProcessDist.c
* Devuelve en "child" el intercomunicador que se conecta a los hijos.
*/
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) {
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child) {
int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
int spawn_err = MPI_Comm_spawn(mall->name_exec, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, mall->root, comm, child, MPI_ERRCODES_IGNORE);
MPI_Comm_set_name(*child, "MPI_COMM_MALL_RESIZE");
int spawn_err = MPI_Comm_spawn(mall->name_exec, MPI_ARGV_NULL, spawn_set.spawn_qty, spawn_set.mapping, mall->root, comm, child, MPI_ERRCODES_IGNORE);
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_data.spawn_qty);
printf("Error creating new set of %d procs.\n", spawn_set.spawn_qty);
}
MAM_Comm_main_structures(rootBcast);
MAM_Comm_main_structures(*child, rootBcast);
return spawn_err;
}
void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
int i, tag;
char *port_name, aux;
//MPI_Barrier(MPI_COMM_WORLD);
//printf("P%d TEST END - set[%d] spw=%d\n", mall->myId, i, spawn_data.sets[i].spawn_qty); fflush(stdout);
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
tag = MAM_TAG_STRAT_MULTIPLE_FIRST;
MPI_Send(&spawn_data.total_spawns, 1, MPI_INT, MALLEABILITY_ROOT, tag, intercomms[0]);
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, tag, intercomms[0], MPI_STATUS_IGNORE);
for(i=1; i<spawn_data.total_spawns; i++) {
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MALLEABILITY_ROOT, tag+i, intercomms[i]);
MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_MULTIPLE_FIRST, intercomms[0], MPI_STATUS_IGNORE);
}
} else { port_name = malloc(1); }
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, comm, child);
for(i=0; i<spawn_data.total_spawns; i++) {
MPI_Comm_free(&intercomms[i]);
}
free(port_name);
}
void multiple_strat_children(MPI_Comm *parents) {
int i, start, total_spawns, new_root;
int rootBcast = MPI_PROC_NULL;
char *port_name, aux;
MPI_Status stat;
MPI_Comm newintracomm, intercomm, parents_comm;
new_root = 0;
parents_comm = *parents;
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, parents_comm, &stat);
if(stat.MPI_TAG == MAM_TAG_STRAT_MULTIPLE_FIRST) {
MPI_Recv(&total_spawns, 1, MPI_INT, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, MPI_STATUS_IGNORE);
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm);
start = 0;
new_root = 1;
rootBcast = MPI_ROOT;
} else {
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, &stat);
// The "+1" is because the first iteration is done before the loop
start = stat.MPI_TAG - MAM_TAG_STRAT_MULTIPLE_FIRST + 1;
}
} else { port_name = malloc(1); }
MPI_Bcast(&start, 1, MPI_INT, mall->root, mall->comm);
if(start) {
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
MPI_Comm_free(&intercomm);
} else {
start = 1;
MPI_Comm_dup(mall->comm, &newintracomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, mall->comm); // FIXME Seems inneficient - Should be performed by parent root?
}
for(i=start; i<total_spawns; i++) {
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, rootBcast, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
if(newintracomm != MPI_COMM_WORLD) MPI_Comm_free(&newintracomm);
MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
MPI_Comm_free(&intercomm);
if(new_root) {
MPI_Send(&aux, 1, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm); // Ensures order in the created intracommunicator
}
}
// Connect with parents
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
// Update communicator to expected one
MAM_comms_update(newintracomm);
MPI_Comm_rank(mall->comm, &mall->myId);
MPI_Comm_size(mall->comm, &mall->numP);
if(new_root) MPI_Close_port(port_name);
free(port_name);
MPI_Comm_free(&newintracomm);
MPI_Comm_free(parents);
*parents = intercomm;
}
/*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root
*/
int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
int spawn_err;
void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
char *port_name;
MPI_Comm newintercomm;
if (mall->myId == mall->root) {
spawn_err = baseline_spawn(spawn_data, MPI_COMM_SELF, child);
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, 130, *child, MPI_STATUS_IGNORE);
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_SINGLE, *child, MPI_STATUS_IGNORE);
set_spawn_state(MALL_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
wakeup_completion();
......@@ -81,8 +210,6 @@ int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
MPI_Comm_free(child);
free(port_name);
*child = newintercomm;
return spawn_err;
}
/*
......@@ -99,12 +226,12 @@ void single_strat_children(MPI_Comm *parents) {
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, 130, *parents);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_SINGLE, *parents);
} else {
port_name = malloc(1);
}
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, MPI_COMM_WORLD, &newintercomm);
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, mall->comm, &newintercomm);
if(mall->myId == mall->root) {
MPI_Close_port(port_name);
......
......@@ -137,32 +137,30 @@ void unset_spawn_postpone_flag(int outside_state) {
* para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
*
*/
void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents) {
void malleability_connect_children(MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->spawn_qty = mall->numP;
spawn_data->target_qty = mall->numP;
spawn_data->comm = comm;
MAM_Comm_main_structures(MALLEABILITY_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
//MPI_Comm_remote_size(*parents, &spawn_data->initial_qty);
MAM_Comm_main_structures(*parents, MALLEABILITY_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
spawn_data->initial_qty = mall->num_parents;
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
baseline(*spawn_data, parents);
if(!spawn_data->spawn_is_intercomm) {
intracomm_strategy(MALLEABILITY_CHILDREN, parents);
}
break;
case MALL_SPAWN_MERGE:
spawn_data->target_qty += spawn_data->initial_qty;
spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
merge(*spawn_data, parents, MALL_NOT_STARTED);
break;
}
//mall->num_parents = spawn_data->initial_qty;
free(spawn_data);
}
......@@ -174,13 +172,16 @@ void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents) {
void set_spawn_configuration(MPI_Comm comm) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->total_spawns = 0;
spawn_data->initial_qty = mall->numP;
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
spawn_data->comm = comm;
spawn_data->mapping_fill_method = MALL_DIST_STRING;
spawn_data->sets = NULL;
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
......@@ -196,7 +197,6 @@ void set_spawn_configuration(MPI_Comm comm) {
if(spawn_data->spawn_is_async) {
init_spawn_state();
}
spawn_data->mapping = MPI_INFO_NULL;
}
/*
......@@ -204,11 +204,23 @@ void set_spawn_configuration(MPI_Comm comm) {
* junto a la destrucion de aquellas estructuras que utiliza.
*/
void deallocate_spawn_data() {
int i;
MPI_Info *info;
if(spawn_data == NULL) return;
if(spawn_data->mapping != MPI_INFO_NULL) {
MPI_Info_free(&(spawn_data->mapping));
for(i=0; i<spawn_data->total_spawns; i++) {
info = &(spawn_data->sets[i].mapping);
if(*info != MPI_INFO_NULL) {
MPI_Info_free(info);
*info = MPI_INFO_NULL;
}
}
if(spawn_data->sets != NULL) {
free(spawn_data->sets);
spawn_data->sets = NULL;
}
if(spawn_data->spawn_is_async) {
free_spawn_state();
}
......@@ -230,7 +242,7 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
// WORK
if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
processes_dist(*spawn_data, &(spawn_data->mapping));
processes_dist(spawn_data);
}
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
......
......@@ -8,7 +8,7 @@
int init_spawn(MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed);
void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents);
void malleability_connect_children(MPI_Comm *parents);
void unset_spawn_postpone_flag(int outside_state);
......
......@@ -9,24 +9,28 @@
//--------------PRIVATE DECLARATIONS---------------//
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes);
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str);
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
#include <slurm/slurm.h>
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str);
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
//@deprecated functions
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info);
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data);
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name);
//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);
......@@ -38,29 +42,37 @@ int write_hostfile_node(int ptr, int qty, char *node_name);
* para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
* para los procesos y creando un fichero hostfile.
*
* OUT parameters -->
* info_spawn: Objeto MPI_Info en el que se indica el mappeado
* a usar al crear los procesos.
*/
void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn) {
void processes_dist(Spawn_data *spawn_data) {
int used_nodes=0;
int *procs_array;
// GET NEW DISTRIBUTION
node_dist(spawn_data, &procs_array, &used_nodes);
node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
#if USE_MAL_SLURM
switch(spawn_data.mapping_fill_method) {
switch(spawn_data->mapping_fill_method) {
case MALL_DIST_STRING:
generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
// if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
if(spawn_data->spawn_is_multiple) {
generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
} else {
generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
}
break;
case MALL_DIST_HOSTFILE:
generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
case MALL_DIST_HOSTFILE: // FIXME Does not consider multiple spawn strat
generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
break;
}
free(procs_array);
#else
generate_info_string(mall->nodelist, procs_array, used_nodes, info_spawn);
// if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
if(spawn_data->spawn_is_multiple) {
generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
} else {
generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
}
#endif
free(procs_array);
}
......@@ -68,8 +80,8 @@ void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn) {
//-----------------DISTRIBUTION-----------------//
/*
* Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
* cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
* nodo.
* cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
* nodo y cuantas creaciones de procesos seran necesarias.
*
* Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
*
......@@ -78,9 +90,8 @@ void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn) {
* COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de
* ocupar otro nodo.
*/
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes) {
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
int i, *procs;
procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
/* GET NEW DISTRIBUTION */
......@@ -95,8 +106,19 @@ void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes) {
//Copy results to output vector qty
*qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
// if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
if(spawn_data.spawn_is_multiple) {
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
if(procs[i]) (*total_spawns)++;
printf("procs[%d] = %d\n", i, procs[i]);
}
} else {
*total_spawns = 1;
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
}
}
free(procs);
}
......@@ -106,18 +128,18 @@ void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes) {
* para que todos los nodos tengan el mismo numero. Devuelve el total de
* nodos utilizados y el numero de procesos a crear en cada nodo.
*
* FIXME Tener en cuenta procesos ya creados (already_created)
* Asume que los procesos que ya existen estan en los nodos mas bajos
* con el mismo tamBl. //FIXME No deberia asumir el tamBl.
*
* FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
*/
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, tamBl, remainder;
int i, tamBl;
*used_nodes = mall->num_nodes;
tamBl = spawn_data.target_qty / mall->num_nodes;
remainder = spawn_data.target_qty % mall->num_nodes;
for(i=0; i<remainder; i++) {
procs[i] = tamBl + 1;
}
for(i=remainder; i<mall->num_nodes; i++) {
tamBl = spawn_data.target_qty / *used_nodes;
i = spawn_data.already_created ? spawn_data.already_created / tamBl : 0;
for(; i<*used_nodes; i++) {
procs[i] = tamBl;
}
}
......@@ -142,7 +164,7 @@ void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
//FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
//First nodes could already have existing procs
//Start from the first with free spaces
if (remainder) {
if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
procs[i] = tamBl - remainder;
asigCores += procs[i];
i = (i+1) % mall->num_nodes;
......@@ -176,21 +198,69 @@ void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
*
*
*/
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
char *host_str;
fill_str_hosts(nodelist, procs_array, nodes, &host_str);
// SET MAPPING
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", mall->nodelist);
set_mapping_host(spawn_data->spawn_qty, host_str, 0, spawn_data);
free(host_str);
}
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
*
*
*/
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
char *host, *aux, *token, *hostlist_str;
size_t i=0,j=0,len=0;
aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
strcpy(aux, nodelist);
token = strtok(aux, ",");
while (token != NULL && i < nodes) {
host = strdup(token);
if (procs_array[i] != 0) {
write_str_node(&hostlist_str, len, procs_array[i], host);
set_mapping_host(procs_array[i], hostlist_str, j, spawn_data);
free(hostlist_str); hostlist_str = NULL;
j++;
}
i++;
free(host);
token = strtok(NULL, ",");
}
free(aux);
if(hostlist_str != NULL) { free(hostlist_str); }
}
//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//
/*
* Anyade en la siguiente entrada de spawns la
* distribucion fisica a utilizar con un par
* host/mapping y el total de procesos.
*/
void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data) {
MPI_Info *info;
spawn_data->sets[index].spawn_qty = qty;
info = &(spawn_data->sets[index].mapping);
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", host);
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str) {
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
char *host, *aux, *token;
size_t i=0,len=0;
......@@ -200,7 +270,7 @@ void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str
while (token != NULL && i < used_nodes) {
host = strdup(token);
if (qty[i] != 0) {
len = write_str_node(host_str, len, qty[i], host);
len = write_str_node(hostlist_str, len, qty[i], host);
}
i++;
free(host);
......@@ -213,7 +283,7 @@ void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str
* Añade en una cadena "qty" entradas de "node_name".
* Realiza la reserva de memoria y la realoja si es necesario.
*/
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name) {
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
int err;
char *ocurrence;
size_t i, len, len_node;
......@@ -222,11 +292,11 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na
len = qty * len_node; // Number of times the node is used
if(len_og == 0) { // Memoria no reservada
*hostfile_str = (char *) malloc((len+1) * sizeof(char));
*hostlist_str = (char *) malloc((len+1) * sizeof(char));
} else { // Cadena ya tiene datos
*hostfile_str = (char *) realloc(*hostfile_str, (len_og + len + 1) * sizeof(char));
*hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
}
if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria
if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
ocurrence = (char *) malloc((len_node+1) * sizeof(char));
if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
......@@ -236,10 +306,10 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na
i=0;
if(len_og == 0) { // Si se inicializa, la primera es una copia
i++;
strcpy(*hostfile_str, node_name);
strcpy(*hostlist_str, node_name);
}
for(; i<qty; i++){ // Las siguientes se conctanenan
strcat(*hostfile_str, ocurrence);
strcat(*hostlist_str, ocurrence);
}
......@@ -255,21 +325,48 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na
* procesos.
* Es necesario usar Slurm para usarlo.
*/
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
// CREATE AND SET STRING HOSTS
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
char *hoststring;
// CREATE AND SET STRING HOSTS
fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", hoststring);
set_mapping_host(spawn_data->spawn_qty, hoststring, 0, spawn_data);
free(hoststring);
}
/*
* Crea y devuelve un conjunto de objetos MPI_Info con
* un par host/mapping en el que se indica el mappeado
* a utilizar en los nuevos procesos dividido por nodos.
* Es necesario Slurm para usarlo.
*/
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
char *host, *hostlist_str;
size_t i=0,j=0,len=0;
hostlist_t hostlist;
hostlist_str = NULL;
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
write_str_node(&hostlist_str, len, qty[i], host);
set_mapping_host(qty[i], hostlist_str, j, spawn_data);
free(hostlist_str); hostlist_str = NULL;
j++;
}
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
if(hostlist_str != NULL) { free(hostlist_str); }
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) {
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
char *host;
size_t i=0,len=0;
hostlist_t hostlist;
......@@ -277,7 +374,7 @@ void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **ho
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
len = write_str_node(hostfile_str, len, qty[i], host);
len = write_str_node(hostlist_str, len, qty[i], host);
}
i++;
free(host);
......@@ -296,9 +393,13 @@ void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **ho
* Genera un fichero hostfile y lo anyade a un objeto
* MPI_Info para ser utilizado.
*/
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info){
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data){
char *hostfile;
int ptr;
MPI_Info *info;
spawn_data->sets[0].spawn_qty = spawn_data->spawn_qty;
info = &(spawn_data->sets[0].mapping);
// CREATE/UPDATE HOSTFILE
ptr = create_hostfile(&hostfile);
......
......@@ -9,6 +9,6 @@
#include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn);
void processes_dist(Spawn_data *spawn_data);
#endif
......@@ -5,14 +5,22 @@
#include <mpi.h>
/* --- SPAWN STRUCTURE --- */
typedef struct {
int spawn_qty;
MPI_Info mapping;
} Spawn_set;
typedef struct {
int spawn_qty, initial_qty, target_qty;
int already_created;
int spawn_is_single, spawn_is_async, spawn_is_intercomm;
MPI_Info mapping;
int total_spawns;
int spawn_is_single, spawn_is_async, spawn_is_intercomm, spawn_is_multiple;
// MPI_Info mapping;
int mapping_fill_method;
MPI_Comm comm, returned_comm;
MPI_Comm comm, returned_comm; // ONLY SET FOR SOURCE PROCESSES
Spawn_set *sets;
} Spawn_data;
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment