Commit fe41880f authored by iker_martin's avatar iker_martin
Browse files

Fixed error for shm communications between dynamic processes by adding prctl....

Fixed error for shm communications between dynamic processes by adding prctl. Added synchronous user communication.
parent 4015ba23
......@@ -2,9 +2,9 @@
#SBATCH -N 1
#SBATCH -p P1
#SBATCH -t 01:00:00
#SBATCH -t 10:00:00
module load /home/martini/MODULES/modulefiles/mpich-4.0.3-ofi
nodelist=$SLURM_JOB_NODELIST
nodes=$SLURM_JOB_NUM_NODES
......
......@@ -13,8 +13,6 @@ export dirCG
#matrix="bcsstk01.rsa"
matrix="bcsstk17.rsa"
module load /home/martini/MODULES/modulefiles/mpich-4.0.3-ofi
numP=$1
numC=$2
qty=1
......
......@@ -13,4 +13,4 @@ numC=$2
echo "Test"
#mpirun -hosts $initial_nodelist -np $numP valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --trace-children=yes --log-file=nc.vg.%p ./build/a.out bcsstk01.rsa $numC
mpirun -np $numP valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --trace-children=yes --log-file=nc.vg.%p $dirCG/build/a.out ../bcsstk01.rsa $numC
mpirun -np $numP valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --trace-children=yes --log-file=nc.vg.%p $dirCG/build/a.out ../bcsstk01.rsa $numC
......@@ -9,6 +9,7 @@
#include <mpi.h>
#include <string.h>
#include "../malleability/MAM.h"
#include <sys/prctl.h>
#include <unistd.h>
......@@ -49,7 +50,8 @@ struct Dist_data {
typedef struct {
SparseMatrix other_subm;
int *array_vptr, *array_vpos, initiated;
int *recv_vlen;
int *array_vptr, *array_vlen, *array_vpos, n, initiated;
double start_time, *array_vval;
MPI_Comm comm;
MPI_Request reqs[2];
......@@ -57,13 +59,16 @@ typedef struct {
static const user_redist_t empty_user_data = {
.array_vptr = NULL,
.array_vlen = NULL,
.recv_vlen = NULL,
.array_vpos = NULL,
.array_vval = NULL,
.n = 0,
.initiated = 0,
.comm = MPI_COMM_NULL
};
void dumb(Compute_data *computeData, struct Dist_data *dist_data); //FIXME Delte me
void dump(Compute_data *computeData, struct Dist_data *dist_data); //FIXME Delte me
void init_app(Compute_data *computeData, struct Dist_data *dist_data, char* argv[]);
void get_mat_dist(Compute_data *computeData, struct Dist_data dist_data, SparseMatrix mat);
......@@ -78,12 +83,15 @@ void free_computeData(Compute_data *computeData);
void originals_set_data(struct Dist_data *dist_data, Compute_data *computeData, int num_target);
void user_func(void *args);
void print_counts2(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, int include_zero, const char* name);
void targets_distribution(mam_user_reconf_t user_reconf, user_redist_t *user_data);
void targets_distribution_synch(mam_user_reconf_t user_reconf, user_redist_t *user_data);
void targets_update(struct Dist_data *dist_data, Compute_data *computeData, user_redist_t *user_data);
void print_global_results(double start_time);
//----------------------------------------------------------------------------------------------------
void get_dist(int total_r, int id, int numP, struct Dist_data *dist_data);
void prepare_redist_counts(int *counts, int *displs, int numP_other, int offset, struct Dist_data dist_data, int *vptr);
void prepare_redist_counts_vlen(int *counts, int *displs, int numP_other, int offset, struct Dist_data dist_data);
void set_counts(int id, int numP, struct Dist_data data_dist, int offset, int *sendcounts);
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int *idS);
//----------------------------------------------------------------------------------------------------
......@@ -114,6 +122,7 @@ int main (int argc, char *argv[]) {
user_data = empty_user_data;
user_data.comm = dist_data.comm;
prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY, 0, 0, 0);
int new_group = MAM_Init(ROOT, &dist_data.comm, argv[0], user_func, (void *) &user_data);
if( !new_group ) { //First set of processes
......@@ -121,7 +130,9 @@ int main (int argc, char *argv[]) {
originals_set_data(&dist_data, &computeData, num_targets);
init_numP = dist_data.numP;
user_data.n = computeData.n;
user_data.array_vptr = computeData.subm.vptr;
user_data.array_vlen = computeData.vlen;
user_data.array_vpos = computeData.subm.vpos;
user_data.array_vval = computeData.subm.vval;
MPI_Barrier(MPI_COMM_WORLD);
......@@ -412,8 +423,7 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, user_redist_
double DONE = 1.0, DMONE = -1.0, DZERO = 0.0;
int state = MAM_UNRESERVED;
int ended_loop = 1;
int cnt = 0;
int reconfigure = 0, rec_iter = 1;
int reconfigure = 0, rec_iter = 500;
computeData->maxiter = 1000;
......@@ -443,9 +453,8 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, user_redist_
computeData->tol = sqrt (computeData->beta); // tol = sqrt(beta) = norm (res)
computeData->iter++;
if (computeData->iter == rec_iter) reconfigure = 1;
if (computeData->iter == rec_iter) { reconfigure = 1;}
if (reconfigure) {
MAM_Checkpoint(&state, MAM_CHECK_COMPLETION, user_func, (void *) user_data);
if(state == MAM_COMPLETED) {
reconfigure = 0;
......@@ -454,7 +463,7 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, user_redist_
}
}
}
if(state == MAM_PENDING) {
......@@ -468,7 +477,7 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, user_redist_
return ended_loop;
}
void dumb(Compute_data *computeData, struct Dist_data *dist_data) {
void dump(Compute_data *computeData, struct Dist_data *dist_data) {
int i;
if(dist_data->myId == 0) printf("TamBL=");
......@@ -572,7 +581,6 @@ void dumb(Compute_data *computeData, struct Dist_data *dist_data) {
}
if(dist_data->myId == 0) printf("\n");
fflush(stdout); MPI_Barrier(dist_data->comm);
if(dist_data->myId == 0) printf("Vec[last]=");
fflush(stdout); MPI_Barrier(dist_data->comm);
for(i=0; i<dist_data->numP; i++) {
......@@ -605,6 +613,7 @@ void free_computeData(Compute_data *computeData) {
if(computeData->d_full != NULL) {
RemoveDoubles (&computeData->d_full);
}
if(computeData->subm.vptr != NULL) {
RemoveSparseMatrix2 (&computeData->subm);
}
......@@ -615,6 +624,7 @@ void free_computeData(Compute_data *computeData) {
if(computeData->displs_rows != NULL) {
RemoveInts (&computeData->displs_rows);
}
if(computeData->vlen != NULL) {
RemoveInts (&computeData->vlen);
}
......@@ -653,6 +663,8 @@ void originals_set_data(struct Dist_data *dist_data, Compute_data *computeData,
MAM_Data_add(&(computeData->n), NULL, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
MAM_Data_add(&(computeData->umbral), NULL, 1, MPI_DOUBLE, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
//MAM_Data_add(&(computeData->n), NULL, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
//MAM_Data_add(&(computeData->umbral), NULL, 1, MPI_DOUBLE, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
MAM_Data_add(&(computeData->iter), NULL, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
MAM_Data_add(&(computeData->tol), NULL, 1, MPI_DOUBLE, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
MAM_Data_add(&(computeData->beta), NULL, 1, MPI_DOUBLE, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
......@@ -690,6 +702,10 @@ void user_func(void *args) {
user_redist_t *user_data = (user_redist_t *) args;
if(!user_data->initiated) {
MPI_Bcast(&user_data->start_time, 1, MPI_DOUBLE, 0, user_reconf.comm);
//targets_distribution_synch(user_reconf, user_data);
//flag = 1;
targets_distribution(user_reconf, user_data);
user_data->initiated = 1;
......@@ -706,10 +722,9 @@ void user_func(void *args) {
if(flag) MAM_Resume_redistribution(NULL);
}
/*
* Funcion encargada de realizar la redistribucion de datos
* del usuario.
* asíncrona del usuario.
*
* Calcula el total de elementos a enviar/recibir por cada proceso
* y tras ello llama a la funcion Ialltoallv dos veces.
......@@ -772,6 +787,7 @@ void targets_distribution(mam_user_reconf_t user_reconf, user_redist_t *user_dat
recv_vval = user_data->other_subm.vval;
prepare_redist_counts(rcounts, rdispls, user_reconf.numS, offset, dist_data, user_data->other_subm.vptr);
// print_counts2(dist_data, rcounts, rdispls, numP, 0, "TARGETS");
}
if(user_reconf.rank_state != MAM_PROC_NEW_RANK) {
......@@ -781,6 +797,7 @@ void targets_distribution(mam_user_reconf_t user_reconf, user_redist_t *user_dat
offset = (user_reconf.numS + user_reconf.numT) == numP ?
user_reconf.numS : 0;
prepare_redist_counts(scounts, sdispls, user_reconf.numT, offset, dist_data, user_data->array_vptr);
// print_counts2(dist_data, scounts, sdispls, numP, 0, "SOURCES");
}
// COMUNICACION DE DATOS //
......@@ -791,6 +808,126 @@ void targets_distribution(mam_user_reconf_t user_reconf, user_redist_t *user_dat
free(scounts); free(sdispls); free(rcounts); free(rdispls);
}
/*
* Funcion encargada de realizar la redistribucion de datos
* síncrona del usuario.
*
* Calcula el total de elementos a enviar/recibir por cada proceso
* y tras ello llama a la funcion Ialltoallv dos veces.
*
* Además inicializa la memoria para aquellos procesos que vayan
* a recibir datos.
*/
void targets_distribution_synch(mam_user_reconf_t user_reconf, user_redist_t *user_data) {
int i, n, offset, elems, rank, numP, *vlen, *rank_states;
int *scounts, *rcounts, *sdispls, *rdispls;
size_t total_qty;
void *value = NULL;
struct Dist_data dist_data;
MPI_Datatype type;
int aux_int;
int *recv_vpos = &aux_int;
double aux_double;
double *recv_vval = &aux_double;
user_data->recv_vlen = &aux_int;
MPI_Comm_rank(user_reconf.comm, &rank);
MPI_Comm_size(user_reconf.comm, &numP);
scounts = calloc(numP, sizeof(int));
sdispls = calloc(numP, sizeof(int));
rcounts = calloc(numP, sizeof(int));
rdispls = calloc(numP, sizeof(int));
offset = 0;
rank_states = (int *) malloc(numP * sizeof(int));
MPI_Allgather(&user_reconf.rank_state, 1, MPI_INT, rank_states, 1, MPI_INT, user_reconf.comm);
if(rank == 0) n = user_data->n;
MPI_Bcast(&n, 1, MPI_INT, 0, user_reconf.comm);
if(user_reconf.rank_state != MAM_PROC_ZOMBIE) {
MPI_Comm_rank(user_data->comm, &dist_data.myId);
dist_data.numP = user_reconf.numT;
if(user_reconf.rank_state == MAM_PROC_NEW_RANK) {
user_data->array_vlen = &aux_int;
for(i=0; i<user_reconf.numS; i++) {
if(rank_states[i] == MAM_PROC_CONTINUE) {
dist_data.myId += user_reconf.numS;
break;
}
}
}
get_dist(n, dist_data.myId, dist_data.numP, &dist_data);
CreateInts(&user_data->recv_vlen, dist_data.tamBl);
prepare_redist_counts_vlen(rcounts, rdispls, user_reconf.numS, offset, dist_data);
// print_counts2(dist_data, rcounts, rdispls, numP, 0, "TARGETS");
}
if(user_reconf.rank_state != MAM_PROC_NEW_RANK) {
MPI_Comm_rank(user_data->comm, &dist_data.myId);
dist_data.numP = user_reconf.numS;
get_dist(n, dist_data.myId, dist_data.numP, &dist_data);
offset = (user_reconf.numS + user_reconf.numT) == numP ?
user_reconf.numS : 0;
prepare_redist_counts_vlen(scounts, sdispls, user_reconf.numT, offset, dist_data);
// print_counts2(dist_data, scounts, sdispls, numP, 0, "SOURCES");
}
// COMUNICACION DE DATOS //
MPI_Alltoallv(user_data->array_vlen, scounts, sdispls, MPI_INT, user_data->recv_vlen, rcounts, rdispls, MPI_INT, user_reconf.comm);
free(scounts); free(sdispls); free(rcounts); free(rdispls);
scounts = calloc(numP, sizeof(int));
sdispls = calloc(numP, sizeof(int));
rcounts = calloc(numP, sizeof(int));
rdispls = calloc(numP, sizeof(int));
offset = 0;
if(user_reconf.rank_state != MAM_PROC_ZOMBIE) {
MPI_Comm_rank(user_data->comm, &dist_data.myId);
dist_data.numP = user_reconf.numT;
if(user_reconf.rank_state == MAM_PROC_NEW_RANK) {
user_data->array_vlen = &aux_int;
for(i=0; i<user_reconf.numS; i++) {
if(rank_states[i] == MAM_PROC_CONTINUE) {
dist_data.myId += user_reconf.numS;
break;
}
}
}
get_dist(n, dist_data.myId, dist_data.numP, &dist_data);
CreateSparseMatrixVptr(&user_data->other_subm, dist_data.tamBl, n, 0);
user_data->other_subm.vptr[0] = 0;
//memcpy(user_data->other_subm.vptr+1, vlen, dist_data.tamBl * sizeof(int));
for(i=0; i<dist_data.tamBl; i++) {
user_data->other_subm.vptr[i+1] = user_data->recv_vlen[i];
}
TransformLengthtoHeader(user_data->other_subm.vptr, user_data->other_subm.dim1); // The array is converted from vlen to vptr
elems = user_data->other_subm.vptr[dist_data.tamBl];
CreateSparseMatrixValues(&user_data->other_subm, dist_data.tamBl, n, elems, 0);
recv_vpos = user_data->other_subm.vpos;
recv_vval = user_data->other_subm.vval;
prepare_redist_counts(rcounts, rdispls, user_reconf.numS, offset, dist_data, user_data->other_subm.vptr);
}
if(user_reconf.rank_state != MAM_PROC_NEW_RANK) {
MPI_Comm_rank(user_data->comm, &dist_data.myId);
dist_data.numP = user_reconf.numS;
get_dist(n, dist_data.myId, dist_data.numP, &dist_data);
offset = (user_reconf.numS + user_reconf.numT) == numP ?
user_reconf.numS : 0;
prepare_redist_counts(scounts, sdispls, user_reconf.numT, offset, dist_data, user_data->array_vptr);
}
// COMUNICACION DE DATOS //
MPI_Alltoallv(user_data->array_vpos, scounts, sdispls, MPI_INT, recv_vpos, rcounts, rdispls, MPI_INT, user_reconf.comm);
MPI_Alltoallv(user_data->array_vval, scounts, sdispls, MPI_DOUBLE, recv_vval, rcounts, rdispls, MPI_DOUBLE, user_reconf.comm);
free(rank_states);
free(scounts); free(sdispls); free(rcounts); free(rdispls);
}
void targets_update(struct Dist_data *dist_data, Compute_data *computeData, user_redist_t *user_data) {
int IONE = 1, i;
size_t entry, total_qty;
......@@ -803,8 +940,10 @@ void targets_update(struct Dist_data *dist_data, Compute_data *computeData, user
entry = 0;
MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
//MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
computeData->n = *((int *)value);
MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT);
//MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE);
computeData->umbral = *((double *)value);
get_dist(computeData->n, dist_data->myId, dist_data->numP, dist_data);
......@@ -834,12 +973,14 @@ void targets_update(struct Dist_data *dist_data, Compute_data *computeData, user
MAM_Data_get_pointer(&value, 0, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_CONSTANT);
computeData->vlen = ((int *)value);
//computeData->vlen = user_data->recv_vlen;
start_time = user_data->start_time;
computeData->subm = user_data->other_subm;
*user_data = empty_user_data;
user_data->start_time = start_time;
user_data->array_vptr = computeData->subm.vptr;
user_data->array_vlen = computeData->vlen;
user_data->array_vpos = computeData->subm.vpos;
user_data->array_vval = computeData->subm.vval;
user_data->comm = dist_data->comm;
......@@ -915,6 +1056,25 @@ void prepare_redist_counts(int *counts, int *displs, int numP_other, int offset,
}
}
void prepare_redist_counts_vlen(int *counts, int *displs, int numP_other, int offset, struct Dist_data dist_data) {
int idS[2], i, idS_zero;
int last_index, first_index;
getIds_intercomm(dist_data, numP_other, idS);
idS[0] += offset;
idS[1] += offset;
idS_zero = 0;
if(!idS[0]) {
set_counts(0, numP_other, dist_data, offset, counts);
idS_zero = 1;
}
for(i=idS[0] + idS_zero; i<idS[1]; i++) {
set_counts(i, numP_other, dist_data, offset, counts);
displs[i] = displs[i-1] + counts[i-1];
}
}
/*
* Obtiene para un Id de proceso, cuantos elementos va
* a enviar/recibir el proceso myId
......@@ -969,6 +1129,16 @@ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int *idS) {
idS[1] = idE;
}
void print_counts2(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, int include_zero, const char* name) {
int i;
for(i=0; i < size; i++) {
if(xcounts[i] != 0 || include_zero) {
printf("P%d of %d | %scounts[%d]=%d disp=%d\n", data_dist.myId, data_dist.numP, name, i, xcounts[i], xdispls[i]);
}
}
}
void print_global_results(double start_time) {
size_t i;
double sp_time, sy_time, asy_time, mall_time, global_time;
......
CC = gcc
MCC = mpicc
#C_FLAGS_ALL = -Wall -Wextra -Wshadow -Wfatal-errors -Wconversion -Wpedantic
C_FLAGS =
C_FLAGS =
LD_FLAGS = -lm -pthread
DEF =
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment