Commit d65df059 authored by iker_martin's avatar iker_martin
Browse files

WIP. Añadido nuevo método de comunicación asíncrona. Ahora es posible elegir...

WIP. Añadido nuevo método de comunicación asíncrona. Ahora es posible elegir entre 4 métodos (Por defecto se usa uno con hebras). Se ha añadido a la salida de configuracion el numero de bytes de comunicación en cada iteración de computo.
parent 197adfb1
...@@ -176,9 +176,11 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) { ...@@ -176,9 +176,11 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) {
state = start_redistribution(group->numS, comm_req); state = start_redistribution(group->numS, comm_req);
} else if(state == MAL_ASYNC_PENDING) { } else if(state == MAL_ASYNC_PENDING) {
state = thread_check(); if(config_file->aib == MAL_USE_THREAD) {
if(state == MAL_COMM_COMPLETED) end_redistribution(iter); state = thread_check(iter);
//state = check_redistribution(iter, comm_req); } else {
state = check_redistribution(iter, comm_req);
}
} }
return state; return state;
...@@ -225,7 +227,12 @@ int start_redistribution(int numS, MPI_Request **comm_req) { ...@@ -225,7 +227,12 @@ int start_redistribution(int numS, MPI_Request **comm_req) {
if(config_file->adr > 0) { if(config_file->adr > 0) {
results->async_start = MPI_Wtime(); results->async_start = MPI_Wtime();
return thread_creation(); if(config_file->aib == MAL_USE_THREAD) {
return thread_creation();
} else {
send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, group->numS, comm_req, config_file->aib);
return MAL_ASYNC_PENDING;
}
} }
return end_redistribution(0); return end_redistribution(0);
} }
...@@ -248,14 +255,14 @@ int thread_creation() { ...@@ -248,14 +255,14 @@ int thread_creation() {
* *
* El estado de la comunicación es devuelto al finalizar la función. * El estado de la comunicación es devuelto al finalizar la función.
*/ */
int thread_check() { int thread_check(int iter) {
if(group->commAsync == MAL_COMM_COMPLETED) { if(group->commAsync == MAL_COMM_COMPLETED) {
if(pthread_join(async_thread, NULL)) { if(pthread_join(async_thread, NULL)) {
printf("Error al esperar al hilo\n"); printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1); MPI_Abort(MPI_COMM_WORLD, -1);
return -2; return -2;
} }
return MAL_COMM_COMPLETED; return end_redistribution(iter);
} }
return MAL_ASYNC_PENDING; return MAL_ASYNC_PENDING;
...@@ -284,8 +291,8 @@ void* thread_async_work(void* void_arg) { ...@@ -284,8 +291,8 @@ void* thread_async_work(void* void_arg) {
* *
* Esta funcion permite dos modos de funcionamiento al comprobar si la * Esta funcion permite dos modos de funcionamiento al comprobar si la
* comunicacion asincrona ha terminado. * comunicacion asincrona ha terminado.
* Si se utiliza el modo "MAL_USE_NORMAL", se considera terminada cuando * Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera
* los padres terminan de enviar. * terminada cuando los padres terminan de enviar.
* Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando * Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
* los hijos han terminado de recibir. * los hijos han terminado de recibir.
*/ */
...@@ -293,13 +300,17 @@ int check_redistribution(int iter, MPI_Request **comm_req) { ...@@ -293,13 +300,17 @@ int check_redistribution(int iter, MPI_Request **comm_req) {
int completed, all_completed, test_err; int completed, all_completed, test_err;
MPI_Request *req_completed; MPI_Request *req_completed;
if(config_file->aib == MAL_USE_NORMAL) { if (config_file->aib == MAL_USE_POINT) {
req_completed = &(*comm_req)[0]; test_err = MPI_Testall(group->numS, *comm_req, &completed, MPI_STATUSES_IGNORE);
} else { // MAL_USE_IBARRIER } else {
req_completed = &(*comm_req)[1]; if(config_file->aib == MAL_USE_NORMAL) {
} req_completed = &(*comm_req)[0];
} else if (config_file->aib == MAL_USE_IBARRIER) {
req_completed = &(*comm_req)[1];
}
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
}
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) { if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", group->myId); printf("P%d aborting -- Test Async\n", group->myId);
MPI_Abort(MPI_COMM_WORLD, test_err); MPI_Abort(MPI_COMM_WORLD, test_err);
...@@ -309,7 +320,7 @@ int check_redistribution(int iter, MPI_Request **comm_req) { ...@@ -309,7 +320,7 @@ int check_redistribution(int iter, MPI_Request **comm_req) {
if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended
MPI_Wait(req_completed, MPI_STATUS_IGNORE); //MPI_Wait(req_completed, MPI_STATUS_IGNORE); TODO BORRAR??
if(config_file->aib == MAL_USE_IBARRIER) { if(config_file->aib == MAL_USE_IBARRIER) {
MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta
......
...@@ -28,6 +28,9 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num ...@@ -28,6 +28,9 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num
void send_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req); void send_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req); void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void send_async_point_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
// DIST FUNCTIONS // DIST FUNCTIONS
void get_dist(int qty, int id, int numP, struct Dist_data *dist_data); void get_dist(int qty, int id, int numP, struct Dist_data *dist_data);
void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts); void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
...@@ -196,7 +199,7 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num ...@@ -196,7 +199,7 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num
* El vector array no se modifica en esta funcion. * El vector array no se modifica en esta funcion.
*/ */
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) { int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
int rootBcast = MPI_PROC_NULL; int i, rootBcast = MPI_PROC_NULL;
int *idS = NULL; int *idS = NULL;
struct Counts counts; struct Counts counts;
struct Dist_data dist_data; struct Dist_data dist_data;
...@@ -216,12 +219,18 @@ int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm inte ...@@ -216,12 +219,18 @@ int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm inte
*comm_req[0] = MPI_REQUEST_NULL; *comm_req[0] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &(*comm_req[0])); send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &(*comm_req[0]));
} else { } else if (parents_wait == MAL_USE_IBARRIER){
*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request)); *comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
(*comm_req)[0] = MPI_REQUEST_NULL; (*comm_req)[0] = MPI_REQUEST_NULL;
(*comm_req)[1] = MPI_REQUEST_NULL; (*comm_req)[1] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &((*comm_req)[1])); send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &((*comm_req)[1]));
MPI_Ibarrier(intercomm, &((*comm_req)[0]) ); MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
} else if (parents_wait == MAL_USE_POINT){
*comm_req = (MPI_Request *) malloc(numP_child * sizeof(MPI_Request));
for(i=0; i<numP_child; i++){
(*comm_req)[i] = MPI_REQUEST_NULL;
}
send_async_point_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, *comm_req);
} }
freeCounts(&counts); freeCounts(&counts);
...@@ -242,15 +251,14 @@ int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm inte ...@@ -242,15 +251,14 @@ int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm inte
*/ */
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents, int parents_wait) { void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents, int parents_wait) {
int *idS = NULL; int *idS = NULL;
int wait_err; int wait_err, i;
struct Counts counts; struct Counts counts;
struct Dist_data dist_data; struct Dist_data dist_data;
MPI_Request comm_req, aux; MPI_Request *comm_req, aux;
// Obtener distribución para este hijo // Obtener distribución para este hijo
get_dist(qty, myId, numP, &dist_data); get_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char)); *array = malloc(dist_data.tamBl * sizeof(char));
//(*array)[dist_data.tamBl] = '\0';
dist_data.intercomm = intercomm; dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/ /* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
...@@ -258,9 +266,21 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in ...@@ -258,9 +266,21 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
recv_async_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, &comm_req); if(parents_wait == MAL_USE_POINT) {
comm_req = (MPI_Request *) malloc(numP_parents * sizeof(MPI_Request));
for(i=0; i<numP_parents; i++){
comm_req[i] = MPI_REQUEST_NULL;
}
recv_async_point_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, comm_req);
wait_err = MPI_Waitall(numP_parents, comm_req, MPI_STATUSES_IGNORE);
} else {
comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
*comm_req = MPI_REQUEST_NULL;
recv_async_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, comm_req);
wait_err = MPI_Wait(comm_req, MPI_STATUS_IGNORE);
}
wait_err = MPI_Wait(&comm_req, MPI_STATUS_IGNORE);
if(wait_err != MPI_SUCCESS) { if(wait_err != MPI_SUCCESS) {
MPI_Abort(MPI_COMM_WORLD, wait_err); MPI_Abort(MPI_COMM_WORLD, wait_err);
} }
...@@ -273,12 +293,15 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in ...@@ -273,12 +293,15 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in
//printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array); //printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
freeCounts(&counts); freeCounts(&counts);
free(idS); free(idS);
free(comm_req);
} }
/* /*
* Envia a los hijos un vector que es redistribuido a los procesos * Envia a los hijos un vector que es redistribuido a los procesos
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos. * del otro grupo se transmiten elementos.
*
* El envio se realiza a partir de una comunicación colectiva.
*/ */
void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req) { void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
int i; int i;
...@@ -298,10 +321,35 @@ void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, i ...@@ -298,10 +321,35 @@ void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, i
MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req); MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
} }
/*
* Envia a los hijos un vector que es redistribuido a los procesos
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos.
*
* El envio se realiza a partir de varias comunicaciones punto a punto.
*/
void send_async_point_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
int i;
// PREPARAR ENVIO DEL VECTOR
if(idI == 0) {
set_counts(0, numP_child, dist_data, counts.counts);
idI++;
MPI_Isend(array, counts.counts[0], MPI_CHAR, 0, 99, dist_data.intercomm, &(comm_req[0]));
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_child, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
}
}
/* /*
* Recibe de los padres un vector que es redistribuido a los procesos * Recibe de los padres un vector que es redistribuido a los procesos
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos. * del otro grupo se transmiten elementos.
*
* La recepcion se realiza a partir de una comunicacion colectiva.
*/ */
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) { void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
int i; int i;
...@@ -323,6 +371,30 @@ void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int nu ...@@ -323,6 +371,30 @@ void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int nu
free(aux); free(aux);
} }
/*
* Recibe de los padres un vector que es redistribuido a los procesos
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos.
*
* La recepcion se realiza a partir de varias comunicaciones punto a punto.
*/
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
int i;
// Ajustar los valores de recepcion
if(idI == 0) {
set_counts(0, numP_parents, dist_data, counts.counts);
idI++;
MPI_Isend(array, counts.counts[0], MPI_CHAR, 0, 99, dist_data.intercomm, &(comm_req[0]));
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_parents, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
MPI_Isend(array+counts.displs[i], counts.counts[0], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[0]));
}
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
}
/* /*
* ======================================================================================== * ========================================================================================
* ======================================================================================== * ========================================================================================
......
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
#define MAL_USE_NORMAL 0 #define MAL_USE_NORMAL 0
#define MAL_USE_IBARRIER 1 #define MAL_USE_IBARRIER 1
#define MAL_USE_POINT 2
#define MAL_USE_THREAD 3
int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child); int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child);
void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents); void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents);
......
...@@ -71,7 +71,7 @@ do ...@@ -71,7 +71,7 @@ do
for phy_dist in cpu node for phy_dist in cpu node
do do
for ibarrier_use in 0 #TODO Poner a 0 1 for ibarrier_use in 3 #TODO Poner a 0 1 2 3?
do do
i=$(($i + 1)) i=$(($i + 1))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment