Commit bb81b914 authored by iker_martin's avatar iker_martin
Browse files

Hotfix en el spawn single con hilos

parent 45bf2b43
......@@ -179,13 +179,12 @@ int work() {
state = malleability_checkpoint();
iter = 0;
while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING) {
while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) {
if(iter < config_file->iters[group->grp+1]) {
iterate(matrix, config_file->matrix_tam, state);
iter++;
group->iter_start = iter;
}
state = malleability_checkpoint();
}
......@@ -227,7 +226,7 @@ void iterate(double *matrix, int n, int async_comm) {
actual_time = MPI_Wtime(); // Guardar tiempos
// TODO Que diferencie entre ambas en el IO
if(async_comm == MAL_DIST_PENDING || async_comm == MAL_SPAWN_PENDING) { // Se esta realizando una redistribucion de datos asincrona
if(async_comm == MAL_DIST_PENDING || async_comm == MAL_SPAWN_PENDING || async_comm == MAL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
operations=0;
}
......
......@@ -33,6 +33,7 @@ typedef struct {
//--------------PRIVATE SPAWN TYPE DECLARATIONS---------------//
void* thread_work(void* creation_data_arg);
void* thread_join_single(void* creation_data_arg);
//--------------PRIVATE DECLARATIONS---------------//
void processes_dist(char *argv, int numP_childs, int already_created, int type_dist);
......@@ -151,22 +152,35 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
} else if (slurm_data->spawn_is_single) {
MPI_Bcast(&commSlurm, 1, MPI_INT, root, comm);
state = commSlurm;
if(state != MAL_SPAWN_SINGLE_PENDING) return state; // Continue only if asynchronous process creation has ended
if(myId == root) {
if(pthread_join(slurm_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
*child = *returned_comm;
} else {
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
generic_spawn(myId, root, slurm_data->spawn_is_single, child, comm_thread);
if(commSlurm == MAL_SPAWN_SINGLE_START) { // Non-root processes join root to finalize the spawn
commSlurm = MAL_SPAWN_SINGLE_PENDING;
if(myId != root) {
Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data));
creation_data->argv = NULL;
creation_data->numP_childs = -1;
creation_data->already_created = -1;
creation_data->myId = myId;
creation_data->root = root;
creation_data->type_dist = -1;
creation_data->comm = comm_thread;
if(pthread_create(&slurm_thread, NULL, thread_join_single, (void *)creation_data)) {
printf("Error al crear el hilo de apoyo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
}
}
if(commSlurm != MAL_SPAWN_COMPLETED) return commSlurm; // Continue only if asynchronous process creation has ended
if(pthread_join(slurm_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
*child = *returned_comm;
} else {
printf("Error Check spawn: Configuracion invalida\n");
MPI_Abort(MPI_COMM_WORLD, -1);
......@@ -288,6 +302,23 @@ void* thread_work(void* creation_data_arg) {
pthread_exit(NULL);
}
/*
* Funcion llamada por un hilo para que este finalize junto al proceso raiz
* la creacion de procesos single.
*
* Una vez se ha realizado la union con los hijos, se avisa al hilo maestro.
*/
void* thread_join_single(void* creation_data_arg) {
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
free(creation_data);
pthread_exit(NULL);
}
//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//
/*
......@@ -375,7 +406,7 @@ void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child)
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *child, MPI_STATUS_IGNORE);
commSlurm = MAL_SPAWN_SINGLE_PENDING;
commSlurm = MAL_SPAWN_SINGLE_START; // Indicate other processes to join root to end spawn procedure
} else {
port_name = malloc(1);
}
......
......@@ -151,7 +151,7 @@ int malleability_checkpoint() {
state = start_redistribution();
}
} else if(state == MAL_SPAWN_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
} else if(state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm);
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos
if (state == MAL_SPAWN_COMPLETED) {
......
......@@ -3,11 +3,12 @@
#define MAL_ZOMBIE -3
#define MAL_NOT_STARTED 0
#define MAL_SPAWN_PENDING 1
#define MAL_SPAWN_SINGLE_PENDING 2
#define MAL_SPAWN_COMPLETED 3
#define MAL_DIST_PENDING 4
#define MAL_DIST_COMPLETED 5
#define MAL_DIST_ADAPTED 6
#define MAL_SPAWN_SINGLE_START 2
#define MAL_SPAWN_SINGLE_PENDING 3
#define MAL_SPAWN_COMPLETED 4
#define MAL_DIST_PENDING 5
#define MAL_DIST_COMPLETED 6
#define MAL_DIST_ADAPTED 7
// TODO Refactor
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment