Commit fca86a9b authored by iker_martin's avatar iker_martin
Browse files

Refactored Manager and CommDist for non-blocking communications and...

Refactored Manager and CommDist for non-blocking communications and MPI_Request usage. Two minor bug fixes. RMA asynchronous and Ibarrier strategy are the next steps to develop
parent 95b08697
...@@ -66,6 +66,7 @@ int main(int argc, char *argv[]) { ...@@ -66,6 +66,7 @@ int main(int argc, char *argv[]) {
set_benchmark_configuration(config_file); set_benchmark_configuration(config_file);
set_benchmark_results(results); set_benchmark_results(results);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss, set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs); config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
......
This diff is collapsed.
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
//#define MAL_USE_THREAD 3 //#define MAL_USE_THREAD 3
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm); int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
int async_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies); int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies); void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies);
......
...@@ -23,7 +23,7 @@ void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Cou ...@@ -23,7 +23,7 @@ void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Cou
get_util_ids(dist_data, numP_other, &idS); get_util_ids(dist_data, numP_other, &idS);
counts->idI = idS[0]; counts->idI = idS[0];
counts->idE = idS[0]; counts->idE = idS[1];
get_block_dist(n, idS[0], numP_other, &dist_target); // RMA Specific operation get_block_dist(n, idS[0], numP_other, &dist_target); // RMA Specific operation
counts->first_target_displs = dist_data.ini - dist_target.ini; // RMA Specific operation counts->first_target_displs = dist_data.ini - dist_target.ini; // RMA Specific operation
......
...@@ -437,7 +437,8 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async ...@@ -437,7 +437,8 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
for(i=0; i < data_struct->entries; i++) { for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_recv = NULL; aux_recv = NULL;
send_async(aux_send, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children, data_struct->requests, mall_conf->red_method, mall_conf->red_strategies); async_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
&(data_struct->requests[i]), &(data_struct->request_qty[i]));
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv; if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
} }
} else { } else {
...@@ -462,7 +463,8 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch ...@@ -462,7 +463,8 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
if(is_asynchronous) { if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) { for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_async(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_parents, mall_conf->red_method, mall_conf->red_strategies); async_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
&(data_struct->requests[i]), &(data_struct->request_qty[i]));
data_struct->arrays[i] = (void *) aux; data_struct->arrays[i] = (void *) aux;
} }
} else { } else {
...@@ -641,21 +643,16 @@ int start_redistribution() { ...@@ -641,21 +643,16 @@ int start_redistribution() {
* los hijos han terminado de recibir. * los hijos han terminado de recibir.
*/ */
int check_redistribution() { int check_redistribution() {
int is_intercomm, completed, all_completed, test_err; int is_intercomm, req_qty, completed, all_completed, test_err;
MPI_Request *req_completed; MPI_Request *req_completed;
//dist_a_data->requests[0][X] //FIXME Numero magico 0 -- Modificar para que sea un for?
//TODO Modificar a switch-case //FIXME Modificar para que sea un for
if (mall_conf->red_method == MALL_RED_POINT) { req_completed = dist_a_data->requests[0]; //FIXME Numero magico
test_err = MPI_Testall(mall->numC, dist_a_data->requests[0], &completed, MPI_STATUSES_IGNORE); req_qty = dist_a_data->request_qty[0]; //FIXME Numero magico
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { //FIXME Strategy not fully implemented
test_err = MPI_Test(&(req_completed[req_qty-1]), &completed, MPI_STATUS_IGNORE);
} else { } else {
if(mall_conf->red_method == MALL_RED_BASELINE) { test_err = MPI_Testall(req_qty, req_completed, &completed, MPI_STATUSES_IGNORE); //FIXME Numero magico
req_completed = &(dist_a_data->requests[0][0]);
} else if (mall_conf->red_method == MALL_RED_IBARRIER) { //FIXME No es un metodo
req_completed = &(dist_a_data->requests[0][1]);
}
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
} }
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) { if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
...@@ -663,12 +660,11 @@ int check_redistribution() { ...@@ -663,12 +660,11 @@ int check_redistribution() {
MPI_Abort(MPI_COMM_WORLD, test_err); MPI_Abort(MPI_COMM_WORLD, test_err);
} }
MPI_Wait(req_completed, MPI_STATUS_IGNORE); completed=1;
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm); MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
if(mall_conf->red_method == MALL_RED_IBARRIER) { //FIXME No es un metodo if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { //FIXME Strategy not fully implemented
MPI_Wait(&(dist_a_data->requests[0][0]), MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono MPI_Waitall(req_qty, req_completed, MPI_STATUSES_IGNORE);
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
} }
......
...@@ -147,8 +147,6 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size ...@@ -147,8 +147,6 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size
data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *)); data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *));
data_struct->arrays = (void **) malloc(size * sizeof(void *)); data_struct->arrays = (void **) malloc(size * sizeof(void *));
data_struct->request_ibarrier = MPI_REQUEST_NULL;
for(i=0; i<size; i++) { //calloc and memset does not ensure a NULL value for(i=0; i<size; i++) { //calloc and memset does not ensure a NULL value
data_struct->requests[i] = NULL; data_struct->requests[i] = NULL;
data_struct->arrays[i] = NULL; data_struct->arrays[i] = NULL;
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
typedef struct { typedef struct {
size_t entries; // Indica numero de vectores a comunicar (replicated data) size_t entries; // Indica numero de vectores a comunicar (replicated data)
size_t max_entries; size_t max_entries;
MPI_Request request_ibarrier; // Request para indicar que los padres esperan a que los hijos terminen de recibir
size_t *qty; // Indica numero de elementos en cada subvector de sync_array size_t *qty; // Indica numero de elementos en cada subvector de sync_array
int *types; int *types;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment