Commit a539c1cc authored by iker_martin's avatar iker_martin
Browse files

Anadida maleabilidad asincrona y nuevo parametro para indicar como realizaran...

Anadida maleabilidad asincrona y nuevo parametro para indicar como realizaran la espera de transmision de datos asincronos los padres
parent 90aa227d
......@@ -37,6 +37,8 @@ static int handler(void* user, const char* section, const char* name,
pconfig->sdr = atoi(value);
} else if (MATCH("general", "ADR")) {
pconfig->adr = atoi(value);
} else if (MATCH("general", "AIB")) {
pconfig->aib = atoi(value);
} else if (MATCH("general", "time")) {
pconfig->general_time = atof(value);
......@@ -128,11 +130,11 @@ void free_config(configuration *user_config) {
* Imprime por salida estandar toda la informacion que contiene
* la configuracion pasada como argumento
*/
void print_config(configuration *user_config) {
void print_config(configuration *user_config, int numP) {
if(user_config != NULL) {
int i;
printf("Config loaded: resizes=%d, matrix=%d, sdr=%d, adr=%d, time=%f\n",
user_config->resizes, user_config->matrix_tam, user_config->sdr, user_config->adr, user_config->general_time);
printf("Config loaded: resizes=%d, matrix=%d, sdr=%d, adr=%d, time=%f || NUMP=%d\n",
user_config->resizes, user_config->matrix_tam, user_config->sdr, user_config->adr, user_config->general_time, numP);
for(i=0; i<user_config->resizes; i++) {
printf("Resize %d: Iters=%d, Procs=%d, Factors=%f, Phy=%d\n",
i, user_config->iters[i], user_config->procs[i], user_config->factors[i], user_config->phy_dist[i]);
......@@ -221,14 +223,14 @@ configuration *recv_config_file(int root, MPI_Comm intercomm) {
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 6;
int blocklengths[6] = {1, 1, 1, 1, 1, 1};
int i, counts = 7;
int blocklengths[7] = {1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = MPI_INT;
types[5] = MPI_FLOAT;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_INT;
types[6] = MPI_FLOAT;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
......@@ -238,7 +240,8 @@ void def_struct_config_file(configuration *config_file, MPI_Datatype *config_typ
MPI_Get_address(&(config_file->matrix_tam), &displs[2]);
MPI_Get_address(&(config_file->sdr), &displs[3]);
MPI_Get_address(&(config_file->adr), &displs[4]);
MPI_Get_address(&(config_file->general_time), &displs[5]);
MPI_Get_address(&(config_file->aib), &displs[5]);
MPI_Get_address(&(config_file->general_time), &displs[6]);
for(i=0;i<counts;i++) displs[i] -= dir;
......
......@@ -8,6 +8,7 @@ typedef struct
int resizes;
int actual_resize;
int matrix_tam, sdr, adr;
int aib;
float general_time;
int *iters, *procs, *phy_dist;
......@@ -17,7 +18,7 @@ typedef struct
configuration *read_ini_file(char *file_name);
void free_config(configuration *user_config);
void print_config(configuration *user_config);
void print_config(configuration *user_config, int numP);
// MPI Intercomm functions
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm);
......
......@@ -10,8 +10,10 @@
int work();
void Sons_init();
int checkpoint(int iter);
int checkpoint(int iter, int state, MPI_Request **comm_req);
void TC(int numS);
int start_redistribution(int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req);
void iterate(double *matrix, int n);
void computeMatrix(double *matrix, int n);
......@@ -21,11 +23,12 @@ typedef struct {
int myId;
int numP;
int grp;
int iter_start;
MPI_Comm children, parents;
char **argv;
char *sync_array;
char *sync_array, *async_array;
} group_data;
configuration *config_file;
......@@ -42,6 +45,7 @@ int main(int argc, char *argv[]) {
group->myId = myId;
group->numP = numP;
group->grp = 0;
group->iter_start = 0;
group->argv = argv;
MPI_Comm_get_parent(&(group->parents));
......@@ -52,20 +56,22 @@ int main(int argc, char *argv[]) {
if(config_file->sdr > 0) {
malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
}
if(config_file->adr > 0) {
malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
}
}
if(myId== ROOT) print_config(config_file);
if(myId== ROOT) print_config(config_file, numP);
int res = work();
if(res) { // Ultimo set de procesos comprueba resultados
if(res) { // Ultimo set de procesos muestra resultados
//RESULTADOS
}
// free_config(config_file); //FIXME DESCOMENTAR
// free(group->sync_array);
// free(group);
free_config(config_file);
free(group->sync_array);
free(group->async_array);
free(group);
MPI_Finalize();
return 0;
......@@ -83,26 +89,25 @@ int main(int argc, char *argv[]) {
* comunicacion asincrona y realizar entonces la sincrona.
*/
int work() {
int iter, maxiter;
int iter, maxiter, state;
double *matrix;
MPI_Request *async_comm;
maxiter = config_file->iters[group->grp];
initMatrix(&matrix, config_file->matrix_tam);
for(iter=0; iter < maxiter; iter++) {
for(iter=group->iter_start; iter < maxiter; iter++) {
iterate(matrix, config_file->matrix_tam);
}
checkpoint(iter);
state = checkpoint(iter, MAL_COMM_UNINITIALIZED, &async_comm);
/*
iter = 0
while(maxiter) { //FIXME AÑADIR VALOR
iter = 0;
while(state == MAL_ASYNC_PENDING) {
iterate(matrix, config_file->matrix_tam);
iter++;
//check_async(iter);
state = checkpoint(iter, state, &async_comm);
}
*/
return 0;
}
......@@ -115,43 +120,28 @@ int work() {
*
* Si hay datos asincronos a transmitir, primero se comienza a
* transmitir estos y se termina la funcion. Se tiene que comprobar con
* la funcion "??" que se han terminado de enviar //TODO
* llamando a la función de nuevo que se han terminado de enviar //TODO
*
* Si hay ademas datos sincronos a enviar, no se envian aun.
*
* Si solo hay datos sincronos se envian tras la creacion de los procesos
* y finalmente se desconectan los dos grupos de procesos.
*/
int checkpoint(int iter) {
int checkpoint(int iter, int state, MPI_Request **comm_req) {
if(state == MAL_COMM_UNINITIALIZED) {
// Comprobar si se tiene que realizar un redimensionado
if(config_file->iters[group->grp] < iter) {return 0;}
if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}
int numS = config_file->procs[group->grp +1];
TC(numS);
state = start_redistribution(numS, comm_req);
int rootBcast = MPI_PROC_NULL;
if(group->myId == ROOT) rootBcast = MPI_ROOT;
// Enviar a los hijos que grupo de procesos son
MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
send_config_file(config_file, rootBcast, group->children);
if(config_file->adr > 0) {
send_async(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
} else if
if(config_file->sdr > 0) {
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
} else if(MAL_ASYNC_PENDING) {
state = check_redistribution(iter, comm_req);
}
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->children));
//MPI_Comm_free(&(group->children));
return 1;
return state;
}
/*
......@@ -170,6 +160,66 @@ void TC(int numS){
}
}
int start_redistribution(int numS, MPI_Request **comm_req) {
int rootBcast = MPI_PROC_NULL;
if(group->myId == ROOT) rootBcast = MPI_ROOT;
// Enviar a los hijos que grupo de procesos son
MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
send_config_file(config_file, rootBcast, group->children);
if(config_file->adr > 0) {
send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
return MAL_ASYNC_PENDING;
}
if(config_file->sdr > 0) {
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
}
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->children));
return MAL_COMM_COMPLETED;
}
int check_redistribution(int iter, MPI_Request **comm_req) {
int completed, all_completed, test_err, iter_send;
int numS = config_file->procs[group->grp +1];
int rootBcast = MPI_PROC_NULL;
MPI_Request *req_completed;
if(group->myId == ROOT) rootBcast = MPI_ROOT;
if(config_file->aib == MAL_USE_NORMAL) {
req_completed = &(*comm_req)[0];
} else {
req_completed = &(*comm_req)[1];
}
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting\n", group->myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
}
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended
iter_send = iter;
MPI_Bcast(&iter_send, 1, MPI_INT, rootBcast, group->children);
if(config_file->sdr > 0) {
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
}
if(config_file->aib == MAL_USE_IBARRIER) {
MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
}
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->children));
free(*comm_req);
return MAL_COMM_COMPLETED;
}
/*
* Inicializacion de los datos de los hijos.
* En la misma se reciben datos de los padres: La configuracion
......@@ -182,18 +232,19 @@ void Sons_init() {
MPI_Bcast(&(group->grp), 1, MPI_INT, ROOT, group->parents);
group->grp++;
config_file = recv_config_file(ROOT, group->parents);
int numP_parents = config_file->procs[group->grp -1];
if(config_file->adr > 0) { // Recibir datos asincronos
recv_async(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents, config_file->aib);
MPI_Bcast(&(group->iter_start), 1, MPI_INT, ROOT, group->parents);
}
if(config_file->sdr > 0) { // Recibir datos sincronos
recv_sync(&(group->sync_array), config_file->sdr, group->myId, group->numP, ROOT, group->parents, numP_parents);
}
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->parents));
// MPI_Abort(MPI_COMM_WORLD, 0);
}
......
......@@ -2,7 +2,7 @@
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include "CommDist.h"
struct Dist_data {
int ini; //Primer elemento a enviar
......@@ -22,12 +22,9 @@ struct Counts {
int *zero_arr;
};
void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts);
void send_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
......@@ -40,6 +37,23 @@ void freeCounts(struct Counts *counts);
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, const char* name);
/*
* Reserva memoria para un vector de hasta "qty" elementos.
* Los "qty" elementos se disitribuyen entre los "numP" procesos
* que llaman a esta funcion.
*/
void malloc_comm_array(char **array, int qty, int myId, int numP) {
struct Dist_data dist_data;
get_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char));
int i;
for(i=0; i<dist_data.tamBl; i++) {
(*array)[i] = '!' + i + dist_data.ini;
}
// printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
}
//================================================================================
//================================================================================
......@@ -102,31 +116,12 @@ void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm int
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
recv_sync_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts);
printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
//printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
freeCounts(&counts);
free(idS);
}
/*
* Reserva memoria para un vector de hasta "qty" elementos.
* Los "qty" elementos se disitribuyen entre los "numP" procesos
* que llaman a esta funcion.
*/
void malloc_comm_array(char **array, int qty, int myId, int numP) {
struct Dist_data dist_data;
get_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char));
int i;
for(i=0; i<dist_data.tamBl; i++) {
(*array)[i] = '!' + i + dist_data.ini;
}
printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
}
/*
* Envia a los hijos un vector que es redistribuido a los procesos
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
......@@ -194,7 +189,7 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num
*
* El vector array no se modifica en esta funcion.
*/
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request *comm_req) {
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
int rootBcast = MPI_PROC_NULL;
int *idS = NULL;
struct Counts counts;
......@@ -210,7 +205,18 @@ int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm inte
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, comm_req);
if(parents_wait == MAL_USE_NORMAL) {
*comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
*comm_req[0] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &(*comm_req[0]));
} else {
*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
(*comm_req)[0] = MPI_REQUEST_NULL;
(*comm_req)[1] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &((*comm_req)[1]));
MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
}
freeCounts(&counts);
free(idS);
......@@ -224,13 +230,16 @@ int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm inte
*
* El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
* Tiene que ser liberado posteriormente por el usuario.
*
* El argumento "parents_wait" sirve para indicar si se usará la versión en la los padres
* espera a que terminen de enviar, o en la que esperan a que los hijos acaben de recibir.
*/
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents) {
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents, int parents_wait) {
int *idS = NULL;
int wait_err;
struct Counts counts;
struct Dist_data dist_data;
MPI_Request comm_req;
MPI_Request comm_req, aux;
// Obtener distribución para este hijo
get_dist(qty, myId, numP, &dist_data);
......@@ -244,16 +253,18 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
recv_async_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, &comm_req);
printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
wait_err = MPI_Wait(&comm_req, MPI_STATUS_IGNORE);
if(wait_err != MPI_SUCCESS) {
MPI_Abort(MPI_COMM_WORLD, wait_err);
}
// TODO Indicar a los padres que los hijos han terminado??
if(parents_wait == MAL_USE_IBARRIER) {
MPI_Ibarrier(intercomm, &aux);
MPI_Wait(&aux, MPI_STATUS_IGNORE); //Es necesario comprobar que la comunicación ha terminado para desconectar los grupos de procesos
}
//printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
freeCounts(&counts);
free(idS);
}
......
......@@ -3,13 +3,19 @@
#include <mpi.h>
#include <string.h>
#define MAL_COMM_COMPLETED 0
#define MAL_COMM_UNINITIALIZED 0
#define MAL_ASYNC_PENDING 1
#define MAL_USE_NORMAL 0
#define MAL_USE_IBARRIER 1
int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child);
void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents);
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request *comm_req);
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents);
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait);
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents, int parents_wait);
void malloc_comm_array(char **array, int qty, int myId, int numP);
......@@ -2,17 +2,6 @@
#SBATCH -N 2
#module load gcc/6.4.0
#module load openmpi/1.10.7
#module load /home/martini/ejemplos_Mod/modulefiles/mpi/openmpi_aliaga_1_10_7
echo "OPENMPI"
#module load /home/martini/MODULES/modulefiles/openmpi4.1.0
#mpirun -mca btl_openib_allow_ib 1 -npernode 10 -np 20 ./batch5.out
echo "MPICH"
module load mpich-3.4.1-noucx
#module load /home/martini/MODULES/modulefiles/mpich3.4
......@@ -21,14 +10,4 @@ module load mpich-3.4.1-noucx
mpirun -ppn 1 -np 2 ./a.out test.ini
echo "Intel"
#module load /home/martini/MODULES/modulefiles/intel64.module
#export I_MPI_OFI_PROVIDER=tcp
#export I_MPI_DEBUG=6
#export I_MPI_FABRICS=shm:ofi
#mpirun -print-all-exitcodes -np 2 --ppn 1 ./batch.out
#mpirun -genv I_MPI_FABRICS=shm:ofi -print-all-exitcodes -iface ib0 -np 16 ./batch4.out
#mpirun -genv I_MPI_FABRICS=shm:ofi -iface enp59s0f0 -np 20 ./batch4.out
echo "END RUN"
......@@ -2,7 +2,9 @@
resizes=2 ; Numero de redistribuciones
matrix_tam=2000 ; Tamaño en bytes de la matriz de computo
SDR=20 ; Tamaño en bytes a redistribuir de forma sincrona
ADR=10000 ; Tamaño en bytes a redistribuir de forma asincrona
ADR=20 ; Tamaño en bytes a redistribuir de forma asincrona
AIB=0 ; Indica si las redistribuciones asíncronas se consideran terminadas para los padres
; cuando terminan de enviar (0) o cuando terminan de recibir los valores (1)
time=0.5 ; Tiempo necesario para realizar una iteracion
[resize0] ; Grupo inicial(mpirun) - La aplicación no sigue el valor procs ni physical_dist
......@@ -14,12 +16,12 @@ physical_dist=node ; Tipo de redistribución física de los procesos
[resize1] ; Grupo de hijos 1
iters=5
procs=4
factor=1.5
factor=0.5
physical_dist=node
[resize2] ; Grupo de hijos 1
[resize2] ; Grupo de hijos 2
iters=5
procs=8
factor=2
factor=0.25
physical_dist=node
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment