Commit 40c3cc1d authored by iker_martin's avatar iker_martin
Browse files

Added synchronous MPI-RMA redistributions. Works over Baseline and Merge...

Added synchronous MPI-RMA redistributions. Works over Baseline and Merge spawn, but has not been tested with all the strategies
parent 96107657
...@@ -7,6 +7,10 @@ ...@@ -7,6 +7,10 @@
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts); void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int comm_type);
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win);
//////////////////////////
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req); void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req); void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
...@@ -49,9 +53,9 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) { ...@@ -49,9 +53,9 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
* In the redistribution is differenciated parent group from the children and the values each group indicates can be * In the redistribution is differenciated parent group from the children and the values each group indicates can be
* different. * different.
* *
* - send (IN): Array with the data to send. This value can not be NULL. * - send (IN): Array with the data to send. This data can not be null for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data. * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* process receives data and is NULL, the behaviour is undefined. * If the process receives data and is NULL, the behaviour is undefined.
* - qty (IN): Sum of elements shared by all processes that will send data. * - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm". * - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using * - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
...@@ -63,22 +67,138 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) { ...@@ -63,22 +67,138 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
* *
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true... * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/ */
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, MPI_Comm comm) { int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm) {
int is_intercomm; int is_intercomm, aux_comm_used = 0;
struct Counts s_counts, r_counts; struct Counts s_counts, r_counts;
struct Dist_data dist_data;
MPI_Comm aux_comm = MPI_COMM_NULL;
/* PREPARE COMMUNICATION */ /* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm); MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts); prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
printf("P%d/%d Comm type=%d RMA_LOCK=%d RMA_All=%d\n", myId, numP, comm_type, MALL_RED_RMA_LOCK, MALL_RED_RMA_LOCKALL);
/* PERFORM COMMUNICATION */ /* PERFORM COMMUNICATION */
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm); switch(comm_type) {
case MALL_RED_RMA_LOCKALL:
case MALL_RED_RMA_LOCK:
if(is_children_group) {
get_block_dist(qty, myId, numP, &dist_data);
} else {
get_block_dist(qty, myId, numO, &dist_data);
}
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else { aux_comm = comm; }
sync_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, comm_type);
break;
case MALL_RED_POINT:
//TODO
case MALL_RED_BASELINE:
default:
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm);
break;
}
if(aux_comm_used) {
MPI_Comm_free(&aux_comm);
}
freeCounts(&s_counts); freeCounts(&s_counts);
freeCounts(&r_counts); freeCounts(&r_counts);
return 1; //FIXME In this case is always true... return 1; //FIXME In this case is always true...
} }
/*
* Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
* how data should be redistributed
*
* - send (IN): Array with the data to send. This value can not be NULL for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - tamBl (IN): How many elements are stored in the parameter "send".
* - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
* - comm_type (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
*
*/
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int comm_type) {
int aux_array_used;
MPI_Win win;
aux_array_used = 0;
if(send == NULL) {
tamBl = 1;
send = malloc(tamBl*sizeof(char));
aux_array_used = 1;
}
MPI_Win_create(send, (MPI_Aint)tamBl, sizeof(char), MPI_INFO_NULL, comm, &win);
switch(comm_type) {
case MALL_RED_RMA_LOCKALL:
sync_rma_lockall(recv, r_counts, win);
break;
case MALL_RED_RMA_LOCK:
sync_rma_lock(recv, r_counts, win);
break;
}
MPI_Win_free(&win);
if(aux_array_used) {
free(send);
send = NULL;
}
}
/*
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win) {
int i, target_displs;
target_displs = r_counts.first_target_displs;
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
MPI_Get(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win);
MPI_Win_unlock(i, win);
target_displs=0;
}
}
/*
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win) {
int i, target_displs;
target_displs = r_counts.first_target_displs;
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Get(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win);
target_displs=0;
}
MPI_Win_unlock_all(win);
}
//================================================================================ //================================================================================
//================================================================================ //================================================================================
//========================ASYNCHRONOUS FUNCTIONS================================== //========================ASYNCHRONOUS FUNCTIONS==================================
...@@ -305,10 +425,10 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr ...@@ -305,10 +425,10 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr
// Obtener distribución para este hijo // Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data); get_block_dist(qty, myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char)); *recv = malloc(dist_data.tamBl * sizeof(char));
//get_block_dist(qty, myId, numP, &dist_data); get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO, 1, "Children C"); print_counts(dist_data, r_counts->counts, r_counts->displs, numO, 1, "Children C");
} else { } else {
//get_block_dist(qty, myId, numP, &dist_data); get_block_dist(qty, myId, numP, &dist_data);
prepare_comm_alltoall(myId, numP, numO, qty, s_counts); prepare_comm_alltoall(myId, numP, numO, qty, s_counts);
if(is_intercomm) { if(is_intercomm) {
...@@ -322,9 +442,9 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr ...@@ -322,9 +442,9 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr
} else { } else {
mallocCounts(r_counts, numP); mallocCounts(r_counts, numP);
} }
//print_counts(dist_data, r_counts->counts, r_counts->displs, numP, 1, "Children P "); print_counts(dist_data, r_counts->counts, r_counts->displs, numP, 1, "Children P ");
} }
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO, 1, "Parents "); print_counts(dist_data, s_counts->counts, s_counts->displs, numO, 1, "Parents ");
} }
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
//#define MAL_USE_POINT 2 //#define MAL_USE_POINT 2
//#define MAL_USE_THREAD 3 //#define MAL_USE_THREAD 3
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, MPI_Comm comm); int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait); int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait); void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait);
......
...@@ -15,13 +15,18 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS); ...@@ -15,13 +15,18 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
*/ */
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts) { void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts) {
int i, *idS; int i, *idS;
struct Dist_data dist_data; struct Dist_data dist_data, dist_target;
mallocCounts(counts, numP_other); mallocCounts(counts, numP_other);
get_block_dist(n, myId, numP, &dist_data); get_block_dist(n, myId, numP, &dist_data);
get_util_ids(dist_data, numP_other, &idS); get_util_ids(dist_data, numP_other, &idS);
counts->idI = idS[0];
counts->idE = idS[0];
get_block_dist(n, idS[0], numP_other, &dist_target); // RMA Specific operation
counts->first_target_displs = dist_data.ini - dist_target.ini; // RMA Specific operation
if(idS[0] == 0) { if(idS[0] == 0) {
set_interblock_counts(0, numP_other, dist_data, counts->counts); set_interblock_counts(0, numP_other, dist_data, counts->counts);
idS[0]++; idS[0]++;
...@@ -196,8 +201,13 @@ void mallocCounts(struct Counts *counts, size_t numP) { ...@@ -196,8 +201,13 @@ void mallocCounts(struct Counts *counts, size_t numP) {
counts->displs = calloc(numP, sizeof(int)); counts->displs = calloc(numP, sizeof(int));
if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);} if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->zero_arr = calloc(numP, sizeof(int)); counts->zero_arr = calloc(numP, sizeof(int)); // TODO Deprecate
if(counts->zero_arr == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);} if(counts->zero_arr == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);} // TODO Deprecate
counts->len = numP;
counts->idI = -1;
counts->idE = -1;
counts->first_target_displs = -1;
} }
......
...@@ -18,6 +18,8 @@ struct Dist_data { ...@@ -18,6 +18,8 @@ struct Dist_data {
}; };
struct Counts { struct Counts {
int len, idI, idE;
int first_target_displs; // RMA. Indicates displacement for first target when performing a Get.
int *counts; int *counts;
int *displs; int *displs;
int *zero_arr; int *zero_arr;
......
...@@ -445,7 +445,7 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async ...@@ -445,7 +445,7 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
for(i=0; i < data_struct->entries; i++) { for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_recv = NULL; aux_recv = NULL;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm); sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->comm_type, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv; if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
} }
} }
...@@ -469,7 +469,7 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch ...@@ -469,7 +469,7 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
} else { } else {
for(i=0; i < data_struct->entries; i++) { for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm); sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->comm_type, mall->intercomm);
data_struct->arrays[i] = (void *) aux; data_struct->arrays[i] = (void *) aux;
} }
} }
......
...@@ -5,21 +5,6 @@ ...@@ -5,21 +5,6 @@
#include <stdlib.h> #include <stdlib.h>
//States //States
/*
#define MAL_UNRESERVED -1
#define MAL_DENIED -2
#define MAL_ZOMBIE -3
#define MAL_NOT_STARTED 0
#define MAL_SPAWN_PENDING 1
#define MAL_SPAWN_SINGLE_START 2
#define MAL_SPAWN_SINGLE_PENDING 3
#define MAL_SPAWN_ADAPT_POSTPONE 4
#define MAL_SPAWN_COMPLETED 5
#define MAL_DIST_PENDING 6
#define MAL_DIST_COMPLETED 7
#define MAL_DIST_ADAPTED 8
*/
#define MALL_DENIED -1 #define MALL_DENIED -1
enum mall_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING, enum mall_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING,
MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED, MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED,
...@@ -28,6 +13,10 @@ enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE}; ...@@ -28,6 +13,10 @@ enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
#define MALL_SPAWN_PTHREAD 2 #define MALL_SPAWN_PTHREAD 2
#define MALL_SPAWN_SINGLE 3 #define MALL_SPAWN_SINGLE 3
enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA_LOCK, MALL_RED_RMA_LOCKALL};
#define MAL_RED_IBARRIER 2
#define MAL_RED_THREAD 3
#define MALLEABILITY_ROOT 0 #define MALLEABILITY_ROOT 0
#define MAL_APP_EXECUTING 0 #define MAL_APP_EXECUTING 0
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment