Commit 335fdd65 authored by iker_martin's avatar iker_martin
Browse files

Spawn ports enhanced for complex strategies. Basic algorithm for multiple...

Spawn ports enhanced for complex strategies. Basic algorithm for multiple strat has beed refactored.
parent 2d27fd66
...@@ -5,19 +5,23 @@ ...@@ -5,19 +5,23 @@
#include "../MAM_DataStructures.h" #include "../MAM_DataStructures.h"
#include "Baseline.h" #include "Baseline.h"
#include "Spawn_state.h" #include "Spawn_state.h"
#include "PortService.h"
#define MAM_TAG_STRAT_SINGLE 130 #define MAM_TAG_STRAT_SINGLE 130
#define MAM_TAG_STRAT_MULTIPLE 131
#define MAM_TAG_STRAT_MULTIPLE_FIRST 131 #define MAM_TAG_STRAT_MULTIPLE_FIRST 131
#define MAM_TAG_STRAT_MULTIPLE_OTHER 132 #define MAM_TAG_STRAT_MULTIPLE_OTHER 132
//--------------PRIVATE DECLARATIONS---------------// //--------------PRIVATE DECLARATIONS---------------//
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child); int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child);
void baseline_parents(Spawn_data spawn_data, MPI_Comm *child); void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child); void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
void multiple_strat_parents2(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
void multiple_strat_children(MPI_Comm *parents); void multiple_strat_children(MPI_Comm *parents);
void multiple_strat_children2(MPI_Comm *parents, Spawn_ports *spawn_port);
void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child); void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
void single_strat_children(MPI_Comm *parents); void single_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port);
//--------------PUBLIC FUNCTIONS---------------// //--------------PUBLIC FUNCTIONS---------------//
...@@ -26,16 +30,19 @@ void single_strat_children(MPI_Comm *parents); ...@@ -26,16 +30,19 @@ void single_strat_children(MPI_Comm *parents);
* spawn_data.spawn_qty procesos. * spawn_data.spawn_qty procesos.
*/ */
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
Spawn_ports spawn_port;
MPI_Comm intercomm; MPI_Comm intercomm;
MPI_Comm_get_parent(&intercomm); //FIXME May be a problem for third reconf or more with only expansions MPI_Comm_get_parent(&intercomm); //FIXME May be a problem for third reconf or more with only expansions
init_ports(&spawn_port);
if (intercomm == MPI_COMM_NULL) { // Parents path if (intercomm == MPI_COMM_NULL) { // Parents path
baseline_parents(spawn_data, child); baseline_parents(spawn_data, &spawn_port, child);
} else { // Children path } else { // Children path
if(spawn_data.spawn_is_multiple) { multiple_strat_children(child); } if(spawn_data.spawn_is_multiple) { multiple_strat_children2(child, &spawn_port); }
if(spawn_data.spawn_is_single) { single_strat_children(child); } if(spawn_data.spawn_is_single) { single_strat_children(child, &spawn_port); }
} }
free_ports(&spawn_port);
return MAM_I_SPAWN_COMPLETED; return MAM_I_SPAWN_COMPLETED;
} }
...@@ -46,7 +53,7 @@ int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de err ...@@ -46,7 +53,7 @@ int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de err
* creación de procesos. * creación de procesos.
* *
*/ */
void baseline_parents(Spawn_data spawn_data, MPI_Comm *child) { void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child) {
int i; int i;
MPI_Comm comm, *intercomms; MPI_Comm comm, *intercomms;
...@@ -73,7 +80,7 @@ void baseline_parents(Spawn_data spawn_data, MPI_Comm *child) { ...@@ -73,7 +80,7 @@ void baseline_parents(Spawn_data spawn_data, MPI_Comm *child) {
#endif #endif
// TODO Improvement - Deactivate Multiple spawn before spawning if total_spawns == 1 // TODO Improvement - Deactivate Multiple spawn before spawning if total_spawns == 1
if(spawn_data.spawn_is_multiple) { multiple_strat_parents(spawn_data, comm, intercomms, child); } if(spawn_data.spawn_is_multiple) { multiple_strat_parents2(spawn_data, spawn_port, comm, intercomms, child); }
else { *child = intercomms[0]; } else { *child = intercomms[0]; }
if(spawn_data.spawn_is_single) { single_strat_parents(spawn_data, child); } if(spawn_data.spawn_is_single) { single_strat_parents(spawn_data, child); }
...@@ -125,6 +132,42 @@ void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *inte ...@@ -125,6 +132,42 @@ void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *inte
free(port_name); free(port_name);
} }
void multiple_strat_parents2(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
int i, rootBcast;
int buffer[2];
char aux;
i = 0;
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
buffer[0] = i;
buffer[1] = spawn_data.total_spawns;
MPI_Bcast(buffer, 2, MPI_INT, rootBcast, intercomms[i]);
if(mall->myId == mall->root) {
MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
}
for(i=1; i<spawn_data.total_spawns; i++) {
buffer[0] = i;
MPI_Bcast(buffer, 2, MPI_INT, rootBcast, intercomms[i]);
if(mall->myId == mall->root) {
MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
}
}
// Reconnect with new children communicator
if(mall->myId == mall->root) { discover_remote_port(0, spawn_port); }
else { discover_remote_port(MAM_SERVICE_UNNEEDED, spawn_port); }
MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, mall->root, comm, child);
// Free unneeded spawn communicators
for(i=0; i<spawn_data.total_spawns; i++) { MPI_Comm_disconnect(&intercomms[i]); }
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Multiple PA completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
void multiple_strat_children(MPI_Comm *parents) { void multiple_strat_children(MPI_Comm *parents) {
int i, start, total_spawns, new_root; int i, start, total_spawns, new_root;
int rootBcast = MPI_PROC_NULL; int rootBcast = MPI_PROC_NULL;
...@@ -190,6 +233,68 @@ void multiple_strat_children(MPI_Comm *parents) { ...@@ -190,6 +233,68 @@ void multiple_strat_children(MPI_Comm *parents) {
*parents = intercomm; *parents = intercomm;
} }
void multiple_strat_children2(MPI_Comm *parents, Spawn_ports *spawn_port) {
int i, group_id, total_spawns, new_root;
int buffer[2];
char aux;
MPI_Comm newintracomm, intercomm, parents_comm;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Multiple CH started", mall->myId, mall->numP); fflush(stdout);
#endif
new_root = 0;
parents_comm = *parents;
MPI_Bcast(buffer, 2, MPI_INT, mall->root_parents, parents_comm);
group_id = buffer[0];
total_spawns = buffer[1];
if(mall->myId == mall->root && !group_id) { new_root = 1; }
open_port(spawn_port, new_root, group_id);
if(group_id) {
if(mall->myId == mall->root) { discover_remote_port(0, spawn_port); }
else { discover_remote_port(MAM_SERVICE_UNNEEDED, spawn_port); }
MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
MPI_Comm_disconnect(&intercomm);
} else { // Root group of targets
group_id = 1;
MPI_Comm_dup(mall->comm, &newintracomm);
if(new_root) {
MPI_Send(&aux, 1, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_MULTIPLE, parents_comm); // Ensures order in the created intracommunicator
}
}
for(i=group_id; i<total_spawns; i++) {
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
if(newintracomm != MPI_COMM_WORLD) MPI_Comm_disconnect(&newintracomm);
MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
MPI_Comm_disconnect(&intercomm);
if(new_root) {
MPI_Send(&aux, 1, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_MULTIPLE, parents_comm); // Ensures order in the created intracommunicator
}
}
// Connect with sources
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
// Update communicator to expected one
MAM_comms_update(newintracomm);
MPI_Comm_rank(mall->comm, &mall->myId);
MPI_Comm_size(mall->comm, &mall->numP);
MPI_Comm_disconnect(&newintracomm);
MPI_Comm_disconnect(parents);
*parents = intercomm;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Multiple CH completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
/* /*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres * Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root * Si el valor es diferente, la creación es solo con la participación del proceso root
...@@ -214,6 +319,10 @@ void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) { ...@@ -214,6 +319,10 @@ void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
MPI_Comm_disconnect(child); MPI_Comm_disconnect(child);
free(port_name); free(port_name);
*child = newintercomm; *child = newintercomm;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Single PA completed", mall->myId, mall->numP); fflush(stdout);
#endif
} }
/* /*
...@@ -223,24 +332,16 @@ void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) { ...@@ -223,24 +332,16 @@ void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
* Solo se utiliza cuando la creación de los procesos ha sido * Solo se utiliza cuando la creación de los procesos ha sido
* realizada por un solo proceso padre * realizada por un solo proceso padre
*/ */
void single_strat_children(MPI_Comm *parents) { void single_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port) {
char *port_name;
MPI_Comm newintercomm; MPI_Comm newintercomm;
int is_root = mall->myId == mall->root ? 1 : 0;
open_port(spawn_port, is_root, MAM_SERVICE_UNNEEDED);
if(mall->myId == mall->root) { if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char)); MPI_Send(spawn_port->port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_SINGLE, *parents);
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_SINGLE, *parents);
} else {
port_name = malloc(1);
} }
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, mall->comm, &newintercomm); MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, mall->comm, &newintercomm);
if(mall->myId == mall->root) {
MPI_Close_port(port_name);
}
free(port_name);
MPI_Comm_disconnect(parents); MPI_Comm_disconnect(parents);
*parents = newintercomm; *parents = newintercomm;
} }
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "PortService.h"
#define MAM_SERVICE_CONSTANT_NAME 22 // Constant size name
#define MAM_SERVICE_VARIABLE_NAME 4 // Variable size name + '\0'
#define MAM_SERVICE_NAME_SIZE MAM_SERVICE_CONSTANT_NAME + MAM_SERVICE_VARIABLE_NAME
// Example of mam service name --> "mam_service_jid0010_gr001\0"
// constant part |variable part
//
void init_ports(Spawn_ports *spawn_port) {
spawn_port->opened_port = 0;
spawn_port->port_name = NULL;
spawn_port->service_name = NULL;
spawn_port->remote_port = NULL;
spawn_port->remote_service = NULL;
}
/*
* Opens an MPI port for inter-process communication and optionally publishes it as a service.
* Allows MaM to find other spawned groups which are not connected.
*
* Parameters:
* spawn_data: A structure containing information related to the port and service names.
* open_port : A flag that indicates if this process should Open (1) or only malloc(0).
* open_service: A flag that indicates if the service should be published.
* If it is not MAM_SERVICE_UNNEEDED, a service name is generated and published with the chosen number.
*
* Functionality:
* - Ensures that a port is only opened if it hasn't been opened already.
* - The process with the root rank opens the port and, if required, publishes a service name for it.
* - If SLURM is being used, it attempts to get the SLURM job ID from the environment.
* - For non-root ranks, it simply allocates 1 byte of memory for the port_name to avoid it being NULL (a placeholder operation).
*
* Notes:
* - SLURM is conditionally used to obtain job-specific information.
* - Error handling is not included in this function (e.g., failed memory allocation, failed MPI calls).
*/
void open_port(Spawn_ports *spawn_port, int open_port, int open_service)
{
int job_id = 0;
if (spawn_port->port_name != NULL)
return;
if (open_port)
{
spawn_port->opened_port = 1;
spawn_port->port_name = (char *)malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Open_port(MPI_INFO_NULL, spawn_port->port_name);
if (open_service != MAM_SERVICE_UNNEEDED)
{
spawn_port->service_name = (char *)malloc((MAM_SERVICE_NAME_SIZE) * sizeof(char));
#if MAM_USE_SLURM
char *tmp = getenv("SLURM_JOB_ID");
if(tmp != NULL) { job_id = atoi(tmp); }
#endif
snprintf(spawn_port->service_name, MAM_SERVICE_NAME_SIZE, "mam_service_jid%04d_gr%03d", job_id, open_service);
MPI_Publish_name(spawn_port->service_name, MPI_INFO_NULL, spawn_port->port_name);
}
} else {
spawn_port->port_name = malloc(1);
}
}
/*
* Function: close_port
* --------------------
* Closes an open MPI local port and cleans up associated resources.
*
* Parameters:
* spawn_data: A structure containing information related to the port and service names.
*
* Functionality:
* - The root process is the only one responsible for closing the MPI port and service.
* - Frees the memory allocated for the port and service and sets the pointer to NULL.
*
* Notes:
* - This function assumes that MPI resources were successfully allocated and opened in the corresponding `open_port` function.
* - No explicit error handling is present (e.g., checking the return value of MPI functions).
*/
void close_port(Spawn_ports *spawn_port) {
if(spawn_port->port_name != NULL) {
if(spawn_port->service_name != NULL) {
MPI_Unpublish_name(spawn_port->service_name, MPI_INFO_NULL, spawn_port->port_name);
free(spawn_port->service_name);
spawn_port->service_name = NULL;
}
if(spawn_port->opened_port) MPI_Close_port(spawn_port->port_name);
free(spawn_port->port_name);
spawn_port->port_name = NULL;
}
}
/*
* Function: discover_remote_port
* ------------------------------
* Discovers the MPI port associated with a remote service using its service name.
* If the port cannot be found, it retries a set number of times before aborting the MPI execution.
* This function must at least be called by the root process which will call MPI_Comm_connect, altough
* it could be called by all processes without any issues.
*
* Parameters:
* remote_service: A pointer to a string that will hold the remote service name.
* If this is the first time discovering the service, memory will be allocated and the name will be generated.
* id_group: An integer representing the group ID, used to identify the service.
* remote_port: A string where the discovered remote port name will be stored.
*
* Notes:
* - This function assumes that the service name follows a fixed pattern (`mam_service_jid%04d_gr%03d`).
* - If id_group is MAM_SERVICE_UNNEEDED, it is assumed the process is not the root and does not require
* to discover the real port.
* - SLURM is conditionally used to retrieve the job ID from the environment.
* - The number of retry attempts before aborting is limited to 5.
* - No explicit error handling is present (e.g., checking the return value of MPI functions).
*/
void discover_remote_port(int id_group, Spawn_ports *spawn_port) {
int error_tries = 0, job_id = 0;
if(spawn_port->remote_port == NULL) {
spawn_port->remote_port = (char*) malloc(MPI_MAX_PORT_NAME * sizeof(char));
}
if(id_group == MAM_SERVICE_UNNEEDED) { return; }
if(spawn_port->remote_service == NULL) { //First discover
spawn_port->remote_service = (char*) malloc(MAM_SERVICE_NAME_SIZE * sizeof(char));
#if MAM_USE_SLURM
char *tmp = getenv("SLURM_JOB_ID");
if(tmp != NULL) { job_id = atoi(tmp); }
#endif
snprintf(spawn_port->remote_service, MAM_SERVICE_NAME_SIZE, "mam_service_jid%04d_gr%03d", job_id, id_group);
} else { // For subsequent lookups, only update the variable part (group ID) of the service name.
snprintf(spawn_port->remote_service + MAM_SERVICE_CONSTANT_NAME, MAM_SERVICE_VARIABLE_NAME, "%03d", id_group);
}
snprintf(spawn_port->remote_port, 5, "NULL");
MPI_Lookup_name(spawn_port->remote_service, MPI_INFO_NULL, spawn_port->remote_port);
while(strncmp(spawn_port->remote_port, "NULL", 4) == 0) {
sleep(1);
MPI_Lookup_name(spawn_port->remote_service, MPI_INFO_NULL, spawn_port->remote_port);
if(++error_tries > 5) MPI_Abort(MPI_COMM_WORLD, -1);
}
}
void free_ports(Spawn_ports *spawn_port) {
close_port(spawn_port);
if(spawn_port->remote_port != NULL) {
free(spawn_port->remote_port);
spawn_port->remote_port = NULL;
}
if(spawn_port->remote_service != NULL) {
free(spawn_port->remote_service);
spawn_port->remote_service = NULL;
}
}
#ifndef MAM_SPAWN_PORTSERVICE_H
#define MAM_SPAWN_PORTSERVICE_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
#define MAM_SERVICE_UNNEEDED -1 // Constant to avoid opening a service if not required
void init_ports(Spawn_ports *spawn_port);
void open_port(Spawn_ports *spawn_port, int open_port, int open_service);
void close_port(Spawn_ports *spawn_port);
void discover_remote_port(int id_group, Spawn_ports *spawn_port);
void free_ports(Spawn_ports *spawn_port);
#endif
...@@ -11,6 +11,12 @@ typedef struct { ...@@ -11,6 +11,12 @@ typedef struct {
MPI_Info mapping; MPI_Info mapping;
} Spawn_set; } Spawn_set;
typedef struct {
int opened_port;
char *port_name, *service_name;
char *remote_port, *remote_service;
} Spawn_ports;
typedef struct { typedef struct {
int spawn_qty, initial_qty, target_qty; int spawn_qty, initial_qty, target_qty;
int already_created; int already_created;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment