Commit 01e07630 authored by iker_martin's avatar iker_martin
Browse files

Added many changes to allow correct execution of merge spawn. Asynchronous...

Added many changes to allow correct execution of merge spawn. Asynchronous merge requires additional changes.
parent 7ea5eebb
......@@ -9,6 +9,8 @@
#include <string.h>
#include "../malleability/malleabilityManager.h"
#include<unistd.h>
//#define ONLY_SYM 0
#define ROOT 0
//#define DEBUG 0
......@@ -44,6 +46,8 @@ struct Dist_data {
MPI_Datatype scalars, arrays;
};
void dumb(Compute_data *computeData, struct Dist_data *dist_data); //FIXME Delte me
void init_app(Compute_data *computeData, struct Dist_data *dist_data, char* argv[]);
void get_mat_dist(Compute_data *computeData, struct Dist_data dist_data, SparseMatrix mat);
void get_rows_dist(Compute_data *computeData, int numP, int n);
......@@ -51,7 +55,7 @@ void mat_alloc(Compute_data *computeData, SparseMatrix mat, struct Dist_data dis
void computeSolution(Compute_data computeData, double **subsol, SparseMatrix mat, int myId, double **full_vec);
void pre_compute(Compute_data *computeData, struct Dist_data dist_data, double *subsol, double *full_vec);
int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm);
void free_computeData(Compute_data *computeData);
void free_computeData(Compute_data *computeData, int terminate);
//===================================MALLEABILITY FUNCTIONS====================================================
......@@ -64,6 +68,7 @@ void send_matrix(struct Dist_data dist_data, Compute_data computeData, int rootB
void dist_new(struct Dist_data *dist_data, Compute_data *computeData);
void recv_matrix(struct Dist_data *dist_data, Compute_data *computeData, int idI, int idE,
int *sendcounts, int *recvcounts,int *sdispls, int *rdispls);
void update_dist_data(struct Dist_data *dist_data);
//----------------------------------------------------------------------------------------------------
void get_dist(int total_r, int id, int numP, struct Dist_data *dist_data);
void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
......@@ -101,11 +106,10 @@ int main (int argc, char *argv[]) {
MPI_Comm_rank(MPI_COMM_WORLD, &myId);
printf("Nuevo set %d/%d\n", myId, numP);
dist_data.myId = myId;
dist_data.numP = numP;
dist_data.comm = MPI_COMM_WORLD;
int new_group = init_malleability(myId, numP, ROOT, dist_data.comm, argv[0], nodelist, num_cpus, num_nodes);
update_dist_data(&dist_data);
if( !new_group ) { //First set of processes
init_app(&computeData, &dist_data, argv);
......@@ -115,17 +119,16 @@ int main (int argc, char *argv[]) {
}
if ( computeData.iter == 0 ) {
terminate = compute(&computeData, &dist_data, sm);
}
terminate = 1;
//if(computeData.iter==0)
terminate = compute(&computeData, &dist_data, sm);
if(myId == ROOT && terminate) {
printf ("End(%d) --> (%d,%20.10e)\n", computeData.n, computeData.iter, computeData.tol);
}
// End of CG
free_malleability();
free_computeData(&computeData);
free_computeData(&computeData, 1);
MPI_Finalize();
}
......@@ -239,7 +242,6 @@ void get_rows_dist(Compute_data *computeData, int numP, int n) {
// Fill dist_rows and dist_elem so each process can make ScatterV or GatherV calls
for(i=0; i<numP; i++) {
get_dist(n, i, numP, &dist_data);
computeData->dist_rows[i] = dist_data.tamBl;
// Fill displacements
......@@ -272,9 +274,9 @@ void mat_alloc(Compute_data *computeData, SparseMatrix mat, struct Dist_data dis
MPI_Scatterv((mat.vptr)+1, computeData->dist_rows, computeData->displs_rows, MPI_INT, (computeData->subm.vptr)+1, dist_data.tamBl, MPI_INT, ROOT, MPI_COMM_WORLD);
CreateInts(&(computeData->vlen), dist_data.tamBl+1);
for(i=0; i<dist_data.tamBl+1; i++) {
computeData->vlen[i] = computeData->subm.vptr[i];
CreateInts(&(computeData->vlen), dist_data.tamBl);
for(i=0; i<dist_data.tamBl; i++) {
computeData->vlen[i] = computeData->subm.vptr[i+1];
}
TransformLengthtoHeader(computeData->subm.vptr, computeData->subm.dim1); // The array is converted from vlen to vptr
elems = computeData->subm.vptr[dist_data.tamBl];
......@@ -400,9 +402,6 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm) {
while ((computeData->iter < computeData->maxiter) && (computeData->tol > computeData->umbral)) {
//while (computeData->tol > computeData->umbral) {
if (computeData->iter == 3) { state = malleability_checkpoint(); }
if (dist_data->myId == 0) {printf("TEST %d\n", computeData->iter);}
if ((state == MALL_COMPLETED && sm == 0) || state == MALL_ZOMBIE) {break;}
// if(dist_data->myId == ROOT) printf ("(%d,%20.10e)\n", computeData->iter, computeData->tol);
......@@ -413,22 +412,35 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm) {
ProdSparseMatrixVector (computeData->subm, computeData->d_full, computeData->z); // z += A * d_full
#endif
computeData->rho = ddot (&(dist_data->tamBl), computeData->d, &IONE, computeData->z, &IONE); // rho = (d * z)
MPI_Allreduce(MPI_IN_PLACE, &computeData->rho, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); // Reduce(rho, SUM)
MPI_Allreduce(MPI_IN_PLACE, &computeData->rho, 1, MPI_DOUBLE, MPI_SUM, dist_data->comm); // Reduce(rho, SUM)
computeData->rho = computeData->beta / computeData->rho; // rho = beta / aux
daxpy (&(dist_data->tamBl), &computeData->rho, computeData->d, &IONE, computeData->vec, &IONE); // x += rho * d
computeData->rho = -computeData->rho;
daxpy (&(dist_data->tamBl), &computeData->rho, computeData->z, &IONE, computeData->res, &IONE); // res -= rho * z
computeData->alpha = computeData->beta; // alpha = beta
computeData->beta = ddot (&(dist_data->tamBl), computeData->res, &IONE, computeData->res, &IONE); // beta = res' * res
MPI_Allreduce(MPI_IN_PLACE, &computeData->beta, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); // Reduce(beta, SUM)
MPI_Allreduce(MPI_IN_PLACE, &computeData->beta, 1, MPI_DOUBLE, MPI_SUM, dist_data->comm); // Reduce(beta, SUM)
computeData->alpha = computeData->beta / computeData->alpha; // alpha = beta / alpha
dscal (&(dist_data->tamBl), &computeData->alpha, computeData->d, &IONE); // d = alpha * d
daxpy (&(dist_data->tamBl), &DONE, computeData->res, &IONE, computeData->d, &IONE); // d += res
if(computeData->iter==3) dumb(computeData,dist_data);
MPI_Allgatherv(computeData->d, dist_data->tamBl, MPI_DOUBLE, computeData->d_full,
computeData->dist_rows, computeData->displs_rows, MPI_DOUBLE, MPI_COMM_WORLD); // d_full = Gather(d)
computeData->dist_rows, computeData->displs_rows, MPI_DOUBLE, dist_data->comm); // d_full = Gather(d)
computeData->tol = sqrt (computeData->beta); // tol = sqrt(beta) = norm (res)
computeData->iter++;
if (computeData->iter == 3) {
state = malleability_checkpoint();
if ((state == MALL_COMPLETED && sm == 0) || state == MALL_ZOMBIE) {break;}
else if(state == MALL_COMPLETED) {
dumb(computeData,dist_data);
free_computeData(computeData, 0);
update_dist_data(dist_data);
dist_new(dist_data, computeData);
}
}
}
#ifdef DEBUG
if(dist_data->myId == ROOT) printf ("Ended loop\n");
......@@ -436,8 +448,23 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm) {
return ended_loop;
}
void dumb(Compute_data *computeData, struct Dist_data *dist_data) {
int i;
sleep(dist_data->myId+dist_data->numP);
printf("P%d -tamBl=%d D=", dist_data->myId, dist_data->tamBl);
/*for(i=0; i<dist_data->tamBl; i++) {
printf("%lf ", computeData->d[i]);
}*/
printf("\n");
printf("D_full=");
for(i=0; i<computeData->n; i++) {
printf("%lf ", computeData->d_full[i]);
}
printf("\n");
fflush(stdout); MPI_Barrier(dist_data->comm);
}
void free_computeData(Compute_data *computeData) {
void free_computeData(Compute_data *computeData, int terminate) {
if(computeData->res != NULL) {
RemoveDoubles (&computeData->res);
}
......@@ -452,7 +479,7 @@ void free_computeData(Compute_data *computeData) {
}
if(computeData->d_full != NULL) {
if(computeData->d_full != NULL && terminate) {
RemoveDoubles (&computeData->d_full);
}
if(computeData->subm.vptr != NULL) {
......@@ -500,52 +527,23 @@ int dist_old(struct Dist_data *dist_data, Compute_data *computeData, int num_chi
set_malleability_configuration(sm, ss, phy_dist, rm, rs);
set_children_number(num_children);
malleability_add_data(&(computeData->iter), 1, MAL_INT, 1, 1);
//malleability_add_data(&(computeData->tol), 1, MAL_DOUBLE, 1, 1);
//malleability_add_data(&(computeData->beta), 1, MAL_DOUBLE, 1, 1);
//malleability_add_data(&(computeData->umbral), 1, MAL_DOUBLE, 1, 1);
/*
malleability_add_data(&(computeData->vec), computeData->n, MAL_DOUBLE, 0, 1);
malleability_add_data(&(computeData->res), computeData->n, MAL_DOUBLE, 0, 1);
malleability_add_data(&(computeData->z), computeData->n, MAL_DOUBLE, 0, 1);
malleability_add_data(&(computeData->d_full), computeData->n, MAL_DOUBLE, 1, 1);
malleability_add_data(&(computeData->vlen), computeData->n, MAL_INT, 1, 1); //FIXME Ultimo valor puede sere asinc
malleability_add_data(&(computeData->subm.vpos), computeData->n, MAL_INT, 1, 1);
malleability_add_data(&(computeData->subm.vval), computeData->n, MAL_DOUBLE, 1, 1);
*/
}
malleability_add_data(&(computeData->n), 1, MAL_INT, MAL_DATA_ALONE, 1, 1);
malleability_add_data(&(computeData->iter), 1, MAL_INT, MAL_DATA_ALONE, 1, 1);
malleability_add_data(&(computeData->tol), 1, MAL_DOUBLE, MAL_DATA_ALONE, 1, 1);
malleability_add_data(&(computeData->beta), 1, MAL_DOUBLE, MAL_DATA_ALONE, 1, 1);
malleability_add_data(&(computeData->umbral), 1, MAL_DOUBLE, MAL_DATA_ALONE, 1, 1);
/*
MPI_Bcast(computeData->d_full, computeData->n, MPI_DOUBLE, rootBcast, dist_data.comm_children);
MPI_Alltoallv(computeData->res, sendcounts, sdispls, dist_data.arrays, NULL, recvcounts, rdispls, MPI_INT, dist_data.comm_children);
*/
malleability_add_data(computeData->d_full, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 1, 1);
void send_matrix(struct Dist_data dist_data, Compute_data computeData, int rootBcast, int numP_child, int idI, int idE,
int *sendcounts, int *recvcounts,int *sdispls, int *rdispls) {
int i;
TransformHeadertoLength(computeData.subm.vptr, computeData.subm.dim1); // De vptr a vlen
// Distribuir vlen con los hijos
MPI_Alltoallv(computeData.subm.vptr+1, sendcounts, sdispls, MPI_INT, NULL, recvcounts, rdispls, MPI_INT, dist_data.comm_children);
TransformLengthtoHeader(computeData.subm.vptr, computeData.subm.dim1); // De vlen a vptr
// Calcular cuantos elementos se van a enviar a cada proceso hijo
if(idI == 0 && sendcounts[0] > 0) {
sendcounts[0] = computeData.subm.vptr[sdispls[0] + sendcounts[0]] - computeData.subm.vptr[sdispls[0]];
idI++;
}
for(i=idI; i<idE; i++) {
if(sendcounts[i] > 0) {
sendcounts[i] = computeData.subm.vptr[sdispls[i] + sendcounts[i]] - computeData.subm.vptr[sdispls[i]];
}
sdispls[i] = sdispls[i-1] + sendcounts[i-1];
}
//print_counts(dist_data, sendcounts, sdispls, numP_child, "Send");
malleability_add_data(computeData->vec, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 0, 1);
malleability_add_data(computeData->res, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 0, 1);
malleability_add_data(computeData->z, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 0, 1);
/* COMUNICACION DE DATOS */
MPI_Alltoallv(computeData.subm.vpos, sendcounts, sdispls, MPI_INT, NULL, recvcounts, rdispls, MPI_INT, dist_data.comm_children);
MPI_Alltoallv(computeData.subm.vval, sendcounts, sdispls, MPI_DOUBLE, NULL, recvcounts, rdispls, MPI_DOUBLE, dist_data.comm_children);
//FIXME SIguientes valores pueden ser asincronos
malleability_add_data(computeData->vlen, computeData->n, MAL_INT, 1+MAL_DATA_INDEPENDENT, 0, 1);
malleability_add_data(computeData->subm.vpos, computeData->n, MAL_INT, 1+MAL_DATA_DEPENDENT, 0, 1);
malleability_add_data(computeData->subm.vval, computeData->n, MAL_DOUBLE, 1+MAL_DATA_DEPENDENT, 0, 1);
}
/*
......@@ -567,80 +565,57 @@ void send_matrix(struct Dist_data dist_data, Compute_data computeData, int rootB
*
*/
void dist_new(struct Dist_data *dist_data, Compute_data *computeData) {
int IONE = 1, i;
void *value = NULL;
malleability_get_data(&value, 0, 1, 1);
computeData->n = *((int *)value);
malleability_get_data(&value, 1, 1, 1);
computeData->iter = *((int *)value);
/* malleability_get_data(&value, 1, 1, 1);
computeData->tol = *((double *)value);
malleability_get_data(&value, 2, 1, 1);
computeData->beta = *((double *)value);
computeData->tol = *((double *)value);
malleability_get_data(&value, 3, 1, 1);
computeData->beta = *((double *)value);
malleability_get_data(&value, 4, 1, 1);
computeData->umbral = *((double *)value);
*/
/*
malleability_get_data(&value, 5, 1, 1);
computeData->d_full = ((double *)value);
malleability_get_data(&value, 0, 0, 1);
computeData->vec = ((double *)value);
malleability_get_data(&value, 1, 0, 1);
computeData->res = ((double *)value);
malleability_get_data(&value, 2, 0, 1);
computeData->z = ((double *)value);
malleability_get_data(&value, 4, 1, 1);
computeData->d_full = ((double *)value);
malleability_get_data(&value, 5, 1, 1);
computeData->subm.vptr = ((int *)value);
malleability_get_data(&value, 6, 1, 1);
computeData->subm.vpos = ((int *)value);
malleability_get_data(&value, 7, 1, 1);
computeData->subm.vval = ((double *)value);
TransformLengthtoHeader(computeData->subm.vptr, computeData->subm.dim1); // De vlen a vptr
*/
}
/*
MPI_Bcast(computeData->d_full, computeData->n, MPI_DOUBLE, ROOT, dist_data->comm_parents); // Recibir vectores RES y D_FULL
MPI_Alltoallv(aux, sendcounts, sdispls, MPI_INT, computeData->res, recvcounts, rdispls, dist_data->arrays, dist_data->comm_parents);
dcopy (&(dist_data->tamBl), &(computeData->d_full[dist_data->ini]), &IONE, computeData->d, &IONE); // Copia parcial de D_FULL a D
*/
void recv_matrix(struct Dist_data *dist_data, Compute_data *computeData, int idI, int idE,
int *sendcounts, int *recvcounts,int *sdispls, int *rdispls) {
int i;
double *aux;
int *aux_int, elems;
Compute_data dist_parents;
/* PREPARAR DATOS DE RECEPCION SOBRE MATRIZ */
get_rows_dist(&dist_parents, dist_data->numP_parents, computeData->n);
get_dist(computeData->n, dist_data->myId, dist_data->numP, dist_data);
get_rows_dist(computeData, dist_data->numP, computeData->n);
CreateSparseMatrixVptr(&(computeData->subm), dist_data->tamBl, computeData->n, 0);
CreateDoubles(&computeData->d, dist_data->tamBl);
dcopy (&(dist_data->tamBl), &(computeData->d_full[dist_data->ini]), &IONE, computeData->d, &IONE); // d = d_full[ini] to d_full[ini+tamBl]
MPI_Alltoallv(aux_int, sendcounts, sdispls, MPI_INT, (computeData->subm.vptr)+1, recvcounts, rdispls, MPI_INT, dist_data->comm_parents);
TransformLengthtoHeader(computeData->subm.vptr, computeData->subm.dim1); // De vlen a vptr
elems = computeData->subm.vptr[dist_data->tamBl];
CreateSparseMatrixValues(&(computeData->subm), dist_data->tamBl, computeData->n, elems, 0);
// Calcular cuantos elementos se van a recibir de cada proceso padre
if(idI == 0 && recvcounts[0] > 0) {
recvcounts[0] = computeData->subm.vptr[rdispls[0] + recvcounts[0]] - computeData->subm.vptr[rdispls[0]];
idI++;
}
for(i=idI; i<idE; i++) {
if(recvcounts[i] > 0) {
recvcounts[i] = computeData->subm.vptr[rdispls[i] + recvcounts[i]] - computeData->subm.vptr[rdispls[i]];
}
rdispls[i] = rdispls[i-1] + recvcounts[i-1];
malleability_get_data(&value, 3, 0, 1);
computeData->vlen = ((int *)value);
CreateSparseMatrixVptr(&(computeData->subm), dist_data->tamBl, computeData->n, 0);
computeData->subm.vptr[0] = 0;
for(i=0; i<dist_data->tamBl; i++) {
computeData->subm.vptr[i+1] = computeData->vlen[i];
}
//print_counts(*dist_data, recvcounts, rdispls, numP_parents, "Recv");
/* COMUNICACION DE DATOS */
MPI_Alltoallv(aux_int, sendcounts, sdispls, MPI_INT, computeData->subm.vpos, recvcounts, rdispls, MPI_INT, dist_data->comm_parents);
MPI_Alltoallv(aux, sendcounts, sdispls, MPI_DOUBLE, computeData->subm.vval, recvcounts, rdispls, MPI_DOUBLE, dist_data->comm_parents);
TransformLengthtoHeader(computeData->subm.vptr, computeData->subm.dim1); // The array is converted from vlen to vptr
malleability_get_data(&value, 4, 0, 1);
computeData->subm.vpos = ((int *)value);
malleability_get_data(&value, 5, 0, 1);
computeData->subm.vval = ((double *)value);
}
free(dist_parents.dist_rows);
free(dist_parents.displs_rows);
void update_dist_data(struct Dist_data *dist_data) {
int myId, numP;
get_malleability_user_comm(&(dist_data->comm));
MPI_Comm_size(dist_data->comm, &numP);
MPI_Comm_rank(dist_data->comm, &myId);
dist_data->myId = myId;
dist_data->numP = numP;
}
/*
......
......@@ -5,18 +5,18 @@
#include "distribution_methods/block_distribution.h"
#include "CommDist.h"
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void prepare_redistribution(int qty, int mal_type, int myId, int numP, int numO, int is_children_group, int is_intercomm, void **recv, struct Counts *s_counts, struct Counts *r_counts);
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty);
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method);
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win);
void sync_point2point(void *send, void *recv, int mal_type, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
void async_point2point(char *send, char *recv, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
void perform_manual_communication(void *send, void *recv, int mal_type, int myId, struct Counts s_counts, struct Counts r_counts);
void perform_manual_communication(char *send, char *recv, int myId, struct Counts s_counts, struct Counts r_counts);
void *ind_send, *ind_recv; //FIXME Borrar
void recalculate_counts(struct Counts *counts, int *array, void **recv, int mal_type);
int recalculate_elems(int *array, int ini, int fin);
/*
* Reserva memoria para un vector de hasta "qty" elementos.
......@@ -67,49 +67,70 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, MPI_Comm comm) {
int sync_communication(void *send, void **recv, int qty, int mal_type, int dependency, int myId, int numP, int numO, int is_children_group, int red_method, MPI_Comm comm) {
int is_intercomm, aux_comm_used = 0;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
MPI_Datatype datatype;
MPI_Comm aux_comm = MPI_COMM_NULL;
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
prepare_redistribution(qty, mal_type, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
/*
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
}
/* PERFORM COMMUNICATION */
switch(red_method) {
*/
if(mal_type == MAL_INT) {
datatype = MPI_INT;
} else if(mal_type == MAL_DOUBLE) {
datatype = MPI_DOUBLE;
} else if(mal_type == MAL_CHAR) {
datatype = MPI_CHAR;
} else {
printf("Malleability -- Redistribution type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
case MALL_RED_RMA_LOCKALL:
case MALL_RED_RMA_LOCK:
if(is_children_group) {
get_block_dist(qty, myId, numP, &dist_data);
} else {
get_block_dist(qty, myId, numO, &dist_data);
if(dependency == 1+MAL_DATA_DEPENDENT) {
if(is_children_group) {
recalculate_counts(&r_counts, (int *) ind_recv, recv, mal_type);
} else {
recalculate_counts(&s_counts, (int *) ind_send, recv, mal_type);
if(!is_intercomm) {
recalculate_counts(&r_counts, (int *) ind_recv, recv, mal_type);
}
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else { aux_comm = comm; }
sync_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, red_method);
break;
}
}
/* PERFORM COMMUNICATION */
switch(red_method) {
case MALL_RED_POINT:
sync_point2point(send, *recv, is_intercomm, myId, s_counts, r_counts, aux_comm);
sync_point2point(send, *recv, mal_type, is_intercomm, myId, s_counts, r_counts, aux_comm);
break;
case MALL_RED_BASELINE:
default:
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, aux_comm);
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm);
break;
}
if(aux_comm_used) {
MPI_Comm_free(&aux_comm);
}
}
if(dependency == 1+MAL_DATA_INDEPENDENT) {
if(is_children_group) {
ind_recv = *recv;
} else {
ind_send = send;
if(!is_intercomm) {
ind_recv = *recv;
}
}
}
freeCounts(&s_counts);
freeCounts(&r_counts);
return 1; //FIXME In this case is always true...
......@@ -132,14 +153,26 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
* - comm (IN): Communicator to use to perform the redistribution.
*
*/
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
void sync_point2point(void *send, void *recv, int mal_type, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
int i, j, init, end, total_sends;
MPI_Request *sends;
MPI_Datatype datatype;
if(mal_type == MAL_INT) {
datatype = MPI_INT;
} else if(mal_type == MAL_DOUBLE) {
datatype = MPI_DOUBLE;
} else if(mal_type == MAL_CHAR) {
datatype = MPI_CHAR;
} else {
printf("Malleability -- Redistribution type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
init = s_counts.idI;
end = s_counts.idE;
if(!is_intercomm && (s_counts.idI == myId || s_counts.idE == myId + 1)) {
perform_manual_communication(send, recv, myId, s_counts, r_counts);
perform_manual_communication(send, recv, mal_type, myId, s_counts, r_counts);
if(s_counts.idI == myId) init = s_counts.idI+1;
else end = s_counts.idE-1;
......@@ -152,7 +185,7 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
}
for(i=init; i<end; i++) {
sends[j] = MPI_REQUEST_NULL;
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], MPI_CHAR, i, 99, comm, &(sends[j]));
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
j++;
}
......@@ -164,7 +197,7 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
}
for(i=init; i<end; i++) {
MPI_Recv(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, 99, comm, MPI_STATUS_IGNORE);
MPI_Recv(recv+r_counts.displs[i], r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
}
if(total_sends > 0) {
......@@ -172,93 +205,6 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
}
}
/*
* Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
* how data should be redistributed
*
* - send (IN): Array with the data to send. This value can not be NULL for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - tamBl (IN): How many elements are stored in the parameter "send".
* - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
* - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
*
*/
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) {
int aux_array_used;
MPI_Win win;
aux_array_used = 0;
if(send == NULL) {
tamBl = 1;
send = malloc(tamBl*sizeof(char)); //TODO Check if the value can be NULL at WIN_create
aux_array_used = 1;
}
MPI_Win_create(send, (MPI_Aint)tamBl, sizeof(char), MPI_INFO_NULL, comm, &win);
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
sync_rma_lockall(recv, r_counts, win);
break;
case MALL_RED_RMA_LOCK:
sync_rma_lock(recv, r_counts, win);
break;
}
MPI_Win_free(&win);
if(aux_array_used) {
free(send);
send = NULL;
}
}
/*
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win) {
int i, target_displs;
target_displs = r_counts.first_target_displs;
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
MPI_Get(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win);
MPI_Win_unlock(i, win);
target_displs=0;
}
}
/*
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win) {
int i, target_displs;
target_displs = r_counts.first_target_displs;
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Get(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win);
target_displs=0;
}
MPI_Win_unlock_all(win);
}
//================================================================================
//================================================================================
//========================ASYNCHRONOUS FUNCTIONS==================================
......@@ -289,16 +235,28 @@ void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win) {
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always false...
*/
int async_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty) {
int async_communication(void *send, void **recv, int qty, int mal_type, int dependency, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty) {
int is_intercomm, aux_comm_used = 0;
struct Counts s_counts, r_counts;
MPI_Datatype datatype;
MPI_Comm aux_comm = MPI_COMM_NULL;
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
prepare_redistribution(qty, mal_type, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
check_requests(s_counts, r_counts, requests, request_qty);
if(mal_type == MAL_INT) {
datatype = MPI_INT;
} else if(mal_type == MAL_DOUBLE) {
datatype = MPI_DOUBLE;
} else if(mal_type == MAL_CHAR) {
datatype = MPI_CHAR;
} else {
printf("Malleability -- Redistribution type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
/* PERFORM COMMUNICATION */
switch(red_method) {
......@@ -306,11 +264,11 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in
case MALL_RED_RMA_LOCK:
return MALL_DENIED; //TODO Realizar versiones asíncronas
case MALL_RED_POINT:
async_point2point(send, *recv, s_counts, r_counts, comm, *requests);
async_point2point(send, *recv, datatype, s_counts, r_counts, comm, *requests);
break;
case MALL_RED_BASELINE:
default:
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm, &((*requests)[0]));
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm, &((*requests)[0]));
break;
}
......@@ -329,6 +287,18 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in
if(aux_comm_used) {
MPI_Comm_free(&aux_comm);
}
if(dependency == 1+MAL_DATA_INDEPENDENT) {
if(is_children_group) {
ind_recv = *recv;
} else {
ind_send = send;
if(!is_intercomm) {
ind_recv = *recv;
}
}
}
freeCounts(&s_counts);
freeCounts(&r_counts);
return 0; //FIXME In this case is always false...
......@@ -349,16 +319,16 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_point2point(char *send, char *recv, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
int i, j = 0;
for(i=s_counts.idI; i<s_counts.idE; i++) {
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], MPI_CHAR, i, 99, comm, &(requests[j]));
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
j++;
}
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Irecv(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, 99, comm, &(requests[j]));
MPI_Irecv(recv+r_counts.displs[i], r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
j++;
}
}
......@@ -389,13 +359,25 @@ void async_point2point(char *send, char *recv, struct Counts s_counts, struct Co
* - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
*
*/
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
void prepare_redistribution(int qty, int mal_type, int myId, int numP, int numO, int is_children_group, int is_intercomm, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
int array_size = numO;
int offset_ids = 0;
size_t datasize;
struct Dist_data dist_data;
if(mal_type == MAL_INT) {
datasize = sizeof(int);
} else if(mal_type == MAL_DOUBLE) {
datasize = sizeof(double);
} else if(mal_type == MAL_CHAR) {
datasize = sizeof(char);
} else {
printf("Malleability -- Redistribution type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
if(is_intercomm) {
offset_ids = numP; //FIXME Modify only if active?
//offset_ids = numP; //FIXME Modify only if active?
} else {
array_size = numP > numO ? numP : numO;
}
......@@ -408,9 +390,9 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
*recv = malloc(dist_data.tamBl * datasize);
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Children C ");
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 1, "Children C ");
} else {
//get_block_dist(qty, myId, numP, &dist_data);
......@@ -419,10 +401,10 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr
prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts);
// Obtener distribución para este hijo y reservar vector de recibo
get_block_dist(qty, myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Children P ");
*recv = malloc(dist_data.tamBl * datasize);
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 1, "Children P ");
}
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Parents ");
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 1, "Parents ");
}
}
......@@ -477,10 +459,32 @@ void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request
* - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
*/
void perform_manual_communication(char *send, char *recv, int myId, struct Counts s_counts, struct Counts r_counts) {
void perform_manual_communication(void *send, void *recv, int mal_type, int myId, struct Counts s_counts, struct Counts r_counts) {
int i;
for(i=0; i<s_counts.counts[myId];i++) {
recv[i+r_counts.displs[myId]] = send[i+s_counts.displs[myId]];
if(mal_type == MAL_INT) {
int *new_recv, *new_send;
new_recv = (int *) recv;
new_send = (int *) send;
for(i=0; i<s_counts.counts[myId];i++) {
new_recv[i+r_counts.displs[myId]] = new_send[i+s_counts.displs[myId]];
}
} else if(mal_type == MAL_DOUBLE) {
double *new_recv, *new_send;
new_recv = (double *) recv;
new_send = (double *) send;
for(i=0; i<s_counts.counts[myId];i++) {
new_recv[i+r_counts.displs[myId]] = new_send[i+s_counts.displs[myId]];
}
} else if(mal_type == MAL_CHAR) {
char *new_recv, *new_send;
new_recv = (char *) recv;
new_send = (char *) send;
for(i=0; i<s_counts.counts[myId];i++) {
new_recv[i+r_counts.displs[myId]] = new_send[i+s_counts.displs[myId]];
}
} else {
printf("Malleability -- Redistribution type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
}
......@@ -496,3 +500,42 @@ int malleability_red_contains_strat(int comm_strategies, int strategy, int *resu
if(result != NULL) *result = value;
return value;
}
void recalculate_counts(struct Counts *counts, int *array, void **recv, int mal_type) {
int i, ini, fin;
ini = 0;
fin = counts->counts[counts->idI];
counts->counts[counts->idI] = recalculate_elems(array, ini, fin);
for(i=counts->idI+1; i<counts->idE; i++) {
fin = counts->displs[i] + counts->counts[i];
ini = counts->displs[i];
counts->counts[i] = recalculate_elems(array, ini, fin);
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
}
if(*recv != NULL) {
int datasize, qty;
if(mal_type == MAL_INT) {
datasize = sizeof(int);
} else if(mal_type == MAL_DOUBLE) {
datasize = sizeof(double);
} else if(mal_type == MAL_CHAR) {
datasize = sizeof(char);
} else {
printf("Malleability -- Redistribution type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
qty = counts->counts[counts->idE-1] + counts->displs[counts->idE-1];
free(*recv);
*recv = malloc(qty * datasize);
}
}
int recalculate_elems(int *array, int ini, int fin) {
int i, sol = 0;
for(i=ini; i<fin; i++) {
sol += array[i];
}
return sol;
}
......@@ -16,11 +16,11 @@
//#define MAL_USE_POINT 2
//#define MAL_USE_THREAD 3
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
int async_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty);
int sync_communication(void *send, void **recv, int qty, int mal_type, int dependency, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
int async_communication(void *send, void **recv, int qty, int mal_type, int dependency, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies);
int send_async(char *array, int qty, int mal_type, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies);
void recv_async(char **array, int qty, int mal_type, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies);
void malloc_comm_array(char **array, int qty, int myId, int numP);
......
......@@ -298,18 +298,18 @@ void get_malleability_user_comm(MPI_Comm *comm) {
*
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_add_data(void *data, size_t total_qty, int type, int is_replicated, int is_constant) {
void malleability_add_data(void *data, size_t total_qty, int type, int dependency, int is_replicated, int is_constant) {
size_t total_reqs = 0;
if(is_constant) {
if(is_replicated) {
add_data(data, total_qty, type, total_reqs, rep_s_data);
add_data(data, total_qty, type, dependency, total_reqs, rep_s_data);
} else {
add_data(data, total_qty, type, total_reqs, dist_s_data);
add_data(data, total_qty, type, dependency, total_reqs, dist_s_data);
}
} else {
if(is_replicated) {
add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
add_data(data, total_qty, type, dependency, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
} else {
if(mall_conf->red_method == MALL_RED_BASELINE) {
total_reqs = 1;
......@@ -319,7 +319,7 @@ void malleability_add_data(void *data, size_t total_qty, int type, int is_replic
total_reqs = mall->numC;
}
add_data(data, total_qty, type, total_reqs, dist_a_data);
add_data(data, total_qty, type, dependency, total_reqs, dist_a_data);
}
}
}
......@@ -333,18 +333,18 @@ void malleability_add_data(void *data, size_t total_qty, int type, int is_replic
* Mas informacion en la funcion "modify_data".
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int is_replicated, int is_constant) {
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int dependency, int is_replicated, int is_constant) {
size_t total_reqs = 0;
if(is_constant) {
if(is_replicated) {
modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
modify_data(data, index, total_qty, type, dependency, total_reqs, rep_s_data);
} else {
modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
modify_data(data, index, total_qty, type, dependency, total_reqs, dist_s_data);
}
} else {
if(is_replicated) {
modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
modify_data(data, index, total_qty, type, dependency, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
} else {
if(mall_conf->red_method == MALL_RED_BASELINE) {
total_reqs = 1;
......@@ -354,7 +354,7 @@ void malleability_modify_data(void *data, size_t index, size_t total_qty, int ty
total_reqs = mall->numC;
}
modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
modify_data(data, index, total_qty, type, dependency, total_reqs, dist_a_data);
}
}
}
......@@ -424,22 +424,22 @@ void malleability_get_data(void **data, size_t index, int is_replicated, int is_
*/
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
size_t i;
char *aux_send, *aux_recv;
void *aux_send, *aux_recv;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_send = data_struct->arrays[i];
aux_recv = NULL;
async_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
async_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
&(data_struct->requests[i]), &(data_struct->request_qty[i]));
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_send = data_struct->arrays[i];
aux_recv = NULL;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
}
}
......@@ -451,20 +451,20 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
*/
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
size_t i;
char *aux, aux_s;
void *aux, *aux_s = NULL;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
async_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
aux = data_struct->arrays[i];
async_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
&(data_struct->requests[i]), &(data_struct->request_qty[i]));
data_struct->arrays[i] = (void *) aux;
data_struct->arrays[i] = aux;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
data_struct->arrays[i] = (void *) aux;
aux = data_struct->arrays[i];
sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
data_struct->arrays[i] = aux;
}
}
}
......@@ -519,8 +519,13 @@ void Children_init() {
MPI_Datatype datatype;
if(rep_s_data->types[i] == MAL_INT) {
datatype = MPI_INT;
} else {
} else if(rep_s_data->types[i] == MAL_DOUBLE) {
datatype = MPI_DOUBLE;
} else if(rep_s_data->types[i] == MAL_CHAR) {
datatype = MPI_CHAR;
} else {
printf("Malleability -- Redistribution recv type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
}
......@@ -694,8 +699,13 @@ int end_redistribution() {
MPI_Datatype datatype;
if(rep_s_data->types[i] == MAL_INT) {
datatype = MPI_INT;
} else {
} else if(rep_s_data->types[i] == MAL_DOUBLE) {
datatype = MPI_DOUBLE;
} else if(rep_s_data->types[i] == MAL_CHAR) {
datatype = MPI_CHAR;
} else {
printf("Malleability -- Redistribution data array type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
}
......
......@@ -18,8 +18,8 @@ void set_malleability_configuration(int spawn_method, int spawn_strategies, int
void set_children_number(int numC); // TODO TO BE DEPRECATED
void get_malleability_user_comm(MPI_Comm *comm);
void malleability_add_data(void *data, size_t total_qty, int type, int is_replicated, int is_constant);
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int is_replicated, int is_constant);
void malleability_add_data(void *data, size_t total_qty, int type, int dependency, int is_replicated, int is_constant);
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int dependency, int is_replicated, int is_constant);
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant);
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant);
#endif
......@@ -22,10 +22,13 @@ enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA
#define MAL_APP_EXECUTING 0
#define MAL_APP_ENDED 1
//TODO DEPRECATE
#define MAL_INT 0
#define MAL_CHAR 1
#define MAL_DOUBLE 2
#define MAL_DATA_ALONE -1
#define MAL_DATA_INDEPENDENT 0
#define MAL_DATA_DEPENDENT 1
////////////////
#define MALLEABILITY_CHILDREN 1
......
......@@ -20,7 +20,7 @@ void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleabilit
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
*/
void add_data(void *data, size_t total_qty, int type, size_t request_qty, malleability_data_t *data_struct) {
void add_data(void *data, size_t total_qty, int type, int dependency, size_t request_qty, malleability_data_t *data_struct) {
size_t i;
if(data_struct->entries == 0) {
......@@ -31,6 +31,7 @@ void add_data(void *data, size_t total_qty, int type, size_t request_qty, mallea
data_struct->qty[data_struct->entries] = total_qty;
data_struct->types[data_struct->entries] = type;
data_struct->dependencies[data_struct->entries] = dependency;
data_struct->arrays[data_struct->entries] = data;
data_struct->request_qty[data_struct->entries] = request_qty;
......@@ -49,7 +50,7 @@ void add_data(void *data, size_t total_qty, int type, size_t request_qty, mallea
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
*/
void modify_data(void *data, size_t index, size_t total_qty, int type, size_t request_qty, malleability_data_t *data_struct) {
void modify_data(void *data, size_t index, size_t total_qty, int type, int dependency, size_t request_qty, malleability_data_t *data_struct) {
size_t i;
if(data_struct->entries < index) { // Index does not exist
......@@ -62,6 +63,7 @@ void modify_data(void *data, size_t index, size_t total_qty, int type, size_t re
data_struct->qty[index] = total_qty;
data_struct->types[index] = type;
data_struct->dependencies[index] = dependency;
data_struct->arrays[index] = data;
data_struct->request_qty[index] = request_qty;
......@@ -83,10 +85,8 @@ void modify_data(void *data, size_t index, size_t total_qty, int type, size_t re
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) {
int is_intercomm, rootBcast = MPI_PROC_NULL;
size_t i, j;
datatype;
MPI_Datatype entries_type, struct_type;
MPI_Comm_test_inter(intercomm, &is_intercomm);
if(is_intercomm && !is_children_group) {
rootBcast = myId == root ? MPI_ROOT : MPI_PROC_NULL;
......@@ -112,6 +112,8 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(int));
} else if(data_struct_rep->types[i] == MAL_DOUBLE) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(double));
} else if(data_struct_rep->types[i] == MAL_CHAR) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(char));
} else {
printf("Malleability -- Redistribution data array type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
......@@ -152,6 +154,7 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size
data_struct->max_entries = size;
data_struct->qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->types = (int *) malloc(size * sizeof(int));
data_struct->dependencies = (int *) malloc(size * sizeof(int));
data_struct->request_qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *));
data_struct->arrays = (void **) malloc(size * sizeof(void *));
......@@ -169,18 +172,19 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size
*/
void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t qty_to_add) {
size_t i, needed, *qty_aux, *request_qty_aux;
int *types_aux;
int *types_aux, *dependencies_aux;
MPI_Request **requests_aux;
void **arrays_aux;
needed = data_struct->max_entries + qty_to_add;
qty_aux = (size_t *) realloc(data_struct->qty, needed * sizeof(int));
types_aux = (int *) realloc(data_struct->types, needed * sizeof(int));
dependencies_aux = (int *) realloc(data_struct->dependencies, needed * sizeof(int));
request_qty_aux = (size_t *) realloc(data_struct->request_qty, needed * sizeof(int));
requests_aux = (MPI_Request **) realloc(data_struct->requests, needed * sizeof(MPI_Request *));
arrays_aux = (void **) realloc(data_struct->arrays, needed * sizeof(void *));
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL || request_qty_aux == NULL) {
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL || dependencies_aux == NULL || request_qty_aux == NULL) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria constante de datos a redistribuir/comunicar\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
......@@ -192,6 +196,7 @@ void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t q
data_struct->qty = qty_aux;
data_struct->types = types_aux;
data_struct->dependencies = dependencies_aux;
data_struct->request_qty = request_qty_aux;
data_struct->requests = requests_aux;
data_struct->arrays = arrays_aux;
......@@ -213,6 +218,9 @@ void free_malleability_data_struct(malleability_data_t *data_struct) {
if(data_struct->types != NULL) {
free(data_struct->types);
}
if(data_struct->dependencies != NULL) {
free(data_struct->dependencies);
}
if(data_struct->requests != NULL && data_struct->request_qty != NULL) {
for(i=0; i<max; i++) {
if(data_struct->requests[i] != NULL) {
......@@ -270,22 +278,24 @@ void def_malleability_entries(malleability_data_t *data_struct_rep, malleability
* TODO Refactor?
*/
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 6;
int counts = 8;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
types[0] = types[1] = types[3] = types[4] = MPI_UNSIGNED_LONG;
types[2] = types[5] = MPI_INT;
blocklengths[0] = blocklengths[1] = blocklengths[2] = data_struct_rep->entries;
blocklengths[3] = blocklengths[4] = blocklengths[5] = data_struct_dist->entries;
types[0] = types[1] = types[4] = types[5] = MPI_UNSIGNED_LONG;
types[2] = types[3] = types[6] = types[7] = MPI_INT;
blocklengths[0] = blocklengths[1] = blocklengths[2] = blocklengths[3] = data_struct_rep->entries;
blocklengths[4] = blocklengths[5] = blocklengths[6] = blocklengths[7] = data_struct_dist->entries;
MPI_Get_address((data_struct_rep->qty), &displs[0]);
MPI_Get_address((data_struct_rep->request_qty), &displs[1]);
MPI_Get_address((data_struct_rep->types), &displs[2]);
MPI_Get_address((data_struct_dist->qty), &displs[3]);
MPI_Get_address((data_struct_dist->request_qty), &displs[4]);
MPI_Get_address((data_struct_dist->types), &displs[5]);
MPI_Get_address((data_struct_rep->dependencies), &displs[3]);
MPI_Get_address((data_struct_dist->qty), &displs[4]);
MPI_Get_address((data_struct_dist->request_qty), &displs[5]);
MPI_Get_address((data_struct_dist->types), &displs[6]);
MPI_Get_address((data_struct_dist->dependencies), &displs[7]);
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type);
......
......@@ -15,6 +15,7 @@ typedef struct {
size_t max_entries;
size_t *qty; // Indica numero de elementos en cada subvector de sync_array
int *types;
int *dependencies;
// Vector de vectores de request. En cada elemento superior se indican los requests a comprobar para dar por finalizada
// la comunicacion de ese dato
......@@ -24,8 +25,8 @@ typedef struct {
} malleability_data_t;
void add_data(void *data, size_t total_qty, int type, size_t request_qty, malleability_data_t *data_struct);
void modify_data(void *data, size_t index, size_t total_qty, int type, size_t request_qty, malleability_data_t *data_struct);
void add_data(void *data, size_t total_qty, int type, int dependency, size_t request_qty, malleability_data_t *data_struct);
void modify_data(void *data, size_t index, size_t total_qty, int type, int dependency, size_t request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm);
void free_malleability_data_struct(malleability_data_t *data_struct);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment