Commit 4bcc805a authored by iker_martin's avatar iker_martin
Browse files

Added MaM interface

parent 1cd76849
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
#include <signal.h>
#include "malleabilityZombies.h"
#define PIDS_QTY 320
void zombies_suspend();
int offset_pids, *pids = NULL;
void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm) {
int pid = getpid();
int *pids_counts = malloc(mall->numP * sizeof(int));
int *pids_displs = malloc(mall->numP * sizeof(int));
int i, count=1;
#if USE_MAL_DEBUG > 2
if(mall->myId == mall->root){ DEBUG_FUNC("Collecting zombies", mall->myId, mall->numP); } fflush(stdout);
#endif
if(mall->myId < mall->numC) {
count = 0;
if(mall->myId == mall->root) {
for(i=0; i < mall->numC; i++) {
pids_counts[i] = 0;
}
for(i=mall->numC; i<mall->numP; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i - mall->numC) + offset_pids;
}
offset_pids += mall->numP - mall->numC;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, mall->root, comm);
free(pids_counts);
free(pids_displs);
if(mall->myId >= mall->numC) {
zombies_suspend();
}
}
void zombies_service_init() {
offset_pids = 0;
pids = malloc(PIDS_QTY * sizeof(int));
for(int i=0; i<PIDS_QTY; i++) {
pids[i] = 0;
}
}
void zombies_service_free() {
free(pids);
}
void zombies_suspend() {
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
act.sa_handler=gestor_usr2;
sigaction(SIGUSR2, &act, NULL);
sigset_t set;
sigprocmask(SIG_SETMASK,NULL,&set);
sigsuspend(&set);
}
void zombies_awake() {
for(int i=0; i < offset_pids; i++) { // Despertar a los zombies
kill(pids[i], SIGUSR2);
}
}
#ifndef MALLEABILITY_ZOMBIES_H
#define MALLEABILITY_ZOMBIES_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
#include <signal.h>
#include "malleabilityDataStructures.h"
void zombies_collect_suspended(MPI_Comm comm);
void zombies_service_init();
void zombies_service_free();
void zombies_awake();
#endif
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#include "Baseline.h"
#include "Spawn_state.h"
//--------------PRIVATE DECLARATIONS---------------//
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child);
int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
void single_strat_children(MPI_Comm *parents);
//--------------PUBLIC FUNCTIONS---------------//
/*
* Metodo basico para la creacion de procesos. Crea en total
* spawn_data.spawn_qty procesos.
*/
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
MPI_Comm intercomm;
MPI_Comm_get_parent(&intercomm);
if (intercomm == MPI_COMM_NULL) { // Parents path
if (spawn_data.spawn_is_single) {
single_strat_parents(spawn_data, child);
} else {
baseline_spawn(spawn_data, spawn_data.comm, child);
}
} else if(spawn_data.spawn_is_single) { // Children path
single_strat_children(child);
}
return MALL_SPAWN_COMPLETED;
}
//--------------PRIVATE FUNCTIONS---------------//
/*
* Crea un grupo de procesos segun la configuracion indicada por la funcion
* "processes_dist()".
*/
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) {
int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
int spawn_err = MPI_Comm_spawn(mall->name_exec, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, mall->root, comm, child, MPI_ERRCODES_IGNORE);
MPI_Comm_set_name(*child, "MPI_COMM_MALL_RESIZE");
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_data.spawn_qty);
}
MAM_Comm_main_structures(rootBcast);
return spawn_err;
}
/*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root
*/
int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
int spawn_err;
char *port_name;
MPI_Comm newintercomm;
if (mall->myId == mall->root) {
spawn_err = baseline_spawn(spawn_data, MPI_COMM_SELF, child);
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, 130, *child, MPI_STATUS_IGNORE);
set_spawn_state(MALL_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
wakeup_completion();
} else {
port_name = malloc(1);
}
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, spawn_data.comm, &newintercomm);
if(mall->myId == mall->root)
MPI_Comm_free(child);
free(port_name);
*child = newintercomm;
return spawn_err;
}
/*
* Conectar grupo de hijos con grupo de padres
* Devuelve un intercomunicador para hablar con los padres
*
* Solo se utiliza cuando la creación de los procesos ha sido
* realizada por un solo proceso padre
*/
void single_strat_children(MPI_Comm *parents) {
char *port_name;
MPI_Comm newintercomm;
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, 130, *parents);
} else {
port_name = malloc(1);
}
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, MPI_COMM_WORLD, &newintercomm);
if(mall->myId == mall->root) {
MPI_Close_port(port_name);
}
free(port_name);
MPI_Comm_free(parents);
*parents = newintercomm;
}
#ifndef MALLEABILITY_SPAWN_BASELINE_H
#define MALLEABILITY_SPAWN_BASELINE_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
int baseline(Spawn_data spawn_data, MPI_Comm *child);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#include "../MAM_Configuration.h"
#include "ProcessDist.h"
#include "GenericSpawn.h"
#include "Baseline.h"
#include "Merge.h"
#include "Spawn_state.h"
// This code is a Singleton object -- Only one instance can be used at a given time and
// no multiple calls to perform diferent resizes can be performed at the same time.
Spawn_data *spawn_data = NULL;
pthread_t spawn_thread;
//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
void set_spawn_configuration(MPI_Comm comm);
void deallocate_spawn_data();
//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed);
//--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn(MPI_Comm *child);
void* thread_work(void *args);
//--------------PUBLIC FUNCTIONS---------------//
/*
* Se solicita la creacion de un nuevo grupo de "numP" procesos con una distribucion
* fisica "type_dist".
*
* Se puede solicitar en primer plano, encargandose por tanto el proceso que llama a esta funcion,
* o en segundo plano, donde un hilo se encarga de configurar esta creacion.
*
* Si se pide en primer plano, al terminarla es posible llamar a "check_spawn_state()" para crear
* los procesos.
*
* Si se pide en segundo plano, llamar a "check_spawn_state()" comprobara si la configuracion para
* crearlos esta lista, y si es asi, los crea.
*
* Devuelve el estado de el procedimiento. Si no devuelve "MALL_SPAWN_COMPLETED", es necesario llamar a
* "check_spawn_state()".
*/
int init_spawn(MPI_Comm comm, MPI_Comm *child) {
int local_state;
set_spawn_configuration(comm);
if(!spawn_data->spawn_is_async) {
generic_spawn(child, MALL_NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async);
if (local_state == MALL_SPAWN_COMPLETED)
deallocate_spawn_data();
} else {
local_state = spawn_data->spawn_is_single ?
MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
local_state = mall_conf->spawn_method == MALL_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
MALL_SPAWN_ADAPT_POSTPONE : local_state;
set_spawn_state(local_state, 0);
if((spawn_data->spawn_is_single && mall->myId == mall->root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn(child);
}
}
return local_state;
}
/*
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) {
int local_state;
int global_state=MALL_NOT_STARTED;
if(spawn_data->spawn_is_async) { // Async
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, child, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Generic
global_state = check_generic_state(comm, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
global_state = local_state;
} else {
printf("Error Check spawn: Configuracion invalida State = %d\n", local_state);
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
} else if(mall_conf->spawn_method == MALL_SPAWN_MERGE){ // Start Merge shrink Sync
generic_spawn(child, MALL_DIST_COMPLETED);
global_state = get_spawn_state(spawn_data->spawn_is_async);
}
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED)
deallocate_spawn_data();
return global_state;
}
/*
* Elimina la bandera bloqueante MALL_SPAWN_ADAPT_POSTPONE para los hilos
* auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no
* avance hasta que se complete la redistribucion de datos. Por tanto,
* al modificar la bandera los hilos pueden continuar.
*
* Por seguridad se comprueba que no se realice el cambio a la bandera a
* no ser que se cumplan las 3 condiciones.
*/
void unset_spawn_postpone_flag(int outside_state) {
int local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE && outside_state == MALL_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) {
set_spawn_state(MALL_SPAWN_PENDING, spawn_data->spawn_is_async);
wakeup_redistribution();
}
}
/*
* Funcion bloqueante de los hijos para asegurar que todas las tareas del paso
* de creacion de los hijos se terminan correctamente.
*
* Ademas los hijos obtienen informacion basica de los padres
* para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
*
*/
void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->spawn_qty = mall->numP;
spawn_data->target_qty = mall->numP;
spawn_data->comm = comm;
MAM_Comm_main_structures(MALLEABILITY_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
//MPI_Comm_remote_size(*parents, &spawn_data->initial_qty);
spawn_data->initial_qty = mall->num_parents;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
baseline(*spawn_data, parents);
if(!spawn_data->spawn_is_intercomm) {
intracomm_strategy(MALLEABILITY_CHILDREN, parents);
}
break;
case MALL_SPAWN_MERGE:
spawn_data->target_qty += spawn_data->initial_qty;
merge(*spawn_data, parents, MALL_NOT_STARTED);
break;
}
//mall->num_parents = spawn_data->initial_qty;
free(spawn_data);
}
//--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/*
* Agrupa en una sola estructura todos los datos de configuración necesarios
* e inicializa las estructuras necesarias.
*/
void set_spawn_configuration(MPI_Comm comm) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->initial_qty = mall->numP;
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
spawn_data->comm = comm;
spawn_data->mapping_fill_method = MALL_DIST_STRING;
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
spawn_data->already_created = 0;
break;
case MALL_SPAWN_MERGE:
spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
spawn_data->already_created = spawn_data->initial_qty;
break;
}
if(spawn_data->spawn_is_async) {
init_spawn_state();
}
spawn_data->mapping = MPI_INFO_NULL;
}
/*
* Libera una estructura de datos spawn_data
* junto a la destrucion de aquellas estructuras que utiliza.
*/
void deallocate_spawn_data() {
if(spawn_data == NULL) return;
if(spawn_data->mapping != MPI_INFO_NULL) {
MPI_Info_free(&(spawn_data->mapping));
}
if(spawn_data->spawn_is_async) {
free_spawn_state();
}
free(spawn_data);
spawn_data = NULL;
}
//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//
/*
* Funcion generica para la creacion de procesos. Obtiene la configuracion
* y segun esta, elige como deberian crearse los procesos.
*
* Cuando termina, modifica la variable global para indicar este cambio
*/
void generic_spawn(MPI_Comm *child, int data_stage) {
int local_state, aux_state;
// WORK
if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
processes_dist(*spawn_data, &(spawn_data->mapping));
}
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
local_state = baseline(*spawn_data, child);
if(!spawn_data->spawn_is_intercomm) {
local_state = intracomm_strategy(MALLEABILITY_NOT_CHILDREN, child);
}
break;
case MALL_SPAWN_MERGE:
local_state = merge(*spawn_data, child, data_stage);
break;
}
// END WORK
aux_state = get_spawn_state(spawn_data->spawn_is_async);
if(!(aux_state == MALL_SPAWN_PENDING && local_state == MALL_SPAWN_ADAPT_POSTPONE)) {
set_spawn_state(local_state, spawn_data->spawn_is_async);
}
}
//--------------PRIVATE THREAD FUNCTIONS---------------//
/*
* Aloja la memoria para un hilo auxiliar dedicado a la creacion de procesos.
* No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
* asociado al terminar.
*/
int allocate_thread_spawn(MPI_Comm *child) {
if(pthread_create(&spawn_thread, NULL, thread_work, (void *) child)) {
printf("Error al crear el hilo de SPAWN\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
if(pthread_detach(spawn_thread)) {
printf("Error when detaching spawning thread\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return 0;
}
/*
* Funcion llamada por un hilo para que este se encarge
* de configurar la creacion de un nuevo grupo de procesos.
*
* Una vez esta lista la configuracion y es posible crear los procesos
* se avisa al hilo maestro.
*/
void* thread_work(void *args) {
int local_state;
MPI_Comm *child = (MPI_Comm *) args;
generic_spawn(child, MALL_NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE || local_state == MALL_SPAWN_PENDING) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos
local_state = wait_redistribution();
generic_spawn(child, MALL_DIST_COMPLETED);
}
wakeup_completion();
pthread_exit(NULL);
}
/*
* Comprueba si una creacion de procesos asincrona en el
* paso "single" ha terminado.
* Si no ha terminado se mantiene el estado
* "MALL_SPAWN_SINGLE_PENDING".
*
* Si ha terminado se crean los hilos auxiliares para
* los procesos no root y se devuelve el estado
* "MALL_SPAWN_PENDING".
*/
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed) {
while(wait_completed && mall->myId == mall->root && global_state == MALL_SPAWN_SINGLE_PENDING) {
global_state = wait_completion();
}
MPI_Bcast(&global_state, 1, MPI_INT, mall->root, comm);
// Non-root processes join root to finalize the spawn
// They also must join if the application has ended its work
if(global_state == MALL_SPAWN_SINGLE_COMPLETED) {
global_state = MALL_SPAWN_PENDING;
set_spawn_state(global_state, spawn_data->spawn_is_async);
if(mall->myId != mall->root) {
allocate_thread_spawn(child);
}
}
return global_state;
}
/*
* Comprueba si una creación de procesos asincrona en el
* paso "generic" ha terminado.
* Si no ha terminado devuelve el estado
* "MALL_SPAWN_PENDING".
*
* Si ha terminado libera la memoria asociada a spawn_data
* y devuelve el estado "MALL_SPAWN_COMPLETED".
*/
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed) {
int global_state;
while(wait_completed && local_state == MALL_SPAWN_PENDING) local_state = wait_completion();
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
set_spawn_state(global_state, spawn_data->spawn_is_async);
}
return global_state;
}
#ifndef MALLEABILITY_GENERIC_SPAWN_H
#define MALLEABILITY_GENERIC_SPAWN_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "Spawn_DataStructure.h"
int init_spawn(MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed);
void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents);
void unset_spawn_postpone_flag(int outside_state);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#include "Merge.h"
#include "Baseline.h"
//--------------PRIVATE DECLARATIONS---------------//
void merge_adapt_expand(MPI_Comm *child, int is_children_group);
void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId);
//--------------PUBLIC FUNCTIONS---------------//
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
MPI_Comm intercomm;
int local_state;
int is_children_group = 1;
if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink
if(data_state == MALL_DIST_COMPLETED) {
merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, mall->myId);
local_state = MALL_SPAWN_ADAPTED;
} else {
local_state = MALL_SPAWN_ADAPT_POSTPONE;
}
} else { //Expand
MPI_Comm_get_parent(&intercomm);
is_children_group = intercomm == MPI_COMM_NULL ? 0:1;
baseline(spawn_data, child);
merge_adapt_expand(child, is_children_group);
local_state = MALL_SPAWN_COMPLETED;
}
return local_state;
}
int intracomm_strategy(int is_children_group, MPI_Comm *child) {
merge_adapt_expand(child, is_children_group);
return MALL_SPAWN_COMPLETED;
}
//--------------PRIVATE MERGE TYPE FUNCTIONS---------------//
/*
* Se encarga de que el grupo de procesos resultante se
* encuentren todos en un intra comunicador, uniendo a
* padres e hijos en un solo comunicador.
*
* Se llama antes de la redistribución de datos.
*
* TODO REFACTOR
*/
void merge_adapt_expand(MPI_Comm *child, int is_children_group) {
MPI_Comm new_comm = MPI_COMM_NULL;
MPI_Intercomm_merge(*child, is_children_group, &new_comm); //El que pone 0 va primero
MPI_Comm_free(child);
*child = new_comm;
}
/*
* Se encarga de que el grupo de procesos resultante se
* eliminen aquellos procesos que ya no son necesarios.
* Los procesos eliminados se quedaran como zombies.
*
* Se llama una vez ha terminado la redistribución de datos.
*/
void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId) {
int color = MPI_UNDEFINED;
if(myId < numC) {
color = 1;
}
MPI_Comm_split(comm, color, myId, child);
}
#ifndef MALLEABILITY_SPAWN_MERGE_H
#define MALLEABILITY_SPAWN_MERGE_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state);
int intracomm_strategy(int is_children_group, MPI_Comm *child);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <mpi.h>
#include "ProcessDist.h"
//--------------PRIVATE DECLARATIONS---------------//
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes);
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str);
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
#include <slurm/slurm.h>
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str);
//@deprecated functions
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info);
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif
//--------------------------------SLURM USAGE-------------------------------------//
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name);
//@deprecated functions
int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);
//--------------PUBLIC FUNCTIONS---------------//
/*
* Configura la creacion de un nuevo grupo de procesos, reservando la memoria
* para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
* para los procesos y creando un fichero hostfile.
*
* OUT parameters -->
* info_spawn: Objeto MPI_Info en el que se indica el mappeado
* a usar al crear los procesos.
*/
void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn) {
int used_nodes=0;
int *procs_array;
// GET NEW DISTRIBUTION
node_dist(spawn_data, &procs_array, &used_nodes);
#if USE_MAL_SLURM
switch(spawn_data.mapping_fill_method) {
case MALL_DIST_STRING:
generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
break;
case MALL_DIST_HOSTFILE:
generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, info_spawn);
break;
}
free(procs_array);
#else
generate_info_string(mall->nodelist, procs_array, used_nodes, info_spawn);
#endif
}
//--------------PRIVATE FUNCTIONS---------------//
//-----------------DISTRIBUTION-----------------//
/*
* Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
* cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
* nodo.
*
* Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
*
* COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
* todos los nodos disponibles.
* COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de
* ocupar otro nodo.
*/
void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes) {
int i, *procs;
procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
/* GET NEW DISTRIBUTION */
switch(mall_conf->spawn_dist) {
case MALL_DIST_SPREAD: // DIST NODES @deprecated
spread_dist(spawn_data, used_nodes, procs);
break;
case MALL_DIST_COMPACT: // DIST CPUs
compact_dist(spawn_data, used_nodes, procs);
break;
}
//Copy results to output vector qty
*qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
}
free(procs);
}
/*
* Distribucion basada en equilibrar el numero de procesos en cada nodo
* para que todos los nodos tengan el mismo numero. Devuelve el total de
* nodos utilizados y el numero de procesos a crear en cada nodo.
*
* FIXME Tener en cuenta procesos ya creados (already_created)
*/
void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, tamBl, remainder;
*used_nodes = mall->num_nodes;
tamBl = spawn_data.target_qty / mall->num_nodes;
remainder = spawn_data.target_qty % mall->num_nodes;
for(i=0; i<remainder; i++) {
procs[i] = tamBl + 1;
}
for(i=remainder; i<mall->num_nodes; i++) {
procs[i] = tamBl;
}
}
/*
* Distribucion basada en llenar un nodo de procesos antes de pasar al
* siguiente nodo. Devuelve el total de nodos utilizados y el numero
* de procesos a crear en cada nodo.
*
* Tiene en cuenta los procesos ya existentes para el mappeado de
* los procesos a crear.
*/
void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, asigCores;
int tamBl, remainder;
tamBl = mall->num_cpus;
asigCores = spawn_data.already_created;
i = *used_nodes = spawn_data.already_created / tamBl;
remainder = spawn_data.already_created % tamBl;
//FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
//First nodes could already have existing procs
//Start from the first with free spaces
if (remainder) {
procs[i] = tamBl - remainder;
asigCores += procs[i];
i = (i+1) % mall->num_nodes;
(*used_nodes)++;
}
//Assign tamBl to each node
while(asigCores+tamBl <= spawn_data.target_qty) {
asigCores += tamBl;
procs[i] += tamBl;
i = (i+1) % mall->num_nodes;
(*used_nodes)++;
}
//Last node could have less procs than tamBl
if(asigCores < spawn_data.target_qty) {
procs[i] += spawn_data.target_qty - asigCores;
(*used_nodes)++;
}
if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes; //FIXME Si ocurre esto no es un error?
}
//--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------//
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
*
*
*/
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
char *host_str;
fill_str_hosts(nodelist, procs_array, nodes, &host_str);
// SET MAPPING
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", mall->nodelist);
free(host_str);
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **host_str) {
char *host, *aux, *token;
size_t i=0,len=0;
aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
strcpy(aux, nodelist);
token = strtok(aux, ",");
while (token != NULL && i < used_nodes) {
host = strdup(token);
if (qty[i] != 0) {
len = write_str_node(host_str, len, qty[i], host);
}
i++;
free(host);
token = strtok(NULL, ",");
}
free(aux);
}
/*
* Añade en una cadena "qty" entradas de "node_name".
* Realiza la reserva de memoria y la realoja si es necesario.
*/
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name) {
int err;
char *ocurrence;
size_t i, len, len_node;
len_node = strlen(node_name) + 1; // Str length + ','
len = qty * len_node; // Number of times the node is used
if(len_og == 0) { // Memoria no reservada
*hostfile_str = (char *) malloc((len+1) * sizeof(char));
} else { // Cadena ya tiene datos
*hostfile_str = (char *) realloc(*hostfile_str, (len_og + len + 1) * sizeof(char));
}
if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria
ocurrence = (char *) malloc((len_node+1) * sizeof(char));
if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
err = snprintf(ocurrence, len_node+1, ",%s", node_name);
if(err < 0) return -3; // No ha sido posible escribir sobre la variable auxiliar
i=0;
if(len_og == 0) { // Si se inicializa, la primera es una copia
i++;
strcpy(*hostfile_str, node_name);
}
for(; i<qty; i++){ // Las siguientes se conctanenan
strcat(*hostfile_str, ocurrence);
}
free(ocurrence);
return len+len_og;
}
//--------------------------------SLURM USAGE-------------------------------------//
#if USE_MAL_SLURM
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
* Es necesario usar Slurm para usarlo.
*/
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){
// CREATE AND SET STRING HOSTS
char *hoststring;
fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", hoststring);
free(hoststring);
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) {
char *host;
size_t i=0,len=0;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
len = write_str_node(hostfile_str, len, qty[i], host);
}
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
}
//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================
/* FIXME Por revisar
* @deprecated
* Genera un fichero hostfile y lo anyade a un objeto
* MPI_Info para ser utilizado.
*/
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info){
char *hostfile;
int ptr;
// CREATE/UPDATE HOSTFILE
ptr = create_hostfile(&hostfile);
MPI_Info_create(info);
MPI_Info_set(*info, "hostfile", hostfile);
free(hostfile);
// SET NEW DISTRIBUTION
fill_hostfile_slurm(nodelist, ptr, procs_array, nodes);
close(ptr);
}
/*
* @deprecated
* Crea un fichero que se utilizara como hostfile
* para un nuevo grupo de procesos.
*
* El nombre es devuelto en el argumento "file_name",
* que tiene que ser un puntero vacio.
*
* Ademas se devuelve un descriptor de fichero para
* modificar el fichero.
*/
int create_hostfile(char **file_name) {
int ptr, err;
size_t len = 11; //FIXME Numero mágico
*file_name = NULL;
*file_name = malloc(len * sizeof(char));
if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
err = snprintf(*file_name, len, "hostfile.o");
if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if(ptr < 0) return -3; // No ha sido posible crear el fichero
return ptr; // Devolver puntero a fichero
}
/*
* @deprecated
* Rellena un fichero hostfile indicado por ptr con los nombres
* de los nodos a utilizar indicados por "job_record" y la cantidad
* de procesos que alojara cada nodo indicado por "qty".
*/
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int nodes) {
int i=0;
char *host;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(nodelist);
while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
write_hostfile_node(ptr, qty[i], host);
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
}
/*
* @deprecated
* Escribe en el fichero hostfile indicado por ptr una nueva linea.
*
* Esta linea indica el nombre de un nodo y la cantidad de procesos a
* alojar en ese nodo.
*/
int write_hostfile_node(int ptr, int qty, char *node_name) {
int err;
char *line;
size_t len, len_node, len_int;
len_node = strlen(node_name);
err = snprintf(NULL, 0, "%d", qty);
if(err < 0) return -1;
len_int = err;
len = len_node + len_int + 3;
line = malloc(len * sizeof(char));
if(line == NULL) return -2; // No ha sido posible alojar la memoria
err = snprintf(line, len, "%s:%d\n", node_name, qty);
if(err < 0) return -3; // No ha sido posible escribir en el fichero
write(ptr, line, len-1);
free(line);
return 0;
}
#endif
//--------------------------------SLURM USAGE-------------------------------------//
#ifndef MALLEABILITY_SPAWN_PROCESS_DIST_H
#define MALLEABILITY_SPAWN_PROCESS_DIST_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
void processes_dist(Spawn_data spawn_data, MPI_Info *info_spawn);
#endif
#ifndef MAM_SPAWN_DATASTRUCTURE_H
#define MAM_SPAWN_DATASTRUCTURE_H
#include <mpi.h>
/* --- SPAWN STRUCTURE --- */
typedef struct {
int spawn_qty, initial_qty, target_qty;
int already_created;
int spawn_is_single, spawn_is_async, spawn_is_intercomm;
MPI_Info mapping;
int mapping_fill_method;
MPI_Comm comm, returned_comm;
} Spawn_data;
#endif
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "Spawn_state.h"
pthread_mutex_t spawn_mutex;
pthread_cond_t spawn_cond, completion_cond;
int spawn_state;
int waiting_redistribution=0, waiting_completion=0;
void init_spawn_state() {
pthread_mutex_init(&spawn_mutex,NULL);
pthread_cond_init(&spawn_cond,NULL);
pthread_cond_init(&completion_cond,NULL);
set_spawn_state(1,0); //FIXME First parameter is a horrible magical number
}
void free_spawn_state() {
pthread_mutex_destroy(&spawn_mutex);
pthread_cond_destroy(&spawn_cond);
pthread_cond_destroy(&completion_cond);
}
int get_spawn_state(int is_async) {
int value;
if(is_async) {
pthread_mutex_lock(&spawn_mutex);
value = spawn_state;
pthread_mutex_unlock(&spawn_mutex);
} else {
value = spawn_state;
}
return value;
}
void set_spawn_state(int value, int is_async) {
if(is_async) {
pthread_mutex_lock(&spawn_mutex);
spawn_state = value;
pthread_mutex_unlock(&spawn_mutex);
} else {
spawn_state = value;
}
}
int wait_redistribution() {
pthread_mutex_lock(&spawn_mutex);
if(!waiting_redistribution) {
waiting_redistribution=1;
pthread_cond_wait(&spawn_cond, &spawn_mutex);
}
waiting_redistribution=0;
pthread_mutex_unlock(&spawn_mutex);
return get_spawn_state(1);
}
void wakeup_redistribution() {
pthread_mutex_lock(&spawn_mutex);
if(waiting_redistribution) {
pthread_cond_signal(&spawn_cond);
}
waiting_redistribution=1;
pthread_mutex_unlock(&spawn_mutex);
}
int wait_completion() {
pthread_mutex_lock(&spawn_mutex);
if(!waiting_completion) {
waiting_completion=1;
pthread_cond_wait(&completion_cond, &spawn_mutex);
}
waiting_completion=0;
pthread_mutex_unlock(&spawn_mutex);
return get_spawn_state(1);
}
void wakeup_completion() {
pthread_mutex_lock(&spawn_mutex);
if(waiting_completion) {
pthread_cond_signal(&completion_cond);
}
waiting_completion=1;
pthread_mutex_unlock(&spawn_mutex);
}
#ifndef MALLEABILITY_SPAWN_STATE_H
#define MALLEABILITY_SPAWN_STATE_H
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
void init_spawn_state();
void free_spawn_state();
int get_spawn_state(int is_async);
void set_spawn_state(int value, int is_async);
int wait_redistribution();
void wakeup_redistribution();
int wait_completion();
void wakeup_completion();
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment