Commit 90aa227d authored by iker_martin's avatar iker_martin
Browse files

Version sincrona funcional, asincrona aun en proceso

parent e994f807
...@@ -142,11 +142,11 @@ void print_config(configuration *user_config) { ...@@ -142,11 +142,11 @@ void print_config(configuration *user_config) {
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| \\ //||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| || //||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//| FUNCIONES DE INTERCOMUNICACION DE ESTRUCTURA DE CONFIGURACION || //| FUNCIONES DE INTERCOMUNICACION DE ESTRUCTURA DE CONFIGURACION ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| || //||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| // //||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| |/
/* /*
* Envia una estructura de configuracion al grupo de procesos al que se * Envia una estructura de configuracion al grupo de procesos al que se
......
...@@ -61,9 +61,9 @@ int main(int argc, char *argv[]) { ...@@ -61,9 +61,9 @@ int main(int argc, char *argv[]) {
//RESULTADOS //RESULTADOS
} }
free_config(config_file); // free_config(config_file); //FIXME DESCOMENTAR
free(group->sync_array); // free(group->sync_array);
free(group); // free(group);
...@@ -125,7 +125,7 @@ int work() { ...@@ -125,7 +125,7 @@ int work() {
int checkpoint(int iter) { int checkpoint(int iter) {
// Comprobar si se tiene que realizar un redimensionado // Comprobar si se tiene que realizar un redimensionado
if(config_file->iters[group->grp] < iter || group->grp!= 0) {return 0;} if(config_file->iters[group->grp] < iter) {return 0;}
int numS = config_file->procs[group->grp +1]; int numS = config_file->procs[group->grp +1];
...@@ -139,6 +139,10 @@ int checkpoint(int iter) { ...@@ -139,6 +139,10 @@ int checkpoint(int iter) {
send_config_file(config_file, rootBcast, group->children); send_config_file(config_file, rootBcast, group->children);
if(config_file->adr > 0) {
send_async(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
} else if
if(config_file->sdr > 0) { if(config_file->sdr > 0) {
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS); send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
} }
...@@ -188,6 +192,8 @@ void Sons_init() { ...@@ -188,6 +192,8 @@ void Sons_init() {
// Desconectar intercomunicador con los hijos // Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->parents)); MPI_Comm_disconnect(&(group->parents));
// MPI_Abort(MPI_COMM_WORLD, 0);
} }
......
module load /home/martini/MODULES/modulefiles/mpich3.4 module load mpich-3.4.1-noucx
mpicc -Wall Main/Main.c IOcodes/read_ini.c IOcodes/ini.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm mpicc -Wall Main/Main.c IOcodes/read_ini.c IOcodes/ini.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm
...@@ -10,6 +10,9 @@ struct Dist_data { ...@@ -10,6 +10,9 @@ struct Dist_data {
int tamBl; // Total de elementos int tamBl; // Total de elementos
int qty; // Total number of rows of the full disperse matrix int qty; // Total number of rows of the full disperse matrix
int myId;
int numP;
MPI_Comm intercomm; MPI_Comm intercomm;
}; };
...@@ -20,10 +23,13 @@ struct Counts { ...@@ -20,10 +23,13 @@ struct Counts {
}; };
void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE,
int *sendcounts, int *recvcounts, int *sdispls, int *rdispls); void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts);
int *sendcounts, int *recvcounts,int *sdispls, int *rdispls);
void send_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
// DIST FUNCTIONS // DIST FUNCTIONS
void get_dist(int qty, int id, int numP, struct Dist_data *dist_data); void get_dist(int qty, int id, int numP, struct Dist_data *dist_data);
...@@ -32,6 +38,15 @@ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS); ...@@ -32,6 +38,15 @@ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
void mallocCounts(struct Counts *counts, int numP); void mallocCounts(struct Counts *counts, int numP);
void freeCounts(struct Counts *counts); void freeCounts(struct Counts *counts);
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, const char* name);
//================================================================================
//================================================================================
//========================SINCHRONOUS FUNCTIONS===================================
//================================================================================
//================================================================================
/* /*
* Realiza un envio síncrono del vector array desde este grupo de procesos al grupo * Realiza un envio síncrono del vector array desde este grupo de procesos al grupo
* enlazado por el intercomunicador intercomm. * enlazado por el intercomunicador intercomm.
...@@ -54,7 +69,7 @@ int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm inter ...@@ -54,7 +69,7 @@ int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm inter
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
send_sync_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts.counts, counts.zero_arr, counts.displs, counts.zero_arr); send_sync_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts);
freeCounts(&counts); freeCounts(&counts);
free(idS); free(idS);
...@@ -78,6 +93,7 @@ void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm int ...@@ -78,6 +93,7 @@ void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm int
// Obtener distribución para este hijo // Obtener distribución para este hijo
get_dist(qty, myId, numP, &dist_data); get_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char)); *array = malloc(dist_data.tamBl * sizeof(char));
(*array)[dist_data.tamBl] = '\0';
dist_data.intercomm = intercomm; dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/ /* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
...@@ -85,7 +101,8 @@ void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm int ...@@ -85,7 +101,8 @@ void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm int
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
recv_sync_arrays(dist_data, *array, root, idS[0], idS[1], numP_parents, counts.zero_arr, counts.counts, counts.zero_arr, counts.displs); recv_sync_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts);
printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
freeCounts(&counts); freeCounts(&counts);
free(idS); free(idS);
...@@ -101,6 +118,12 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) { ...@@ -101,6 +118,12 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
get_dist(qty, myId, numP, &dist_data); get_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char)); *array = malloc(dist_data.tamBl * sizeof(char));
int i;
for(i=0; i<dist_data.tamBl; i++) {
(*array)[i] = '!' + i + dist_data.ini;
}
printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
} }
...@@ -109,22 +132,24 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) { ...@@ -109,22 +132,24 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos. * del otro grupo se transmiten elementos.
*/ */
void send_sync_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, void send_sync_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts) {
int *sendcounts, int *recvcounts, int *sdispls, int *rdispls) {
int i; int i;
// PREPARAR ENVIO DEL VECTOR // PREPARAR ENVIO DEL VECTOR
if(idI == 0) { if(idI == 0) {
set_counts(0, numP_child, dist_data, sendcounts); set_counts(0, numP_child, dist_data, counts.counts);
idI++; idI++;
} }
for(i=idI; i<idE; i++) { for(i=idI; i<idE; i++) {
set_counts(i, numP_child, dist_data, sendcounts); set_counts(i, numP_child, dist_data, counts.counts);
sdispls[i] = sdispls[i-1] + sendcounts[i-1]; counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
} }
//print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
/* COMUNICACION DE DATOS */ /* COMUNICACION DE DATOS */
// MPI_Alltoallv(array, sendcounts, sdispls, MPI_CHAR, NULL, recvcounts, rdispls, MPI_CHAR, dist_data.intercomm); MPI_Alltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm);
} }
/* /*
...@@ -132,24 +157,155 @@ void send_sync_arrays(struct Dist_data dist_data, char *array, int rootBcast, in ...@@ -132,24 +157,155 @@ void send_sync_arrays(struct Dist_data dist_data, char *array, int rootBcast, in
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos. * del otro grupo se transmiten elementos.
*/ */
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts) {
int *sendcounts, int *recvcounts,int *sdispls, int *rdispls) {
int i; int i;
char *aux; char *aux = malloc(1);
// Ajustar los valores de recepcion // Ajustar los valores de recepcion
if(idI == 0) { if(idI == 0) {
set_counts(0, numP_parents, dist_data, recvcounts); set_counts(0, numP_parents, dist_data, counts.counts);
idI++; idI++;
} }
for(i=idI; i<idE; i++) { for(i=idI; i<idE; i++) {
set_counts(i, numP_parents, dist_data, recvcounts); set_counts(i, numP_parents, dist_data, counts.counts);
rdispls[i] = rdispls[i-1] + recvcounts[i-1]; counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
} }
// print_counts(*dist_data, recvcounts, rdispls, numP_parents, "Recv"); //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
/* COMUNICACION DE DATOS */ /* COMUNICACION DE DATOS */
// MPI_Alltoallv(aux, sendcounts, sdispls, MPI_CHAR, array, recvcounts, rdispls, MPI_CHAR, dist_data->intercomm); MPI_Alltoallv(aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm);
free(aux);
}
//================================================================================
//================================================================================
//========================ASINCHRONOUS FUNCTIONS==================================
//================================================================================
//================================================================================
/*
* Realiza un envio asincrono del vector array desde este grupo de procesos al grupo
* enlazado por el intercomunicador intercomm.
*
* El objeto MPI_Request se devuelve con el manejador para comprobar si la comunicacion
* ha terminado.
*
* El vector array no se modifica en esta funcion.
*/
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request *comm_req) {
int rootBcast = MPI_PROC_NULL;
int *idS = NULL;
struct Counts counts;
struct Dist_data dist_data;
if(myId == root) rootBcast = MPI_ROOT;
get_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
dist_data.intercomm = intercomm;
// Create arrays which contains info about how many elements will be send to each created process
mallocCounts(&counts, numP_child);
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, comm_req);
freeCounts(&counts);
free(idS);
return 1;
}
/*
* Realiza una recepcion asincrona del vector array a este grupo de procesos desde el grupo
* enlazado por el intercomunicador intercomm.
*
* El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
* Tiene que ser liberado posteriormente por el usuario.
*/
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents) {
int *idS = NULL;
int wait_err;
struct Counts counts;
struct Dist_data dist_data;
MPI_Request comm_req;
// Obtener distribución para este hijo
get_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char));
(*array)[dist_data.tamBl] = '\0';
dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
mallocCounts(&counts, numP_parents);
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
recv_async_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, &comm_req);
printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
wait_err = MPI_Wait(&comm_req, MPI_STATUS_IGNORE);
if(wait_err != MPI_SUCCESS) {
MPI_Abort(MPI_COMM_WORLD, wait_err);
}
// TODO Indicar a los padres que los hijos han terminado??
freeCounts(&counts);
free(idS);
}
/*
* Envia a los hijos un vector que es redistribuido a los procesos
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos.
*/
void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
int i;
// PREPARAR ENVIO DEL VECTOR
if(idI == 0) {
set_counts(0, numP_child, dist_data, counts.counts);
idI++;
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_child, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
}
//print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
/* COMUNICACION DE DATOS */
MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
}
/*
* Recibe de los padres un vector que es redistribuido a los procesos
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos.
*/
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
int i;
char *aux = malloc(1);
// Ajustar los valores de recepcion
if(idI == 0) {
set_counts(0, numP_parents, dist_data, counts.counts);
idI++;
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_parents, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
}
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
/* COMUNICACION DE DATOS */
MPI_Ialltoallv(aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm, comm_req);
free(aux);
} }
/* /*
...@@ -168,6 +324,8 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num ...@@ -168,6 +324,8 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num
void get_dist(int qty, int id, int numP, struct Dist_data *dist_data) { void get_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
int rem; int rem;
dist_data->myId = id;
dist_data->numP = numP;
dist_data->qty = qty; dist_data->qty = qty;
dist_data->tamBl = qty / numP; dist_data->tamBl = qty / numP;
rem = qty % numP; rem = qty % numP;
...@@ -257,7 +415,7 @@ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) { ...@@ -257,7 +415,7 @@ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE; idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
} }
idS = malloc(2 * sizeof(int)); *idS = malloc(2 * sizeof(int));
(*idS)[0] = idI; (*idS)[0] = idI;
(*idS)[1] = idE; (*idS)[1] = idE;
} }
...@@ -298,3 +456,14 @@ void freeCounts(struct Counts *counts) { ...@@ -298,3 +456,14 @@ void freeCounts(struct Counts *counts) {
free(counts->displs); free(counts->displs);
free(counts->zero_arr); free(counts->zero_arr);
} }
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, const char* name) {
int i;
for(i=0; i < size; i++) {
if(xcounts[i] != 0) {
printf("P%d of %d | %scounts[%d]=%d disp=%d\n", data_dist.myId, data_dist.numP, name, i, xcounts[i], xdispls[i]);
}
}
}
...@@ -8,4 +8,8 @@ int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm inter ...@@ -8,4 +8,8 @@ int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm inter
void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents); void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents);
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request *comm_req);
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents);
void malloc_comm_array(char **array, int qty, int myId, int numP); void malloc_comm_array(char **array, int qty, int myId, int numP);
...@@ -14,11 +14,12 @@ echo "OPENMPI" ...@@ -14,11 +14,12 @@ echo "OPENMPI"
#mpirun -mca btl_openib_allow_ib 1 -npernode 10 -np 20 ./batch5.out #mpirun -mca btl_openib_allow_ib 1 -npernode 10 -np 20 ./batch5.out
echo "MPICH" echo "MPICH"
module load /home/martini/MODULES/modulefiles/mpich3.4 module load mpich-3.4.1-noucx
#module load /home/martini/MODULES/modulefiles/mpich3.4
#export HYDRA_DEBUG=1 #export HYDRA_DEBUG=1
#-disable-hostname-propagation -disable-auto-cleanup -pmi-port -hosts n00,n01 #-disable-hostname-propagation -disable-auto-cleanup -pmi-port -hosts n00,n01
mpirun -np 2 ./a.out test.ini mpirun -ppn 1 -np 2 ./a.out test.ini
echo "Intel" echo "Intel"
......
[general] [general]
resizes=1 ; Numero de redistribuciones resizes=2 ; Numero de redistribuciones
matrix_tam=2000 ; Tamaño en bytes de la matriz de computo matrix_tam=2000 ; Tamaño en bytes de la matriz de computo
SDR=10000 ; Tamaño en bytes a redistribuir de forma sincrona SDR=20 ; Tamaño en bytes a redistribuir de forma sincrona
ADR=10000 ; Tamaño en bytes a redistribuir de forma asincrona ADR=10000 ; Tamaño en bytes a redistribuir de forma asincrona
time=0.5 ; Tiempo necesario para realizar una iteracion time=0.5 ; Tiempo necesario para realizar una iteracion
...@@ -13,7 +13,13 @@ physical_dist=node ; Tipo de redistribución física de los procesos ...@@ -13,7 +13,13 @@ physical_dist=node ; Tipo de redistribución física de los procesos
[resize1] ; Grupo de hijos 1 [resize1] ; Grupo de hijos 1
iters=5 iters=5
procs=2 procs=4
factor=0.5 factor=1.5
physical_dist=node
[resize2] ; Grupo de hijos 1
iters=5
procs=8
factor=2
physical_dist=node physical_dist=node
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment