Commit ffab3976 authored by iker_martin's avatar iker_martin
Browse files

Added merge synchronous data redistribution. Memory leak found at configuration.c

parent 2f81e29c
......@@ -66,11 +66,17 @@ int main(int argc, char *argv[]) {
set_benchmark_configuration(config_file);
set_benchmark_results(results);
malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
if(config_file->sdr) {
malleability_add_data(group->sync_array, config_file->sdr, MAL_CHAR, 0, 1);
}
if(config_file->adr) {
malleability_add_data(group->async_array, config_file->adr, MAL_CHAR, 0, 0);
}
MPI_Barrier(comm);
results->exec_start = MPI_Wtime();
} else { //Init hijos
......@@ -91,6 +97,15 @@ int main(int argc, char *argv[]) {
malleability_get_data(&value, 2, 1, 1);
group->iter_start = *((int *)value);
if(config_file->sdr) {
malleability_get_data(&value, 0, 0, 1);
group->sync_array = (char *)value;
}
if(config_file->adr) {
malleability_get_data(&value, 0, 0, 0);
group->async_array = (char *)value;
}
group->grp = group->grp + 1;
}
......@@ -440,13 +455,17 @@ void obtain_op_times(int compute) {
* Libera toda la memoria asociada con la aplicacion
*/
void free_application_data() {
if(config_file->sdr) {
// FIXME ERROR para grupo 1 en adelante (0 Guay)
if (group->grp==0){
if(config_file->sdr && group->sync_array != NULL) {
free(group->sync_array);
group->sync_array = NULL;
}
if(config_file->adr) {
if(config_file->adr && group->async_array != NULL) {
free(group->async_array);
group->async_array = NULL;
}
}
free_malleability();
free_results_data(results, config_file->n_stages);
......
......@@ -135,7 +135,7 @@ void free_config(configuration *user_config) {
}
}
//Liberar tipos derivados
if(user_config->config_type != MPI_DATATYPE_NULL) {
if(user_config->config_type != MPI_DATATYPE_NULL) { //FIXME No se libera
MPI_Type_free(&(user_config->config_type));
user_config->config_type = MPI_DATATYPE_NULL;
}
......
......@@ -5,9 +5,6 @@
#include "distribution_methods/block_distribution.h"
#include "CommDist.h"
void send_sync_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts);
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
......@@ -46,101 +43,65 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
//================================================================================
/*
* Realiza un envio síncrono del vector array desde este grupo de procesos al grupo
* enlazado por el intercomunicador intercomm.
* Performs a communication to redistribute an array in a block distribution.
* In the redistribution is differenciated parent group from the children and the values each group indicates can be
* different.
*
* El vector array no se modifica en esta funcion.
*/
int send_sync(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child) {
int *idS = NULL;
struct Counts counts;
struct Dist_data dist_data;
get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
dist_data.intercomm = intercomm;
// Create arrays which contains info about how many elements will be send to each created process
mallocCounts(&counts, numP_child);
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
send_sync_arrays(dist_data, array, numP_child, counts);
freeCounts(&counts);
free(idS);
return 1;
}
/*
* Realiza una recepcion síncrona del vector array a este grupo de procesos desde el grupo
* enlazado por el intercomunicador intercomm.
* - send (IN): Array with the data to send. This value can not be NULL.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* process receives data and is NULL, the behaviour is undefined.
* - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - comm (IN): Communicator to use to perform the redistribution.
*
* El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
* Tiene que ser liberado posteriormente por el usuario.
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/
void recv_sync(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents) {
int *idS = NULL;
struct Counts counts;
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, MPI_Comm comm) {
int is_intercomm;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
// Obtener distribución para este hijo
if(is_children_group) {
mallocCounts(&s_counts, numO);
prepare_comm_alltoall(myId, numP, numO, qty, &r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
get_block_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char));
//(*array)[dist_data.tamBl] = '\0';
dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
mallocCounts(&counts, numP_parents);
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
recv_sync_arrays(dist_data, *array, numP_parents, counts);
//printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
freeCounts(&counts);
free(idS);
}
/*
* Envia a los hijos un vector que es redistribuido a los procesos
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos.
*/
void send_sync_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts) {
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
/* COMUNICACION DE DATOS */
MPI_Alltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm);
}
/*
* Recibe de los padres un vector que es redistribuido a los procesos
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos.
*/
void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts) {
char aux;
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
// Ajustar los valores de recepcion
/*
if(idI == 0) {
set_counts(0, numP_parents, dist_data, counts.counts);
idI++;
print_counts(dist_data, r_counts.counts, r_counts.displs, numO, 1, "Children C");
} else {
prepare_comm_alltoall(myId, numP, numO, qty, &s_counts);
MPI_Comm_test_inter(comm, &is_intercomm);
if(is_intercomm) {
mallocCounts(&r_counts, numO);
} else {
if(myId < numO) {
prepare_comm_alltoall(myId, numO, numP, qty, &r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
} else {
mallocCounts(&r_counts, numP);
}
get_block_dist(qty, myId, numP, &dist_data);
print_counts(dist_data, r_counts.counts, r_counts.displs, numP, 1, "Children P ");
print_counts(dist_data, s_counts.counts, s_counts.displs, numO, 1, "Parents ");
}
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_parents, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
}*/
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
/* COMUNICACION DE DATOS */
MPI_Alltoallv(&aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm);
}
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm);
freeCounts(&s_counts);
freeCounts(&r_counts);
return 1;
}
//================================================================================
//================================================================================
......
......@@ -16,9 +16,7 @@
//#define MAL_USE_POINT 2
//#define MAL_USE_THREAD 3
int send_sync(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child);
void recv_sync(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents);
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, MPI_Comm comm);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait);
......
......@@ -16,9 +16,10 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts) {
int i, *idS;
struct Dist_data dist_data;
mallocCounts(counts, numP_other);
get_block_dist(n, myId, numP, &dist_data);
mallocCounts(counts, numP_other);
get_util_ids(dist_data, numP_other, &idS);
if(idS[0] == 0) {
......@@ -29,6 +30,7 @@ void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Cou
set_interblock_counts(i, numP_other, dist_data, counts->counts);
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
}
free(idS);
}
/*
......@@ -206,12 +208,20 @@ void mallocCounts(struct Counts *counts, size_t numP) {
* de forma dinamica.
*/
void freeCounts(struct Counts *counts) {
free(counts->counts);
free(counts->displs);
free(counts->zero_arr);
counts->counts = NULL;
counts->displs = NULL;
counts->zero_arr = NULL;
if(counts != NULL) {
if(counts->counts != NULL) {
free(counts->counts);
counts->counts = NULL;
}
if(counts->displs != NULL) {
free(counts->displs);
counts->displs = NULL;
}
if(counts->zero_arr != NULL) {
free(counts->zero_arr);
counts->zero_arr = NULL;
}
}
}
/*
......
......@@ -304,6 +304,8 @@ void get_malleability_user_comm(MPI_Comm *comm) {
* Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
*
* Mas informacion en la funcion "add_data".
*
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_add_data(void *data, size_t total_qty, int type, int is_replicated, int is_constant) {
......@@ -339,6 +341,7 @@ void malleability_add_data(void *data, size_t total_qty, int type, int is_replic
* Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
*
* Mas informacion en la funcion "modify_data".
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int is_replicated, int is_constant) {
if(is_constant) {
......@@ -369,6 +372,7 @@ void malleability_modify_data(void *data, size_t index, size_t total_qty, int ty
/*
* Devuelve el numero de entradas para la estructura de descripcion de
* datos elegida.
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
......@@ -393,6 +397,7 @@ void malleability_get_entries(size_t *entries, int is_replicated, int is_constan
* con la funcion "malleability_add_data()".
* Es tarea del usuario saber el tipo de esos datos.
* TODO Refactor a que sea automatico
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_get_data(void **data, int index, int is_replicated, int is_constant) {
malleability_data_t *data_struct;
......@@ -429,17 +434,19 @@ void malleability_get_data(void **data, int index, int is_replicated, int is_con
*/
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
size_t i;
char *aux;
char *aux_send, *aux_recv;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_async(aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children, data_struct->requests, mall_conf->comm_type);
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_async(aux_send, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children, data_struct->requests, mall_conf->comm_type);
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_sync(aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children);
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_recv = NULL;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
}
}
}
......@@ -451,7 +458,7 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
*/
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
size_t i;
char *aux;
char *aux, aux_s;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
......@@ -462,7 +469,7 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_sync(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_parents);
sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
data_struct->arrays[i] = (void *) aux;
}
}
......@@ -486,8 +493,12 @@ void Children_init() {
int is_intercomm;
malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
// TODO A partir de este punto tener en cuenta si es BASELINE o MERGE
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(!is_intercomm) { // For intracommunicators, these processes will be added
MPI_Comm_rank(mall->intercomm, &(mall->myId));
MPI_Comm_size(mall->intercomm, &(mall->numP));
}
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
comm_node_data(root_parents, MALLEABILITY_CHILDREN);
......@@ -627,7 +638,7 @@ int start_redistribution() {
* los hijos han terminado de recibir.
*/
int check_redistribution() {
int completed, all_completed, test_err;
int is_intercomm, completed, all_completed, test_err;
MPI_Request *req_completed;
//dist_a_data->requests[0][X] //FIXME Numero magico 0 -- Modificar para que sea un for?
......@@ -657,6 +668,10 @@ int check_redistribution() {
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......@@ -673,14 +688,7 @@ int end_redistribution() {
size_t i;
int is_intercomm, rootBcast, local_state;
is_intercomm = 0;
if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
} else {
// Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
// y se trata del spawn Merge Shrink
mall->intercomm = mall->comm;
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
......@@ -691,6 +699,7 @@ int end_redistribution() {
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
mall_conf->results->sync_time[mall_conf->grp] = MPI_Wtime();
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
if(!is_intercomm) mall_conf->results->sync_end = MPI_Wtime(); // Merge method only
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo
......@@ -726,7 +735,6 @@ int end_redistribution() {
}
}
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) {
MPI_Comm_disconnect(&(mall->intercomm));
}
......@@ -840,7 +848,7 @@ int thread_creation() {
* El estado de la comunicación es devuelto al finalizar la función.
*/
int thread_check() {
int all_completed = 0;
int all_completed = 0, is_intercomm;
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
......@@ -852,6 +860,8 @@ int thread_check() {
MPI_Abort(MPI_COMM_WORLD, -1);
return -2;
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......
......@@ -114,7 +114,7 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int
}
for(i=0; i < data_struct_dist->entries; i++) {
data_struct_dist->arrays[i] = (void *) malloc(data_struct_dist->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int
data_struct_dist->arrays[i] = (void *) NULL;
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment