Commit e83b5922 authored by Iker Martín Álvarez's avatar Iker Martín Álvarez
Browse files

New version of Proteo

parent 26305fac
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "PortService.h"
#define MAM_SERVICE_CONSTANT_NAME 22 // Constant size name
#define MAM_SERVICE_VARIABLE_NAME 4 // Variable size name + '\0'
#define MAM_SERVICE_NAME_SIZE MAM_SERVICE_CONSTANT_NAME + MAM_SERVICE_VARIABLE_NAME
// Example of mam service name --> "mam_service_jid0010_gr001\0"
// constant part |variable part
//
void init_ports(Spawn_ports *spawn_port) {
spawn_port->opened_port = 0;
spawn_port->port_name = NULL;
spawn_port->service_name = NULL;
spawn_port->remote_port = NULL;
spawn_port->remote_service = NULL;
}
/*
* Opens an MPI port for inter-process communication and optionally publishes it as a service.
* Allows MaM to find other spawned groups which are not connected.
*
* Parameters:
* spawn_data: A structure containing information related to the port and service names.
* open_port : A flag that indicates if this process should Open (1) or only malloc(0).
* open_service: A flag that indicates if the service should be published.
* If it is not MAM_SERVICE_UNNEEDED, a service name is generated and published with the chosen number.
*
* Functionality:
* - Ensures that a port is only opened if it hasn't been opened already.
* - The process with the root rank opens the port and, if required, publishes a service name for it.
* - If SLURM is being used, it attempts to get the SLURM job ID from the environment.
* - For non-root ranks, it simply allocates 1 byte of memory for the port_name to avoid it being NULL (a placeholder operation).
*
* Notes:
* - SLURM is conditionally used to obtain job-specific information.
* - Error handling is not included in this function (e.g., failed memory allocation, failed MPI calls).
*/
void open_port(Spawn_ports *spawn_port, int open_port, int open_service)
{
int job_id = 0;
if (spawn_port->port_name != NULL)
return;
if (open_port) {
spawn_port->opened_port = 1;
spawn_port->port_name = (char *)malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Open_port(MPI_INFO_NULL, spawn_port->port_name);
if (open_service != MAM_SERVICE_UNNEEDED) {
spawn_port->service_name = (char *)malloc((MAM_SERVICE_NAME_SIZE) * sizeof(char));
#if MAM_USE_SLURM
char *tmp = getenv("SLURM_JOB_ID");
if(tmp != NULL) { job_id = atoi(tmp)%1000; }
#endif
snprintf(spawn_port->service_name, MAM_SERVICE_NAME_SIZE, "mam_service_jid%04d_gr%03d", job_id, open_service);
MPI_Publish_name(spawn_port->service_name, MPI_INFO_NULL, spawn_port->port_name);
}
} else {
spawn_port->port_name = malloc(1);
spawn_port->port_name[0] = '\0';
}
}
/*
* Function: close_port
* --------------------
* Closes an open MPI local port and cleans up associated resources.
*
* Parameters:
* spawn_data: A structure containing information related to the port and service names.
*
* Functionality:
* - The root process is the only one responsible for closing the MPI port and service.
* - Frees the memory allocated for the port and service and sets the pointer to NULL.
*
* Notes:
* - This function assumes that MPI resources were successfully allocated and opened in the corresponding `open_port` function.
* - No explicit error handling is present (e.g., checking the return value of MPI functions).
*/
void close_port(Spawn_ports *spawn_port) {
if(spawn_port->port_name != NULL) {
if(spawn_port->service_name != NULL) {
MPI_Unpublish_name(spawn_port->service_name, MPI_INFO_NULL, spawn_port->port_name);
free(spawn_port->service_name);
spawn_port->service_name = NULL;
}
if(spawn_port->opened_port) MPI_Close_port(spawn_port->port_name);
free(spawn_port->port_name);
spawn_port->port_name = NULL;
}
}
/*
* Function: discover_remote_port
* ------------------------------
* Discovers the MPI port associated with a remote service using its service name.
* If the port cannot be found, it retries a set number of times before aborting the MPI execution.
* This function must at least be called by the root process which will call MPI_Comm_connect, altough
* it could be called by all processes without any issues.
*
* Parameters:
* remote_service: A pointer to a string that will hold the remote service name.
* If this is the first time discovering the service, memory will be allocated and the name will be generated.
* id_group: An integer representing the group ID, used to identify the service.
* remote_port: A string where the discovered remote port name will be stored.
*
* Notes:
* - This function assumes that the service name follows a fixed pattern (`mam_service_jid%04d_gr%03d`).
* - If id_group is MAM_SERVICE_UNNEEDED, it is assumed the process is not the root and does not require
* to discover the real port.
* - SLURM is conditionally used to retrieve the job ID from the environment.
* - The number of retry attempts before aborting is limited to 5.
* - No explicit error handling is present (e.g., checking the return value of MPI functions).
*/
void discover_remote_port(int id_group, Spawn_ports *spawn_port) {
int error_tries = 0, job_id = 0;
if(spawn_port->remote_port == NULL) {
spawn_port->remote_port = (char*) malloc(MPI_MAX_PORT_NAME * sizeof(char));
if(id_group == MAM_SERVICE_UNNEEDED) { spawn_port->remote_port[0] = '\0'; }
}
if(id_group == MAM_SERVICE_UNNEEDED) { return; }
if(spawn_port->remote_service == NULL) { //First discover
spawn_port->remote_service = (char*) malloc(MAM_SERVICE_NAME_SIZE * sizeof(char));
#if MAM_USE_SLURM
char *tmp = getenv("SLURM_JOB_ID");
if(tmp != NULL) { job_id = atoi(tmp)%1000; }
#endif
snprintf(spawn_port->remote_service, MAM_SERVICE_NAME_SIZE, "mam_service_jid%04d_gr%03d", job_id, id_group);
} else { // For subsequent lookups, only update the variable part (group ID) of the service name.
snprintf(spawn_port->remote_service + MAM_SERVICE_CONSTANT_NAME, MAM_SERVICE_VARIABLE_NAME, "%03d", id_group);
}
snprintf(spawn_port->remote_port, 5, "NULL");
MPI_Lookup_name(spawn_port->remote_service, MPI_INFO_NULL, spawn_port->remote_port);
while(strncmp(spawn_port->remote_port, "NULL", 4) == 0) {
sleep(1);
MPI_Lookup_name(spawn_port->remote_service, MPI_INFO_NULL, spawn_port->remote_port);
if(++error_tries > 5) MPI_Abort(MPI_COMM_WORLD, -1);
}
}
void free_ports(Spawn_ports *spawn_port) {
close_port(spawn_port);
if(spawn_port->remote_port != NULL) {
free(spawn_port->remote_port);
spawn_port->remote_port = NULL;
}
if(spawn_port->remote_service != NULL) {
free(spawn_port->remote_service);
spawn_port->remote_service = NULL;
}
}
#ifndef MAM_SPAWN_PORTSERVICE_H
#define MAM_SPAWN_PORTSERVICE_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
#define MAM_SERVICE_UNNEEDED -1 // Constant to avoid opening a service if not required
void init_ports(Spawn_ports *spawn_port);
void open_port(Spawn_ports *spawn_port, int open_port, int open_service);
void close_port(Spawn_ports *spawn_port);
void discover_remote_port(int id_group, Spawn_ports *spawn_port);
void free_ports(Spawn_ports *spawn_port);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <mpi.h>
#include "ProcessDist.h"
#include "SpawnUtils.h"
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
//--------------PRIVATE CONSTANTS------------------//
#define MAM_HOSTFILE_NAME1 "MAM_HF_ID" // Constant size name (9) -- Part of SIZE1
#define MAM_HOSTFILE_NAME2 "_S" // Constant size name (2) -- Part of SIZE1
#define MAM_HOSTFILE_NAME3 ".tmp" // Constant size name (4) -- Part of SIZE2
#define MAM_HOSTFILE_SIZE1 15 // 11 Chars + 4 Digits
#define MAM_HOSTFILE_SIZE2 8 // 4 Chars + 3 Digits + \0
#define MAM_HOSTFILE_SIZE MAM_HOSTFILE_SIZE1 + MAM_HOSTFILE_SIZE2 //23 = 15 Chars + 7 Digits + \0
#define MAM_HOSTFILE_LINE_SIZE 32
//--------------PRIVATE DECLARATIONS---------------//
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void set_mapping_host(int qty, char *info_type, char *host, size_t index, Spawn_data *spawn_data);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
int write_hostfile_node(int file, int qty, char *node_name, char **line, size_t *len_og);
//--------------------------------SLURM USAGE-------------------------------------//
#if MAM_USE_SLURM
#include <slurm/slurm.h>
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
void generate_info_hostfile_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data);
void fill_hostfile_slurm(char* file_name, size_t used_nodes, int *qty, hostlist_t *hostlist);
size_t fill_multiple_hostfile_slurm(char* file_name, int *qty, hostlist_t *hostlist, char **line, size_t *len_line);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
//--------------PUBLIC FUNCTIONS---------------//
/*
* Configura la creacion de un nuevo grupo de procesos, reservando la memoria
* para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
* para los procesos y creando un fichero hostfile.
*
*/
void processes_dist(Spawn_data *spawn_data) {
int used_nodes=0;
int *procs_array;
// GET NEW DISTRIBUTION
node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
#if MAM_USE_SLURM
switch(spawn_data->mapping_fill_method) {
case MAM_PHY_TYPE_STRING:
if(spawn_data->spawn_is_multiple || spawn_data->spawn_is_parallel) {
generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
} else {
generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
}
break;
case MAM_PHY_TYPE_HOSTFILE:
generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
break;
}
#else
if(spawn_data->spawn_is_multiple || spawn_data->spawn_is_parallel) {
generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
} else {
generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
}
#endif
char *aux_cmd = get_spawn_cmd();
for(int index = 0; index<spawn_data->total_spawns; index++) {
spawn_data->sets[index].cmd = aux_cmd;
}
free(procs_array);
}
void set_hostfile_name(char **file_name, int *n, int jid, int index) {
if(*file_name == NULL) {
*file_name = (char *) malloc(MAM_HOSTFILE_SIZE * sizeof(char));
}
if(*n == 0) {
jid = jid % 1000;
snprintf(*file_name, MAM_HOSTFILE_SIZE , "%s%04d%s%03d%s", MAM_HOSTFILE_NAME1, jid, MAM_HOSTFILE_NAME2, index, MAM_HOSTFILE_NAME3);
} else {
snprintf((*file_name)+MAM_HOSTFILE_SIZE1, MAM_HOSTFILE_SIZE2 , "%03d%s", index, MAM_HOSTFILE_NAME3);
}
*n=1;
}
int read_hostfile_procs(char *file_name, int *qty) {
char *line = NULL, *ptr;
FILE *file = NULL;
file = fopen(file_name, "r");
if(file == NULL) {
perror("Could not open hostfile to read");
MPI_Abort(MPI_COMM_WORLD, -1);
}
*qty = 0;
line = (char *) malloc(MAM_HOSTFILE_LINE_SIZE * sizeof(char));
while (fgets(line, MAM_HOSTFILE_LINE_SIZE, file) != NULL) {
size_t len = strlen(line);
ptr = line + len - 1;
// Search delimiter
while (ptr != line && *ptr != ':') { ptr--; }
if (*ptr == ':') { *qty += atoi(ptr + 1); }
}
return 0;
}
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
/*
* Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
* cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
* nodo y cuantas creaciones de procesos seran necesarias.
*
* Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
*
* COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
* todos los nodos disponibles.
* COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de
* ocupar otro nodo.
*/
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
int i, *procs;
procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
/* GET NEW DISTRIBUTION */
switch(mall_conf->spawn_dist) {
case MAM_PHY_DIST_SPREAD: // DIST NODES
spread_dist(spawn_data, used_nodes, procs);
break;
case MAM_PHY_DIST_COMPACT: // DIST CPUs
compact_dist(spawn_data, used_nodes, procs);
break;
}
//Copy results to output vector qty
*qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
// if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
if(spawn_data.spawn_is_multiple || spawn_data.spawn_is_parallel) {
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
if(procs[i]) (*total_spawns)++;
}
} else {
*total_spawns = 1;
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
}
}
free(procs);
}
/*
* Distribucion basada en equilibrar el numero de procesos en cada nodo
* para que todos los nodos tengan el mismo numero. Devuelve el total de
* nodos utilizados y el numero de procesos a crear en cada nodo.
*
* Asume que los procesos que ya existen estan en los nodos mas bajos
* con el mismo tamBl. //FIXME No deberia asumir el tamBl.
*
* FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
*/
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, tamBl, remainder;
*used_nodes = mall->num_nodes;
tamBl = spawn_data.target_qty / *used_nodes;
i = spawn_data.already_created / tamBl;
remainder = spawn_data.already_created % tamBl;
if(remainder) {
procs[i++] = tamBl - remainder;
}
for(; i<*used_nodes; i++) {
procs[i] = tamBl;
}
}
/*
* Distribucion basada en llenar un nodo de procesos antes de pasar al
* siguiente nodo. Devuelve el total de nodos utilizados y el numero
* de procesos a crear en cada nodo.
*
* Tiene en cuenta los procesos ya existentes para el mappeado de
* los procesos a crear.
*/
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, asigCores;
int tamBl, remainder;
tamBl = mall->num_cpus;
asigCores = spawn_data.already_created;
i = *used_nodes = spawn_data.already_created / tamBl;
remainder = spawn_data.already_created % tamBl;
//FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
//First nodes could already have existing procs
//Start from the first with free spaces
if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
procs[i] = tamBl - remainder;
asigCores += procs[i];
i = (i+1) % mall->num_nodes;
(*used_nodes)++;
}
//Assign tamBl to each node
while(asigCores+tamBl <= spawn_data.target_qty) {
asigCores += tamBl;
procs[i] += tamBl;
i = (i+1) % mall->num_nodes;
(*used_nodes)++;
}
//Last node could have less procs than tamBl
if(asigCores < spawn_data.target_qty) {
procs[i] += spawn_data.target_qty - asigCores;
(*used_nodes)++;
}
if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes; //FIXME Si ocurre esto no es un error?
}
//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
*
*
*/
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
char *host_str;
fill_str_hosts(nodelist, procs_array, nodes, &host_str);
// SET MAPPING
set_mapping_host(spawn_data->spawn_qty, "hosts", host_str, 0, spawn_data);
free(host_str);
}
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
*
*
*/
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
char *host, *aux, *token, *hostlist_str;
size_t i=0,j=0,len=0;
aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
strcpy(aux, nodelist);
token = strtok(aux, ",");
while (token != NULL && i < nodes) {
host = strdup(token);
if (procs_array[i] != 0) {
write_str_node(&hostlist_str, len, procs_array[i], host);
set_mapping_host(procs_array[i], "hosts", hostlist_str, j, spawn_data);
free(hostlist_str); hostlist_str = NULL;
j++;
}
i++;
free(host);
token = strtok(NULL, ",");
}
free(aux);
if(hostlist_str != NULL) { free(hostlist_str); }
}
//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//
/*
* Anyade en la siguiente entrada de spawns la
* distribucion fisica a utilizar con un par
* host/mapping y el total de procesos.
*/
void set_mapping_host(int qty, char *info_type, char *host, size_t index, Spawn_data *spawn_data) {
MPI_Info *info;
spawn_data->sets[index].spawn_qty = qty;
info = &(spawn_data->sets[index].mapping);
MPI_Info_create(info);
MPI_Info_set(*info, info_type, host);
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
char *host, *aux, *token;
size_t i=0,len=0;
aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
strcpy(aux, nodelist);
token = strtok(aux, ",");
while (token != NULL && i < used_nodes) {
host = strdup(token);
if (qty[i] != 0) {
len = write_str_node(hostlist_str, len, qty[i], host);
}
i++;
free(host);
token = strtok(NULL, ",");
}
free(aux);
}
/*
* Añade en una cadena "qty" entradas de "node_name".
* Realiza la reserva de memoria y la realoja si es necesario.
*/
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
int err;
char *ocurrence;
size_t i, len, len_node;
len_node = strlen(node_name) + 1; // Str length + ','
len = qty * len_node; // Number of times the node is used
if(len_og == 0) { // Memoria no reservada
*hostlist_str = (char *) malloc((len+1) * sizeof(char));
} else { // Cadena ya tiene datos
*hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
}
if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
ocurrence = (char *) malloc((len_node+1) * sizeof(char));
if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
err = snprintf(ocurrence, len_node+1, ",%s", node_name);
if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
i=0;
if(len_og == 0) { // Si se inicializa, la primera es una copia
i++;
strcpy(*hostlist_str, node_name);
}
for(; i<qty; i++){ // Las siguientes se conctanenan
strcat(*hostlist_str, ocurrence);
}
free(ocurrence);
return len+len_og;
}
/*
* Escribe en el fichero hostfile indicado por ptr una nueva linea.
*
* Esta linea indica el nombre de un nodo y la cantidad de procesos a
* alojar en ese nodo.
*/
int write_hostfile_node(int file, int qty, char *node_name, char **line, size_t *len_og) {
int err;
size_t len, len_node, len_int;
if(*line == NULL) {
*len_og = MAM_HOSTFILE_LINE_SIZE;
*line = (char *) malloc(*len_og * sizeof(char));
}
len_node = strlen(node_name);
err = snprintf(NULL, 0, "%d", qty);
if(err < 0) return -1;
len_int = err;
len = len_node + len_int + 3;
if(*len_og < len) {
*len_og = len+MAM_HOSTFILE_LINE_SIZE;
*line = (char *) realloc(*line, *len_og * sizeof(char));
}
err = snprintf(*line, len, "%s:%d\n", node_name, qty);
err = write(file, *line, len-1);
if(err < 0) {
perror("Error writing to the host file");
close(file);
exit(EXIT_FAILURE);
}
return 0;
}
//--------------------------------SLURM USAGE-------------------------------------//
#if MAM_USE_SLURM
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
* Es necesario usar Slurm para usarlo.
*/
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
char *hoststring;
// CREATE AND SET STRING HOSTS
fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
set_mapping_host(spawn_data->spawn_qty, "hosts", hoststring, 0, spawn_data);
free(hoststring);
}
/*
* Crea y devuelve un conjunto de objetos MPI_Info con
* un par host/mapping en el que se indica el mappeado
* a utilizar en los nuevos procesos dividido por nodos.
* Es necesario Slurm para usarlo.
*/
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
char *host, *hostlist_str;
size_t i=0,j=0,len=0;
hostlist_t hostlist;
hostlist_str = NULL;
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
write_str_node(&hostlist_str, len, qty[i], host);
set_mapping_host(qty[i], "hosts", hostlist_str, j, spawn_data);
free(hostlist_str); hostlist_str = NULL;
j++;
}
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
if(hostlist_str != NULL) { free(hostlist_str); }
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
char *host;
size_t i=0,len=0;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
len = write_str_node(hostlist_str, len, qty[i], host);
}
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
}
void generate_info_hostfile_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data){
int index = 0, jid;
size_t qty_index = 0, len_line = 0;
char *hostfile_name, *line;
hostlist_t hostlist;
char *tmp = getenv("SLURM_JOB_ID");
jid = tmp != NULL ? (atoi(tmp)%1000) : 0;
line = NULL;
hostlist = slurm_hostlist_create(nodelist);
hostfile_name = (char *) malloc(MAM_HOSTFILE_SIZE * sizeof(char));
snprintf(hostfile_name, MAM_HOSTFILE_SIZE , "%s%04d%s%03d%s", MAM_HOSTFILE_NAME1, jid, MAM_HOSTFILE_NAME2, index, MAM_HOSTFILE_NAME3);
if(spawn_data->spawn_is_multiple || spawn_data->spawn_is_parallel) { // MULTIPLE
for(; index<spawn_data->total_spawns; index++) {
// This strat creates 1 hostfile per spawn
qty_index = fill_multiple_hostfile_slurm(hostfile_name, qty+qty_index, &hostlist, &line, &len_line);
set_mapping_host(qty[qty_index-1], "hostfile", hostfile_name, index, spawn_data);
snprintf(hostfile_name+MAM_HOSTFILE_SIZE1, MAM_HOSTFILE_SIZE2 , "%03d%s", index+1, MAM_HOSTFILE_NAME3);
}
free(line);
} else { // NOT MULTIPLE
fill_hostfile_slurm(hostfile_name, used_nodes, qty, &hostlist);
set_mapping_host(spawn_data->spawn_qty, "hostfile", hostfile_name, index, spawn_data);
}
free(hostfile_name);
slurm_hostlist_destroy(hostlist);
}
// Function to generate the configuration file
void fill_hostfile_slurm(char* file_name, size_t used_nodes, int *qty, hostlist_t *hostlist) {
char *host, *line;
size_t i=0, len_line=0;
line = NULL;
int file = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (file < 0) {
perror("Error opening the host file");
exit(EXIT_FAILURE);
}
while ( (host = slurm_hostlist_shift(*hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
write_hostfile_node(file, qty[i], host, &line, &len_line);
}
i++;
free(host);
}
close(file);
free(line);
}
size_t fill_multiple_hostfile_slurm(char* file_name, int *qty, hostlist_t *hostlist, char **line, size_t *len_line) {
char *host;
size_t i=0;
int file = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (file < 0) {
perror("Error opening the host file");
exit(EXIT_FAILURE);
}
while( (host = slurm_hostlist_shift(*hostlist)) ) {
if(qty[i] != 0) {
write_hostfile_node(file, qty[i], host, line, len_line);
i++;
break;
}
i++;
free(host); host = NULL;
}
if(host != NULL) free(host);
close(file);
return i;
}
#endif
//--------------------------------SLURM USAGE-------------------------------------//
\ No newline at end of file
#ifndef MAM_SPAWN_PROCESS_DIST_H
#define MAM_SPAWN_PROCESS_DIST_H
#include "Spawn_DataStructure.h"
void processes_dist(Spawn_data *spawn_data);
void set_hostfile_name(char **file_name, int *n, int jid, int index);
int read_hostfile_procs(char *file_name, int *qty);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "SpawnUtils.h"
/*
* Funcion basica encargada de la creacion de procesos.
* Crea un set de procesos segun la configuracion obtenida
* en ProcessDist.c
* Devuelve en "child" el intercomunicador que se conecta a los hijos.
*/
void mam_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child) {
int rootBcast = MPI_PROC_NULL;
int comm_size;
MPI_Comm_size(comm, &comm_size);
if(mall->myId == mall->root || comm_size == 1) rootBcast = MPI_ROOT;
int spawn_err = MPI_Comm_spawn(spawn_set.cmd, MPI_ARGV_NULL, spawn_set.spawn_qty, spawn_set.mapping, MAM_ROOT, comm, child, MPI_ERRCODES_IGNORE);
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_set.spawn_qty);
MPI_Abort(MPI_COMM_WORLD, -1);
}
MAM_Comm_main_structures(*child, rootBcast);
}
/*
* Comprueba que comando hay que llamar al realizar
* el spawn. Todos los sets tienen que hacer el mismo
* comando.
*/
char* get_spawn_cmd() {
char *cmd_aux;
switch(mall_conf->external_usage) {
case MAM_USE_VALGRIND:
cmd_aux = MAM_VALGRIND_SCRIPT;
break;
case MAM_USE_EXTRAE:
cmd_aux = MAM_EXTRAE_SCRIPT;
break;
default:
cmd_aux = mall->name_exec;
break;
}
return cmd_aux;
}
\ No newline at end of file
#ifndef MAM_SPAWN_UTILS_H
#define MAM_SPAWN_UTILS_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
void mam_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child);
char* get_spawn_cmd();
#endif
#ifndef MAM_SPAWN_DATASTRUCTURE_H
#define MAM_SPAWN_DATASTRUCTURE_H
#include <mpi.h>
/* --- SPAWN STRUCTURE --- */
typedef struct {
int spawn_qty;
char *cmd;
MPI_Info mapping;
} Spawn_set;
typedef struct {
int opened_port;
char *port_name, *service_name;
char *remote_port, *remote_service;
} Spawn_ports;
typedef struct {
int spawn_qty, initial_qty, target_qty;
int already_created;
int total_spawns;
int spawn_is_single, spawn_is_async, spawn_is_intercomm, spawn_is_multiple, spawn_is_parallel;
// MPI_Info mapping;
int mapping_fill_method;
MPI_Comm comm, returned_comm; // ONLY SET FOR SOURCE PROCESSES
Spawn_set *sets;
} Spawn_data;
#endif
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "Spawn_state.h"
pthread_mutex_t spawn_mutex;
pthread_cond_t spawn_cond, completion_cond;
int spawn_state;
int waiting_redistribution=0, waiting_completion=0;
void init_spawn_state() {
pthread_mutex_init(&spawn_mutex,NULL);
pthread_cond_init(&spawn_cond,NULL);
pthread_cond_init(&completion_cond,NULL);
set_spawn_state(1,0); //FIXME First parameter is a horrible magical number
}
void free_spawn_state() {
pthread_mutex_destroy(&spawn_mutex);
pthread_cond_destroy(&spawn_cond);
pthread_cond_destroy(&completion_cond);
}
int get_spawn_state(int is_async) {
int value;
if(is_async) {
pthread_mutex_lock(&spawn_mutex);
value = spawn_state;
pthread_mutex_unlock(&spawn_mutex);
} else {
value = spawn_state;
}
return value;
}
void set_spawn_state(int value, int is_async) {
if(is_async) {
pthread_mutex_lock(&spawn_mutex);
spawn_state = value;
pthread_mutex_unlock(&spawn_mutex);
} else {
spawn_state = value;
}
}
int wait_redistribution() {
pthread_mutex_lock(&spawn_mutex);
if(!waiting_redistribution) {
waiting_redistribution=1;
pthread_cond_wait(&spawn_cond, &spawn_mutex);
}
waiting_redistribution=0;
pthread_mutex_unlock(&spawn_mutex);
return get_spawn_state(1);
}
void wakeup_redistribution() {
pthread_mutex_lock(&spawn_mutex);
if(waiting_redistribution) {
pthread_cond_signal(&spawn_cond);
}
waiting_redistribution=1;
pthread_mutex_unlock(&spawn_mutex);
}
int wait_completion() {
pthread_mutex_lock(&spawn_mutex);
if(!waiting_completion) {
waiting_completion=1;
pthread_cond_wait(&completion_cond, &spawn_mutex);
}
waiting_completion=0;
pthread_mutex_unlock(&spawn_mutex);
return get_spawn_state(1);
}
void wakeup_completion() {
pthread_mutex_lock(&spawn_mutex);
if(waiting_completion) {
pthread_cond_signal(&completion_cond);
}
waiting_completion=1;
pthread_mutex_unlock(&spawn_mutex);
}
#ifndef MAM_SPAWN_STATE_H
#define MAM_SPAWN_STATE_H
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
void init_spawn_state();
void free_spawn_state();
int get_spawn_state(int is_async);
void set_spawn_state(int value, int is_async);
int wait_redistribution();
void wakeup_redistribution();
int wait_completion();
void wakeup_completion();
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "PortService.h"
#include "Strategy_Multiple.h"
/*=====================DEBUG ALGORITHM=====================*/
//The following algorithm is a basic implementation, was created
//to test if the idea could work within Slurm+DMR.
//Im keeping it for cases when I want to debug codes related to
//this strategy.
void multiple_strat_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
int i, rootBcast;
int buffer[2];
char aux;
i = 0;
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
buffer[0] = i;
buffer[1] = spawn_data.total_spawns;
MPI_Bcast(buffer, 2, MPI_INT, rootBcast, intercomms[i]);
if(mall->myId == mall->root) {
MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_MPITAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
}
for(i=1; i<spawn_data.total_spawns; i++) {
buffer[0] = i;
MPI_Bcast(buffer, 2, MPI_INT, rootBcast, intercomms[i]);
if(mall->myId == mall->root) {
MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_MPITAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
}
}
// Reconnect with new children communicator
if(mall->myId == mall->root) { discover_remote_port(0, spawn_port); }
else { discover_remote_port(MAM_SERVICE_UNNEEDED, spawn_port); }
MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, mall->root, comm, child);
// Free unneeded spawn communicators
for(i=0; i<spawn_data.total_spawns; i++) { MPI_Comm_disconnect(&intercomms[i]); }
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Multiple PA completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
void multiple_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port) {
int i, group_id, total_spawns, new_root;
int buffer[2];
char aux;
MPI_Comm newintracomm, intercomm, parents_comm;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Multiple CH started", mall->myId, mall->numP); fflush(stdout);
#endif
new_root = 0;
parents_comm = *parents;
MPI_Bcast(buffer, 2, MPI_INT, mall->root_parents, parents_comm);
group_id = buffer[0];
total_spawns = buffer[1];
if(mall->myId == mall->root && !group_id) { new_root = 1; }
open_port(spawn_port, new_root, group_id);
if(group_id) {
if(mall->myId == mall->root) { discover_remote_port(0, spawn_port); }
else { discover_remote_port(MAM_SERVICE_UNNEEDED, spawn_port); }
MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
MPI_Comm_disconnect(&intercomm);
group_id++;
} else { // Root group of targets
group_id = 1;
MPI_Comm_dup(mall->comm, &newintracomm);
if(new_root) {
MPI_Send(&aux, 1, MPI_CHAR, mall->root_parents, MAM_MPITAG_STRAT_MULTIPLE, parents_comm); // Ensures order in the created intracommunicator
}
}
for(i=group_id; i<total_spawns; i++) {
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
if(newintracomm != MPI_COMM_WORLD) MPI_Comm_disconnect(&newintracomm);
MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
MPI_Comm_disconnect(&intercomm);
if(new_root) {
MPI_Send(&aux, 1, MPI_CHAR, mall->root_parents, MAM_MPITAG_STRAT_MULTIPLE, parents_comm); // Ensures order in the created intracommunicator
}
}
// Connect with sources
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
// Update communicator to expected one
MAM_comms_update(newintracomm);
MPI_Comm_rank(mall->comm, &mall->myId);
MPI_Comm_size(mall->comm, &mall->numP);
MPI_Comm_disconnect(&newintracomm);
MPI_Comm_disconnect(parents);
*parents = intercomm;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Multiple CH completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
/* @deprecated functions -- Basic algorithm to try out if it the strategy could work
void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
int i, tag;
char *port_name, aux;
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
tag = MAM_MPITAG_STRAT_MULTIPLE;
MPI_Send(&spawn_data.total_spawns, 1, MPI_INT, MAM_ROOT, tag, intercomms[0]);
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, tag, intercomms[0], MPI_STATUS_IGNORE);
for(i=1; i<spawn_data.total_spawns; i++) {
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MAM_ROOT, tag+i, intercomms[i]);
MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_MPITAG_STRAT_MULTIPLE, intercomms[0], MPI_STATUS_IGNORE);
}
} else { port_name = malloc(1); }
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, comm, child);
for(i=0; i<spawn_data.total_spawns; i++) {
MPI_Comm_disconnect(&intercomms[i]);
}
free(port_name);
}
*/
/*
void multiple_strat_children(MPI_Comm *parents) {
int i, start, total_spawns, new_root;
int rootBcast = MPI_PROC_NULL;
char *port_name, aux;
MPI_Status stat;
MPI_Comm newintracomm, intercomm, parents_comm;
new_root = 0;
parents_comm = *parents;
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, parents_comm, &stat);
if(stat.MPI_TAG == MAM_MPITAG_STRAT_MULTIPLE) {
MPI_Recv(&total_spawns, 1, MPI_INT, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, MPI_STATUS_IGNORE);
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm);
start = 0;
new_root = 1;
rootBcast = MPI_ROOT;
} else {
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, &stat);
// The "+1" is because the first iteration is done before the loop
start = stat.MPI_TAG - MAM_MPITAG_STRAT_MULTIPLE + 1;
}
} else { port_name = malloc(1); }
MPI_Bcast(&start, 1, MPI_INT, mall->root, mall->comm);
if(start) {
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
MPI_Comm_disconnect(&intercomm);
} else {
start = 1;
MPI_Comm_dup(mall->comm, &newintracomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, mall->comm); // FIXME Seems inneficient - Should be performed by parent root?
}
for(i=start; i<total_spawns; i++) {
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, rootBcast, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
if(newintracomm != MPI_COMM_WORLD) MPI_Comm_disconnect(&newintracomm);
MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
MPI_Comm_disconnect(&intercomm);
if(new_root) {
MPI_Send(&aux, 1, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm); // Ensures order in the created intracommunicator
}
}
// Connect with parents
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
// Update communicator to expected one
MAM_comms_update(newintracomm);
MPI_Comm_rank(mall->comm, &mall->myId);
MPI_Comm_size(mall->comm, &mall->numP);
if(new_root) MPI_Close_port(port_name);
free(port_name);
MPI_Comm_disconnect(&newintracomm);
MPI_Comm_disconnect(parents);
*parents = intercomm;
}
*/
\ No newline at end of file
#ifndef MAM_SPAWN_MULTIPLE_H
#define MAM_SPAWN_MULTIPLE_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
void multiple_strat_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
void multiple_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "PortService.h"
#include "Strategy_Parallel.h"
#include "ProcessDist.h"
#include "SpawnUtils.h"
#include <math.h>
void parallel_strat_parents_hypercube(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
void parallel_strat_children_hypercube(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents);
void hypercube_spawn(int group_id, int groups, int init_nodes, int init_step, MPI_Comm **spawn_comm, int *qty_comms);
void common_synch(Spawn_data spawn_data, int qty_comms, MPI_Comm intercomm, MPI_Comm *spawn_comm);
void binary_tree_connection(int groups, int group_id, Spawn_ports *spawn_port, MPI_Comm *newintracomm);
void binary_tree_reorder(MPI_Comm *newintracomm, int group_id);
//--------PUBLIC FUNCTIONS----------//
//The abstraction for the algorithm is to allow different algorithms depending
//on the circumstances of the spawn.
void parallel_strat_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child) {
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Parallel PA started", mall->myId, mall->numP); fflush(stdout);
#endif
parallel_strat_parents_hypercube(spawn_data, spawn_port, child);
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Parallel PA completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
void parallel_strat_children(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents) {
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Parallel CH started", mall->myId, mall->numP); fflush(stdout);
#endif
parallel_strat_children_hypercube(spawn_data, spawn_port, parents);
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Parallel CH completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
//--------PRIVATE FUNCTIONS----------//
/*=====================HYPERCUBE++ ALGORITHM=====================*/
//The following algorithm divides the spawning task across all available ranks.
//It starts with just the sources, and then all spawned processes help with further
//spawns until all the required processes have been created.
//FIXME -- The amount of processes per spawned group must be homogenous among groups
// - There is an exception for the last node, which could have less procs
// - Yet, the first spawned group cannot have less procs than the rest
void parallel_strat_parents_hypercube(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child) {
int opening, qty_comms;
int groups, init_nodes, actual_step, group_id;
MPI_Comm *spawn_comm = NULL;
MPI_Bcast(&spawn_data.total_spawns, 1, MPI_INT, mall->root, spawn_data.comm);
actual_step = 0;
qty_comms = 0;
init_nodes = mall->numP / mall->num_cpus; //FIXME does not consider heterogenous machines
groups = spawn_data.total_spawns + init_nodes;
group_id = -init_nodes;
opening = mall->myId == mall->root ? 1 : 0;
open_port(spawn_port, opening, groups);
hypercube_spawn(group_id, groups, init_nodes, actual_step, &spawn_comm, &qty_comms);
common_synch(spawn_data, qty_comms, MPI_COMM_NULL, spawn_comm);
for(int i=0; i<qty_comms; i++) { MPI_Comm_disconnect(&spawn_comm[i]); }
if(spawn_comm != NULL) free(spawn_comm);
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, MAM_ROOT, spawn_data.comm, child);
}
/*
- MPI_Comm *parents: Initially is the intercommunicator with its parent
*/
void parallel_strat_children_hypercube(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents) {
int group_id, opening, qty_comms;
int actual_step;
int groups, init_nodes;
MPI_Comm newintracomm, *spawn_comm = NULL;
// TODO Comprobar si entrar en spawn solo si groups < numSources
qty_comms = 0;
group_id = mall->gid;
init_nodes = spawn_data.initial_qty / mall->num_cpus;
groups = spawn_data.spawn_qty / mall->num_cpus + init_nodes;
opening = (mall->myId == MAM_ROOT && group_id < (groups-init_nodes)/2) ? 1 : 0;
open_port(spawn_port, opening, group_id);
// Spawn more processes if required
if(groups - init_nodes > spawn_data.initial_qty) {
actual_step = log((group_id + init_nodes) / init_nodes) / log(1 + mall->numP);
actual_step = floor(actual_step) + 1;
hypercube_spawn(group_id, groups, init_nodes, actual_step, &spawn_comm, &qty_comms);
}
common_synch(spawn_data, qty_comms, *parents, spawn_comm);
for(int i=0; i<qty_comms; i++) { MPI_Comm_disconnect(&spawn_comm[i]); }
MPI_Comm_disconnect(parents);
// Connect groups and ensure expected rank order
binary_tree_connection(groups - init_nodes, group_id, spawn_port, &newintracomm);
binary_tree_reorder(&newintracomm, group_id);
// Create intercomm between sources and children
opening = (mall->myId == mall->root && !group_id) ? groups : MAM_SERVICE_UNNEEDED;
discover_remote_port(opening, spawn_port);
MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, MAM_ROOT, newintracomm, parents);
// New group obtained -- Adjust ranks and comms
MAM_comms_update(newintracomm);
MPI_Comm_rank(mall->comm, &mall->myId);
MPI_Comm_size(mall->comm, &mall->numP);
MPI_Comm_disconnect(&newintracomm);
}
// This function does not allow the same process to have multiple threads executing it
void hypercube_spawn(int group_id, int groups, int init_nodes, int init_step,
MPI_Comm **spawn_comm, int *qty_comms) {
int i, aux_sum, actual_step;
int next_group_id, actual_nodes;
int jid=0, n=0;
char *file_name = NULL;
Spawn_set set;
actual_step = init_step;
actual_nodes = pow(1+mall->num_cpus, actual_step)*init_nodes - init_nodes;
aux_sum = mall->num_cpus*(init_nodes + group_id) + mall->myId; //Constant sum for next line
next_group_id = actual_nodes + aux_sum;
if(next_group_id < groups - init_nodes) { //FIXME qty_comms no se calcula bien para procesos del mismo group_id en los ultimos pasos
int max_steps = ceil(log(groups / init_nodes) / log(1 + mall->num_cpus));
*qty_comms = max_steps - actual_step;
*spawn_comm = (MPI_Comm *) malloc(*qty_comms * sizeof(MPI_Comm));
}
//if(mall->myId == 0)printf("T1 P%d+%d step=%d next_id=%d aux_sum=%d actual_nodes=%d comms=%d\n", mall->myId, group_id, actual_step, next_group_id, aux_sum, actual_nodes, *qty_comms);
#if MAM_USE_SLURM
char *tmp = getenv("SLURM_JOB_ID");
if(tmp != NULL) { jid = atoi(tmp); }
#endif
set.cmd = get_spawn_cmd();
i = 0;
while(next_group_id < groups - init_nodes) {
set_hostfile_name(&file_name, &n, jid, next_group_id);
//read_hostfile_procs(file_name, &set.spawn_qty);
set.spawn_qty = mall->num_cpus;
MPI_Info_create(&set.mapping);
MPI_Info_set(set.mapping, "hostfile", file_name);
mall->gid = next_group_id; // Used to pass the group id to the spawned process // Not thread safe
mam_spawn(set, MPI_COMM_SELF, &(*spawn_comm)[i]);
MPI_Info_free(&set.mapping);
actual_step++; i++;
actual_nodes = pow(1+mall->num_cpus, actual_step)*init_nodes - init_nodes;
next_group_id = actual_nodes + aux_sum;
}
*qty_comms = i;
if(file_name != NULL) free(file_name);
}
void common_synch(Spawn_data spawn_data, int qty_comms, MPI_Comm intercomm, MPI_Comm *spawn_comm) {
int i, root, root_other;
char aux;
MPI_Request *requests = NULL;
requests = (MPI_Request *) malloc(qty_comms * sizeof(MPI_Request));
root = root_other = 0; //FIXME Magical Number
// Upside synchronization
for(i=0; i<qty_comms; i++) {
MPI_Irecv(&aux, 1, MPI_CHAR, root_other, 130, spawn_comm[i], &requests[i]);
}
if(qty_comms) { MPI_Waitall(qty_comms, requests, MPI_STATUSES_IGNORE); }
if(intercomm != MPI_COMM_NULL) { MPI_Barrier(mall->comm); }
if(intercomm != MPI_COMM_NULL && mall->myId == root) { MPI_Send(&aux, 1, MPI_CHAR, root_other, 130, intercomm); }
// Sources synchronization
// TODO Maybe could be used an split comm to reduce overhead of Barrier when not all sources spawn
if(intercomm == MPI_COMM_NULL) { MPI_Barrier(spawn_data.comm); }
// Downside synchronization
if(intercomm != MPI_COMM_NULL && mall->myId == root) { MPI_Recv(&aux, 1, MPI_CHAR, root_other, 130, intercomm, MPI_STATUS_IGNORE); }
MPI_Barrier(mall->comm); // FIXME This barrier should not be required
for(i=0; i<qty_comms; i++) {
MPI_Isend(&aux, 1, MPI_CHAR, root_other, 130, spawn_comm[i], &requests[i]);
}
if(qty_comms) { MPI_Waitall(qty_comms, requests, MPI_STATUSES_IGNORE); }
if(requests != NULL) { free(requests); }
}
void binary_tree_connection(int groups, int group_id, Spawn_ports *spawn_port, MPI_Comm *newintracomm) {
int service_id;
int middle, new_groups, new_group_id, new_rank;
MPI_Comm merge_comm, aux_comm, new_intercomm;
// FIXME -- Supposes there is no changes in each group before this point
// - If there are any, they should be reflected in mall->comm
// and here should be used a duplicated of mall->comm.
// As of now is not used for simplicity
merge_comm = aux_comm = MPI_COMM_WORLD;
new_intercomm = MPI_COMM_NULL;
new_rank = mall->myId;
while(groups > 1) {
middle = groups / 2;
new_groups = ceil(groups / 2.0);
if(group_id < middle) {
//Accept work
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, MAM_ROOT, merge_comm, &new_intercomm);
MPI_Intercomm_merge(new_intercomm, 0, &aux_comm); //El que pone 0 va primero
if(merge_comm != MPI_COMM_WORLD && merge_comm != MPI_COMM_NULL) MPI_Comm_disconnect(&merge_comm);
if(new_intercomm != MPI_COMM_WORLD && new_intercomm != MPI_COMM_NULL) MPI_Comm_disconnect(&new_intercomm);
merge_comm = aux_comm;
MPI_Bcast(&new_groups, 1, MPI_INT, MAM_ROOT, aux_comm);
} else if(group_id >= new_groups) {
new_group_id = groups - group_id - 1;
service_id = new_rank == MAM_ROOT ? new_group_id : MAM_SERVICE_UNNEEDED;
discover_remote_port(service_id, spawn_port);
// Connect work
MPI_Comm_connect(spawn_port->remote_port, MPI_INFO_NULL, MAM_ROOT, merge_comm, &new_intercomm);
MPI_Intercomm_merge(new_intercomm, 1, &aux_comm); //El que pone 0 va primero
if(merge_comm != MPI_COMM_WORLD && merge_comm != MPI_COMM_NULL) MPI_Comm_disconnect(&merge_comm);
if(new_intercomm != MPI_COMM_WORLD && new_intercomm != MPI_COMM_NULL) MPI_Comm_disconnect(&new_intercomm);
merge_comm = aux_comm;
// Get new id
group_id = new_group_id;
new_rank = -1;
MPI_Bcast(&new_groups, 1, MPI_INT, MAM_ROOT, aux_comm);
}
groups = new_groups;
}
*newintracomm = merge_comm;
}
void binary_tree_reorder(MPI_Comm *newintracomm, int group_id) {
int merge_size, *reorder, *index_reorder;
int expected_rank;
MPI_Group merge_group, aux_group;
MPI_Comm aux_comm;
index_reorder = NULL;
reorder = NULL;
// FIXME Expects all groups having the same size
expected_rank = mall->numP * group_id + mall->myId;
MPI_Comm_group(*newintracomm, &merge_group);
MPI_Comm_size(*newintracomm, &merge_size);
index_reorder = (int *) malloc(merge_size * sizeof(int));
reorder = (int *) malloc(merge_size * sizeof(int));
MPI_Allgather(&expected_rank, 1, MPI_INT, index_reorder, 1, MPI_INT, *newintracomm);
for(int i=0; i<merge_size; i++) {
reorder[index_reorder[i]] = i;
}
MPI_Group_incl(merge_group, merge_size, reorder, &aux_group);
MPI_Comm_create(*newintracomm, aux_group, &aux_comm);
//int merge_rank, new_rank;
//MPI_Comm_rank(*newintracomm, &merge_rank);
//MPI_Comm_rank(aux_comm, &new_rank);
//printf("Grupo %d -- Merge rank = %d - New rank = %d\n", group_id, merge_rank, new_rank);
if(*newintracomm != MPI_COMM_WORLD && *newintracomm != MPI_COMM_NULL) MPI_Comm_disconnect(newintracomm);
MPI_Group_free(&merge_group);
MPI_Group_free(&aux_group);
*newintracomm = aux_comm;
free(index_reorder);
free(reorder);
}
\ No newline at end of file
#ifndef MAM_SPAWN_PARALLEL_H
#define MAM_SPAWN_PARALLEL_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
void parallel_strat_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
void parallel_strat_children(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "PortService.h"
#include "Spawn_state.h"
#include "Strategy_Single.h"
/*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root
*/
void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
char *port_name;
MPI_Comm newintercomm;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Single PA started", mall->myId, mall->numP); fflush(stdout);
#endif
if (mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, MAM_MPITAG_STRAT_SINGLE, *child, MPI_STATUS_IGNORE);
set_spawn_state(MAM_I_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
wakeup_completion();
} else {
port_name = malloc(1);
}
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, spawn_data.comm, &newintercomm);
if(mall->myId == mall->root)
MPI_Comm_disconnect(child);
free(port_name);
*child = newintercomm;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Single PA completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
/*
* Conectar grupo de hijos con grupo de padres
* Devuelve un intercomunicador para hablar con los padres
*
* Solo se utiliza cuando la creación de los procesos ha sido
* realizada por un solo proceso padre
*/
void single_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port) {
MPI_Comm newintercomm;
int is_root = mall->myId == mall->root ? 1 : 0;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Single CH started", mall->myId, mall->numP); fflush(stdout);
#endif
open_port(spawn_port, is_root, MAM_SERVICE_UNNEEDED);
if(mall->myId == mall->root) {
MPI_Send(spawn_port->port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, MAM_MPITAG_STRAT_SINGLE, *parents);
}
MPI_Comm_accept(spawn_port->port_name, MPI_INFO_NULL, mall->root, mall->comm, &newintercomm);
MPI_Comm_disconnect(parents);
*parents = newintercomm;
#if MAM_DEBUG >= 4
DEBUG_FUNC("Additional spawn action - Single CH completed", mall->myId, mall->numP); fflush(stdout);
#endif
}
#ifndef MAM_SPAWN_SINGLE_H
#define MAM_SPAWN_SINGLE_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
void single_strat_children(MPI_Comm *parents, Spawn_ports *spawn_port);
#endif
MAM_USE_SLURM ?= 0
MAM_USE_BARRIERS ?= 0
MAM_DEBUG ?= 0
CONFIG = config.txt
PROTEO_HOME := $(shell realpath -z $$(echo "$$(pwd)/..") | tr -d '\0')
# Put all auto generated stuff to this build dir.
BUILD_DIR = ./build
# BASIC RULES
.PHONY : all install sam mam clean sam_clean mam_clean clear sam_clear mam_clear
all: install
# Default configuration file
$(CONFIG) : $(BUILD_DIR)/$(CONFIG)
$(BUILD_DIR)/$(CONFIG):
@ mkdir -p $(@D)
@ echo "export PROTEO_HOME=$(PROTEO_HOME)" > $(BUILD_DIR)/$(CONFIG)
@ echo "export PROTEO_BIN=$(PROTEO_HOME)/Codes/SAM/build/a.out" >> $(BUILD_DIR)/$(CONFIG)
@ echo "export PATH=\$$PATH:\$$PROTEO_HOME/Codes/MaM" >> $(BUILD_DIR)/$(CONFIG)
@ echo "export LD_LIBRARY_PATH=\$$LD_LIBRARY_PATH:\$$PROTEO_HOME/Codes/MaM/build" >> $(BUILD_DIR)/$(CONFIG)
@ echo "codeDir=\"/Codes\"" >> $(BUILD_DIR)/$(CONFIG)
@ echo "execDir=\"/Exec\"" >> $(BUILD_DIR)/$(CONFIG)
mam:
@echo "Compiling MaM"
$(MAKE) -C MaM MAM_USE_SLURM=$(MAM_USE_SLURM) MAM_USE_BARRIERS=$(MAM_USE_BARRIERS) MAM_DEBUG=$(MAM_DEBUG)
sam: mam
@echo "Compiling SAM"
$(MAKE) -C SAM MAM_DEBUG=$(MAM_DEBUG)
install: mam sam $(CONFIG)
echo "Done"
#Clean rules
sam_clean:
@echo "Cleaning SAM"
$(MAKE) -C SAM clean
mam_clean:
@echo "Cleaning MaM"
$(MAKE) -C MaM clean
clean: sam_clean mam_clean
-rm $(BUILD_DIR)/$(CONFIG)
#Clear rules
sam_clear:
@echo "Clearing SAM"
$(MAKE) -C SAM clear
mam_clear:
@echo "Clearing MaM"
$(MAKE) -C MaM clear
clear: sam_clear mam_clear
-rm -rf $(BUILD_DIR)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "read_ini.h"
#include "ini.h"
#include "../MaM/MAM.h"
ext_functions_t *user_functions;
void get_numbers_from_string(const char *input, size_t *res_len, int **res);
/*
* Funcion utilizada para leer el fichero de configuracion
* y guardarlo en una estructura para utilizarlo en el futuro.
*
* Primero lee la seccion "general" y a continuacion cada una
* de las secciones "resize%d".
*/
static int handler(void* user, const char* section, const char* name,
const char* value) {
int ret_value=1;
int *aux;
size_t aux_len;
configuration* pconfig = (configuration*)user;
if(pconfig->actual_group >= pconfig->n_groups && pconfig->actual_stage >= pconfig->n_stages) {
return 1; // There is no more work to perform
}
char *resize_name = malloc(10 * sizeof(char));
snprintf(resize_name, 10, "resize%zu", pconfig->actual_group);
char *stage_name = malloc(10 * sizeof(char));
snprintf(stage_name, 10, "stage%zu", pconfig->actual_stage);
#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0
#define LAST(iter, total) iter < total
if (MATCH("general", "Total_Resizes")) {
pconfig->n_resizes = strtoul(value, NULL, 10);
pconfig->n_groups = pconfig->n_resizes+1;
user_functions->resizes_f(pconfig);
} else if (MATCH("general", "Total_Stages")) {
pconfig->n_stages = strtoul(value, NULL, 10);
user_functions->stages_f(pconfig);
} else if (MATCH("general", "Granularity")) {
pconfig->granularity = atoi(value);
} else if (MATCH("general", "SDR")) { // TODO Refactor a nombre manual
pconfig->sdr = strtoul(value, NULL, 10);
} else if (MATCH("general", "ADR")) { // TODO Refactor a nombre manual
pconfig->adr = strtoul(value, NULL, 10);
} else if (MATCH("general", "Rigid")) {
pconfig->rigid_times = atoi(value);
} else if (MATCH("general", "Capture_Method")) {
pconfig->capture_method = atoi(value);
// Iter stage
} else if (MATCH(stage_name, "Stage_Type") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].pt = atoi(value);
} else if (MATCH(stage_name, "Stage_Time_Capped") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].t_capped = atoi(value);
} else if (MATCH(stage_name, "Stage_Bytes") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].bytes = atoi(value);
} else if (MATCH(stage_name, "Stage_Identifier") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].id = atoi(value);
} else if (MATCH(stage_name, "Stage_Time") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].t_stage = (float) atof(value);
pconfig->actual_stage = pconfig->actual_stage+1; // Ultimo elemento del grupo
// Resize stage
} else if (MATCH(resize_name, "Iters") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].iters = atoi(value);
} else if (MATCH(resize_name, "Procs") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].procs = atoi(value);
} else if (MATCH(resize_name, "FactorS") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].factor =(float) atof(value);
} else if (MATCH(resize_name, "Dist") && LAST(pconfig->actual_group, pconfig->n_groups)) {
int aux_value = MAM_PHY_DIST_COMPACT;
if (strcmp(value, "spread") == 0) {
aux_value = MAM_PHY_DIST_SPREAD;
}
pconfig->groups[pconfig->actual_group].phy_dist = aux_value;
} else if (MATCH(resize_name, "Redistribution_Method") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].rm = atoi(value);
} else if (MATCH(resize_name, "Redistribution_Strategy") && LAST(pconfig->actual_group, pconfig->n_groups)) {
get_numbers_from_string(value, &aux_len, &aux);
pconfig->groups[pconfig->actual_group].rs = aux;
pconfig->groups[pconfig->actual_group].rs_len = aux_len;
} else if (MATCH(resize_name, "Spawn_Method") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].sm = atoi(value);
} else if (MATCH(resize_name, "Spawn_Strategy") && LAST(pconfig->actual_group, pconfig->n_groups)) {
get_numbers_from_string(value, &aux_len, &aux);
pconfig->groups[pconfig->actual_group].ss = aux;
pconfig->groups[pconfig->actual_group].ss_len = aux_len;
pconfig->actual_group = pconfig->actual_group+1; // Ultimo elemento de la estructura
// Unkown case
} else {
ret_value = 0; /* unknown section or name, error */
}
free(resize_name);
free(stage_name);
return ret_value;
}
/**
* @brief Extracts numbers from a comma-separated string and stores them in an array.
*
* This function takes a string containing a sequence of numbers separated by commas,
* converts each number to an integer, and stores them in a dynamically allocated array.
*
* @param input The input string containing comma-separated numbers.
* @param res_len Pointer to an integer that will hold the length of the resulting array.
* Note: Null can be passed if the caller does not need it.
* @param res Pointer to an integer array where the extracted numbers will be stored.
* Note: The memory for this array is dynamically allocated and should be freed by the caller.
*/
void get_numbers_from_string(const char *input, size_t *res_len, int **res) {
char *aux, *token;
int num;
size_t len, malloc_len;
len = 0;
malloc_len = 10;
*res = (int *) malloc(malloc_len * sizeof(int));
aux = (char *) malloc((strlen(input)+1) * sizeof(char));
strcpy(aux, input);
token = strtok(aux, ",");
while (token != NULL) {
num = atoi(token);
if(len == malloc_len) {
malloc_len += 10;
*res = (int *) realloc(*res, malloc_len * sizeof(int));
}
(*res)[len] = num;
len++;
token = strtok(NULL, ",");
}
if(res_len != NULL) *res_len = len;
if(len != malloc_len) {
*res = (int *) realloc(*res, len * sizeof(int));
}
free(aux);
}
/*
* Crea y devuelve una estructura de configuracion a traves
* de un nombre de fichero dado.
*
* La memoria de la estructura se reserva en la funcion y es conveniente
* liberarla con la funcion "free_config()"
*/
configuration *read_ini_file(char *file_name, ext_functions_t init_functions) {
configuration *config = NULL;
config = malloc(sizeof(configuration));
if(config == NULL) {
printf("Error when reserving configuration structure\n");
return NULL;
}
config->capture_method = 0;
config->rigid_times = 0;
config->n_resizes = 0;
config->n_groups = 1;
config->n_stages = 1;
config->actual_group=0;
config->actual_stage=0;
user_functions = &init_functions;
if(ini_parse(file_name, handler, config) < 0) { // Obtener configuracion
printf("Can't load '%s'\n", file_name);
return NULL;
}
return config;
}
#ifndef READ_INI_H
#define READ_INI_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "../Main/Main_datatypes.h"
typedef void (*Malloc_conf)(configuration* user_config);
typedef struct {
Malloc_conf resizes_f, stages_f;
} ext_functions_t;
configuration *read_ini_file(char *file_name, ext_functions_t init_functions);
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment