Commit 24f3bfb7 authored by iker_martin's avatar iker_martin
Browse files

WIP 2/3 Added Parallel Strat. The spawn has been moved from Baseline to SpawnUtils.c.

parent 62af4ef7
......@@ -34,7 +34,7 @@ mam_config_setting_t configSettings[] = {
{NULL, 1, INT_MAX, {.set_config_complex = MAM_I_set_target_number }, MAM_NUM_TARGETS_ENV}
};
unsigned int masks_spawn[] = {MAM_STRAT_CLEAR_VALUE, MAM_MASK_PTHREAD, MAM_MASK_SPAWN_SINGLE, MAM_MASK_SPAWN_INTERCOMM, MAM_MASK_SPAWN_MULTIPLE};
unsigned int masks_spawn[] = {MAM_STRAT_CLEAR_VALUE, MAM_MASK_PTHREAD, MAM_MASK_SPAWN_SINGLE, MAM_MASK_SPAWN_INTERCOMM, MAM_MASK_SPAWN_MULTIPLE, MAM_MASK_SPAWN_PARALLEL};
unsigned int masks_red[] = {MAM_STRAT_CLEAR_VALUE, MAM_MASK_PTHREAD, MAM_MASK_RED_WAIT_SOURCES, MAM_MASK_RED_WAIT_TARGETS};
/**
......@@ -92,7 +92,7 @@ void MAM_Set_key_configuration(int key, int required, int *provided) {
if(required < 0 || state > MAM_I_NOT_STARTED) return;
mam_config_setting_t *config = NULL;
for (i = 0; i < MAM_KEY_COUNT; i++) {
for (i = 0; i < MAM_KEY_COUNT; i++) { //FIXME A for is not needed -- Check if key < MAM_KEY_COUNT and then just use key as index
if (key == i) {
config = &configSettings[i];
break;
......@@ -228,7 +228,8 @@ void MAM_Check_configuration() {
}
MPI_Allreduce(&mall->internode_group, &global_internodes, 1, MPI_INT, MPI_MAX, mall->comm);
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL)
if((MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL)
|| MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PARALLEL, NULL) )
&& global_internodes) { // Remove internode MPI_COMM_WORLDs
MAM_Set_key_configuration(MAM_SPAWN_METHOD, MAM_SPAWN_BASELINE, NULL);
}
......@@ -237,6 +238,7 @@ void MAM_Check_configuration() {
if(MAM_I_contains_strat(mall_conf->spawn_strategies, MAM_MASK_SPAWN_INTERCOMM)) {
MAM_I_remove_strat(&mall_conf->spawn_strategies, MAM_MASK_SPAWN_INTERCOMM);
}
// FIXME This should not be required to be removed for that case...
if(mall->numP > mall->numC && MAM_I_contains_strat(mall_conf->spawn_strategies, MAM_MASK_SPAWN_SINGLE)) {
MAM_I_remove_strat(&mall_conf->spawn_strategies, MAM_MASK_SPAWN_SINGLE);
}
......@@ -301,6 +303,7 @@ int MAM_I_set_method(unsigned int new_method, unsigned int *method) {
return *method;
}
//TODO Se podría hacer un par de arrays o dict para obtener la mascara sin un switch
int MAM_I_set_spawn_strat(unsigned int strategy, unsigned int *strategies) {
int result = 0;
int strat_removed = 0;
......@@ -315,12 +318,25 @@ int MAM_I_set_spawn_strat(unsigned int strategy, unsigned int *strategies) {
break;
case MAM_STRAT_SPAWN_SINGLE:
result = MAM_I_add_strat(strategies, MAM_MASK_SPAWN_SINGLE);
if(result == MAM_STRATS_ADDED) {
strat_removed += MAM_I_remove_strat(strategies, MAM_MASK_SPAWN_PARALLEL);
}
break;
case MAM_STRAT_SPAWN_INTERCOMM:
result = MAM_I_add_strat(strategies, MAM_MASK_SPAWN_INTERCOMM);
break;
case MAM_STRAT_SPAWN_MULTIPLE:
result = MAM_I_add_strat(strategies, MAM_MASK_SPAWN_MULTIPLE);
if(result == MAM_STRATS_ADDED) {
strat_removed += MAM_I_remove_strat(strategies, MAM_MASK_SPAWN_PARALLEL);
}
break;
case MAM_STRAT_SPAWN_PARALLEL:
result = MAM_I_add_strat(strategies, MAM_MASK_SPAWN_PARALLEL);
if(result == MAM_STRATS_ADDED) {
strat_removed += MAM_I_remove_strat(strategies, MAM_MASK_SPAWN_MULTIPLE);
strat_removed += MAM_I_remove_strat(strategies, MAM_MASK_SPAWN_SINGLE);
}
break;
default:
//Unkown strategy
......
......@@ -13,6 +13,7 @@
#define MAM_MASK_SPAWN_SINGLE 0x02
#define MAM_MASK_SPAWN_INTERCOMM 0x04
#define MAM_MASK_SPAWN_MULTIPLE 0x08
#define MAM_MASK_SPAWN_PARALLEL 0x10
#define MAM_MASK_RED_WAIT_SOURCES 0x02
#define MAM_MASK_RED_WAIT_TARGETS 0x04
......
......@@ -9,7 +9,7 @@ enum mam_states{MAM_UNRESERVED, MAM_NOT_STARTED, MAM_PENDING, MAM_USER_PENDING,
enum mam_proc_states{MAM_PROC_CONTINUE, MAM_PROC_NEW_RANK, MAM_PROC_ZOMBIE};
enum mam_spawn_methods{MAM_SPAWN_BASELINE, MAM_SPAWN_MERGE, MAM_METHODS_SPAWN_LEN};
enum mam_spawn_strategies{MAM_STRAT_SPAWN_CLEAR, MAM_STRAT_SPAWN_PTHREAD, MAM_STRAT_SPAWN_SINGLE, MAM_STRAT_SPAWN_INTERCOMM, MAM_STRAT_SPAWN_MULTIPLE, MAM_STRATS_SPAWN_LEN};
enum mam_spawn_strategies{MAM_STRAT_SPAWN_CLEAR, MAM_STRAT_SPAWN_PTHREAD, MAM_STRAT_SPAWN_SINGLE, MAM_STRAT_SPAWN_INTERCOMM, MAM_STRAT_SPAWN_MULTIPLE, MAM_STRAT_SPAWN_PARALLEL, MAM_STRATS_SPAWN_LEN};
enum mam_phy_dist_methods{MAM_PHY_DIST_SPREAD = 1, MAM_PHY_DIST_COMPACT, MAM_METHODS_PHYSICAL_DISTRIBUTION_LEN};
enum mam_phy_info_methods{MAM_PHY_TYPE_STRING = 1, MAM_PHY_TYPE_HOSTFILE};
......
......@@ -9,7 +9,7 @@ int state = MAM_I_UNRESERVED;
* de MaM.
*/
void MAM_Def_main_datatype() {
int i, counts = 11;
int i, counts = 12;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
......@@ -33,9 +33,10 @@ void MAM_Def_main_datatype() {
MPI_Get_address(&(mall->root_parents), &displs[5]);
MPI_Get_address(&(mall->num_parents), &displs[6]); //TODO Add only when Single strat active?
MPI_Get_address(&(mall->numC), &displs[7]); //TODO Add only when MultipleSpawn strat active?
MPI_Get_address(&(mall->num_cpus), &displs[8]);
MPI_Get_address(&(mall->num_nodes), &displs[9]);
MPI_Get_address(&(mall->nodelist_len), &displs[10]);
MPI_Get_address(&(mall->gid), &displs[8]); //TODO Add only when ParallelSpawn strat active?
MPI_Get_address(&(mall->num_cpus), &displs[9]);
MPI_Get_address(&(mall->num_nodes), &displs[10]);
MPI_Get_address(&(mall->nodelist_len), &displs[11]);
MPI_Type_create_struct(counts, blocklengths, displs, types, &mall->struct_type);
MPI_Type_commit(&mall->struct_type);
......
......@@ -52,7 +52,7 @@ typedef struct {
typedef struct {
int myId, numP, numC, zombie;
int root, root_collectives;
int num_parents, root_parents;
int num_parents, root_parents, gid;
pthread_t async_thread;
MPI_Comm comm, thread_comm, original_comm;
MPI_Comm intercomm, tmp_comm;
......
......@@ -701,7 +701,8 @@ void Children_init(void (*user_function)(void *), void *user_args) {
}
mall->root_collectives = mall->root_parents;
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL)) {
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL)
|| MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PARALLEL, NULL)) {
mall->internode_group = 0;
} else {
mall->internode_group = MAM_Is_internode_group();
......
......@@ -4,13 +4,15 @@
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "Baseline.h"
#include "SpawnUtils.h"
#include "Strategy_Single.h"
#include "Strategy_Multiple.h"
#include "Strategy_Parallel.h"
#include "PortService.h"
//--------------PRIVATE DECLARATIONS---------------//
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child);
void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
void baseline_children(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents);
//--------------PUBLIC FUNCTIONS---------------//
/*
......@@ -26,8 +28,7 @@ int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de err
if (intercomm == MPI_COMM_NULL) { // Parents path
baseline_parents(spawn_data, &spawn_port, child);
} else { // Children path
if(spawn_data.spawn_is_multiple) { multiple_strat_children(child, &spawn_port); }
if(spawn_data.spawn_is_single) { single_strat_children(child, &spawn_port); }
baseline_children(spawn_data, &spawn_port, child);
}
free_ports(&spawn_port);
......@@ -45,6 +46,17 @@ void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *
int i;
MPI_Comm comm, *intercomms;
#if MAM_DEBUG >= 3
DEBUG_FUNC("Starting spawning of processes", mall->myId, mall->numP); fflush(stdout);
#endif
if (spawn_data.spawn_is_parallel) {
// This spawn is quite different from the rest, as so
// it takes care of everything related to spawning.
parallel_strat_parents(spawn_data, spawn_port, child);
return;
}
if (spawn_data.spawn_is_single && mall->myId != mall->root) {
single_strat_parents(spawn_data, child);
return;
......@@ -57,11 +69,8 @@ void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *
spawn_data.sets = (Spawn_set *) malloc(spawn_data.total_spawns * sizeof(Spawn_set));
}
#if MAM_DEBUG >= 3
DEBUG_FUNC("Starting spawning of processes", mall->myId, mall->numP); fflush(stdout);
#endif
for(i=0; i<spawn_data.total_spawns; i++) {
baseline_spawn(spawn_data.sets[i], comm, &intercomms[i]);
mam_spawn(spawn_data.sets[i], comm, &intercomms[i]);
}
#if MAM_DEBUG >= 3
DEBUG_FUNC("Sources have created the new processes. Performing additional actions if required.", mall->myId, mall->numP); fflush(stdout);
......@@ -77,22 +86,15 @@ void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *
if(mall->myId != mall->root) { free(spawn_data.sets); }
}
/*
* Funcion basica encargada de la creacion de procesos.
* Crea un set de procesos segun la configuracion obtenida
* en ProcessDist.c
* Devuelve en "child" el intercomunicador que se conecta a los hijos.
*/
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child) {
int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
int spawn_err = MPI_Comm_spawn(spawn_set.cmd, MPI_ARGV_NULL, spawn_set.spawn_qty, spawn_set.mapping, mall->root, comm, child, MPI_ERRCODES_IGNORE);
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_set.spawn_qty);
void baseline_children(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents) {
if(spawn_data.spawn_is_parallel) {
// This spawn is quite different from the rest, as so
// it takes care of everything related to spawning.
parallel_strat_children(spawn_data, spawn_port, parents);
return;
}
MAM_Comm_main_structures(*child, rootBcast);
return spawn_err;
if(spawn_data.spawn_is_multiple) { multiple_strat_children(parents, spawn_port); }
if(spawn_data.spawn_is_single) { single_strat_children(parents, spawn_port); }
}
\ No newline at end of file
......@@ -139,7 +139,6 @@ void unset_spawn_postpone_flag(int outside_state) {
*/
void malleability_connect_children(MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
MAM_Comm_main_structures(*parents, MAM_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
spawn_data->initial_qty = mall->num_parents;
spawn_data->target_qty = mall->numC;
......@@ -147,6 +146,7 @@ void malleability_connect_children(MPI_Comm *parents) {
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PARALLEL, &(spawn_data->spawn_is_parallel));
switch(mall_conf->spawn_method) {
case MAM_SPAWN_BASELINE:
......@@ -179,8 +179,9 @@ void set_spawn_configuration(MPI_Comm comm) {
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PARALLEL, &(spawn_data->spawn_is_parallel));
spawn_data->comm = comm;
spawn_data->mapping_fill_method = MAM_PHY_TYPE_STRING;
spawn_data->mapping_fill_method = MAM_PHY_TYPE_HOSTFILE;
spawn_data->sets = NULL;
switch(mall_conf->spawn_method) {
......
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "SpawnUtils.h"
/*
* Funcion basica encargada de la creacion de procesos.
* Crea un set de procesos segun la configuracion obtenida
* en ProcessDist.c
* Devuelve en "child" el intercomunicador que se conecta a los hijos.
*/
void mam_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child) {
int rootBcast = MPI_PROC_NULL;
int comm_size;
MPI_Comm_size(comm, &comm_size);
if(mall->myId == mall->root || comm_size == 1) rootBcast = MPI_ROOT;
int spawn_err = MPI_Comm_spawn(spawn_set.cmd, MPI_ARGV_NULL, spawn_set.spawn_qty, spawn_set.mapping, MAM_ROOT, comm, child, MPI_ERRCODES_IGNORE);
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_set.spawn_qty);
MPI_Abort(MPI_COMM_WORLD, -1);
}
MAM_Comm_main_structures(*child, rootBcast);
}
/*
* Comprueba que comando hay que llamar al realizar
* el spawn. Todos los sets tienen que hacer el mismo
* comando.
*/
char* get_spawn_cmd() {
char *cmd_aux;
switch(mall_conf->external_usage) {
case MAM_USE_VALGRIND:
cmd_aux = MAM_VALGRIND_SCRIPT;
break;
case MAM_USE_EXTRAE:
cmd_aux = MAM_EXTRAE_SCRIPT;
break;
default:
cmd_aux = mall->name_exec;
break;
}
return cmd_aux;
}
\ No newline at end of file
#ifndef MAM_SPAWN_UTILS_H
#define MAM_SPAWN_UTILS_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
void mam_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child);
char* get_spawn_cmd();
#endif
......@@ -21,7 +21,7 @@ typedef struct {
int spawn_qty, initial_qty, target_qty;
int already_created;
int total_spawns;
int spawn_is_single, spawn_is_async, spawn_is_intercomm, spawn_is_multiple;
int spawn_is_single, spawn_is_async, spawn_is_intercomm, spawn_is_multiple, spawn_is_parallel;
// MPI_Info mapping;
int mapping_fill_method;
......
......@@ -8,6 +8,11 @@
#include "Strategy_Multiple.h"
/*=====================DEBUG ALGORITHM=====================*/
//The following algorithm is a basic implementation, was created
//to test if the idea could work within Slurm+DMR.
//Im keeping it for cases when I want to debug codes related to
//this strategy.
void multiple_strat_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
int i, rootBcast;
int buffer[2];
......
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "PortService.h"
#include "Strategy_Parallel.h"
#include "ProcessDist.h"
#include "SpawnUtils.h"
#include <math.h>
void parallel_strat_parents_hypercube(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
void parallel_strat_children_hypercube(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents);
void hypercube_spawn(int group_id, int groups, int init_nodes, int init_step, MPI_Comm **spawn_comm, int *qty_comms);
void common_synch(Spawn_data spawn_data, int qty_comms, MPI_Comm intercomm, MPI_Comm *spawn_comm);
void binary_tree_connection(int groups, int group_id, Spawn_ports *spawn_port, MPI_Comm *newintracomm);
void binary_tree_reorder(MPI_Comm *newintracomm, int group_id);
//--------PUBLIC FUNCTIONS----------//
//The abstraction for the algorithm is to allow different algorithms depending
//on the circumstances of the spawn.
void parallel_strat_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child) {
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Parallel PA started", mall->myId, mall->numP); fflush(stdout);
#endif
parallel_strat_parents_hypercube(spawn_data, spawn_port, child);
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Parallel PA completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
void parallel_strat_children(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents) {
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Parallel CH started", mall->myId, mall->numP); fflush(stdout);
#endif
parallel_strat_children_hypercube(spawn_data, spawn_port, parents);
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Parallel CH completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
//--------PRIVATE FUNCTIONS----------//
/*=====================HYPERCUBE++ ALGORITHM=====================*/
//The following algorithm divides the spawning task across all available ranks.
//It starts with just the sources, and then all spawned processes help with further
//spawns until all the required processes have been created.
//FIXME -- The amount of processes per spawned group must be homogenous among groups
// - There is an exception for the last node, which could have less procs
// - Yet, the first spawned group cannot have less procs than the rest
void parallel_strat_parents_hypercube(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child) {
int opening, qty_comms;
int groups, init_nodes, actual_step, group_id;
MPI_Comm *spawn_comm = NULL;
MPI_Bcast(&spawn_data.total_spawns, 1, MPI_INT, mall->root, spawn_data.comm);
actual_step = 0;
qty_comms = 0;
init_nodes = mall->numP / mall->num_cpus; //FIXME does not consider heterogenous machines
groups = spawn_data.total_spawns + init_nodes;
//group_id = -((mall->myId / mall->num_cpus) + 1);
group_id = -init_nodes; //FIXME (((mall->numP-1) / mall->num_cpus) + 1); <-- Prev line
opening = mall->myId == mall->root ? 1 : 0;
open_port(spawn_port, opening, groups);
hypercube_spawn(group_id, groups, init_nodes, actual_step, &spawn_comm, &qty_comms);
common_synch(spawn_data, qty_comms, MPI_COMM_NULL, spawn_comm);
for(int i=0; i<qty_comms; i++) { MPI_Comm_disconnect(&spawn_comm[i]); }
if(spawn_comm != NULL) free(spawn_comm);
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, MAM_ROOT, spawn_data.comm, child);
}
/*
- MPI_Comm *parents: Initially is the intercommunicator with its parent
*/
void parallel_strat_children_hypercube(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents) {
int group_id, opening, qty_comms;
int actual_step;
int groups, init_nodes;
MPI_Comm newintracomm, *spawn_comm = NULL;
// TODO Comprobar si entrar en spawn solo si groups < numSources
qty_comms = 0;
group_id = mall->gid;
init_nodes = spawn_data.initial_qty / mall->num_cpus;
groups = spawn_data.spawn_qty / mall->num_cpus + init_nodes;
opening = (mall->myId == MAM_ROOT && group_id < groups/2) ? 1 : 0;
open_port(spawn_port, opening, group_id);
// Spawn more processes if required
if((groups-init_nodes) > spawn_data.initial_qty) {
actual_step = log((group_id + init_nodes) / init_nodes) / log(1 + mall->numP);
actual_step = floor(actual_step) + 1;
hypercube_spawn(group_id, groups, init_nodes, actual_step, &spawn_comm, &qty_comms);
}
common_synch(spawn_data, qty_comms, *parents, spawn_comm);
for(int i=0; i<qty_comms; i++) { MPI_Comm_disconnect(&spawn_comm[i]); }
MPI_Comm_disconnect(parents);
// Connect groups and ensure expected rank order
binary_tree_connection(groups-init_nodes, group_id, spawn_port, &newintracomm);
binary_tree_reorder(&newintracomm, group_id);
// Create intercomm between sources and children
opening = (mall->myId == mall->root && !group_id) ? groups : MAM_SERVICE_UNNEEDED;
discover_remote_port(opening, spawn_port);
MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, MAM_ROOT, newintracomm, parents);
// New group obtained -- Adjust ranks and comms
MAM_comms_update(newintracomm);
MPI_Comm_rank(mall->comm, &mall->myId);
MPI_Comm_size(mall->comm, &mall->numP);
MPI_Comm_disconnect(&newintracomm);
}
// This function does not allow the same process to have multiple threads executing it
void hypercube_spawn(int group_id, int groups, int init_nodes, int init_step,
MPI_Comm **spawn_comm, int *qty_comms) {
int i, next_group_id, aux_sum, actual_step, actual_nodes;
int jid=0, n=0;
char *file_name = NULL;
Spawn_set set;
actual_step = init_step;
actual_nodes = pow(1+mall->num_cpus, actual_step)*init_nodes - init_nodes;
aux_sum = mall->num_cpus*(init_nodes + group_id) + mall->myId; //Constant sum for next line
next_group_id = actual_nodes + aux_sum;
if(next_group_id < groups - init_nodes) { //FIXME qty_comms no se calcula bien para procesos del mismo group_id en los ultimos pasos
int max_steps = ceil(log(groups / init_nodes) / log(1 + mall->num_cpus));
*qty_comms = max_steps - actual_step;
*spawn_comm = (MPI_Comm *) malloc(*qty_comms * sizeof(MPI_Comm));
}
//if(mall->myId == 0)printf("T1 P%d+%d step=%d next_id=%d aux_sum=%d actual_nodes=%d comms=%d\n", mall->myId, group_id, actual_step, next_group_id, aux_sum, actual_nodes, *qty_comms);
#if MAM_USE_SLURM
char *tmp = getenv("SLURM_JOB_ID");
if(tmp != NULL) { jid = atoi(tmp); }
#endif
set.cmd = get_spawn_cmd();
i = 0;
while(next_group_id < groups - init_nodes) {
set_hostfile_name(&file_name, &n, jid, next_group_id);
//read_hostfile_procs(file_name, &set.spawn_qty);
set.spawn_qty = mall->num_cpus;
MPI_Info_create(&set.mapping);
MPI_Info_set(set.mapping, "hostfile", file_name);
mall->gid = next_group_id; // Used to pass the group id to the spawned process // Not thread safe
mam_spawn(set, MPI_COMM_SELF, &(*spawn_comm)[i]);
MPI_Info_free(&set.mapping);
actual_step++; i++;
actual_nodes = pow(1+mall->num_cpus, actual_step)*init_nodes - init_nodes;
next_group_id = actual_nodes + aux_sum;
}
*qty_comms = i;
if(file_name != NULL) free(file_name);
}
void common_synch(Spawn_data spawn_data, int qty_comms, MPI_Comm intercomm, MPI_Comm *spawn_comm) {
int i, root, root_other;
char aux;
MPI_Request *requests = NULL;
requests = (MPI_Request *) malloc(qty_comms * sizeof(MPI_Request));
root = root_other = 0; //FIXME Magical Number
// Upside synchronization
for(i=0; i<qty_comms; i++) {
MPI_Irecv(&aux, 1, MPI_CHAR, root_other, 130, spawn_comm[i], &requests[i]);
}
MPI_Waitall(qty_comms, requests, MPI_STATUSES_IGNORE);
if(intercomm != MPI_COMM_NULL && mall->myId == root) { MPI_Send(&aux, 1, MPI_CHAR, root_other, 130, intercomm); }
// Sources synchronization
// TODO Maybe could be used an split comm to reduce overhead of Barrier when not all sources spawn
if(intercomm == MPI_COMM_NULL) { MPI_Barrier(spawn_data.comm); }
// Downside synchronization
if(intercomm != MPI_COMM_NULL && mall->myId == root) { MPI_Recv(&aux, 1, MPI_CHAR, root_other, 130, intercomm, MPI_STATUS_IGNORE); }
for(i=0; i<qty_comms; i++) {
MPI_Isend(&aux, 1, MPI_CHAR, root_other, 130, spawn_comm[i], &requests[i]);
}
MPI_Waitall(qty_comms, requests, MPI_STATUSES_IGNORE);
if(requests != NULL) { free(requests); }
}
void binary_tree_connection(int groups, int group_id, Spawn_ports *spawn_port, MPI_Comm *newintracomm) {
int service_id;
int middle, new_groups, new_group_id, new_rank;
MPI_Comm merge_comm, aux_comm, new_intercomm;
// FIXME -- Supposes there is no changes in each group before this point
// - If there are any, they should be reflected in mall->comm
// and here should be used a duplicated of mall->comm.
// As of now is not used for simplicity
merge_comm = aux_comm = MPI_COMM_WORLD;
new_intercomm = MPI_COMM_NULL;
new_rank = mall->myId;
while(groups > 1) {
middle = groups / 2;
new_groups = ceil(groups / 2.0);
if(group_id < middle) {
//Accept work
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, MAM_ROOT, merge_comm, &new_intercomm);
MPI_Intercomm_merge(new_intercomm, 0, &aux_comm); //El que pone 0 va primero
if(merge_comm != MPI_COMM_WORLD && merge_comm != MPI_COMM_NULL) MPI_Comm_disconnect(&merge_comm);
if(new_intercomm != MPI_COMM_WORLD && new_intercomm != MPI_COMM_NULL) MPI_Comm_disconnect(&new_intercomm);
merge_comm = aux_comm;
MPI_Bcast(&new_groups, 1, MPI_INT, MAM_ROOT, aux_comm);
} else if(group_id >= new_groups) {
new_group_id = groups - group_id - 1;
service_id = new_rank == MAM_ROOT ? new_group_id : MAM_SERVICE_UNNEEDED;
discover_remote_port(service_id, spawn_port);
// Connect work
MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, MAM_ROOT, merge_comm, &new_intercomm);
MPI_Intercomm_merge(new_intercomm, 1, &aux_comm); //El que pone 0 va primero
if(merge_comm != MPI_COMM_WORLD && merge_comm != MPI_COMM_NULL) MPI_Comm_disconnect(&merge_comm);
if(new_intercomm != MPI_COMM_WORLD && new_intercomm != MPI_COMM_NULL) MPI_Comm_disconnect(&new_intercomm);
merge_comm = aux_comm;
// Get new id
group_id = new_group_id;
new_rank = -1;
MPI_Bcast(&new_groups, 1, MPI_INT, MAM_ROOT, aux_comm);
}
groups = new_groups;
}
*newintracomm = merge_comm;
}
void binary_tree_reorder(MPI_Comm *newintracomm, int group_id) {
int merge_size, *reorder, *index_reorder;
int expected_rank;
MPI_Group merge_group, aux_group;
MPI_Comm aux_comm;
// FIXME Expects all groups having the same size
expected_rank = mall->numP * group_id + mall->myId;
MPI_Comm_group(*newintracomm, &merge_group);
MPI_Comm_size(*newintracomm, &merge_size);
index_reorder = (int *) malloc(merge_size * sizeof(int));
reorder = (int *) malloc(merge_size * sizeof(int));
MPI_Allgather(&expected_rank, 1, MPI_INT, index_reorder, 1, MPI_INT, *newintracomm);
for(int i=0; i<merge_size; i++) {
reorder[index_reorder[i]] = i;
}
MPI_Group_incl(merge_group, merge_size, reorder, &aux_group);
MPI_Comm_create(*newintracomm, aux_group, &aux_comm);
//int merge_rank, new_rank;
//MPI_Comm_rank(*newintracomm, &merge_rank);
//MPI_Comm_rank(aux_comm, &new_rank);
//printf("Grupo %d -- Merge rank = %d - New rank = %d\n", group_id, merge_rank, new_rank);
if(*newintracomm != MPI_COMM_WORLD && *newintracomm != MPI_COMM_NULL) MPI_Comm_disconnect(newintracomm);
*newintracomm = aux_comm;
free(index_reorder);
free(reorder);
}
\ No newline at end of file
#ifndef MAM_SPAWN_PARALLEL_H
#define MAM_SPAWN_PARALLEL_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
void parallel_strat_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
void parallel_strat_children(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents);
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment