Commit 7836c7b6 authored by iker_martin's avatar iker_martin
Browse files
parents 352fcd52 07aa4fe0
......@@ -10,15 +10,13 @@
#include "GenericSpawn.h"
#include "Baseline.h"
#include "Merge.h"
#include "Spawn_state.h"
// This code is a Singleton object -- Only one instance can be used at a given time and
// no multiple calls to perform diferent resizes can be performed at the same time.
int commState = MALL_NOT_STARTED;
Spawn_data *spawn_data;
Spawn_data *spawn_data = NULL;
pthread_t spawn_thread;
//pthread_mutex_t spawn_mutex; FIXME BORRAR
//pthread_cond_t cond_adapt_rdy; FIXME BORRAR
MPI_Comm *returned_comm;
double end_time; //FIXME REFACTOR
......@@ -33,7 +31,6 @@ void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, int global_state);
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time);
int check_merge_shrink_state();
//--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn();
......@@ -63,57 +60,79 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
set_spawn_configuration(argv, num_cpus, num_nodes, nodelist, myId, root, initial_qty, target_qty, type_dist, spawn_method, spawn_strategies, comm);
if(!spawn_data->spawn_is_async) {
generic_spawn(child, NOT_STARTED);
generic_spawn(child, MALL_NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async);
if (local_state == MALL_SPAWN_COMPLETED)
deallocate_spawn_data();
} else {
local_state = spawn_data->spawn_is_single ? MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
local_state = spawn_data->spawn_is_single ?
MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
local_state = spawn_data->spawn_method == MALL_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
MALL_SPAWN_ADAPT_POSTPONE : local_state;
set_spawn_state(local_state, 0);
if((spawn_data->spawn_is_single && myId == root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn();
}
}
return commState;
return local_state;
}
/*
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, double *real_time) {
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
int local_state;
int global_state=MALL_NOT_STARTED;
if(spawn_data->spawn_is_async) {
if(spawn_data->spawn_is_async) { // Async
local_state = get_spawn_state(spawn_data->spawn_is_async);
//printf("Test 3.5 local=%d\n",local_state);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) {
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, local_state);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE && data_dist_completed) {
global_state = check_merge_shrink_state();
} else if(local_state == MALL_SPAWN_PENDING) {
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Baseline
global_state = check_generic_state(comm, child, local_state, real_time);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
global_state = local_state;
} else {
printf("Error Check spawn: Configuracion invalida\n");
printf("Error Check spawn: Configuracion invalida State = %d\n", local_state);
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
} else if(spawn_data->spawn_method == MALL_SPAWN_MERGE){
} else if(spawn_data->spawn_method == MALL_SPAWN_MERGE){ // Start Merge shrink Sync
generic_spawn(child, MALL_DIST_COMPLETED);
global_state = get_spawn_state(spawn_data->spawn_is_async);
if(global_state == MALL_SPAWN_COMPLETED)
deallocate_spawn_data();
}
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED)
deallocate_spawn_data();
return global_state;
}
/*
* Elimina la bandera bloqueante MALL_SPAWN_ADAPT_POSTPONE para los hilos
* auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no
* avance hasta que se complete la redistribucion de datos. Por tanto,
* al modificar la bandera los hilos pueden continuar.
*
* Por seguridad se comprueba que no se realice el cambio a la bandera a
* no ser que se cumplan las 3 condiciones.
*/
void unset_spawn_postpone_flag(int outside_state) {
int local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE && outside_state == MALL_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) {
set_spawn_state(MALL_SPAWN_PENDING, MALL_SPAWN_PTHREAD);
wakeup();
}
}
/*
* Funcion bloqueante de los hijos para asegurar que todas las tareas del paso
* de creacion de los hijos se terminan correctamente.
......@@ -122,7 +141,7 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, d
* para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
*
*/
void malleability_connect_children(int myId, int numP, int root, int *numP_parents, int *root_parents, MPI_Comm *parents) {
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->root = root;
spawn_data->myId = myId;
......@@ -133,21 +152,36 @@ void malleability_connect_children(int myId, int numP, int root, int *numP_paren
set_basic_spawn_dtype();
MPI_Bcast(spawn_data, 1, spawn_data->dtype, MALLEABILITY_ROOT, *parents);
switch(spawn_data->spawn_method) {
case MALL_SPAWN_BASELINE:
local_state = baseline(*spawn_data, parents);
baseline(*spawn_data, parents);
break;
case MALL_SPAWN_MERGE:
spawn_data->target_qty += numP_parents;
local_state = merge(*spawn_data, parents, NOT_STARTED);
spawn_data->target_qty += spawn_data->initial_qty;
merge(*spawn_data, parents, MALL_NOT_STARTED);
break;
}
*root_parents = spawn_data->root_parents;
*numP_parents = spawn_data->initial_qty;
MPI_Type_free(&(spawn_data->dtype));
free(spawn_data);
}
/*
* Función para obtener si entre las estrategias elegidas, se utiliza
* la estrategia pasada como segundo argumento.
*
* Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso
* contrario.
*/
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result) {
int value = spawn_strategies % strategy ? 0 : 1;
if(result != NULL) *result = value;
return value;
}
//--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/*
* Agrupa en una sola estructura todos los datos de configuración necesarios
......@@ -162,8 +196,8 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel
spawn_data->initial_qty = initial_qty;
spawn_data->target_qty = target_qty;
spawn_data->spawn_method = spawn_method;
spawn_data->spawn_is_single = spawn_strategies % MALL_SPAWN_SINGLE ? 0 : 1;
spawn_data->spawn_is_async = spawn_strategies % MALL_SPAWN_PTHREAD ? 0 : 1;
malleability_spawn_contains_strat(spawn_strategies, MALL_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
malleability_spawn_contains_strat(spawn_strategies, MALL_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
spawn_data->comm = comm;
set_basic_spawn_dtype();
......@@ -172,15 +206,15 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel
case MALL_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
spawn_data->already_created = 0;
break;
case MALL_SPAWN_MERGE:
spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
spawn_data->already_created = spawn_data->initial_qty;
break;
}
if(spawn_data->spawn_is_async) {
pthread_mutex_init(&(spawn_data->spawn_mutex),NULL);
pthread_cond_init(&(spawn_data->cond_adapt_rdy),NULL);
spawn_thread = pthread_self();
init_spawn_state();
}
if(spawn_data->myId == spawn_data->root) {
......@@ -229,19 +263,22 @@ void set_basic_spawn_dtype() {
* junto a la destrucion de aquellas estructuras que utiliza.
*/
void deallocate_spawn_data() {
free(spawn_data->cmd);
if(spawn_data == NULL) return;
if(spawn_data->cmd != NULL) {
free(spawn_data->cmd);
}
if(spawn_data->dtype != MPI_DATATYPE_NULL) {
MPI_Type_free(&(spawn_data->dtype));
}
if(spawn_data->mapping != MPI_INFO_NULL) {
MPI_Info_free(&(spawn_data->mapping));
}
if(spawn_data->spawn_is_async) {
pthread_cond_destroy(&(spawn_data->cond_adapt_rdy));
pthread_mutex_destroy(&(spawn_data->spawn_mutex));
spawn_thread = pthread_self();
free_spawn_state();
}
free(spawn_data);
spawn_data = NULL;
}
......@@ -269,9 +306,8 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
break;
}
// END WORK
local_state = get_spawn_state(spawn_data->spawn_is_async);
end_time = MPI_Wtime();
set_spawn_state(local_state, spawn_data->spawn_is_async);
}
......@@ -304,16 +340,19 @@ int allocate_thread_spawn() {
* se avisa al hilo maestro.
*/
void* thread_work(void* arg) {
int local_state;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
generic_spawn(returned_comm, NOT_STARTED);
while(commState == MALL_SPAWN_ADAPT_POSTPONE) {
generic_spawn(returned_comm, MALL_NOT_STARTED);
local_state = get_spawn_state(MALL_SPAWN_PTHREAD);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos
pthread_cond_wait(&(spawn_data->cond_adapt_rdy), &(spawn_data->spawn_mutex));
local_state = wait_wakeup();
generic_spawn(returned_comm, MALL_DIST_COMPLETED);
}
deallocate_spawn_data();
pthread_exit(NULL);
}
......@@ -334,14 +373,9 @@ int check_single_state(MPI_Comm comm, int global_state) {
// They also must join if the application has ended its work
if(global_state == MALL_SPAWN_SINGLE_COMPLETED) {
global_state = MALL_SPAWN_PENDING;
pthread_mutex_lock(&(spawn_data->spawn_mutex));
commState = global_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex));
// TODO Refactor - No debería ser necesario
int threads_not_spawned = pthread_equal(pthread_self(), spawn_thread);
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
if(spawn_data->myId != spawn_data->root && threads_not_spawned) {
if(spawn_data->myId != spawn_data->root) {
allocate_thread_spawn(spawn_data);
}
}
......@@ -362,24 +396,10 @@ int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
*child = *returned_comm;
deallocate_spawn_data(spawn_data);
*real_time=end_time;
}
return global_state;
}
/*
* Permite a una reduccion merge asincrona
* de procesos que estaba a la espera de que la
* distribucion de los datos se completase continue.
*/
int check_merge_shrink_state() {
// FIXME Pasar como caso especial para evitar iteracion no necesaria
int global_state = MALL_SPAWN_ADAPT_PENDING;
pthread_mutex_lock(&(spawn_data->spawn_mutex));
commState = global_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex));
pthread_cond_signal(&(spawn_data->cond_adapt_rdy));
return global_state
}
......@@ -7,8 +7,11 @@
#include "../malleabilityDataStructures.h"
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, double *real_time);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time);
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents);
void malleability_connect_children(int myId, int numP, int root, int *numP_parents, int *root_parents, MPI_Comm *parents);
void unset_spawn_postpone_flag(int outside_state);
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result);
#endif
objects1 := Baseline
objects2 := Merge ProcessDist
objects3 := GenericSpawn
CC := gcc
MCC := mpicc
CFLAGS := -Wall -Wextra
all: $(objects1) $(objects2) $(objects3)
$(objects1): %: %.c
$(MCC) $(CFLAGS) -c -o $@.o $<
$(objects2): %: %.c $(objects1).o
echo $@
$(MCC) $(CFLAGS) -c -o $@.o $<
$(objects3): %: %.c $(objects2).o
echo $@
$(MCC) $(CFLAGS) -c -o $@.o $<
clean:
rm *.o
......@@ -11,24 +11,25 @@ void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId);
//--------------PUBLIC FUNCTIONS---------------//
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
MPI_Comm new_comm = MPI_COMM_NULL;
MPI_Comm intercomm;
int local_state;
int numRanks, is_children_group = 1;
int is_children_group = 1;
if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink
if(data_state == MALL_DIST_COMPLETED)
merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, spawn_data.myId)
if(data_state == MALL_DIST_COMPLETED) {
merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, spawn_data.myId);
local_state = MALL_SPAWN_ADAPTED;
else {
} else {
local_state = MALL_SPAWN_ADAPT_POSTPONE;
}
} else { //Expand
MPI_Comm_size(spawn_data.comm, &numRanks);
is_children_group = spawn_data.initial_qty == numRanks ? 0:1;
MPI_Comm_get_parent(&intercomm);
is_children_group = intercomm == MPI_COMM_NULL ? 0:1;
baseline(spawn_data, child);
merge_adapt_expand(child, is_children_group);
local_state = MALL_SPAWN_ADAPTED;
local_state = MALL_SPAWN_COMPLETED;
}
return local_state;
......@@ -48,7 +49,7 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
void merge_adapt_expand(MPI_Comm *child, int is_children_group) {
MPI_Comm new_comm = MPI_COMM_NULL;
MPI_Intercomm_merge(child, is_children_group, new_comm); //El que pone 0 va primero
MPI_Intercomm_merge(*child, is_children_group, &new_comm); //El que pone 0 va primero
MPI_Comm_free(child); //POSIBLE ERROR?
*child = new_comm;
......@@ -74,8 +75,4 @@ void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId) {
color = 1;
}
MPI_Comm_split(comm, color, myId, child);
//TODO REFACTOR Llevar a otra parte -- Hacer solo si MALL_SPAWN_ADAPTED
//if(*comm != MPI_COMM_WORLD && *comm != MPI_COMM_NULL)
// MPI_Comm_free(comm); //POSIBLE ERROR?
}
......@@ -2,8 +2,10 @@
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <mpi.h>
#include <unistd.h>
#include <string.h>
#include <mpi.h>
#include <slurm/slurm.h>
#include "ProcessDist.h"
......@@ -33,7 +35,7 @@ int write_hostfile_node(int ptr, int qty, char *node_name);
* IN parameters -->
* target_qty: Numero de procesos tras la reconfiguracion
* alreadyCreated: Numero de procesos padre a considerar
* La resta de numC-alreadyCreated es el numero de hijos a crear
* La resta de target_qty-alreadyCreated es el numero de hijos a crear
* num_cpus: Numero de cpus totales (En uso o no)
* num_nodes: Numero de nodos disponibles por esta aplicacion
* info_type: Indica como realizar el mappeado, si indicarlo
......@@ -45,15 +47,12 @@ int write_hostfile_node(int ptr, int qty, char *node_name);
* (NODES/WORST/SPREAD)
*/
int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist) {
if(*dist == NULL) {
return 0;
}
dist->target_qty = target_qty;
dist->already_created = already_created;
dist->num_cpus = num_cpus;
dist->num_cpus = num_nodes;
dist->nodelist = nodelist
dist->num_nodes = num_nodes;
dist->nodelist = nodelist;
dist->dist_type = dist_type;
dist->info_type = info_type;
......@@ -77,10 +76,10 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
node_dist(dist, &procs_array, &used_nodes);
switch(dist.info_type) {
case MALL_DIST_STRING:
generate_info_string(nodelist, procs_array, used_nodes, info_spawn);
generate_info_string(dist.nodelist, procs_array, used_nodes, info_spawn);
break;
case MALL_DIST_HOSTFILE:
generate_info_hostfile(nodelist, procs_array, used_nodes, info_spawn);
generate_info_hostfile(dist.nodelist, procs_array, used_nodes, info_spawn);
break;
}
free(procs_array);
......@@ -133,8 +132,8 @@ void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) {
int i, tamBl, remainder;
*used_nodes = dist.num_nodes;
tamBl = dist.numC / dist.num_nodes;
remainder = dist.numC % dist.num_nodes;
tamBl = dist.target_qty / dist.num_nodes;
remainder = dist.target_qty % dist.num_nodes;
for(i=0; i<remainder; i++) {
procs[i] = tamBl + 1;
}
......@@ -170,7 +169,7 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
}
//Assing tamBl to each node
while(asigCores+tamBl <= dist.numC) {
while(asigCores+tamBl <= dist.target_qty) {
asigCores += tamBl;
procs[i] += tamBl;
i = (i+1) % dist.num_nodes;
......@@ -178,8 +177,8 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
}
//Last node could have less procs than tamBl
if(asigCores < dist.numC) {
procs[i] += dist.numC - asigCores;
if(asigCores < dist.target_qty) {
procs[i] += dist.target_qty - asigCores;
(*used_nodes)++;
}
if(*used_nodes > dist.num_nodes) *used_nodes = dist.num_nodes; //FIXME Si ocurre esto no es un error?
......
......@@ -3,26 +3,29 @@
#include <pthread.h>
pthread_mutex_t spawn_mutex;
pthread_cond_t spawn_cond;
int spawn_state;
void init_spawn_state() {
pthread_mutex_init(&spawn_mutex,NULL);
pthread_cond_init(&spawn_cond,NULL);
}
void free_spawn_state() {
pthread_mutex_destroy(&spawn_mutex);
pthread_cond_destroy(&spawn_cond);
}
int get_spawn_state(int is_async) {
int value;
if(is_async) {
pthread_mutex_lock(&(spawn_data->spawn_mutex));
pthread_mutex_lock(&spawn_mutex);
value = spawn_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex));
pthread_mutex_unlock(&spawn_mutex);
} else {
value = spawn_state;
}
return value
return value;
}
void set_spawn_state(int value, int is_async) {
......@@ -34,3 +37,16 @@ void set_spawn_state(int value, int is_async) {
spawn_state = value;
}
}
int wait_wakeup() {
pthread_mutex_lock(&spawn_mutex);
pthread_cond_wait(&spawn_cond, &spawn_mutex);
pthread_mutex_unlock(&spawn_mutex);
return get_spawn_state(1);
}
void wakeup() {
pthread_mutex_lock(&spawn_mutex);
pthread_cond_signal(&spawn_cond);
pthread_mutex_unlock(&spawn_mutex);
}
......@@ -7,7 +7,11 @@
void init_spawn_state();
void free_spawn_state();
int get_spawn_state(int is_async);
void set_spawn_state(int value, int is_async);
int wait_wakeup();
void wakeup();
#endif
......@@ -19,7 +19,7 @@ echo "MPICH"
numP=$(bash recordMachinefile.sh $configFile)
#mpirun -np 4 /home/martini/Instalaciones/valgrind-mpich-3.4.1-noucx/bin/valgrind --leak-check=full --show-leak-kinds=all --log-file=nc.vg.%p $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes
mpirun -np $numP $dir$codeDir/exec/a.out $configFile $outIndex $nodelist $nodes
mpirun -np $numP $dir$codeDir/build/a.out $configFile $outIndex $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID
echo "END RUN"
......
[general]
R=0
R=1
S=4
Granularity=100000
SDR=0.0
ADR=0.0
AT=0
SM=0
SS=0
SM=1
SS=2
; end [general]
[stage0]
PT=0
......@@ -29,14 +29,14 @@ bytes=33176880
t_stage=0.040449
;end [stage3]
[resize0]
Iters=1000
Procs=160
Iters=1
Procs=8
FactorS=1
Dist=compact
;end [resize0]
[resize1]
Iters=10
Procs=8
FactorS=0.5
Iters=30
Procs=2
FactorS=0.1
Dist=compact
;end [resize1]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment