Commit 3e176fda authored by iker_martin's avatar iker_martin
Browse files

Anadida funcion para realizar comunicaciones en segundo plano por parte de hebras

parent d79aefaf
......@@ -3,6 +3,7 @@
#include <mpi.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <pthread.h>
#include "../IOcodes/read_ini.h"
#include "../IOcodes/results.h"
#include "../malleability/ProcessDist.h"
......@@ -17,6 +18,11 @@ int checkpoint(int iter, int state, MPI_Request **comm_req);
void TC(int numS);
int start_redistribution(int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req);
int end_redistribution(int iter);
int thread_creation();
int thread_check();
void* thread_async_work(void* void_arg);
void iterate(double *matrix, int n, int async_comm);
void computeMatrix(double *matrix, int n);
......@@ -38,16 +44,26 @@ typedef struct {
int iter_start;
int argc;
int numS; // Cantidad de procesos hijos
int commAsync;
MPI_Comm children, parents;
char **argv;
char *sync_array, *async_array;
} group_data;
typedef struct {
int myId, numP, numS, adr;
MPI_Comm children;
char *sync_array;
} thread_data;
configuration *config_file;
group_data *group;
results_data *results;
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
pthread_t async_thread; // TODO Cambiar de sitio?
int main(int argc, char *argv[]) {
int numP, myId, res;
int req;
......@@ -57,7 +73,13 @@ int main(int argc, char *argv[]) {
MPI_Comm_rank(MPI_COMM_WORLD, &myId);
init_group_struct(argv, argc, myId, numP);
MPI_Comm_get_parent(&(group->parents));
if(group->parents == MPI_COMM_NULL ) { // Si son el primer grupo de procesos, recogen la configuracion inicial
init_application();
} else { // Si son procesos hijos deben comunicarse con las padres
Sons_init();
}
if(group->grp == 0) {
MPI_Barrier(MPI_COMM_WORLD);
......@@ -70,6 +92,7 @@ int main(int argc, char *argv[]) {
MPI_Barrier(MPI_COMM_WORLD);
results->exec_time = MPI_Wtime() - results->exec_start;
}
print_final_results();
free_application_data();
......@@ -139,16 +162,18 @@ int checkpoint(int iter, int state, MPI_Request **comm_req) {
// Comprobar si se tiene que realizar un redimensionado
if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}
int numS = config_file->procs[group->grp +1];
group->numS = config_file->procs[group->grp +1];
results->spawn_start = MPI_Wtime();
TC(numS);
TC(group->numS);
results->spawn_time[group->grp] = MPI_Wtime() - results->spawn_start;
state = start_redistribution(numS, comm_req);
state = start_redistribution(group->numS, comm_req);
} else if(state == MAL_ASYNC_PENDING) {
state = check_redistribution(iter, comm_req);
state = thread_check();
if(state == MAL_COMM_COMPLETED) end_redistribution(iter);
//state = check_redistribution(iter, comm_req);
}
return state;
......@@ -195,20 +220,57 @@ int start_redistribution(int numS, MPI_Request **comm_req) {
if(config_file->adr > 0) {
results->async_start = MPI_Wtime();
send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
return MAL_ASYNC_PENDING;
//send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
return thread_creation();
//return MAL_ASYNC_PENDING;
}
if(config_file->sdr > 0) {
results->sync_start = MPI_Wtime();
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
return end_redistribution(0);
}
/*
* Crea una hebra para ejecutar una comunicación en segundo plano.
*/
int thread_creation() {
if(pthread_create(&async_thread, NULL, thread_async_work, NULL)) {
printf("Error al crear el hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return MAL_ASYNC_PENDING;
}
/*
* Comprobación por parte de una hebra maestra que indica
* si una hebra esclava ha terminado su comunicación en segundo plano.
*
* El estado de la comunicación es devuelto al finalizar la función.
*/
int thread_check() {
if(group->commAsync == MAL_COMM_COMPLETED) {
if(pthread_join(async_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -2;
}
return MAL_COMM_COMPLETED;
}
send_results(results, rootBcast, config_file->resizes, group->children);
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->children));
return MAL_ASYNC_PENDING;
}
return MAL_COMM_COMPLETED;
/*
* Función ejecutada por una hebra.
* Ejecuta una comunicación síncrona con los hijos que
* para el usuario se puede considerar como en segundo plano.
*
* Cuando termina la comunicación la hebra maestra puede comprobarlo
* por el valor "commAsync".
*/
void* thread_async_work(void* void_arg) {
send_sync(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, group->numS);
group->commAsync = MAL_COMM_COMPLETED;
pthread_exit(NULL);
}
/*
......@@ -225,11 +287,8 @@ int start_redistribution(int numS, MPI_Request **comm_req) {
* los hijos han terminado de recibir.
*/
int check_redistribution(int iter, MPI_Request **comm_req) {
int completed, all_completed, test_err, iter_send;
int numS = config_file->procs[group->grp +1];
int rootBcast = MPI_PROC_NULL;
int completed, all_completed, test_err;
MPI_Request *req_completed;
if(group->myId == ROOT) rootBcast = MPI_ROOT;
if(config_file->aib == MAL_USE_NORMAL) {
req_completed = &(*comm_req)[0];
......@@ -254,18 +313,34 @@ int check_redistribution(int iter, MPI_Request **comm_req) {
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
}
free(*comm_req);
return end_redistribution(iter);
}
iter_send = iter;
MPI_Bcast(&iter_send, 1, MPI_INT, rootBcast, group->children);
/*
* Termina la redistribución de los datos con los hijos, comprobando
* si se han realizado iteraciones con comunicaciones en segundo plano
* y enviando cuantas iteraciones se han realizado a los hijos.
*
* Además se realizan las comunicaciones síncronas se las hay.
* Finalmente termina enviando los datos temporales a los hijos.
*/
int end_redistribution(int iter) {
int rootBcast = MPI_PROC_NULL;
if(group->myId == ROOT) rootBcast = MPI_ROOT;
if(iter > 0) { // Mandar a los hijos iteracion en la que comenzar
MPI_Bcast(&iter, 1, MPI_INT, rootBcast, group->children);
}
if(config_file->sdr > 0) { // Realizar envio sincrono
results->sync_start = MPI_Wtime();
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, group->numS);
}
send_results(results, rootBcast, config_file->resizes, group->children);
send_results(results, rootBcast, config_file->resizes, group->children);
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->children));
free(*comm_req);
return MAL_COMM_COMPLETED;
}
......@@ -287,7 +362,7 @@ void Sons_init() {
init_results_data(&results, config_file->resizes - 1, config_file->iters[group->grp]);
if(config_file->adr > 0) { // Recibir datos asincronos
recv_async(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents, config_file->aib);
recv_sync(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents);
results->async_time[group->grp] = MPI_Wtime();
MPI_Bcast(&(group->iter_start), 1, MPI_INT, ROOT, group->parents);
}
......@@ -489,6 +564,7 @@ void init_group_struct(char *argv[], int argc, int myId, int numP) {
group->numP = numP;
group->grp = 0;
group->iter_start = 0;
group->commAsync = MAL_COMM_UNINITIALIZED;
group->argc = argc;
group->argv = argv;
}
......@@ -503,12 +579,6 @@ void init_group_struct(char *argv[], int argc, int myId, int numP) {
* se comunican con los padres para inicializar sus datos.
*/
void init_application() {
MPI_Comm_get_parent(&(group->parents));
if(group->parents != MPI_COMM_NULL ) { // Si son procesos hijos deben comunicarse con las padres
Sons_init();
} else { // Si son el primer grupo de procesos, recogen la configuracion inicial
if(group->argc < 2) {
printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
exit(0);
......@@ -530,15 +600,14 @@ void init_application() {
double result, start_time = MPI_Wtime();
int i;
result = 0;
for(i=0; i<10000; i++) {
for(i=0; i<20000; i++) {
result += computePiSerial(config_file->matrix_tam);
}
printf("Creado Top con valor %lf\n", result);
fflush(stdout);
config_file->Top = (MPI_Wtime() - start_time) / 10000; //Tiempo de una iteracion
config_file->Top = (MPI_Wtime() - start_time) / 20000; //Tiempo de una iteracion
MPI_Bcast(&(config_file->Top), 1, MPI_DOUBLE, ROOT, MPI_COMM_WORLD);
}
}
/*
......
......@@ -113,7 +113,7 @@ void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm int
// Obtener distribución para este hijo
get_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char));
(*array)[dist_data.tamBl] = '\0';
//(*array)[dist_data.tamBl] = '\0';
dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
......
......@@ -4,7 +4,7 @@
#include <string.h>
#define MAL_COMM_COMPLETED 0
#define MAL_COMM_UNINITIALIZED 0
#define MAL_COMM_UNINITIALIZED 2
#define MAL_ASYNC_PENDING 1
#define MAL_USE_NORMAL 0
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment