Commit 96107657 authored by iker_martin's avatar iker_martin
Browse files

Added functionality for asynchronous redistributions with threads only....

Added functionality for asynchronous redistributions with threads only. Support for non blocking calls is not yet fully implemented for Merge method.
parent d0c9fe98
......@@ -5,6 +5,8 @@
#include "distribution_methods/block_distribution.h"
#include "CommDist.h"
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
......@@ -64,43 +66,17 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, MPI_Comm comm) {
int is_intercomm;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
if(is_children_group) {
mallocCounts(&s_counts, numO);
prepare_comm_alltoall(myId, numP, numO, qty, &r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts.counts, r_counts.displs, numO, 1, "Children C");
} else {
prepare_comm_alltoall(myId, numP, numO, qty, &s_counts);
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
MPI_Comm_test_inter(comm, &is_intercomm);
if(is_intercomm) {
mallocCounts(&r_counts, numO);
} else {
if(myId < numO) {
prepare_comm_alltoall(myId, numO, numP, qty, &r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
} else {
mallocCounts(&r_counts, numP);
}
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts.counts, r_counts.displs, numP, 1, "Children P ");
//print_counts(dist_data, s_counts.counts, s_counts.displs, numO, 1, "Parents ");
}
}
/* COMUNICACION DE DATOS */
/* PERFORM COMMUNICATION */
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm);
freeCounts(&s_counts);
freeCounts(&r_counts);
return 1;
return 1; //FIXME In this case is always true...
}
//================================================================================
......@@ -301,6 +277,59 @@ void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_p
* ========================================================================================
*/
/*
* Performs a communication to redistribute an array in a block distribution. For each process calculates
* how many elements sends/receives to other processes for the new group.
*
* - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - is_intercomm (IN): Indicates wether the used communicator is a intercomunicator(TRUE) or intracommunicator(FALSE).
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* process receives data and is NULL, the behaviour is undefined.
* - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
struct Dist_data dist_data;
if(is_children_group) {
mallocCounts(s_counts, numO);
prepare_comm_alltoall(myId, numP, numO, qty, r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO, 1, "Children C");
} else {
//get_block_dist(qty, myId, numP, &dist_data);
prepare_comm_alltoall(myId, numP, numO, qty, s_counts);
if(is_intercomm) {
mallocCounts(r_counts, numO);
} else {
if(myId < numO) {
prepare_comm_alltoall(myId, numO, numP, qty, r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
} else {
mallocCounts(r_counts, numP);
}
//print_counts(dist_data, r_counts->counts, r_counts->displs, numP, 1, "Children P ");
}
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO, 1, "Parents ");
}
}
/*
* Obtiene para un proceso de un grupo a que rango procesos de
* otro grupo tiene que enviar o recibir datos.
......
......@@ -35,10 +35,8 @@ typedef struct {
int spawn_method;
int spawn_dist;
int spawn_strategies;
//int spawn_is_single;
//int spawn_threaded;
int comm_type;
int comm_threaded;
int comm_threaded; //TODO Modificar uso para que tenga sentido comm_threaded -- Con comm_type se elige función MPI, con esto si es hilo o no
int grp;
configuration *config_file;
......@@ -213,7 +211,7 @@ int malleability_checkpoint() {
} else {
state = check_redistribution();
}
if(state != MALL_DIST_PENDING) {
if(state != MALL_DIST_PENDING) {
malleability_checkpoint();
}
break;
......@@ -439,7 +437,9 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_recv = NULL;
send_async(aux_send, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children, data_struct->requests, mall_conf->comm_type);
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
}
} else {
for(i=0; i < data_struct->entries; i++) {
......@@ -493,7 +493,6 @@ void Children_init() {
int is_intercomm;
malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
// TODO A partir de este punto tener en cuenta si es BASELINE o MERGE
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(!is_intercomm) { // For intracommunicators, these processes will be added
MPI_Comm_rank(mall->intercomm, &(mall->myId));
......@@ -502,6 +501,7 @@ void Children_init() {
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
comm_node_data(root_parents, MALLEABILITY_CHILDREN);
MPI_Bcast(&(mall_conf->comm_type), 1, MPI_INT, root_parents, mall->intercomm);
mall_conf->results = (results_data *) malloc(sizeof(results_data));
init_results_data(mall_conf->results, mall_conf->config_file->n_resizes, mall_conf->config_file->n_stages, RESULTS_INIT_DATA_QTY);
......@@ -510,17 +510,17 @@ void Children_init() {
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
if(mall_conf->comm_type == MAL_USE_NORMAL || mall_conf->comm_type == MAL_USE_IBARRIER || mall_conf->comm_type == MAL_USE_POINT) {
recv_data(numP_parents, dist_a_data, 1);
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
} else if (mall_conf->comm_type == MAL_USE_THREAD) { //TODO Modificar uso para que tenga sentido comm_threaded
recv_data(numP_parents, dist_a_data, 0);
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
}
mall_conf->results->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
recv_data(numP_parents, dist_s_data, 0);
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
......@@ -608,6 +608,7 @@ int start_redistribution() {
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
comm_node_data(rootBcast, MALLEABILITY_NOT_CHILDREN);
MPI_Bcast(&(mall_conf->comm_type), 1, MPI_INT, rootBcast, mall->intercomm);
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
......@@ -829,16 +830,18 @@ void def_nodeinfo_type(MPI_Datatype *node_type) {
//======================================================||
//======================================================||
int comm_state; //FIXME Usar un handler
/*
* Crea una hebra para ejecutar una comunicación en segundo plano.
*/
int thread_creation() {
comm_state = MALL_DIST_PENDING;
if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
printf("Error al crear el hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return MALL_DIST_PENDING;
return comm_state;
}
/*
......@@ -851,7 +854,7 @@ int thread_check() {
int all_completed = 0, is_intercomm;
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
//FIXME No se tiene en cuenta el estado MALL_APP_ENDED
......@@ -876,7 +879,7 @@ int thread_check() {
*/
void* thread_async_work() {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
state = MALL_DIST_COMPLETED;
comm_state = MALL_DIST_COMPLETED;
pthread_exit(NULL);
}
......
......@@ -33,32 +33,11 @@ enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
#define MAL_APP_EXECUTING 0
#define MAL_APP_ENDED 1
// TODO Refactor
/*
#define COMM_PHY_SPREAD 1
#define COMM_PHY_COMPACT 2
*/
/*
// SPAWN METHODS
#define COMM_SPAWN_SERIAL 0
#define COMM_SPAWN_PTHREAD 1
#define COMM_SPAWN_MERGE 2
#define COMM_SPAWN_MERGE_PTHREAD 3
//#define COMM_SPAWN_BASELINE 0
//#define COMM_SPAWN_MERGE 1
//SPAWN STRATEGIES
#define COMM_SPAWN_MULTIPLE 0
#define COMM_SPAWN_SINGLE 1
//#define COMM_SPAWN_PTHREAD 2
//#define COMM_SPAWN_SINGLE 3
*/
#define MAL_USE_NORMAL 0
#define MAL_USE_IBARRIER 1
#define MAL_USE_POINT 2
#define MAL_USE_THREAD 3
#define MAL_INT 0
#define MAL_CHAR 1
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment