Commit 7cbd92e2 authored by iker_martin's avatar iker_martin
Browse files

Refactor de metodos spawn casi completado. Falta realizar pruebas con Merge shrink asincrono.

parent 67e9186e
......@@ -265,7 +265,6 @@ void print_config_group(configuration *user_config, int grp) {
* configuracion al otro grupo.
*/
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm) {
MPI_Datatype config_type, config_type_array, iter_stage_type;
// Obtener un tipo derivado para enviar todos los
......@@ -304,10 +303,7 @@ void send_config_file(configuration *config_file, int root, MPI_Comm intercomm)
* la funcion "free_config".
*/
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out) {
MPI_Datatype config_type, config_type_array, iter_stage_type;
configuration *config_file = malloc(sizeof(configuration) * 1);
// Obtener un tipo derivado para recibir todos los
......
......@@ -112,13 +112,14 @@ int main(int argc, char *argv[]) {
do {
group->grp = group->grp + 1;
if(group->grp != 0) obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
set_benchmark_grp(group->grp);
get_malleability_user_comm(&comm);
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
if(group->grp != 0)
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
if(config_file->n_resizes != group->grp + 1) {
if(config_file->n_resizes != group->grp + 1) { //TODO Llevar a otra funcion
set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], config_file->at, -1);
set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED
......@@ -186,7 +187,7 @@ int work() {
maxiter = config_file->iters[group->grp];
state = MALL_NOT_STARTED;
res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) {
iterate(matrix, config_file->granularity, state, iter);
......@@ -196,7 +197,7 @@ int work() {
state = malleability_checkpoint();
iter = 0;
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING) {
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE) {
if(iter < config_file->iters[group->grp+1]) {
iterate(matrix, config_file->granularity, state, iter);
iter++;
......@@ -409,7 +410,7 @@ void obtain_op_times(int compute) {
for(i=0; i<config_file->n_stages; i++) {
time+=init_stage(config_file, i, *group, comm, compute);
}
if(!compute) results->wasted_time += time;
if(!compute) {results->wasted_time += time;}
}
/*
......
......@@ -267,19 +267,18 @@ void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm com
*/
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double result, t_stage;
double result, t_stage, start_time;
result = 0;
t_stage = stage->t_stage * config_file->factors[group.grp];
initMatrix(&(stage->double_array), config_file->granularity);
double start_time = MPI_Wtime();
if(group.myId == ROOT && compute) {
result+= process_stage(*config_file, *stage, group, comm);
}
if(compute) {
stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
start_time = MPI_Wtime();
if(group.myId == ROOT) {
result+= process_stage(*config_file, *stage, group, comm);
stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
}
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage->operations = t_stage / stage->t_op;
......@@ -292,13 +291,12 @@ double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *st
result = 0;
t_stage = stage->t_stage * config_file->factors[group.grp];
start_time = MPI_Wtime();
if(group.myId == ROOT && compute) {
result+= process_stage(*config_file, *stage, group, comm);
}
if(compute) {
stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
start_time = MPI_Wtime();
if(group.myId == ROOT) {
result+= process_stage(*config_file, *stage, group, comm);
stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
}
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage->operations = t_stage / stage->t_op;
......@@ -360,13 +358,13 @@ double init_comm_allgatherv_pt(group_data group, configuration *config_file, ite
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
}
if(stage->counts.counts != NULL)
freeCounts(&(stage->counts));
prepare_comm_allgatherv(group.numP, stage->real_bytes, &(stage->counts));
get_block_dist(stage->real_bytes, group.myId, group.numP, &dist_data);
stage->my_bytes = dist_data.tamBl;
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->my_bytes);
......
......@@ -8,11 +8,12 @@
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLGATHER, COMP_REDUCE, COMP_ALLREDUCE};
//FIXME Refactor el void
double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute);
//double stage_init_all();
double process_stage(configuration config_file, iter_stage_t stage, group_data group, MPI_Comm comm);
double latency(int myId, int numP, MPI_Comm comm);
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n);
//FIXME Refactor el void??
void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm);
#endif
mpicc -Wall Main/Main.c Main/computing_func.c Main/comunication_func.c Main/linear_reg.c Main/process_stage.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c malleability/distribution_methods/block_distribution.c -pthread -lslurm -lm
if [ $# -gt 0 ]
then
if [ $1 = "-e" ]
then
echo "Creado ejecutable para ejecuciones"
cp a.out bench.out
fi
fi
......@@ -140,7 +140,8 @@ void free_malleability() {
state = MALL_UNRESERVED;
}
/*
/*
* TODO Reescribir
* Se realiza el redimensionado de procesos por parte de los padres.
*
* Se crean los nuevos procesos con la distribucion fisica elegida y
......@@ -156,48 +157,68 @@ void free_malleability() {
* y finalmente se desconectan los dos grupos de procesos.
*/
int malleability_checkpoint() {
if(state == MALL_UNRESERVED) return MALL_UNRESERVED;
if(state == MALL_NOT_STARTED) {
// Comprobar si se tiene que realizar un redimensionado
//if(CHECK_RMS()) {return MALL_DENIED;}
state = spawn_step();
double end_real_time;
//printf("P%d -- Estado %d\n", mall->myId, state);
switch(state) {
case MALL_UNRESERVED:
break;
case MALL_NOT_STARTED:
// Comprobar si se tiene que realizar un redimensionado
//if(CHECK_RMS()) {return MALL_DENIED;}
state = spawn_step();
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
malleability_checkpoint();
}
break;
if (state == MALL_SPAWN_COMPLETED){
state = start_redistribution();
}
case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado y comienza la redistribucion
case MALL_SPAWN_SINGLE_PENDING:
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start;
} else if(state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
double end_real_time;
malleability_checkpoint();
}
break;
/*if(mall_conf->spawn_type == MALL_SPAWN_MERGE && mall_conf->spawn_type == MALL_SPAWN_PTHREAD && mall->numP > mall->numC) {
state = shrink_redistribution(); //TODO REFACTOR
case MALL_SPAWN_ADAPT_POSTPONE:
case MALL_SPAWN_COMPLETED:
state = start_redistribution();
malleability_checkpoint();
break;
} else {
*/
//state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm, &end_real_time); //FIXMENOW
state = check_spawn_state(&(mall->intercomm), mall->comm, MALL_DIST_PENDING, &end_real_time); //FIXME 3 argumento depende de la distribucion
if (state == MALL_SPAWN_COMPLETED || state == MALL_DIST_ADAPTED) { //FIXME MALL_DIST_ADAPTED tiene que recoger los tiempos
case MALL_DIST_PENDING:
if(mall_conf->comm_type == MAL_USE_THREAD) {
state = thread_check();
} else {
state = check_redistribution();
}
if(state != MALL_DIST_PENDING) {
malleability_checkpoint();
}
break;
case MALL_SPAWN_ADAPT_PENDING:
mall_conf->results->spawn_start = MPI_Wtime();
unset_spawn_postpone_flag(state);
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
printf("TEST END state=%d\n", state);
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
if(malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start;
}
//TODO Si es MERGE, metodo diferente de redistribucion de datos
state = start_redistribution();
}
//}
break;
} else if(state == MALL_DIST_PENDING) {
if(mall_conf->comm_type == MAL_USE_THREAD) {
state = thread_check();
} else {
state = check_redistribution();
}
}
case MALL_SPAWN_ADAPTED:
shrink_redistribution();
break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
state = MALL_COMPLETED;
break;
}
return state;
}
......@@ -410,19 +431,10 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
void Children_init() {
int numP_parents, root_parents, i;
int is_intercomm;
//MPI_Comm aux;
malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
/*MPI_Bcast(&spawn_is_single, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
if(spawn_is_single) {
malleability_establish_connection(mall->myId, MALLEABILITY_ROOT, &(mall->intercomm)); //FIXMENOW
}
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&root_parents, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&numP_parents, 1, MPI_INT, root_parents, mall->intercomm);
*/
// TODO A partir de este punto tener en cuenta si es BASELINE o MERGE
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
// TODO A partir de este punto tener en cuenta si es BASELINE o MERGE
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
......@@ -460,21 +472,18 @@ void Children_init() {
}
}
/*
if(mall_conf->spawn_type == MALL_SPAWN_MERGE || mall_conf->spawn_type == MALL_SPAWN_MERGE_PTHREAD) {
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN); //FIXMENOW
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
}
*/
// Guardar los resultados de esta transmision
recv_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm);
if(!is_intercomm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
}
MPI_Comm_disconnect(&(mall->intercomm));
}
......@@ -490,22 +499,11 @@ void Children_init() {
*/
int spawn_step(){
mall_conf->results->spawn_start = MPI_Wtime();
/* FIXME Mantener funcion de shrink_redistribuition
if((mall_conf->spawn_type == MALL_SPAWN_MERGE) && mall->numP > mall->numC) {
state = shrink_redistribution();
return state;
}
*/
//state = init_slurm_comm(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm)); //FIXMENOW
state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
} else {
//mall_conf->results->spawn_thread_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
//mall_conf->results->spawn_start = MPI_Wtime();
}
return state;
}
......@@ -526,18 +524,27 @@ int spawn_step(){
* grupos de procesos.
*/
int start_redistribution() {
int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
int rootBcast, is_intercomm;
is_intercomm = 0;
if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
} else {
// Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
// y se trata del spawn Merge Shrink
mall->intercomm = mall->comm;
if(mall->comm == MPI_COMM_NULL) { printf("COMM nulo?\n");}
}
/*
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->root), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->numP), 1, MPI_INT, rootBcast, mall->intercomm);
*/
if(is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = mall->root;
}
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
mall_conf->results->async_start = MPI_Wtime();
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(mall_conf->comm_type == MAL_USE_THREAD) {
......@@ -609,9 +616,22 @@ int check_redistribution() {
* Finalmente termina enviando los datos temporales a los hijos.
*/
int end_redistribution() {
int result, i, rootBcast = MPI_PROC_NULL;
//MPI_Comm aux;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
int i, is_intercomm, rootBcast, local_state;
is_intercomm = 0;
if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
} else {
// Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
// y se trata del spawn Merge Shrink
mall->intercomm = mall->comm;
}
if(is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = mall->root;
}
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
......@@ -629,117 +649,65 @@ int end_redistribution() {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
}
}
/*
if(mall_conf->spawn_type == MALL_SPAWN_MERGE) {
double time_adapt = MPI_Wtime();
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN); //FIXMENOW
send_results(mall_conf->results, rootBcast, mall_conf->config_file->n_resizes, mall->intercomm);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_adapt;
// result = MAL_DIST_ADAPTED;
}
*/
local_state = MALL_DIST_COMPLETED;
if(!is_intercomm) { // Merge Spawn
if(mall->numP < mall->numC) { // Expand
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
send_results(mall_conf->results, rootBcast, mall_conf->config_file->n_resizes, mall->intercomm);
result = MALL_DIST_COMPLETED;
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
} else { // Shrink || Merge Shrink requiere de mas tareas
local_state = MALL_SPAWN_ADAPT_PENDING;
}
}
printf("TEST 1 P%d Comm=%d intercomm=%d\n", mall->myId, mall->comm, mall->intercomm);
MPI_Barrier(mall->comm); //FIXMENOW Por alguna razon da error en Comm
if(mall->intercomm != MPI_COMM_NULL) {
if(mall->intercomm == MPI_COMM_WORLD) {
printf("TEST 2 P%d Comm=%d intercomm=%d ES WORLD\n", mall->myId, mall->comm, mall->intercomm);
} //FIXMENOW Intercomm se borra, pero no es COMM WORLD ni COMM NULL
MPI_Comm_disconnect(&(mall->intercomm));
printf("TEST 3 P%d Borra intercomm = %d\n", mall->myId, mall->intercomm);
}
MPI_Comm_disconnect(&(mall->intercomm));
state = MALL_NOT_STARTED;
return result;
printf("TEST 4 P%d Comm=%d intercomm=%d\n", mall->myId, mall->comm, mall->intercomm);
MPI_Barrier(mall->comm); //FIXMENOW Por alguna razon da error en Comm
return local_state;
}
///=============================================
///=============================================
///=============================================
double time_adapt, time_adapt_end;
/*
int state_shrink=0; //TODO Refactor
pthread_t thread_shrink;
MPI_Comm comm_shrink;
int thread_shrink_creation();
void *thread_shrink_work();
int thread_shrink_creation() {
if(pthread_create(&thread_shrink, NULL, thread_shrink_work, NULL)) {
printf("Error al crear el hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return MAL_SPAWN_PENDING;
}
void* thread_shrink_work() {
proc_adapt_shrink(mall->numC, &comm_shrink, mall->myId); //FIXMENOW
time_adapt_end = MPI_Wtime();
state_shrink=2;
pthread_exit(NULL);
}
*/
///=============================================
///=============================================
///=============================================
int shrink_redistribution() {
//int global_state;
double time_aux;
MPI_Comm aux_comm;
/*
if(mall_conf->spawn_type == MALL_SPAWN_MERGE && mall_conf->spawn_type == MALL_SPAWN_PTHREAD) {
if(state_shrink == 0) {
time_adapt = MPI_Wtime();
state_shrink = 1;
MPI_Comm_dup(mall->comm, &comm_shrink);
thread_shrink_creation();
return MAL_SPAWN_PENDING;
} else if(state_shrink>0) {
MPI_Allreduce(&state_shrink, &global_state, 1, MPI_INT, MPI_MIN, mall->comm);
if(global_state < 2) return MAL_SPAWN_PENDING;
time_aux = MPI_Wtime();
if(pthread_join(thread_shrink, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
MPI_Comm_dup(mall->comm, &aux_comm);
mall->comm = comm_shrink;
}
} else {
time_adapt = MPI_Wtime();
MPI_Comm_dup(mall->comm, &aux_comm);
proc_adapt_shrink( mall->numC, &(mall->comm), mall->myId); //FIXMENOW
}
*/
MPI_Comm_dup(mall->comm, &aux_comm);
double time_extra = MPI_Wtime();
//TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos
zombies_collect_suspended(aux_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall->user_comm);
zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall->intercomm);
if(mall->myId < mall->numC) {
MPI_Comm_free(&aux_comm);
MPI_Comm_dup(mall->comm, &aux_comm); // FIXME Que pasa con los comunicadores Thread_comm y User_comm
mall->thread_comm = aux_comm;
MPI_Comm_dup(mall->comm, &aux_comm);
mall->user_comm = aux_comm;
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt; //FIXME Error
if(mall_conf->spawn_method == MALL_SPAWN_MERGE && malleability_spawn_contains_strat(mall_conf->spawn_strategies,MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_real_time[mall_conf->grp] = time_adapt_end - time_adapt + MPI_Wtime() - time_aux; //FIXME Error
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_extra;
if(malleability_spawn_contains_strat(mall_conf->spawn_strategies,MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_real_time[mall_conf->grp] += MPI_Wtime() - time_extra;
}
return MALL_DIST_COMPLETED; //FIXME Refactor Poner a SPAWN_COMPLETED
return MALL_DIST_COMPLETED;
} else {
return MALL_ZOMBIE;
}
......
......@@ -20,13 +20,11 @@
#define MAL_DIST_ADAPTED 8
*/
enum mall_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING,
MALL_SPAWN_SINGLE_PENDING, MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE,
MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED, MALL_DIST_ADAPTED};
enum mall_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING,
MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED,
MALL_SPAWN_ADAPT_PENDING, MALL_SPAWN_ADAPTED, MALL_COMPLETED};
enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
//#define MALL_SPAWN_SERIAL 2
#define MALL_SPAWN_PTHREAD 2
//#define MALL_SPAWN_MULTIPLE 5
#define MALL_SPAWN_SINGLE 3
#define MALLEABILITY_ROOT 0
......
......@@ -49,13 +49,15 @@ void add_data(void *data, int total_qty, int type, int request_qty, malleability
* unicamente.
*/
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) {
int i, rootBcast = MPI_PROC_NULL;
int i, is_intercomm, rootBcast = MPI_PROC_NULL;
MPI_Datatype entries_type, struct_type;
if(is_children_group) {
rootBcast = root;
MPI_Comm_test_inter(intercomm, &is_intercomm);
if(is_intercomm && !is_children_group) {
rootBcast = myId == root ? MPI_ROOT : MPI_PROC_NULL;
} else {
if(myId == root) rootBcast = MPI_ROOT;
rootBcast = root;
}
// Mandar primero numero de entradas
......
......@@ -31,7 +31,6 @@ void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, int global_state);
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time);
int check_merge_shrink_state();
//--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn();
......@@ -68,7 +67,10 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
} else {
local_state = spawn_data->spawn_is_single ? MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
local_state = spawn_data->spawn_is_single ?
MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
local_state = spawn_data->spawn_method == MALL_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
MALL_SPAWN_ADAPT_POSTPONE : local_state;
set_spawn_state(local_state, 0);
if((spawn_data->spawn_is_single && myId == root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn();
......@@ -82,24 +84,25 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, double *real_time) {
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
int local_state;
int global_state=MALL_NOT_STARTED;
if(spawn_data->spawn_is_async) { // Async
local_state = get_spawn_state(spawn_data->spawn_is_async);
printf("Test 3.5 local=%d\n",local_state);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, local_state);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE && data_dist_completed) { // Start Merge Shrink Async
global_state = check_merge_shrink_state();
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_DIST_ADAPTED) { // Baseline
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED) { // Baseline
global_state = check_generic_state(comm, child, local_state, real_time);
} else {
printf("Error Check spawn: Configuracion invalida\n");
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
global_state = local_state;
} else { //FIXMENOW Error con Merge Shrink + Pthreads -- Parece algo con updte de estados en TODOS los procesos
printf("Error Check spawn: Configuracion invalida State = %d\n", local_state);
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
......@@ -107,11 +110,28 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, d
generic_spawn(child, MALL_DIST_COMPLETED);
global_state = get_spawn_state(spawn_data->spawn_is_async);
}
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_DIST_ADAPTED)
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED)
deallocate_spawn_data();
return global_state;
}
/*
* Elimina la bandera bloqueante MALL_SPAWN_ADAPT_POSTPONE para los hilos
* auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no
* avance hasta que se complete la redistribucion de datos. Por tanto,
* al modificar la bandera los hilos pueden continuar.
*
* Por seguridad se comprueba que no se realice el cambio a la bandera a
* no ser que se cumplan las 3 condiciones.
*/
void unset_spawn_postpone_flag(int outside_state) {
int local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE && outside_state == MALL_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) {
set_spawn_state(MALL_SPAWN_PENDING, MALL_SPAWN_PTHREAD);
}
}
/*
* Funcion bloqueante de los hijos para asegurar que todas las tareas del paso
* de creacion de los hijos se terminan correctamente.
......@@ -136,7 +156,7 @@ void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm,
baseline(*spawn_data, parents);
break;
case MALL_SPAWN_MERGE:
spawn_data->target_qty += *numP_parents;
spawn_data->target_qty += spawn_data->initial_qty;
merge(*spawn_data, parents, MALL_NOT_STARTED);
break;
}
......@@ -285,9 +305,8 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
break;
}
// END WORK
set_spawn_state(local_state, spawn_data->spawn_is_async);
end_time = MPI_Wtime();
set_spawn_state(local_state, spawn_data->spawn_is_async);
}
......@@ -330,6 +349,7 @@ void* thread_work(void* arg) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos
repeat = 1;
local_state = wait_wakeup();
printf("Hilos despiertan\n");
}
if (repeat) generic_spawn(returned_comm, MALL_DIST_COMPLETED);
......@@ -355,8 +375,7 @@ int check_single_state(MPI_Comm comm, int global_state) {
global_state = MALL_SPAWN_PENDING;
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
//int threads_not_spawned = pthread_equal(pthread_self(), spawn_thread);
if(spawn_data->myId != spawn_data->root) { //&& threads_not_spawned) {
if(spawn_data->myId != spawn_data->root) {
allocate_thread_spawn(spawn_data);
}
}
......@@ -376,7 +395,7 @@ int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double
int global_state;
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_DIST_ADAPTED) {
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
*child = *returned_comm;
deallocate_spawn_data(spawn_data);
......@@ -384,15 +403,3 @@ int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double
}
return global_state;
}
/*
* Permite a una reduccion merge asincrona
* de procesos que estaba a la espera de que la
* distribucion de los datos se completase continue.
*/
int check_merge_shrink_state() {
// FIXME Pasar como caso especial para evitar iteracion no necesaria
int global_state = MALL_SPAWN_PENDING;
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
return global_state;
}
......@@ -7,10 +7,11 @@
#include "../malleabilityDataStructures.h"
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, double *real_time);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time);
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents);
void unset_spawn_postpone_flag(int outside_state);
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result);
#endif
......@@ -18,7 +18,7 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink
if(data_state == MALL_DIST_COMPLETED) {
merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, spawn_data.myId);
local_state = MALL_DIST_ADAPTED;
local_state = MALL_SPAWN_ADAPTED;
} else {
local_state = MALL_SPAWN_ADAPT_POSTPONE;
}
......@@ -26,6 +26,7 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
MPI_Comm_get_parent(&intercomm);
is_children_group = intercomm == MPI_COMM_NULL ? 0:1;
baseline(spawn_data, child);
merge_adapt_expand(child, is_children_group);
local_state = MALL_SPAWN_COMPLETED;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment