Commit 436bcd12 authored by iker_martin's avatar iker_martin
Browse files

Se permite la creacion de procesos en segundo plano. Falta anyadirlo al fichero de config

parent 7daba8bb
...@@ -16,7 +16,7 @@ int work(); ...@@ -16,7 +16,7 @@ int work();
void Sons_init(); void Sons_init();
int checkpoint(int iter, int state, MPI_Request **comm_req); int checkpoint(int iter, int state, MPI_Request **comm_req);
void TC(int numS); int TC(int numS, int comm_type);
int start_redistribution(int numS, MPI_Request **comm_req); int start_redistribution(int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req); int check_redistribution(int iter, MPI_Request **comm_req);
int end_redistribution(int iter); int end_redistribution(int iter);
...@@ -135,7 +135,7 @@ int work() { ...@@ -135,7 +135,7 @@ int work() {
state = checkpoint(iter, state, &async_comm); state = checkpoint(iter, state, &async_comm);
iter = 0; iter = 0;
while(state == MAL_ASYNC_PENDING) { while(state == MAL_ASYNC_PENDING || state == COMM_IN_PROGRESS) {
iterate(matrix, config_file->matrix_tam, state); iterate(matrix, config_file->matrix_tam, state);
iter++; iter++;
state = checkpoint(iter, state, &async_comm); state = checkpoint(iter, state, &async_comm);
...@@ -167,13 +167,22 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) { ...@@ -167,13 +167,22 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) {
if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;} if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}
group->numS = config_file->procs[group->grp +1]; group->numS = config_file->procs[group->grp +1];
int comm_type = COMM_SPAWN_PTHREAD; // TODO Pasar a CONFIG
results->spawn_start = MPI_Wtime();
if(group->myId == ROOT) { printf("Malleability\n");} if(group->myId == ROOT) { printf("Malleability\n");}
TC(group->numS); state = TC(group->numS, comm_type);
results->spawn_time[group->grp] = MPI_Wtime() - results->spawn_start;
if (state == COMM_FINISHED){
state = start_redistribution(group->numS, comm_req);
}
} else if(state == COMM_IN_PROGRESS) { // Comprueba si el spawn ha terminado y comienza la redistribucion
state = check_slurm_comm(group->myId, ROOT, &(group->children));
if (state == COMM_FINISHED) {
results->spawn_time[group->grp] = MPI_Wtime() - results->spawn_start;
state = start_redistribution(group->numS, comm_req); state = start_redistribution(group->numS, comm_req);
}
} else if(state == MAL_ASYNC_PENDING) { } else if(state == MAL_ASYNC_PENDING) {
if(config_file->aib == MAL_USE_THREAD) { if(config_file->aib == MAL_USE_THREAD) {
...@@ -181,7 +190,6 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) { ...@@ -181,7 +190,6 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) {
} else { } else {
state = check_redistribution(iter, comm_req); state = check_redistribution(iter, comm_req);
} }
} }
return state; return state;
...@@ -189,18 +197,18 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) { ...@@ -189,18 +197,18 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) {
/* /*
* Se encarga de realizar la creacion de los procesos hijos. * Se encarga de realizar la creacion de los procesos hijos.
* Si se pide en segundo plano devuelve el estado actual.
*/ */
void TC(int numS){ int TC(int numS, int comm_type){
// Inicialización de la comunicación con SLURM // Inicialización de la comunicación con SLURM
int dist = config_file->phy_dist[group->grp +1]; int dist = config_file->phy_dist[group->grp +1];
init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, COMM_SPAWN_SERIAL); int comm_state;
// Esperar a que la comunicación y creación de procesos results->spawn_start = MPI_Wtime();
// haya finalizado comm_state = init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, comm_type, MPI_COMM_WORLD, &(group->children));
int test = -1; if(comm_type == COMM_SPAWN_SERIAL)
while(test != MPI_SUCCESS) { results->spawn_time[group->grp] = MPI_Wtime() - results->spawn_start;
test = check_slurm_comm(group->myId, ROOT, MPI_COMM_WORLD, &(group->children)); return comm_state;
}
} }
/* /*
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
int commSlurm = COMM_UNRESERVED; int commSlurm = COMM_UNRESERVED;
struct Slurm_data *slurm_data; struct Slurm_data *slurm_data;
pthread_t slurm_thread; pthread_t slurm_thread;
MPI_Comm *returned_comm;
struct Slurm_data { struct Slurm_data {
char *cmd; // Executable name char *cmd; // Executable name
...@@ -23,9 +24,11 @@ struct Slurm_data { ...@@ -23,9 +24,11 @@ struct Slurm_data {
struct Creation_data { struct Creation_data {
char **argv; char **argv;
int numP_childs, type_dist; int numP_childs, myId, root, type_dist;
MPI_Comm comm;
}; };
//--------------PRIVATE SPAWN TYPE DECLARATIONS---------------// //--------------PRIVATE SPAWN TYPE DECLARATIONS---------------//
void* thread_work(void* creation_data_arg); void* thread_work(void* creation_data_arg);
...@@ -60,16 +63,22 @@ void print_Info(MPI_Info info); ...@@ -60,16 +63,22 @@ void print_Info(MPI_Info info);
* *
* Si se pide en segundo plano, llamar a "check_slurm_comm()" comprobara si la configuracion para * Si se pide en segundo plano, llamar a "check_slurm_comm()" comprobara si la configuracion para
* crearlos esta lista, y si es asi, los crea. * crearlos esta lista, y si es asi, los crea.
*
* Devuelve el estado de el procedimiento. Si no devuelve "COMM_FINISHED", es necesario llamar a
* "check_slurm_comm()".
*/ */
int init_slurm_comm(char **argv, int myId, int numP, int root, int type_dist, int type_creation) { int init_slurm_comm(char **argv, int myId, int numP, int root, int type_dist, int type_creation, MPI_Comm comm, MPI_Comm *child) {
slurm_data = malloc(sizeof(struct Slurm_data)); slurm_data = malloc(sizeof(struct Slurm_data));
if(myId == root) {
slurm_data->type_creation = type_creation; slurm_data->type_creation = type_creation;
if(type_creation == COMM_SPAWN_SERIAL) { if(type_creation == COMM_SPAWN_SERIAL) {
if(myId == root) {
processes_dist(argv, numP, type_dist); processes_dist(argv, numP, type_dist);
}
create_processes(myId, root, child, comm);
free(slurm_data);
commSlurm = COMM_FINISHED; commSlurm = COMM_FINISHED;
} else if(type_creation == COMM_SPAWN_PTHREAD) { } else if(type_creation == COMM_SPAWN_PTHREAD) {
...@@ -78,43 +87,48 @@ int init_slurm_comm(char **argv, int myId, int numP, int root, int type_dist, in ...@@ -78,43 +87,48 @@ int init_slurm_comm(char **argv, int myId, int numP, int root, int type_dist, in
struct Creation_data *creation_data = malloc(sizeof(struct Creation_Data*)); struct Creation_data *creation_data = malloc(sizeof(struct Creation_Data*));
creation_data->argv = argv; creation_data->argv = argv;
creation_data->numP_childs = numP; creation_data->numP_childs = numP;
creation_data->myId = myId;
creation_data->root = root;
creation_data->type_dist = type_dist; creation_data->type_dist = type_dist;
creation_data->comm = comm;
if(pthread_create(&slurm_thread, NULL, thread_work, creation_data)) { if(pthread_create(&slurm_thread, NULL, thread_work, creation_data)) {
printf("Error al crear el hilo de contacto con SLURM\n"); printf("Error al crear el hilo de contacto con SLURM\n");
MPI_Abort(MPI_COMM_WORLD, -1); MPI_Abort(MPI_COMM_WORLD, -1);
return -1; return -1;
} }
}
} }
return 0; return commSlurm;
} }
/* /*
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista, * Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se crea un nuevo grupo de procesos con esa configuracion. * y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/ */
int check_slurm_comm(int myId, int root, MPI_Comm comm, MPI_Comm *child) { int check_slurm_comm(int myId, int root, MPI_Comm *child) {
int spawn_err = COMM_IN_PROGRESS; int state;
if(slurm_data->type_creation == COMM_SPAWN_PTHREAD) {
MPI_Allreduce(&commSlurm, &state, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
if(state != COMM_FINISHED) return state; // Continue only if asynchronous process creation has ended
} else {
return commSlurm;
}
if(myId == root && commSlurm == COMM_FINISHED && slurm_data->type_creation == COMM_SPAWN_PTHREAD) {
if(pthread_join(slurm_thread, NULL)) { if(pthread_join(slurm_thread, NULL)) {
printf("Error al esperar al hilo\n"); printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1); MPI_Abort(MPI_COMM_WORLD, -1);
return -2; return -10;
}
} }
MPI_Bcast(&commSlurm, 1, MPI_INT, root, comm); commSlurm = COMM_FINISHED;
*child = *returned_comm;
if(commSlurm == COMM_FINISHED) {
spawn_err = create_processes(myId, root, child, comm);
free(slurm_data); free(slurm_data);
}
return spawn_err; return commSlurm;
} }
//--------------PRIVATE SPAWN TYPE FUNCTIONS---------------// //--------------PRIVATE SPAWN TYPE FUNCTIONS---------------//
...@@ -128,11 +142,17 @@ int check_slurm_comm(int myId, int root, MPI_Comm comm, MPI_Comm *child) { ...@@ -128,11 +142,17 @@ int check_slurm_comm(int myId, int root, MPI_Comm comm, MPI_Comm *child) {
*/ */
void* thread_work(void* creation_data_arg) { void* thread_work(void* creation_data_arg) {
struct Creation_data *creation_data = (struct Creation_data*) creation_data_arg; struct Creation_data *creation_data = (struct Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
if(creation_data->myId == creation_data->root) {
//if(creation_data->myId == creation_data->root) { printf("WORKD SPAWN 1\n");} fflush(stdout);
processes_dist(creation_data->argv, creation_data->numP_childs, creation_data->type_dist); processes_dist(creation_data->argv, creation_data->numP_childs, creation_data->type_dist);
}
create_processes(creation_data->myId, creation_data->root, returned_comm, creation_data->comm);
commSlurm = COMM_FINISHED; commSlurm = COMM_FINISHED;
free(creation_data); //free(creation_data); //FIXME No se libera bien
pthread_exit(NULL); pthread_exit(NULL);
} }
...@@ -195,6 +215,7 @@ void processes_dist(char *argv[], int numP_childs, int type) { ...@@ -195,6 +215,7 @@ void processes_dist(char *argv[], int numP_childs, int type) {
* "processes_dist()". * "processes_dist()".
*/ */
int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm) { int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm) {
//if(myId == root) { printf("WORKD SPAWN 2.1 cmd=%s pr=%d\n", slurm_data->cmd, slurm_data->qty_procs);} fflush(stdout);
int spawn_err = MPI_Comm_spawn(slurm_data->cmd, MPI_ARGV_NULL, slurm_data->qty_procs, slurm_data->info, root, comm, child, MPI_ERRCODES_IGNORE); int spawn_err = MPI_Comm_spawn(slurm_data->cmd, MPI_ARGV_NULL, slurm_data->qty_procs, slurm_data->info, root, comm, child, MPI_ERRCODES_IGNORE);
if(spawn_err != MPI_SUCCESS) { if(spawn_err != MPI_SUCCESS) {
......
...@@ -14,5 +14,5 @@ ...@@ -14,5 +14,5 @@
#define COMM_SPAWN_SERIAL 0 #define COMM_SPAWN_SERIAL 0
#define COMM_SPAWN_PTHREAD 1 #define COMM_SPAWN_PTHREAD 1
int init_slurm_comm(char **argv, int myId, int numP, int root, int type_dist, int type_creation); int init_slurm_comm(char **argv, int myId, int numP, int root, int type_dist, int type_creation, MPI_Comm comm, MPI_Comm *child);
int check_slurm_comm(int myId, int root, MPI_Comm comm, MPI_Comm *child); int check_slurm_comm(int myId, int root, MPI_Comm *child);
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment