Commit 3acc36ff authored by iker_martin's avatar iker_martin
Browse files

WIP with some tests. Adapted check_redistribution to check all distributed...

WIP with some tests. Adapted check_redistribution to check all distributed data has been completed, not just one set. 4 Bugfixes when using the Merge shrink spawn method. States.h modified for more coherence with strategies values
parent 0e682fe6
...@@ -49,6 +49,7 @@ typedef struct { //FIXME numC_spawned no se esta usando ...@@ -49,6 +49,7 @@ typedef struct { //FIXME numC_spawned no se esta usando
MPI_Comm comm, thread_comm; MPI_Comm comm, thread_comm;
MPI_Comm intercomm; MPI_Comm intercomm;
MPI_Comm user_comm; MPI_Comm user_comm;
int dup_user_comm;
char *name_exec, *nodelist; char *name_exec, *nodelist;
int num_cpus, num_nodes, nodelist_len; int num_cpus, num_nodes, nodelist_len;
...@@ -84,6 +85,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex ...@@ -84,6 +85,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t)); rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t)); dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
mall->dup_user_comm = 0;
MPI_Comm_dup(comm, &dup_comm); MPI_Comm_dup(comm, &dup_comm);
MPI_Comm_dup(comm, &thread_comm); MPI_Comm_dup(comm, &thread_comm);
MPI_Comm_set_name(dup_comm, "MPI_COMM_MALL"); MPI_Comm_set_name(dup_comm, "MPI_COMM_MALL");
...@@ -223,6 +225,7 @@ int malleability_checkpoint() { ...@@ -223,6 +225,7 @@ int malleability_checkpoint() {
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) { if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start; mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
malleability_checkpoint();
} }
break; break;
...@@ -292,6 +295,12 @@ void set_children_number(int numC){ ...@@ -292,6 +295,12 @@ void set_children_number(int numC){
* TODO * TODO
*/ */
void get_malleability_user_comm(MPI_Comm *comm) { void get_malleability_user_comm(MPI_Comm *comm) {
if(mall->dup_user_comm) {
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm));
MPI_Comm_dup(mall->comm, &(mall->user_comm));
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
mall->dup_user_comm = 0;
}
*comm = mall->user_comm; *comm = mall->user_comm;
} }
...@@ -643,16 +652,21 @@ int start_redistribution() { ...@@ -643,16 +652,21 @@ int start_redistribution() {
* los hijos han terminado de recibir. * los hijos han terminado de recibir.
*/ */
int check_redistribution() { int check_redistribution() {
int is_intercomm, req_qty, completed, all_completed, test_err; int is_intercomm, req_qty, completed, local_completed, all_completed, test_err;
size_t i;
MPI_Request *req_completed; MPI_Request *req_completed;
local_completed = 1;
//FIXME Modificar para que sea un for
req_completed = dist_a_data->requests[0]; //FIXME Numero magico //FIXME Modificar para que se tenga en cuenta rep_a_data
req_qty = dist_a_data->request_qty[0]; //FIXME Numero magico for(i=0; i<dist_a_data->entries; i++) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { //FIXME Strategy not fully implemented req_completed = dist_a_data->requests[i];
test_err = MPI_Test(&(req_completed[req_qty-1]), &completed, MPI_STATUS_IGNORE); req_qty = dist_a_data->request_qty[i];
} else { if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { //FIXME Strategy not fully implemented
test_err = MPI_Testall(req_qty, req_completed, &completed, MPI_STATUSES_IGNORE); //FIXME Numero magico test_err = MPI_Test(&(req_completed[req_qty-1]), &completed, MPI_STATUS_IGNORE);
} else {
test_err = MPI_Testall(req_qty, req_completed, &completed, MPI_STATUSES_IGNORE);
}
local_completed = local_completed && completed;
} }
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) { if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
...@@ -660,7 +674,7 @@ int check_redistribution() { ...@@ -660,7 +674,7 @@ int check_redistribution() {
MPI_Abort(MPI_COMM_WORLD, test_err); MPI_Abort(MPI_COMM_WORLD, test_err);
} }
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm); MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { //FIXME Strategy not fully implemented if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { //FIXME Strategy not fully implemented
...@@ -750,21 +764,18 @@ int end_redistribution() { ...@@ -750,21 +764,18 @@ int end_redistribution() {
int shrink_redistribution() { int shrink_redistribution() {
double time_extra = MPI_Wtime(); double time_extra = MPI_Wtime();
//TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall_conf->config_file->n_stages);
zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall_conf->config_file->n_stages);
if(mall->myId < mall->numC) { if(mall->myId < mall->numC) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm)); if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso? mall->dup_user_comm = 1;
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm)); MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm)); MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD"); MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL"); MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
MPI_Comm_free(&(mall->intercomm)); MPI_Comm_free(&(mall->intercomm));
......
...@@ -14,8 +14,8 @@ enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE}; ...@@ -14,8 +14,8 @@ enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
#define MALL_SPAWN_SINGLE 3 #define MALL_SPAWN_SINGLE 3
enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA_LOCK, MALL_RED_RMA_LOCKALL, MALL_RED_IBARRIER}; enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA_LOCK, MALL_RED_RMA_LOCKALL, MALL_RED_IBARRIER};
//#define MALL_RED_IBARRIER 2 Agregar como estrategia y eliminar como método #define MALL_RED_THREAD 2
#define MALL_RED_THREAD 3 //#define MALL_RED_IBARRIER 3 Agregar como estrategia y eliminar como método
#define MALLEABILITY_ROOT 0 #define MALLEABILITY_ROOT 0
......
...@@ -40,13 +40,12 @@ void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int ...@@ -40,13 +40,12 @@ void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int
free(pids_counts); free(pids_counts);
free(pids_displs); free(pids_displs);
// FIXME No deberia estar aqui
// Needed to ensure iteration times are collected before suspending these processes
results_data *results = (results_data *) results_void;
compute_results_iter(results, myId, numP,root, comm);
compute_results_stages(results, myId, numP, n_stages, root, comm);
if(myId >= numC) { if(myId >= numC) {
// FIXME No deberia estar aqui
// Needed to ensure iteration times are collected before suspending these processes
results_data *results = (results_data *) results_void;
compute_results_iter(results, myId, numP,root, comm);
compute_results_stages(results, myId, numP, n_stages, root, comm);
zombies_suspend(); zombies_suspend();
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment