Commit 4771566f authored by iker_martin's avatar iker_martin
Browse files

Hotfix - Error en la comprobacion de hilos single -- Se ha anyadido comentarios

parent bb81b914
......@@ -5,10 +5,7 @@
#include <unistd.h>
#include <sys/stat.h>
#include "computing_func.h"
//#include "../IOcodes/read_ini.h"
//#include "../IOcodes/results.h"
//#include "../malleability/ProcessDist.h"
#include "../malleability/CommDist.h" //TODO Refactor para que no haga falta
#include "../malleability/CommDist.h"
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
......@@ -65,7 +62,7 @@ int main(int argc, char *argv[]) {
init_group_struct(argv, argc, myId, numP);
im_child = init_malleability(myId, numP, ROOT, comm, argv[0]);
if(!im_child) {
if(!im_child) { //TODO REFACTOR Simplificar inicio
init_application();
set_benchmark_grp(group->grp);
......@@ -85,6 +82,8 @@ int main(int argc, char *argv[]) {
group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char));
}
// TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos
void *value = NULL;
malleability_get_data(&value, 0, 1, 1);
group->grp = *((int *)value);
......@@ -184,6 +183,7 @@ int work() {
iterate(matrix, config_file->matrix_tam, state);
iter++;
group->iter_start = iter;
if(iter == config_file->iters[group->grp+1]) indicate_ending_malleability(MAL_APP_ENDED);
}
state = malleability_checkpoint();
}
......
......@@ -8,7 +8,7 @@
#include <slurm/slurm.h>
#include "ProcessDist.h"
int commSlurm = MAL_NOT_STARTED;
int commState = MAL_NOT_STARTED;
struct Slurm_data *slurm_data;
pthread_t slurm_thread;
MPI_Comm *returned_comm;
......@@ -74,6 +74,7 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
int spawn_qty, already_created = 0;
slurm_data = malloc(sizeof(struct Slurm_data));
slurm_thread = pthread_self();
slurm_data->type_creation = type_creation;
slurm_data->spawn_is_single = spawn_is_single;
slurm_data->result_procs = numC;
......@@ -105,7 +106,7 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
free(slurm_data);
} else if(type_creation == COMM_SPAWN_PTHREAD || slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) {
commSlurm = MAL_SPAWN_PENDING;
commState = MAL_SPAWN_PENDING;
if((spawn_is_single && myId == root) || !spawn_is_single || (slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && numP > slurm_data->result_procs)) {
Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data));
......@@ -125,7 +126,7 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
}
}
return commSlurm;
return commState;
}
......@@ -133,14 +134,13 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread) {
int state=-10;
int check_slurm_comm(int myId, int root, int numP, int outside_state, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread) {
if(slurm_data->type_creation == COMM_SPAWN_PTHREAD || slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) {
if(!slurm_data->spawn_is_single || (slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && numP > slurm_data->result_procs)) {
MPI_Allreduce(&commSlurm, &state, 1, MPI_INT, MPI_MIN, comm);
if(state != MAL_SPAWN_COMPLETED) return state; // Continue only if asynchronous process creation has ended
int state=-10;
MPI_Allreduce(&commState, &state, 1, MPI_INT, MPI_MIN, comm);
if(state != MAL_SPAWN_COMPLETED && outside_state != MAL_APP_ENDED) return state; // Continue only if asynchronous process creation has ended
if(pthread_join(slurm_thread, NULL)) {
printf("Error al esperar al hilo\n");
......@@ -150,10 +150,14 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
*child = *returned_comm;
} else if (slurm_data->spawn_is_single) {
MPI_Bcast(&commSlurm, 1, MPI_INT, root, comm);
if(commSlurm == MAL_SPAWN_SINGLE_START) { // Non-root processes join root to finalize the spawn
commSlurm = MAL_SPAWN_SINGLE_PENDING;
MPI_Bcast(&commState, 1, MPI_INT, root, comm);
int app_terminated = (outside_state == MAL_APP_ENDED);
int terminate_spawn = (app_terminated && pthread_equal(pthread_self(), slurm_thread));
// Non-root processes join root to finalize the spawn
// They also must join if the application has ended its work
if(commState == MAL_SPAWN_SINGLE_START || terminate_spawn) {
commState = MAL_SPAWN_SINGLE_PENDING;
if(myId != root) {
Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data));
creation_data->argv = NULL;
......@@ -172,14 +176,22 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
}
}
if(commSlurm != MAL_SPAWN_COMPLETED) return commSlurm; // Continue only if asynchronous process creation has ended
printf("TEST commState %d\n",myId); fflush(stdout);
MPI_Barrier(comm);
if(myId == root) {printf("TEST preCHECK--------------------------------------------\n"); fflush(stdout);}
// Continue only if asynchronous process creation has ended or application does not have more work
if(commState != MAL_SPAWN_COMPLETED) return commState;
printf("TEST commState 2 ------------------------------------- Id%d\n",myId); fflush(stdout);
if(pthread_join(slurm_thread, NULL)) {
//Asegurar que los hilos han terminado
if(pthread_join(slurm_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
*child = *returned_comm;
MPI_Barrier(comm);
if(myId == root) {printf("TEST afterJoin %d\n", myId); fflush(stdout);}
} else {
printf("Error Check spawn: Configuracion invalida\n");
......@@ -187,7 +199,7 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
return -10;
}
} else {
return commSlurm;
return commState;
}
//Free memory
......@@ -196,8 +208,9 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
}
free(slurm_data->cmd);
free(slurm_data);
slurm_thread = pthread_self();
return commSlurm;
return commState;
}
/*
......@@ -229,7 +242,119 @@ void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm)
*intercomm = newintercomm;
}
//--------------TODO PRIVATE SPAWN TYPE FUNCTIONS---------------//
//--------------PRIVATE THREAD FUNCTIONS---------------//
/*
* Funcion llamada por un hilo para que este se encarge
* de configurar la creacion de un nuevo grupo de procesos.
*
* Una vez esta lista la configuracion y es posible crear los procesos
* se avisa al hilo maestro.
*/
void* thread_work(void* creation_data_arg) {
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
if(creation_data->myId == creation_data->root) {
processes_dist(creation_data->argv, creation_data->numP_childs, creation_data->already_created, creation_data->type_dist);
} else {
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
}
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
free(creation_data);
pthread_exit(NULL);
}
/*
* Funcion llamada por un hilo para que este finalize junto al proceso raiz
* la creacion de procesos single.
*
* Una vez se ha realizado la union con los hijos, se avisa al hilo maestro.
*/
void* thread_join_single(void* creation_data_arg) {
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
free(creation_data);
pthread_exit(NULL);
}
//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//
/*
* Funcion generica para la creacion de procesos. Obtiene la configuracion
* y segun esta, elige como deberian crearse los procesos.
*
* Cuando termina, modifica la variable global para indicar este cambio
*/
void generic_spawn(int myId, int root, int spawn_is_single, MPI_Comm *child, MPI_Comm comm) {
if(spawn_is_single) {
single_spawn_connection(myId, root, comm, child);
} else {
int rootBcast = MPI_PROC_NULL;
if(myId == root) rootBcast = MPI_ROOT;
create_processes(myId, root, child, comm);
MPI_Bcast(&spawn_is_single, 1, MPI_INT, rootBcast, *child);
}
commState = MAL_SPAWN_COMPLETED;
}
/*
* Crea un grupo de procesos segun la configuracion indicada por la funcion
* "processes_dist()".
*/
int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm) {
int spawn_err = MPI_Comm_spawn(slurm_data->cmd, MPI_ARGV_NULL, slurm_data->qty_procs, slurm_data->info, root, comm, child, MPI_ERRCODES_IGNORE);
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", slurm_data->qty_procs);
}
return spawn_err;
}
/*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root
*/
void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child){
char *port_name;
int auxiliar_conf = COMM_SPAWN_SINGLE;
MPI_Comm newintercomm;
if (myId == root) {
create_processes(myId, root, child, MPI_COMM_SELF);
MPI_Bcast(&auxiliar_conf, 1, MPI_INT, MPI_ROOT, *child);
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *child, MPI_STATUS_IGNORE);
commState = MAL_SPAWN_SINGLE_START; // Indicate other processes to join root to end spawn procedure
} else {
port_name = malloc(10);
}
//printf("TEST Connect %d\n", myId); fflush(stdout);
MPI_Comm_connect(port_name, MPI_INFO_NULL, root, comm, &newintercomm);
if(myId == root) {printf("TEST ConnectED %d\n", myId); fflush(stdout);}
if(myId == root)
MPI_Comm_free(child);
free(port_name);
*child = newintercomm;
}
//--------------PRIVATE MERGE TYPE FUNCTIONS---------------//
/*
* Se encarga de que el grupo de procesos resultante se
......@@ -276,51 +401,6 @@ void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId) {
*comm=new_comm;
}
//--------------PRIVATE SPAWN TYPE FUNCTIONS---------------//
/*
* Funcion llamada por un hilo para que este se encarge
* de configurar la creacion de un nuevo grupo de procesos.
*
* Una vez esta lista la configuracion y es posible crear los procesos
* se avisa al hilo maestro.
*/
void* thread_work(void* creation_data_arg) {
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
if(creation_data->myId == creation_data->root) {
processes_dist(creation_data->argv, creation_data->numP_childs, creation_data->already_created, creation_data->type_dist);
} else {
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
}
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
free(creation_data);
pthread_exit(NULL);
}
/*
* Funcion llamada por un hilo para que este finalize junto al proceso raiz
* la creacion de procesos single.
*
* Una vez se ha realizado la union con los hijos, se avisa al hilo maestro.
*/
void* thread_join_single(void* creation_data_arg) {
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
free(creation_data);
pthread_exit(NULL);
}
//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//
/*
* Configura la creacion de un nuevo grupo de procesos, reservando la memoria
* para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
......@@ -375,65 +455,6 @@ void processes_dist(char *argv, int numP_childs, int already_created, int type)
slurm_free_job_info_msg(j_info);
}
/*
TODO
*/
void generic_spawn(int myId, int root, int spawn_is_single, MPI_Comm *child, MPI_Comm comm) {
if(spawn_is_single) {
single_spawn_connection(myId, root, comm, child);
} else {
int rootBcast = MPI_PROC_NULL;
if(myId == root) rootBcast = MPI_ROOT;
create_processes(myId, root, child, comm);
MPI_Bcast(&spawn_is_single, 1, MPI_INT, rootBcast, *child);
}
commSlurm = MAL_SPAWN_COMPLETED;
}
/*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root
*/
void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child){
char *port_name;
int auxiliar_conf = COMM_SPAWN_SINGLE;
MPI_Comm newintercomm;
if (myId == root) {
create_processes(myId, root, child, MPI_COMM_SELF);
MPI_Bcast(&auxiliar_conf, 1, MPI_INT, MPI_ROOT, *child);
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *child, MPI_STATUS_IGNORE);
commSlurm = MAL_SPAWN_SINGLE_START; // Indicate other processes to join root to end spawn procedure
} else {
port_name = malloc(1);
}
MPI_Comm_connect(port_name, MPI_INFO_NULL, root, comm, &newintercomm);
if(myId == root)
MPI_Comm_free(child);
free(port_name);
*child = newintercomm;
}
/*
* Crea un grupo de procesos segun la configuracion indicada por la funcion
* "processes_dist()".
*/
int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm) {
int spawn_err = MPI_Comm_spawn(slurm_data->cmd, MPI_ARGV_NULL, slurm_data->qty_procs, slurm_data->info, root, comm, child, MPI_ERRCODES_IGNORE);
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", slurm_data->qty_procs);
}
return spawn_err;
}
/*
* Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
* cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
......
......@@ -6,7 +6,7 @@
#include "malleabilityStates.h"
int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child);
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread);
int check_slurm_comm(int myId, int root, int numP, int outside_state, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread);
void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm);
......
......@@ -49,6 +49,7 @@ typedef struct { //FIXME numC_spawned no se esta usando
} malleability_t;
int state = MAL_UNRESERVED; //FIXME Mover a otro lado
int outside_state = MAL_APP_EXECUTING;
malleability_config_t *mall_conf;
malleability_t *mall;
......@@ -59,7 +60,14 @@ malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;
/*
* TODO HACER
* Inicializa la reserva de memoria para el modulo de maleabilidad
* creando todas las estructuras necesarias y copias de comunicadores
* para no interferir en la aplicación.
*
* Si es llamada por un grupo de procesos creados de forma dinámica,
* inicializan la comunicacion con sus padres. En este caso, al terminar
* la comunicacion los procesos hijo estan preparados para ejecutar la
* aplicacion.
*/
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec) {
MPI_Comm dup_comm, thread_comm;
......@@ -100,6 +108,11 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
return MALLEABILITY_NOT_CHILDREN;
}
/*
* Elimina toda la memoria reservado por el modulo
* de maleabilidad y asegura que los zombies
* despierten si los hubiese.
*/
void free_malleability() {
free_malleability_data_struct(rep_s_data);
free_malleability_data_struct(rep_a_data);
......@@ -122,6 +135,15 @@ void free_malleability() {
state = MAL_UNRESERVED;
}
/*
* FIXME Deprecated -- Borrar si al final no es necesario
* -- Es para evitar casos asincronos donde la aplicacion ha terminado
* pero la maleabilidad sigue en curso
*/
void indicate_ending_malleability(int new_outside_state) {
outside_state = new_outside_state;
}
/*
* Se realiza el redimensionado de procesos por parte de los padres.
*
......@@ -152,10 +174,10 @@ int malleability_checkpoint() {
}
} else if(state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm);
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos
state = check_slurm_comm(mall->myId, mall->root, mall->numP, outside_state, &(mall->intercomm), mall->comm, mall->thread_comm);
if (state == MAL_SPAWN_COMPLETED) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos
state = start_redistribution();
}
......@@ -424,7 +446,7 @@ void Children_init() {
}
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN); //TODO Que valor se pasa?
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
......@@ -544,6 +566,7 @@ int check_redistribution() {
printf("P%d aborting -- Test Async\n", mall->myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
}
//FIXME No se tiene en cuenta el estado MAL_APP_ENDED
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MAL_DIST_PENDING; // Continue only if asynchronous send has ended
......@@ -660,6 +683,7 @@ int thread_check() {
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
if(all_completed != MAL_DIST_COMPLETED) return MAL_DIST_PENDING; // Continue only if asynchronous send has ended
//FIXME No se tiene en cuenta el estado MAL_APP_ENDED
if(pthread_join(mall->async_thread, NULL)) {
printf("Error al esperar al hilo\n");
......
......@@ -10,6 +10,7 @@
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec);
void free_malleability();
void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint();
void set_benchmark_grp(int grp);
......
//States
#define MAL_UNRESERVED -1
#define MAL_DENIED -2
#define MAL_ZOMBIE -3
......@@ -11,6 +12,9 @@
#define MAL_DIST_ADAPTED 7
#define MAL_APP_EXECUTING 0
#define MAL_APP_ENDED 1
// TODO Refactor
#define COMM_PHY_NODES 1
#define COMM_PHY_CPU 2
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment