Commit 30685e95 authored by iker_martin's avatar iker_martin
Browse files

Modificada salida de resultados, ahora tambien se indican los tiempos para...

Modificada salida de resultados, ahora tambien se indican los tiempos para cada etapa de una iteracion como locales
parent 9a0629d9
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <pthread.h>
#include "../malleabilityStates.h"
#include "Baseline.h"
//--------------PRIVATE DECLARATIONS---------------//
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child);
int baseline_single_spawn(Spawn_data spawn_data, MPI_Comm *child);
void baseline_establish_connection(int myId, int root, MPI_Comm *parents);
//--------------PUBLIC FUNCTIONS---------------//
/*
* Metodo basico para la creacion de procesos. Crea en total
* spawn_data.spawn_qty procesos.
*
* Tiene incorporada la estrategia Single para permitir que
* un solo proceso padre cree a los hijos.
*
* Si la funcion es llamada por los hijos se comprobara si
* se esta utilizando la estrategia Single para terminar
* la creacion de procesos. En caso contrario no realizan
* nada los hijos.
*/
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
int numRanks;
MPI_Comm_size(spawn_data.comm, &numRanks);
if (spawn_data.initial_qty == numRanks) { // Parents path
if(spawn_data.spawn_is_single) {
baseline_single_spawn(spawn_data, child);
} else {
baseline_spawn(spawn_data, spawn_data.comm, child);
}
} else if(spawn_data.spawn_is_single) { // Children path
baseline_establish_connection(spawn_data.myId, spawn_data.root, child);
}
return MALL_SPAWN_COMPLETED;
}
//--------------PRIVATE FUNCTIONS---------------//
/*
* Crea un grupo de procesos segun la configuracion indicada por la funcion
* "processes_dist()".
*/
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) {
int rootBcast = MPI_PROC_NULL;
if(spawn_data.myId == spawn_data.root) rootBcast = MPI_ROOT;
// WORK
int spawn_err = MPI_Comm_spawn(spawn_data.cmd, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, spawn_data.root, comm, child, MPI_ERRCODES_IGNORE);
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_data.spawn_qty);
}
// END WORK
MPI_Bcast(&spawn_data, 1, spawn_data.dtype, rootBcast, *child);
return spawn_err;
}
/*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root
*/
int baseline_single_spawn(Spawn_data spawn_data, MPI_Comm *child) {
int spawn_err;
char *port_name;
MPI_Comm newintercomm;
if (spawn_data.myId == spawn_data.root) {
spawn_err = baseline_spawn(spawn_data, MPI_COMM_SELF, child);
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, spawn_data.root, 130, *child, MPI_STATUS_IGNORE);
if(spawn_data.spawn_is_async) {
pthread_mutex_lock(&(spawn_data.spawn_mutex));
commState = MALL_SPAWN_SINGLE_COMPLETED; // Indicate other processes to join root to end spawn procedure
pthread_mutex_unlock(&(spawn_data.spawn_mutex));
}
} else {
port_name = malloc(1);
}
MPI_Comm_connect(port_name, MPI_INFO_NULL, spawn_data.root, spawn_data.comm, &newintercomm);
if(spawn_data.myId == spawn_data.root)
MPI_Comm_free(child);
free(port_name);
*child = newintercomm;
return spawn_err;
}
/*
* Conectar grupo de hijos con grupo de padres
* Devuelve un intercomunicador para hablar con los padres
*
* Solo se utiliza cuando la creación de los procesos ha sido
* realizada por un solo proceso padre
*/
void baseline_establish_connection(int myId, int root, MPI_Comm *parents) {
char *port_name;
MPI_Comm newintercomm;
if(myId == root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *parents);
} else {
port_name = malloc(1);
}
MPI_Comm_accept(port_name, MPI_INFO_NULL, root, MPI_COMM_WORLD, &newintercomm);
if(myId == root) {
MPI_Close_port(port_name);
}
free(port_name);
MPI_Comm_free(parents);
*parents = newintercomm;
}
#ifndef MALLEABILITY_SPAWN_BASELINE_H
#define MALLEABILITY_SPAWN_BASELINE_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityDataStructures.h"
int baseline(Spawn_data spawn_data, MPI_Comm *child);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
#include "ProcessDist.h"
#include "GenericSpawn.h"
#include "Baseline.h"
#include "Merge.h"
// This code is a Singleton object -- Only one instance can be used at a given time and
// no multiple calls to perform diferent resizes can be performed at the same time.
int commState = MALL_NOT_STARTED;
Spawn_data *spawn_data;
pthread_t spawn_thread;
//pthread_mutex_t spawn_mutex; FIXME BORRAR
//pthread_cond_t cond_adapt_rdy; FIXME BORRAR
MPI_Comm *returned_comm;
double end_time; //FIXME REFACTOR
//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodelist, int myId, int root, int initial_qty, int target_qty, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm);
void set_basic_spawn_dtype();
void deallocate_spawn_data();
//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, int global_state);
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time);
int check_merge_shrink_state();
//--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn();
void* thread_work(void* arg);
//--------------PUBLIC FUNCTIONS---------------//
/*
* Se solicita la creacion de un nuevo grupo de "numP" procesos con una distribucion
* fisica "type_dist".
*
* Se puede solicitar en primer plano, encargandose por tanto el proceso que llama a esta funcion,
* o en segundo plano, donde un hilo se encarga de configurar esta creacion.
*
* Si se pide en primer plano, al terminarla es posible llamar a "check_spawn_state()" para crear
* los procesos.
*
* Si se pide en segundo plano, llamar a "check_spawn_state()" comprobara si la configuracion para
* crearlos esta lista, y si es asi, los crea.
*
* Devuelve el estado de el procedimiento. Si no devuelve "MALL_SPAWN_COMPLETED", es necesario llamar a
* "check_spawn_state()".
*/
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child) {
int local_state;
set_spawn_configuration(argv, num_cpus, num_nodes, nodelist, myId, root, initial_qty, target_qty, type_dist, spawn_method, spawn_strategies, comm);
if(!spawn_data->spawn_is_async) {
generic_spawn(child, NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async);
if (local_state == MALL_SPAWN_COMPLETED)
deallocate_spawn_data();
} else {
local_state = spawn_data->spawn_is_single ? MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
set_spawn_state(local_state, 0);
if((spawn_data->spawn_is_single && myId == root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn();
}
}
return commState;
}
/*
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, double *real_time) {
int local_state;
int global_state=MALL_NOT_STARTED;
if(spawn_data->spawn_is_async) {
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) {
global_state = check_single_state(comm, local_state);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE && data_dist_completed) {
global_state = check_merge_shrink_state();
} else if(local_state == MALL_SPAWN_PENDING) {
global_state = check_generic_state(comm, child, local_state, real_time);
} else {
printf("Error Check spawn: Configuracion invalida\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
} else if(spawn_data->spawn_method == MALL_SPAWN_MERGE){
generic_spawn(child, MALL_DIST_COMPLETED);
global_state = get_spawn_state(spawn_data->spawn_is_async);
if(global_state == MALL_SPAWN_COMPLETED)
deallocate_spawn_data();
}
return global_state;
}
/*
* Funcion bloqueante de los hijos para asegurar que todas las tareas del paso
* de creacion de los hijos se terminan correctamente.
*
* Ademas los hijos obtienen informacion basica de los padres
* para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
*
*/
void malleability_connect_children(int myId, int numP, int root, int *numP_parents, int *root_parents, MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->root = root;
spawn_data->myId = myId;
spawn_data->spawn_qty = numP;
spawn_data->target_qty = numP;
spawn_data->comm = comm;
set_basic_spawn_dtype();
MPI_Bcast(spawn_data, 1, spawn_data->dtype, MALLEABILITY_ROOT, *parents);
switch(spawn_data->spawn_method) {
case MALL_SPAWN_BASELINE:
local_state = baseline(*spawn_data, parents);
break;
case MALL_SPAWN_MERGE:
spawn_data->target_qty += numP_parents;
local_state = merge(*spawn_data, parents, NOT_STARTED);
break;
}
MPI_Type_free(&(spawn_data->dtype));
free(spawn_data);
}
//--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/*
* Agrupa en una sola estructura todos los datos de configuración necesarios
* e inicializa las estructuras necesarias.
*/
void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodelist, int myId, int root, int initial_qty, int target_qty, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->myId = myId;
spawn_data->root = root;
spawn_data->root_parents = root;
spawn_data->initial_qty = initial_qty;
spawn_data->target_qty = target_qty;
spawn_data->spawn_method = spawn_method;
spawn_data->spawn_is_single = spawn_strategies % MALL_SPAWN_SINGLE ? 0 : 1;
spawn_data->spawn_is_async = spawn_strategies % MALL_SPAWN_PTHREAD ? 0 : 1;
spawn_data->comm = comm;
set_basic_spawn_dtype();
switch(spawn_data->spawn_method) {
case MALL_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
spawn_data->already_created = 0;
case MALL_SPAWN_MERGE:
spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
spawn_data->already_created = spawn_data->initial_qty;
}
if(spawn_data->spawn_is_async) {
pthread_mutex_init(&(spawn_data->spawn_mutex),NULL);
pthread_cond_init(&(spawn_data->cond_adapt_rdy),NULL);
spawn_thread = pthread_self();
}
if(spawn_data->myId == spawn_data->root) {
physical_struct_create(target_qty, spawn_data->already_created, num_cpus, num_nodes, nodelist, type_dist, MALL_DIST_STRING, &(spawn_data->dist));
//COPY PROGRAM NAME
spawn_data->cmd = malloc(strlen(cmd) * sizeof(char));
strcpy(spawn_data->cmd, cmd);
} else {
spawn_data->cmd = malloc(1 * sizeof(char));
spawn_data->mapping = MPI_INFO_NULL; //It is only needed for the root process
}
}
/*
* Crea un tipo derivado para mandar 4 enteros con informacion
* basica a los hijos. Son datos necesarios para que terminen
* la creacion de procesos.
*/
void set_basic_spawn_dtype() {
int i, counts = 4;
int blocklengths[] = {1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = MPI_INT;
// Rellenar vector displs
MPI_Get_address(spawn_data, &dir);
MPI_Get_address(&(spawn_data->root_parents), &displs[0]);
MPI_Get_address(&(spawn_data->initial_qty), &displs[1]);
MPI_Get_address(&(spawn_data->spawn_is_single), &displs[2]);
MPI_Get_address(&(spawn_data->spawn_method), &displs[3]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, &(spawn_data->dtype));
MPI_Type_commit(&(spawn_data->dtype));
}
/*
* Libera una estructura de datos spawn_data
* junto a la destrucion de aquellas estructuras que utiliza.
*/
void deallocate_spawn_data() {
free(spawn_data->cmd);
MPI_Type_free(&(spawn_data->dtype));
if(spawn_data->mapping != MPI_INFO_NULL) {
MPI_Info_free(&(spawn_data->mapping));
}
if(spawn_data->spawn_is_async) {
pthread_cond_destroy(&(spawn_data->cond_adapt_rdy));
pthread_mutex_destroy(&(spawn_data->spawn_mutex));
spawn_thread = pthread_self();
}
free(spawn_data);
}
//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//
/*
* Funcion generica para la creacion de procesos. Obtiene la configuracion
* y segun esta, elige como deberian crearse los procesos.
*
* Cuando termina, modifica la variable global para indicar este cambio
*/
void generic_spawn(MPI_Comm *child, int data_stage) {
int local_state;
// WORK
if(spawn_data->myId == spawn_data->root) { //SET MAPPING
processes_dist(spawn_data->dist, &(spawn_data->mapping));
}
switch(spawn_data->spawn_method) {
case MALL_SPAWN_BASELINE:
local_state = baseline(*spawn_data, child);
break;
case MALL_SPAWN_MERGE:
local_state = merge(*spawn_data, child, data_stage);
break;
}
// END WORK
local_state = get_spawn_state(spawn_data->spawn_is_async);
end_time = MPI_Wtime();
}
//--------------PRIVATE THREAD FUNCTIONS---------------//
/*
* Aloja la memoria para un hilo auxiliar dedicado a la creacion de procesos.
* No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
* asociado al terminar.
*/
int allocate_thread_spawn() {
if(pthread_create(&spawn_thread, NULL, thread_work, NULL)) {
printf("Error al crear el hilo de SPAWN\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
if(pthread_detach(spawn_thread)) {
printf("Error when detaching spawning thread\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return 0;
}
/*
* Funcion llamada por un hilo para que este se encarge
* de configurar la creacion de un nuevo grupo de procesos.
*
* Una vez esta lista la configuracion y es posible crear los procesos
* se avisa al hilo maestro.
*/
void* thread_work(void* arg) {
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
generic_spawn(returned_comm, NOT_STARTED);
while(commState == MALL_SPAWN_ADAPT_POSTPONE) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos
pthread_cond_wait(&(spawn_data->cond_adapt_rdy), &(spawn_data->spawn_mutex));
generic_spawn(returned_comm, MALL_DIST_COMPLETED);
}
deallocate_spawn_data();
pthread_exit(NULL);
}
/*
* Comprueba si una creacion de procesos asincrona en el
* paso "single" ha terminado.
* Si no ha terminado se mantiene el estado
* "MALL_SPAWN_SINGLE_PENDING".
*
* Si ha terminado se crean los hilos auxiliares para
* los procesos no root y se devuelve el estado
* "MALL_SPAWN_PENDING".
*/
int check_single_state(MPI_Comm comm, int global_state) {
MPI_Bcast(&global_state, 1, MPI_INT, spawn_data->root, comm);
// Non-root processes join root to finalize the spawn
// They also must join if the application has ended its work
if(global_state == MALL_SPAWN_SINGLE_COMPLETED) {
global_state = MALL_SPAWN_PENDING;
pthread_mutex_lock(&(spawn_data->spawn_mutex));
commState = global_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex));
// TODO Refactor - No debería ser necesario
int threads_not_spawned = pthread_equal(pthread_self(), spawn_thread);
if(spawn_data->myId != spawn_data->root && threads_not_spawned) {
allocate_thread_spawn(spawn_data);
}
}
return global_state;
}
/*
* Comprueba si una creación de procesos asincrona en el
* paso "generic" ha terminado.
* Si no ha terminado devuelve el estado
* "MALL_SPAWN_PENDING".
*
* Si ha terminado libera la memoria asociada a spawn_data
* y devuelve el estado "MALL_SPAWN_COMPLETED".
*/
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time) {
int global_state;
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
*child = *returned_comm;
deallocate_spawn_data(spawn_data);
*real_time=end_time;
}
return global_state;
}
/*
* Permite a una reduccion merge asincrona
* de procesos que estaba a la espera de que la
* distribucion de los datos se completase continue.
*/
int check_merge_shrink_state() {
// FIXME Pasar como caso especial para evitar iteracion no necesaria
int global_state = MALL_SPAWN_ADAPT_PENDING;
pthread_mutex_lock(&(spawn_data->spawn_mutex));
commState = global_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex));
pthread_cond_signal(&(spawn_data->cond_adapt_rdy));
return global_state
}
#ifndef MALLEABILITY_GENERIC_SPAWN_H
#define MALLEABILITY_GENERIC_SPAWN_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityDataStructures.h"
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, double *real_time);
void malleability_connect_children(int myId, int numP, int root, int *numP_parents, int *root_parents, MPI_Comm *parents);
#endif
objects1 := Baseline
objects2 := Merge ProcessDist
objects3 := GenericSpawn
CC := gcc
MCC := mpicc
CFLAGS := -Wall -Wextra
all: $(objects1) $(objects2) $(objects3)
$(objects1): %: %.c
$(MCC) $(CFLAGS) -c -o $@.o $<
$(objects2): %: %.c $(objects1).o
echo $@
$(MCC) $(CFLAGS) -c -o $@.o $<
$(objects3): %: %.c $(objects2).o
echo $@
$(MCC) $(CFLAGS) -c -o $@.o $<
clean:
rm *.o
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityStates.h"
#include "Merge.h"
#include "Baseline.h"
//--------------PRIVATE DECLARATIONS---------------//
void merge_adapt_expand(MPI_Comm *child, int is_children_group);
void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId);
//--------------PUBLIC FUNCTIONS---------------//
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
MPI_Comm new_comm = MPI_COMM_NULL;
int local_state;
int numRanks, is_children_group = 1;
if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink
if(data_state == MALL_DIST_COMPLETED)
merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, spawn_data.myId)
local_state = MALL_SPAWN_ADAPTED;
else {
local_state = MALL_SPAWN_ADAPT_POSTPONE;
}
} else { //Expand
MPI_Comm_size(spawn_data.comm, &numRanks);
is_children_group = spawn_data.initial_qty == numRanks ? 0:1;
baseline(spawn_data, child);
merge_adapt_expand(child, is_children_group);
local_state = MALL_SPAWN_ADAPTED;
}
return local_state;
}
//--------------PRIVATE MERGE TYPE FUNCTIONS---------------//
/*
* Se encarga de que el grupo de procesos resultante se
* encuentren todos en un intra comunicador, uniendo a
* padres e hijos en un solo comunicador.
*
* Se llama antes de la redistribución de datos.
*
* TODO REFACTOR
*/
void merge_adapt_expand(MPI_Comm *child, int is_children_group) {
MPI_Comm new_comm = MPI_COMM_NULL;
MPI_Intercomm_merge(child, is_children_group, new_comm); //El que pone 0 va primero
MPI_Comm_free(child); //POSIBLE ERROR?
*child = new_comm;
//*numP = numC; //TODO REFACTOR Llevar a otra parte -- Hacer solo si MALL_SPAWN_ADAPTED
//if(*comm != MPI_COMM_WORLD && *comm != MPI_COMM_NULL) {
// MPI_Comm_free(comm);
//}
}
/*
* Se encarga de que el grupo de procesos resultante se
* eliminen aquellos procesos que ya no son necesarios.
* Los procesos eliminados se quedaran como zombies.
*
* Se llama una vez ha terminado la redistribución de datos.
*/
void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId) {
int color = MPI_UNDEFINED;
if(myId < numC) {
color = 1;
}
MPI_Comm_split(comm, color, myId, child);
//TODO REFACTOR Llevar a otra parte -- Hacer solo si MALL_SPAWN_ADAPTED
//if(*comm != MPI_COMM_WORLD && *comm != MPI_COMM_NULL)
// MPI_Comm_free(comm); //POSIBLE ERROR?
}
#ifndef MALLEABILITY_SPAWN_MERGE_H
#define MALLEABILITY_SPAWN_MERGE_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityDataStructures.h"
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <mpi.h>
#include <string.h>
#include "ProcessDist.h"
//--------------PRIVATE DECLARATIONS---------------//
void node_dist( struct physical_dist dist, int **qty, int *used_nodes);
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs);
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs);
void generate_info_string(char *nodelist, int *procs_array, int nodes, MPI_Info *info);
void fill_str_hostfile(char *nodelist, int *qty, int used_nodes, char **hostfile_str);
int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name);
//@deprecated functions
void generate_info_hostfile(char *nodelist, int *procs_array, int nodes, MPI_Info *info);
int create_hostfile(char **file_name);
void fill_hostfile(char *nodelist, int ptr, int *qty, int used_nodes);
int write_hostfile_node(int ptr, int qty, char *node_name);
/*
* Pone los datos para una estructura que guarda los parametros
* para realizar un mappeado de los procesos.
*
* Si la memoria no esta reservada devuelve falso y no hace nada.
* Si puede realizar los cambios devuelve verdadero.
*
* IN parameters -->
* target_qty: Numero de procesos tras la reconfiguracion
* alreadyCreated: Numero de procesos padre a considerar
* La resta de numC-alreadyCreated es el numero de hijos a crear
* num_cpus: Numero de cpus totales (En uso o no)
* num_nodes: Numero de nodos disponibles por esta aplicacion
* info_type: Indica como realizar el mappeado, si indicarlo
* en una cadena (MALL_DIST_STRING) o en un hostfile
* (MALL_DIST_HOSTFILE)
* dist_type: Indica como sera el mappeado, si intentar rellenar
* primero los nodos con cpus ya usados (CPUS/BEST/COMPACT) o
* que todos los nodos tengan el mismo numero de cpus usados
* (NODES/WORST/SPREAD)
*/
int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist) {
if(*dist == NULL) {
return 0;
}
dist->target_qty = target_qty;
dist->already_created = already_created;
dist->num_cpus = num_cpus;
dist->num_cpus = num_nodes;
dist->nodelist = nodelist
dist->dist_type = dist_type;
dist->info_type = info_type;
return 1;
}
/*
* Configura la creacion de un nuevo grupo de procesos, reservando la memoria
* para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
* para los procesos y creando un fichero hostfile.
*
* OUT parameters -->
* info_spawn: Objeto MPI_Info en el que se indica el mappeado
* a usar al crear los procesos.
*/
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
int used_nodes=0;
int *procs_array;
// GET NEW DISTRIBUTION
node_dist(dist, &procs_array, &used_nodes);
switch(dist.info_type) {
case MALL_DIST_STRING:
generate_info_string(nodelist, procs_array, used_nodes, info_spawn);
break;
case MALL_DIST_HOSTFILE:
generate_info_hostfile(nodelist, procs_array, used_nodes, info_spawn);
break;
}
free(procs_array);
}
/*
* Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
* cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
* nodo.
*
* Se permiten dos tipos de distribuciones fisicas segun el valor de "dist_type":
*
* COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
* todos los nodos disponibles.
* COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de
* ocupar otro nodo.
*/
void node_dist(struct physical_dist dist, int **qty, int *used_nodes) {
int i, *procs;
procs = calloc(dist.num_nodes, sizeof(int)); // Numero de procesos por nodo
/* GET NEW DISTRIBUTION */
switch(dist.dist_type) {
case MALL_DIST_SPREAD: // DIST NODES @deprecated
spread_dist(dist, used_nodes, procs);
break;
case MALL_DIST_COMPACT: // DIST CPUs
compact_dist(dist, used_nodes, procs);
break;
}
//Copy results to output vector qty
*qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
}
free(procs);
}
/*
* Distribucion basada en equilibrar el numero de procesos en cada nodo
* para que todos los nodos tengan el mismo numero. Devuelve el total de
* nodos utilizados y el numero de procesos a crear en cada nodo.
*
* TODO Tener en cuenta procesos ya creados (already_created)
*/
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) {
int i, tamBl, remainder;
*used_nodes = dist.num_nodes;
tamBl = dist.numC / dist.num_nodes;
remainder = dist.numC % dist.num_nodes;
for(i=0; i<remainder; i++) {
procs[i] = tamBl + 1;
}
for(i=remainder; i<dist.num_nodes; i++) {
procs[i] = tamBl;
}
}
/*
* Distribucion basada en llenar un nodo de procesos antes de pasar al
* siguiente nodo. Devuelve el total de nodos utilizados y el numero
* de procesos a crear en cada nodo.
*
* Tiene en cuenta los procesos ya existentes para el mappeado de
* los procesos a crear.
*/
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
int i, asigCores;
int tamBl, remainder;
tamBl = dist.num_cpus / dist.num_nodes;
asigCores = 0;
i = *used_nodes = dist.already_created / tamBl;
remainder = dist.already_created % tamBl;
//FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
//First nodes could already have existing procs
//Start from the first with free spaces
if (remainder) {
procs[i] = asigCores = tamBl - remainder;
i = (i+1) % dist.num_nodes;
(*used_nodes)++;
}
//Assing tamBl to each node
while(asigCores+tamBl <= dist.numC) {
asigCores += tamBl;
procs[i] += tamBl;
i = (i+1) % dist.num_nodes;
(*used_nodes)++;
}
//Last node could have less procs than tamBl
if(asigCores < dist.numC) {
procs[i] += dist.numC - asigCores;
(*used_nodes)++;
}
if(*used_nodes > dist.num_nodes) *used_nodes = dist.num_nodes; //FIXME Si ocurre esto no es un error?
}
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
*/
void generate_info_string(char *nodelist, int *procs_array, int nodes, MPI_Info *info){
// CREATE AND SET STRING HOSTS
char *hoststring;
fill_str_hostfile(nodelist, procs_array, nodes, &hoststring);
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", hoststring);
free(hoststring);
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hostfile(char *nodelist, int *qty, int used_nodes, char **hostfile_str) {
int i=0, len=0;
char *host;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
len = write_str_node(hostfile_str, len, qty[i], host);
}
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
}
/*
* Añade en una cadena "qty" entradas de "node_name".
* Realiza la reserva de memoria y la realoja si es necesario.
*/
int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name) {
int err, len_node, len, i;
char *ocurrence;
len_node = strlen(node_name);
len = qty * (len_node + 1);
if(len_og == 0) { // Memoria no reservada
*hostfile_str = (char *) malloc(len * sizeof(char) - (1 * sizeof(char)));
} else { // Cadena ya tiene datos
*hostfile_str = (char *) realloc(*hostfile_str, (len_og + len) * sizeof(char) - (1 * sizeof(char)));
}
if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria
ocurrence = (char *) malloc((len_node+1) * sizeof(char));
if(ocurrence == NULL) return -1; // No ha sido posible alojar la memoria
err = sprintf(ocurrence, ",%s", node_name);
if(err < 0) return -2; // No ha sido posible escribir sobre la variable auxiliar
i=0;
if(len_og == 0) { // Si se inicializa, la primera es una copia
i++;
strcpy(*hostfile_str, node_name);
}
for(; i<qty; i++){ // Las siguientes se conctanenan
strcat(*hostfile_str, ocurrence);
}
free(ocurrence);
return len+len_og;
}
//====================================================
//====================================================
//============DEPRECATED FUNCTIONS====================
//====================================================
//====================================================
/* FIXME Por revisar
* @deprecated
* Genera un fichero hostfile y lo anyade a un objeto
* MPI_Info para ser utilizado.
*/
void generate_info_hostfile(char *nodelist, int *procs_array, int nodes, MPI_Info *info){
char *hostfile;
int ptr;
// CREATE/UPDATE HOSTFILE
ptr = create_hostfile(&hostfile);
MPI_Info_create(info);
MPI_Info_set(*info, "hostfile", hostfile);
free(hostfile);
// SET NEW DISTRIBUTION
fill_hostfile(nodelist, ptr, procs_array, nodes);
close(ptr);
}
/*
* @deprecated
* Crea un fichero que se utilizara como hostfile
* para un nuevo grupo de procesos.
*
* El nombre es devuelto en el argumento "file_name",
* que tiene que ser un puntero vacio.
*
* Ademas se devuelve un descriptor de fichero para
* modificar el fichero.
*/
int create_hostfile(char **file_name) {
int ptr, err, len = 11;
*file_name = NULL;
*file_name = malloc( len * sizeof(char));
if(*file_name == NULL) return -1; // No ha sido posible alojar la memoria
err = snprintf(*file_name, len, "hostfile.o");
if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
ptr = open(*file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if(ptr < 0) return -3; // No ha sido posible crear el fichero
return ptr; // Devolver puntero a fichero
}
/*
* @deprecated
* Rellena un fichero hostfile indicado por ptr con los nombres
* de los nodos a utilizar indicados por "job_record" y la cantidad
* de procesos que alojara cada nodo indicado por "qty".
*/
void fill_hostfile(char *nodelist, int ptr, int *qty, int nodes) {
int i=0;
char *host;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(nodelist);
while ((host = slurm_hostlist_shift(hostlist)) && i < nodes) {
write_hostfile_node(ptr, qty[i], host);
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
}
/*
* @deprecated
* Escribe en el fichero hostfile indicado por ptr una nueva linea.
*
* Esta linea indica el nombre de un nodo y la cantidad de procesos a
* alojar en ese nodo.
*/
int write_hostfile_node(int ptr, int qty, char *node_name) {
int err, len_node, len_int, len;
char *line;
len_node = strlen(node_name);
len_int = snprintf(NULL, 0, "%d", qty);
len = len_node + len_int + 3;
line = malloc(len * sizeof(char));
if(line == NULL) return -1; // No ha sido posible alojar la memoria
err = snprintf(line, len, "%s:%d\n", node_name, qty);
if(err < 0) return -2; // No ha sido posible escribir en el fichero
write(ptr, line, len-1);
free(line);
return 0;
}
//TODO REFACTOR PARA CUANDO SE COMUNIQUE CON RMS
// Get Slurm job info
//int jobId;
//char *tmp;
//job_info_msg_t *j_info;
//slurm_job_info_t last_record;
//tmp = getenv("SLURM_JOB_ID");
//jobId = atoi(tmp);
//slurm_load_job(&j_info, jobId, 1);
//last_record = j_info->job_array[j_info->record_count - 1];
// Free JOB INFO
//slurm_free_job_info_msg(j_info);
#ifndef MALLEABILITY_SPAWN_PROCESS_DIST_H
#define MALLEABILITY_SPAWN_PROCESS_DIST_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#define MALL_DIST_SPREAD 1
#define MALL_DIST_COMPACT 2
#define MALL_DIST_STRING 1
#define MALL_DIST_HOSTFILE 2
int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist);
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
pthread_mutex_t spawn_mutex;
int spawn_state;
void init_spawn_state() {
pthread_mutex_init(&spawn_mutex,NULL);
}
void free_spawn_state() {
pthread_mutex_destroy(&spawn_mutex);
}
int get_spawn_state(int is_async) {
int value;
if(is_async) {
pthread_mutex_lock(&(spawn_data->spawn_mutex));
value = spawn_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex));
} else {
value = spawn_state;
}
return value
}
void set_spawn_state(int value, int is_async) {
if(is_async) {
pthread_mutex_lock(&spawn_mutex);
spawn_state = value;
pthread_mutex_unlock(&spawn_mutex);
} else {
spawn_state = value;
}
}
#ifndef MALLEABILITY_SPAWN_STATE_H
#define MALLEABILITY_SPAWN_STATE_H
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
void init_spawn_state();
void free_spawn_state();
int get_spawn_state(int is_async);
void set_spawn_state(int value, int is_async);
#endif
...@@ -19,7 +19,7 @@ echo "MPICH" ...@@ -19,7 +19,7 @@ echo "MPICH"
numP=$(bash recordMachinefile.sh $configFile) numP=$(bash recordMachinefile.sh $configFile)
#mpirun -np 4 /home/martini/Instalaciones/valgrind-mpich-3.4.1-noucx/bin/valgrind --leak-check=full --show-leak-kinds=all --log-file=nc.vg.%p $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes #mpirun -np 4 /home/martini/Instalaciones/valgrind-mpich-3.4.1-noucx/bin/valgrind --leak-check=full --show-leak-kinds=all --log-file=nc.vg.%p $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes
mpirun -np $numP $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes mpirun -np $numP $dir$codeDir/exec/a.out $configFile $outIndex $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID rm hostfile.o$SLURM_JOB_ID
echo "END RUN" echo "END RUN"
......
[general] [general]
R=0 R=0
S=1 S=4
Granularity=100000 Granularity=100000
SDR=0.0 SDR=0.0
ADR=0.0 ADR=0.0
...@@ -9,28 +9,28 @@ SM=0 ...@@ -9,28 +9,28 @@ SM=0
SS=0 SS=0
; end [general] ; end [general]
[stage0] [stage0]
PT=3 PT=0
bytes=0 bytes=0
t_stage=0.1 t_stage=0.01235
;end [stage0] ;end [stage0]
[stage1] [stage1]
PT=4 PT=6
bytes=0 bytes=8
t_stage=0.1 t_stage=0.1
;end [stage1] ;end [stage1]
[stage2] [stage2]
PT=5 PT=6
bytes=0 bytes=8
t_stage=0.1 t_stage=0.1
;end [stage2] ;end [stage2]
[stage3] [stage3]
PT=6 PT=4
bytes=0 bytes=33176880
t_stage=0.1 t_stage=0.040449
;end [stage3] ;end [stage3]
[resize0] [resize0]
Iters=10 Iters=1000
Procs=40 Procs=160
FactorS=1 FactorS=1
Dist=compact Dist=compact
;end [resize0] ;end [resize0]
......
...@@ -28,7 +28,7 @@ for ((i=0; i<qty; i++)) ...@@ -28,7 +28,7 @@ for ((i=0; i<qty; i++))
do do
echo "Iter $i" echo "Iter $i"
numP=$(bash $dir$codeDir/recordMachinefile.sh $1) numP=$(bash $dir$codeDir/recordMachinefile.sh $1)
mpirun -f hostfile.o$SLURM_JOB_ID $dir$codeDir/bench.out $1 $2 $nodelist $nodes mpirun -f hostfile.o$SLURM_JOB_ID $dir$codeDir/exec/a.out $1 $2 $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID rm hostfile.o$SLURM_JOB_ID
done done
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment