Commit 95b08697 authored by iker_martin's avatar iker_martin
Browse files

Refactor for redistributing data with non-blocking funtions. Modified...

Refactor for redistributing data with non-blocking funtions. Modified redistributing data constants names to -RED-. Fixed potential bug over requests as the number of requests used was not clear. WIP over CommDist and Manager to adapt non blocking calls. Seems to be working fine right now but more testing is required.
parent ddf8100c
......@@ -7,17 +7,16 @@
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int comm_type);
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method);
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win);
//////////////////////////
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts *counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts *counts, MPI_Request *comm_req);
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts *counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts *counts, MPI_Request *comm_req);
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
/*
* Reserva memoria para un vector de hasta "qty" elementos.
* Los "qty" elementos se disitribuyen entre los "numP" procesos
......@@ -67,7 +66,7 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm) {
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, MPI_Comm comm) {
int is_intercomm, aux_comm_used = 0;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
......@@ -76,10 +75,9 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
printf("P%d/%d Comm type=%d RMA_LOCK=%d RMA_All=%d\n", myId, numP, comm_type, MALL_RED_RMA_LOCK, MALL_RED_RMA_LOCKALL);
/* PERFORM COMMUNICATION */
switch(comm_type) {
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
case MALL_RED_RMA_LOCK:
......@@ -92,7 +90,7 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else { aux_comm = comm; }
sync_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, comm_type);
sync_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, red_method);
break;
case MALL_RED_POINT:
......@@ -123,10 +121,10 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
* displacements.
* - tamBl (IN): How many elements are stored in the parameter "send".
* - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
* - comm_type (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
* - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
*
*/
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int comm_type) {
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) {
int aux_array_used;
MPI_Win win;
......@@ -138,7 +136,7 @@ void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Com
}
MPI_Win_create(send, (MPI_Aint)tamBl, sizeof(char), MPI_INFO_NULL, comm, &win);
switch(comm_type) {
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
sync_rma_lockall(recv, r_counts, win);
break;
......@@ -214,43 +212,40 @@ void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win) {
*
* El vector array no se modifica en esta funcion.
*/
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
int i;
int *idS = NULL;
struct Counts counts;
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies) {
int i, is_intercomm;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
dist_data.intercomm = intercomm;
// Create arrays which contains info about how many elements will be send to each created process
mallocCounts(&counts, numP_child);
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(intercomm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numP_child, MALLEABILITY_NOT_CHILDREN, is_intercomm, NULL, &s_counts, &r_counts);
// MAL_USE_THREAD sigue el camino sincrono
if(parents_wait == MAL_USE_NORMAL) {
if(red_method == MALL_RED_BASELINE) {
//*comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
*comm_req[0] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, numP_child, counts, &(*comm_req[0]));
send_async_arrays(dist_data, array, numP_child, &s_counts, &(*comm_req[0]));
} else if (parents_wait == MAL_USE_IBARRIER){
} else if (red_method == MALL_RED_IBARRIER){ //FIXME No es un metodo
//*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
*comm_req[0] = MPI_REQUEST_NULL;
*comm_req[1] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, numP_child, counts, &((*comm_req)[1]));
send_async_arrays(dist_data, array, numP_child, &s_counts, &((*comm_req)[1]));
MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
} else if (parents_wait == MAL_USE_POINT){
} else if (red_method == MALL_RED_POINT){
//*comm_req = (MPI_Request *) malloc(numP_child * sizeof(MPI_Request));
for(i=0; i<numP_child; i++){
(*comm_req)[i] = MPI_REQUEST_NULL;
}
send_async_point_arrays(dist_data, array, numP_child, counts, *comm_req);
} else if (parents_wait == MAL_USE_THREAD) { //TODO
send_async_point_arrays(dist_data, array, numP_child, &s_counts, *comm_req);
}
freeCounts(&counts);
free(idS);
freeCounts(&s_counts);
freeCounts(&r_counts);
return 1;
}
......@@ -265,8 +260,7 @@ int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int
* El argumento "parents_wait" sirve para indicar si se usará la versión en la los padres
* espera a que terminen de enviar, o en la que esperan a que los hijos acaben de recibir.
*/
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait) {
int *idS = NULL;
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies) {
int wait_err, i;
struct Counts counts;
struct Dist_data dist_data;
......@@ -278,39 +272,36 @@ void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, i
dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
mallocCounts(&counts, numP_parents);
//mallocCounts(&counts, numP_parents);
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
// MAL_USE_THREAD sigue el camino sincrono
if(parents_wait == MAL_USE_POINT) {
if(red_method == MALL_RED_POINT) {
comm_req = (MPI_Request *) malloc(numP_parents * sizeof(MPI_Request));
for(i=0; i<numP_parents; i++){
comm_req[i] = MPI_REQUEST_NULL;
}
recv_async_point_arrays(dist_data, *array, numP_parents, counts, comm_req);
recv_async_point_arrays(dist_data, *array, numP_parents, &counts, comm_req);
wait_err = MPI_Waitall(numP_parents, comm_req, MPI_STATUSES_IGNORE);
} else if (parents_wait == MAL_USE_NORMAL || parents_wait == MAL_USE_IBARRIER) {
} else if (red_method == MALL_RED_BASELINE || red_method == MALL_RED_IBARRIER) { //FIXME IBarrier no es un método
comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
*comm_req = MPI_REQUEST_NULL;
recv_async_arrays(dist_data, *array, numP_parents, counts, comm_req);
recv_async_arrays(dist_data, *array, numP_parents, &counts, comm_req);
wait_err = MPI_Wait(comm_req, MPI_STATUS_IGNORE);
} else if (parents_wait == MAL_USE_THREAD) { //TODO
}
if(wait_err != MPI_SUCCESS) {
MPI_Abort(MPI_COMM_WORLD, wait_err);
}
if(parents_wait == MAL_USE_IBARRIER) { //MAL USE IBARRIER END
if(red_method == MALL_RED_IBARRIER) { //MAL USE IBARRIER END //FIXME IBarrier no es un método
MPI_Ibarrier(intercomm, &aux);
MPI_Wait(&aux, MPI_STATUS_IGNORE); //Es necesario comprobar que la comunicación ha terminado para desconectar los grupos de procesos
}
//printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
freeCounts(&counts);
free(idS);
free(comm_req);
}
......@@ -321,12 +312,11 @@ void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, i
*
* El envio se realiza a partir de una comunicación colectiva.
*/
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts *counts, MPI_Request *comm_req) {
//prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, counts);
/* COMUNICACION DE DATOS */
MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
MPI_Ialltoallv(array, counts->counts, counts->displs, MPI_CHAR, NULL, counts->zero_arr, counts->zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
}
/*
......@@ -336,14 +326,14 @@ void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child,
*
* El envio se realiza a partir de varias comunicaciones punto a punto.
*/
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts *counts, MPI_Request *comm_req) {
int i;
// PREPARAR ENVIO DEL VECTOR
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, counts);
for(i=0; i<numP_child; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
if(counts.counts[0] != 0) {
MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
if(counts->counts[0] != 0) {
MPI_Isend(array+counts->displs[i], counts->counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
}
}
//print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
......@@ -356,15 +346,15 @@ void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_c
*
* La recepcion se realiza a partir de una comunicacion colectiva.
*/
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts *counts, MPI_Request *comm_req) {
char *aux = malloc(1);
// Ajustar los valores de recepcion
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, counts);
//print_counts(dist_data, counts->counts, counts->displs, numP_parents, 1, "Children");
/* COMUNICACION DE DATOS */
MPI_Ialltoallv(aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm, comm_req);
MPI_Ialltoallv(aux, counts->zero_arr, counts->zero_arr, MPI_CHAR, array, counts->counts, counts->displs, MPI_CHAR, dist_data.intercomm, comm_req);
free(aux);
}
......@@ -375,15 +365,15 @@ void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents
*
* La recepcion se realiza a partir de varias comunicaciones punto a punto.
*/
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts *counts, MPI_Request *comm_req) {
int i;
// Ajustar los valores de recepcion
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, counts);
for(i=0; i<numP_parents; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
if(counts.counts[0] != 0) {
MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
if(counts->counts[0] != 0) {
MPI_Irecv(array+counts->displs[i], counts->counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
}
}
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
......@@ -414,7 +404,6 @@ void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_p
* - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
struct Dist_data dist_data;
......@@ -449,42 +438,15 @@ print_counts(dist_data, s_counts->counts, s_counts->displs, numO, 1, "Parents ")
}
/*
* Obtiene para un proceso de un grupo a que rango procesos de
* otro grupo tiene que enviar o recibir datos.
* Función para obtener si entre las estrategias elegidas, se utiliza
* la estrategia pasada como segundo argumento.
*
* Devuelve el primer identificador y el último (Excluido) con el que
* comunicarse.
* Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso
* contrario.
*/
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
int idI, idE;
int tamOther = dist_data.qty / numP_other;
int remOther = dist_data.qty % numP_other;
// Indica el punto de corte del grupo de procesos externo que
// divide entre los procesos que tienen
// un tamaño tamOther + 1 y un tamaño tamOther
int middle = (tamOther + 1) * remOther;
// Calcular idI teniendo en cuenta si se comunica con un
// proceso con tamano tamOther o tamOther+1
if(middle > dist_data.ini) { // First subgroup (tamOther+1)
idI = dist_data.ini / (tamOther + 1);
} else { // Second subgroup (tamOther)
idI = ((dist_data.ini - middle) / tamOther) + remOther;
}
// Calcular idR teniendo en cuenta si se comunica con un
// proceso con tamano tamOther o tamOther+1
if(middle >= dist_data.fin) { // First subgroup (tamOther +1)
idE = dist_data.fin / (tamOther + 1);
idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
} else { // Second subgroup (tamOther)
idE = ((dist_data.fin - middle) / tamOther) + remOther;
idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
}
*idS = malloc(2 * sizeof(int));
(*idS)[0] = idI;
(*idS)[1] = idE;
int malleability_red_contains_strat(int comm_strategies, int strategy, int *result) {
int value = comm_strategies % strategy ? 0 : 1;
if(result != NULL) *result = value;
return value;
}
......@@ -18,9 +18,10 @@
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies);
void malloc_comm_array(char **array, int qty, int myId, int numP);
int malleability_red_contains_strat(int comm_strategies, int strategy, int *result);
#endif
......@@ -35,8 +35,8 @@ typedef struct {
int spawn_method;
int spawn_dist;
int spawn_strategies;
int comm_type;
int comm_threaded; //TODO Modificar uso para que tenga sentido comm_threaded -- Con comm_type se elige función MPI, con esto si es hilo o no
int red_method;
int red_strategies;
int grp;
configuration *config_file;
......@@ -206,7 +206,7 @@ int malleability_checkpoint() {
break;
case MALL_DIST_PENDING:
if(mall_conf->comm_type == MAL_USE_THREAD) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
state = thread_check();
} else {
state = check_redistribution();
......@@ -261,12 +261,12 @@ void get_benchmark_results(results_data **results) {
}
//-------------------------------------------------------------------------------------------------------------
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int comm_type, int comm_threaded) {
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
mall_conf->spawn_method = spawn_method;
mall_conf->spawn_strategies = spawn_strategies;
mall_conf->spawn_dist = spawn_dist;
mall_conf->comm_type = comm_type;
mall_conf->comm_threaded = comm_threaded;
mall_conf->red_method = red_method;
mall_conf->red_strategies = red_strategies;
}
/*
......@@ -306,24 +306,23 @@ void get_malleability_user_comm(MPI_Comm *comm) {
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_add_data(void *data, size_t total_qty, int type, int is_replicated, int is_constant) {
size_t total_reqs = 0;
if(is_constant) {
if(is_replicated) {
add_data(data, total_qty, type, 0, rep_s_data); //FIXME Numero magico
add_data(data, total_qty, type, total_reqs, rep_s_data);
} else {
add_data(data, total_qty, type, 0, dist_s_data); //FIXME Numero magico
add_data(data, total_qty, type, total_reqs, dist_s_data);
}
} else {
if(is_replicated) {
add_data(data, total_qty, type, 0, rep_a_data); //FIXME Numero magico || Un request?
add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
} else {
size_t total_reqs = 0;
if(mall_conf->comm_type == MAL_USE_NORMAL) {
if(mall_conf->red_method == MALL_RED_BASELINE) {
total_reqs = 1;
} else if(mall_conf->comm_type == MAL_USE_IBARRIER) {
} else if(mall_conf->red_method == MALL_RED_IBARRIER) { //TODO This is a strategy, not a method
total_reqs = 2;
} else if(mall_conf->comm_type == MAL_USE_POINT) {
} else if(mall_conf->red_method == MALL_RED_POINT) {
total_reqs = mall->numC;
}
......@@ -342,27 +341,27 @@ void malleability_add_data(void *data, size_t total_qty, int type, int is_replic
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int is_replicated, int is_constant) {
size_t total_reqs = 0;
if(is_constant) {
if(is_replicated) {
modify_data(data, index, total_qty, type, 0, rep_s_data); //FIXME Numero magico
modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
} else {
modify_data(data, index, total_qty, type, 0, dist_s_data); //FIXME Numero magico
modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
}
} else {
if(is_replicated) {
modify_data(data, index, total_qty, type, 0, rep_a_data); //FIXME Numero magico || UN request?
} else {
size_t total_reqs = 0;
if(mall_conf->comm_type == MAL_USE_NORMAL) {
modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
} else {
if(mall_conf->red_method == MALL_RED_BASELINE) {
total_reqs = 1;
} else if(mall_conf->comm_type == MAL_USE_IBARRIER) {
} else if(mall_conf->red_method == MALL_RED_IBARRIER) { //TODO This is a strategy, not a method
total_reqs = 2;
} else if(mall_conf->comm_type == MAL_USE_POINT) {
} else if(mall_conf->red_method == MALL_RED_POINT) {
total_reqs = mall->numC;
}
modify_data(data, index, total_qty, type, total_reqs, dist_a_data); //FIXME Numero magico
modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
}
}
}
......@@ -438,14 +437,14 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_recv = NULL;
send_async(aux_send, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children, data_struct->requests, mall_conf->comm_type);
send_async(aux_send, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children, data_struct->requests, mall_conf->red_method, mall_conf->red_strategies);
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_recv = NULL;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->comm_type, mall->intercomm);
sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
}
}
......@@ -463,13 +462,13 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_async(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_parents, mall_conf->comm_type);
recv_async(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_parents, mall_conf->red_method, mall_conf->red_strategies);
data_struct->arrays[i] = (void *) aux;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->comm_type, mall->intercomm);
sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
data_struct->arrays[i] = (void *) aux;
}
}
......@@ -501,7 +500,8 @@ void Children_init() {
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
comm_node_data(root_parents, MALLEABILITY_CHILDREN);
MPI_Bcast(&(mall_conf->comm_type), 1, MPI_INT, root_parents, mall->intercomm);
MPI_Bcast(&(mall_conf->red_method), 1, MPI_INT, root_parents, mall->intercomm);
MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, root_parents, mall->intercomm);
mall_conf->results = (results_data *) malloc(sizeof(results_data));
init_results_data(mall_conf->results, mall_conf->config_file->n_resizes, mall_conf->config_file->n_stages, RESULTS_INIT_DATA_QTY);
......@@ -509,15 +509,15 @@ void Children_init() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
if(mall_conf->comm_type == MAL_USE_NORMAL || mall_conf->comm_type == MAL_USE_IBARRIER || mall_conf->comm_type == MAL_USE_POINT) {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
} else if (mall_conf->comm_type == MAL_USE_THREAD) { //TODO Modificar uso para que tenga sentido comm_threaded
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
} else {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
}
mall_conf->results->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
......@@ -608,16 +608,18 @@ int start_redistribution() {
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
comm_node_data(rootBcast, MALLEABILITY_NOT_CHILDREN);
MPI_Bcast(&(mall_conf->comm_type), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall_conf->red_method), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, rootBcast, mall->intercomm);
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
//FIXME No se envian los datos replicados (rep_a_data)
mall_conf->results->async_time[mall_conf->grp] = MPI_Wtime();
if(mall_conf->comm_type == MAL_USE_THREAD) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
return thread_creation();
} else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
return MALL_DIST_PENDING;
return MALL_DIST_PENDING;
}
}
return end_redistribution();
......@@ -643,12 +645,13 @@ int check_redistribution() {
MPI_Request *req_completed;
//dist_a_data->requests[0][X] //FIXME Numero magico 0 -- Modificar para que sea un for?
if (mall_conf->comm_type == MAL_USE_POINT) {
//TODO Modificar a switch-case
if (mall_conf->red_method == MALL_RED_POINT) {
test_err = MPI_Testall(mall->numC, dist_a_data->requests[0], &completed, MPI_STATUSES_IGNORE);
} else {
if(mall_conf->comm_type == MAL_USE_NORMAL) {
if(mall_conf->red_method == MALL_RED_BASELINE) {
req_completed = &(dist_a_data->requests[0][0]);
} else if (mall_conf->comm_type == MAL_USE_IBARRIER) {
} else if (mall_conf->red_method == MALL_RED_IBARRIER) { //FIXME No es un metodo
req_completed = &(dist_a_data->requests[0][1]);
}
......@@ -660,11 +663,11 @@ int check_redistribution() {
MPI_Abort(MPI_COMM_WORLD, test_err);
}
MPI_Wait(req_completed, MPI_STATUS_IGNORE); completed=1;
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
if(mall_conf->comm_type == MAL_USE_IBARRIER) {
if(mall_conf->red_method == MALL_RED_IBARRIER) { //FIXME No es un metodo
MPI_Wait(&(dist_a_data->requests[0][0]), MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
......@@ -830,6 +833,7 @@ void def_nodeinfo_type(MPI_Datatype *node_type) {
//======================================================||
//======================================================||
int comm_state; //FIXME Usar un handler
/*
* Crea una hebra para ejecutar una comunicación en segundo plano.
......
......@@ -18,7 +18,7 @@ void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint();
void set_benchmark_grp(int grp);
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int comm_type, int comm_threaded);
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies);
void set_children_number(int numC); // TODO TO BE DEPRECATED
void get_malleability_user_comm(MPI_Comm *comm);
......
......@@ -13,22 +13,19 @@ enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
#define MALL_SPAWN_PTHREAD 2
#define MALL_SPAWN_SINGLE 3
enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA_LOCK, MALL_RED_RMA_LOCKALL};
#define MAL_RED_IBARRIER 2
#define MAL_RED_THREAD 3
enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA_LOCK, MALL_RED_RMA_LOCKALL, MALL_RED_IBARRIER};
//#define MALL_RED_IBARRIER 2 Agregar como estrategia y eliminar como método
#define MALL_RED_THREAD 3
#define MALLEABILITY_ROOT 0
#define MAL_APP_EXECUTING 0
#define MAL_APP_ENDED 1
#define MAL_USE_NORMAL 0
#define MAL_USE_IBARRIER 1
#define MAL_USE_POINT 2
#define MAL_USE_THREAD 3
//TODO DEPRECATE
#define MAL_INT 0
#define MAL_CHAR 1
////////////////
#define MALLEABILITY_CHILDREN 1
#define MALLEABILITY_NOT_CHILDREN 0
......
......@@ -32,6 +32,7 @@ void add_data(void *data, size_t total_qty, int type, size_t request_qty, mallea
data_struct->qty[data_struct->entries] = total_qty;
data_struct->types[data_struct->entries] = type;
data_struct->arrays[data_struct->entries] = data;
data_struct->request_qty[data_struct->entries] = request_qty;
if(request_qty) {
data_struct->requests[data_struct->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request));
......@@ -62,6 +63,7 @@ void modify_data(void *data, size_t index, size_t total_qty, int type, size_t re
data_struct->qty[index] = total_qty;
data_struct->types[index] = type;
data_struct->arrays[index] = data;
data_struct->request_qty[index] = request_qty;
if(request_qty) {
data_struct->requests[index] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request));
......@@ -80,7 +82,7 @@ void modify_data(void *data, size_t index, size_t total_qty, int type, size_t re
*/
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) {
int is_intercomm, rootBcast = MPI_PROC_NULL;
size_t i;
size_t i, j;
MPI_Datatype entries_type, struct_type;
......@@ -104,17 +106,19 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
MPI_Bcast(MPI_BOTTOM, 1, struct_type, rootBcast, intercomm);
if(is_children_group) {
/*
size_t request_qty = 1; // TODO Obtener desde la funcion
data_struct_rep->requests[data_struct_rep->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request));
data_struct_dist->requests[data_struct_dist->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request));
*/
for(i=0; i < data_struct_rep->entries; i++) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int
data_struct_rep->requests[i] = (MPI_Request *) malloc(data_struct_rep->request_qty[i] * sizeof(MPI_Request));
for(j=0; j < data_struct_rep->request_qty[i]; j++) {
data_struct_rep->requests[i][j] = MPI_REQUEST_NULL;
}
}
for(i=0; i < data_struct_dist->entries; i++) {
data_struct_dist->arrays[i] = (void *) NULL;
data_struct_dist->requests[i] = (MPI_Request *) malloc(data_struct_dist->request_qty[i] * sizeof(MPI_Request));
for(j=0; j < data_struct_dist->request_qty[i]; j++) {
data_struct_dist->requests[i][j] = MPI_REQUEST_NULL;
}
}
}
......@@ -134,13 +138,21 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
* "size" elementos.
*/
void init_malleability_data_struct(malleability_data_t *data_struct, size_t size) {
size_t i;
data_struct->max_entries = size;
data_struct->qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->types = (int *) malloc(size * sizeof(int));
data_struct->request_qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *));
data_struct->arrays = (void **) malloc(size * sizeof(void *));
data_struct->request_ibarrier = MPI_REQUEST_NULL;
for(i=0; i<size; i++) { //calloc and memset does not ensure a NULL value
data_struct->requests[i] = NULL;
data_struct->arrays[i] = NULL;
}
}
/*
......@@ -149,7 +161,7 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size
* a las ya existentes.
*/
void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t qty_to_add) {
size_t needed, *qty_aux;
size_t i, needed, *qty_aux, *request_qty_aux;
int *types_aux;
MPI_Request **requests_aux;
void **arrays_aux;
......@@ -157,29 +169,35 @@ void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t q
needed = data_struct->max_entries + qty_to_add;
qty_aux = (size_t *) realloc(data_struct->qty, needed * sizeof(int));
types_aux = (int *) realloc(data_struct->types, needed * sizeof(int));
request_qty_aux = (size_t *) realloc(data_struct->request_qty, needed * sizeof(int));
requests_aux = (MPI_Request **) realloc(data_struct->requests, needed * sizeof(MPI_Request *));
arrays_aux = (void **) realloc(data_struct->arrays, needed * sizeof(void *));
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL) {
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL || request_qty_aux == NULL) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria constante de datos a redistribuir/comunicar\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
for(i=data_struct->max_entries; i<needed; i++) { //calloc and memset does not ensure a NULL value
requests_aux[i] = NULL;
arrays_aux[i] = NULL;
}
data_struct->qty = qty_aux;
data_struct->types = types_aux;
data_struct->request_qty = request_qty_aux;
data_struct->requests = requests_aux;
data_struct->arrays = arrays_aux;
data_struct->max_entries = needed;
}
void free_malleability_data_struct(malleability_data_t *data_struct) {
size_t i, max;
size_t i, j, max;
max = data_struct->entries;
if(max != 0) {
for(i=0; i<max; i++) {
//free(data_struct->arrays[i]); //FIXME Valores alojados con 1 elemento no se liberan?
//free(data_struct->requests[i]); //TODO Plantear como crearlos
}
if(data_struct->qty != NULL) {
......@@ -188,9 +206,22 @@ void free_malleability_data_struct(malleability_data_t *data_struct) {
if(data_struct->types != NULL) {
free(data_struct->types);
}
if(data_struct->requests != NULL) {
free(data_struct->requests);
if(data_struct->requests != NULL && data_struct->request_qty != NULL) {
for(i=0; i<max; i++) {
if(data_struct->requests[i] != NULL) {
for(j=0; j<data_struct->request_qty[i]; j++) {
if(data_struct->requests[i][j] != MPI_REQUEST_NULL) {
MPI_Request_free(&(data_struct->requests[i][j]));
data_struct->requests[i][j] = MPI_REQUEST_NULL;
}
}
free(data_struct->requests[i]);
}
}
free(data_struct->request_qty);
free(data_struct->requests);
}
if(data_struct->arrays != NULL) {
free(data_struct->arrays);
}
......@@ -232,20 +263,22 @@ void def_malleability_entries(malleability_data_t *data_struct_rep, malleability
* TODO Refactor?
*/
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 4;
int counts = 6;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
types[0] = types[2] = MPI_UNSIGNED_LONG;
types[1] = types[3] = MPI_INT;
blocklengths[0] = blocklengths[1] = data_struct_rep->entries;
blocklengths[2] = blocklengths[3] = data_struct_dist->entries;
types[0] = types[1] = types[3] = types[4] = MPI_UNSIGNED_LONG;
types[2] = types[5] = MPI_INT;
blocklengths[0] = blocklengths[1] = blocklengths[2] = data_struct_rep->entries;
blocklengths[3] = blocklengths[4] = blocklengths[5] = data_struct_dist->entries;
MPI_Get_address((data_struct_rep->qty), &displs[0]);
MPI_Get_address((data_struct_rep->types), &displs[1]);
MPI_Get_address((data_struct_dist->qty), &displs[2]);
MPI_Get_address((data_struct_dist->types), &displs[3]);
MPI_Get_address((data_struct_rep->request_qty), &displs[1]);
MPI_Get_address((data_struct_rep->types), &displs[2]);
MPI_Get_address((data_struct_dist->qty), &displs[3]);
MPI_Get_address((data_struct_dist->request_qty), &displs[4]);
MPI_Get_address((data_struct_dist->types), &displs[5]);
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type);
......
......@@ -19,6 +19,7 @@ typedef struct {
// Vector de vectores de request. En cada elemento superior se indican los requests a comprobar para dar por finalizada
// la comunicacion de ese dato
size_t *request_qty;
MPI_Request **requests;
void **arrays; // Cada subvector es una serie de datos a comunicar
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment