Commit 07aa4fe0 authored by iker_martin's avatar iker_martin
Browse files

Merge Shrink funcional. Falta realizar pruebas completas

parent 5cf7c034
...@@ -113,11 +113,9 @@ int main(int argc, char *argv[]) { ...@@ -113,11 +113,9 @@ int main(int argc, char *argv[]) {
group->grp = group->grp + 1; group->grp = group->grp + 1;
set_benchmark_grp(group->grp); set_benchmark_grp(group->grp);
get_malleability_user_comm(&comm); if(group->grp != 0) {
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
if(group->grp != 0)
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
}
if(config_file->n_resizes != group->grp + 1) { //TODO Llevar a otra funcion if(config_file->n_resizes != group->grp + 1) { //TODO Llevar a otra funcion
set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], config_file->at, -1); set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], config_file->at, -1);
...@@ -137,6 +135,10 @@ int main(int argc, char *argv[]) { ...@@ -137,6 +135,10 @@ int main(int argc, char *argv[]) {
res = work(); res = work();
if(res == MALL_ZOMBIE) break; if(res == MALL_ZOMBIE) break;
get_malleability_user_comm(&comm);
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
print_local_results(); print_local_results();
reset_results_index(results); reset_results_index(results);
} while(config_file->n_resizes > group->grp + 1 && config_file->sm == MALL_SPAWN_MERGE); } while(config_file->n_resizes > group->grp + 1 && config_file->sm == MALL_SPAWN_MERGE);
......
...@@ -25,6 +25,8 @@ int thread_creation(); ...@@ -25,6 +25,8 @@ int thread_creation();
int thread_check(); int thread_check();
void* thread_async_work(); void* thread_async_work();
void print_comms_state();
typedef struct { typedef struct {
int spawn_method; int spawn_method;
int spawn_dist; int spawn_dist;
...@@ -215,6 +217,7 @@ int malleability_checkpoint() { ...@@ -215,6 +217,7 @@ int malleability_checkpoint() {
case MALL_SPAWN_ADAPTED: case MALL_SPAWN_ADAPTED:
state = shrink_redistribution(); state = shrink_redistribution();
malleability_checkpoint();
break; break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo? case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
...@@ -692,9 +695,8 @@ int shrink_redistribution() { ...@@ -692,9 +695,8 @@ int shrink_redistribution() {
double time_extra = MPI_Wtime(); double time_extra = MPI_Wtime();
//TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos //TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos
zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall->intercomm); zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results);
printf("HELLO THERE\n");
if(mall->myId < mall->numC) { if(mall->myId < mall->numC) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm)); if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
...@@ -708,13 +710,14 @@ int shrink_redistribution() { ...@@ -708,13 +710,14 @@ int shrink_redistribution() {
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL"); MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER"); MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
MPI_Comm_free(&(mall->intercomm));
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_extra; mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_extra;
if(malleability_spawn_contains_strat(mall_conf->spawn_strategies,MALL_SPAWN_PTHREAD, NULL)) { if(malleability_spawn_contains_strat(mall_conf->spawn_strategies,MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_real_time[mall_conf->grp] += MPI_Wtime() - time_extra; mall_conf->results->spawn_real_time[mall_conf->grp] += MPI_Wtime() - time_extra;
} }
return MALL_DIST_COMPLETED; return MALL_DIST_COMPLETED;
} else { } else {
printf("P%d is a zombie\n", mall->myId);
return MALL_ZOMBIE; return MALL_ZOMBIE;
} }
} }
...@@ -774,3 +777,23 @@ void* thread_async_work() { ...@@ -774,3 +777,23 @@ void* thread_async_work() {
state = MALL_DIST_COMPLETED; state = MALL_DIST_COMPLETED;
pthread_exit(NULL); pthread_exit(NULL);
} }
//==============================================================================
/*
* Muestra por pantalla el estado actual de todos los comunicadores
*/
void print_comms_state() {
int tester;
char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));
MPI_Comm_get_name(mall->comm, test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
MPI_Comm_get_name(mall->user_comm, test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->user_comm, test);
if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_get_name(mall->intercomm, test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
}
free(test);
}
...@@ -18,7 +18,7 @@ int offset_pids, *pids = NULL; ...@@ -18,7 +18,7 @@ int offset_pids, *pids = NULL;
void gestor_usr2() {} void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, MPI_Comm user_comm) { void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void) {
int pid = getpid(); int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int)); int *pids_counts = malloc(numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int)); int *pids_displs = malloc(numP * sizeof(int));
...@@ -41,10 +41,12 @@ void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int ...@@ -41,10 +41,12 @@ void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int
free(pids_counts); free(pids_counts);
free(pids_displs); free(pids_displs);
if(myId >= numC) { // FIXME No deberia estar aqui
// Needed to ensure iteration times are collected before suspending these processes // Needed to ensure iteration times are collected before suspending these processes
results_data *results = (results_data *) results_void; results_data *results = (results_data *) results_void;
compute_results_iter(results, myId, root, user_comm); compute_results_iter(results, myId, root, comm);
if(myId >= numC) {
zombies_suspend(); zombies_suspend();
} }
} }
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
//#include <slurm/slurm.h> //#include <slurm/slurm.h>
#include <signal.h> #include <signal.h>
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, MPI_Comm user_comm); void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void);
void zombies_service_init(); void zombies_service_init();
void zombies_service_free(); void zombies_service_free();
void zombies_awake(); void zombies_awake();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment