#include #include #include //#include //#include #include "mymkl.h" #include "ScalarVectors.h" #include "SparseMatrices.h" #include #include #include "../malleability/MAM.h" #include //#define ONLY_SYM 0 #define ROOT 0 //#define DEBUG 0 #define MAX_PROCS_SET 16 typedef struct { double umbral, tol; int iter, maxiter, n; double beta, rho, alpha; double *res, *z, *d, *vec; SparseMatrix subm; double *d_full; int *dist_elem, *displs_elem; int *dist_rows, *displs_rows; int *vlen; } Compute_data; struct Dist_data { int ini; int fin; int tamBl; // Numero de filas int tot_r; // Total de filas en la matriz int myId; int numP; int numP_parents; MPI_Comm comm, comm_children, comm_parents; MPI_Datatype scalars, arrays; }; typedef struct { SparseMatrix other_subm; int *array_vptr, *array_vpos, initiated; double start_time, *array_vval; MPI_Comm comm; MPI_Request reqs[2]; } user_redist_t; static const user_redist_t empty_user_data = { .array_vptr = NULL, .array_vpos = NULL, .array_vval = NULL, .initiated = 0, .comm = MPI_COMM_NULL }; void dumb(Compute_data *computeData, struct Dist_data *dist_data); //FIXME Delte me void init_app(Compute_data *computeData, struct Dist_data *dist_data, char* argv[]); void get_mat_dist(Compute_data *computeData, struct Dist_data dist_data, SparseMatrix mat); void get_rows_dist(Compute_data *computeData, int numP, int n); void mat_alloc(Compute_data *computeData, SparseMatrix mat, struct Dist_data dist_data); void computeSolution(Compute_data computeData, double **subsol, SparseMatrix mat, int myId, double **full_vec); void pre_compute(Compute_data *computeData, struct Dist_data dist_data, double *subsol, double *full_vec); int compute(Compute_data *computeData, struct Dist_data *dist_data, user_redist_t *user_data); void free_computeData(Compute_data *computeData); //===================================MALLEABILITY FUNCTIONS==================================================== void originals_set_data(struct Dist_data *dist_data, Compute_data *computeData, int num_target); void user_func(void *args); void targets_distribution(mam_user_reconf_t user_reconf, user_redist_t *user_data); void targets_update(struct Dist_data *dist_data, Compute_data *computeData, user_redist_t *user_data); void print_global_results(double start_time); //---------------------------------------------------------------------------------------------------- void get_dist(int total_r, int id, int numP, struct Dist_data *dist_data); void prepare_redist_counts(int *counts, int *displs, int numP_other, int offset, struct Dist_data dist_data, int *vptr); void set_counts(int id, int numP, struct Dist_data data_dist, int offset, int *sendcounts); void getIds_intercomm(struct Dist_data dist_data, int numP_other, int *idS); //---------------------------------------------------------------------------------------------------- int main (int argc, char *argv[]) { int init_numP; int req; Compute_data computeData; user_redist_t user_data; computeData.z = NULL; computeData.d_full = NULL, computeData.d = NULL; computeData.vec = NULL; computeData.res = NULL; computeData.dist_elem = NULL; computeData.displs_elem = NULL; computeData.dist_rows = NULL; computeData.displs_rows = NULL; computeData.subm.vptr = NULL; computeData.vlen = NULL; int num_targets = 1; struct Dist_data dist_data; if (argc >= 3) { num_targets = atoi(argv[2]); } MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req); MPI_Comm_size(MPI_COMM_WORLD, &dist_data.numP); MPI_Comm_rank(MPI_COMM_WORLD, &dist_data.myId); dist_data.comm = MPI_COMM_WORLD; user_data = empty_user_data; user_data.comm = dist_data.comm; int new_group = MAM_Init(ROOT, &dist_data.comm, argv[0], user_func, (void *) &user_data); if( !new_group ) { //First set of processes init_app(&computeData, &dist_data, argv); originals_set_data(&dist_data, &computeData, num_targets); init_numP = dist_data.numP; user_data.array_vptr = computeData.subm.vptr; user_data.array_vpos = computeData.subm.vpos; user_data.array_vval = computeData.subm.vval; MPI_Barrier(MPI_COMM_WORLD); user_data.start_time = MPI_Wtime(); } else { targets_update(&dist_data, &computeData, &user_data); } compute(&computeData, &dist_data, &user_data); MPI_Barrier(dist_data.comm); if(dist_data.myId == ROOT) { print_global_results(user_data.start_time); printf ("End(%d) --> (%d,%20.10e)\n", computeData.n, computeData.iter, computeData.tol); } // End of CG MAM_Finalize(); free_computeData(&computeData); if(init_numP > num_targets && dist_data.myId == 0) { MPI_Abort(MPI_COMM_WORLD, -100); } if(dist_data.comm != MPI_COMM_WORLD && dist_data.comm != MPI_COMM_NULL) MPI_Comm_free(&dist_data.comm); MPI_Finalize(); } /* * Init application data before * starting iterative computation */ void init_app(Compute_data *computeData, struct Dist_data *dist_data, char* argv[]) { SparseMatrix mat, sym; double *full_vec = NULL; double *subsol = NULL; if(dist_data->myId == ROOT) { #ifdef ONLY_SYM printf ("Working with symmetric format\n"); CreateSparseMatrixHB (argv[1], &mat, 1); #else printf ("Working with general format\n"); CreateSparseMatrixHB (argv[1], &sym, 1); DesymmetrizeSparseMatrices (sym, &mat); RemoveSparseMatrix (&sym); #endif computeData->n = mat.dim1; } // Communicate number of rows to distribute and number of elements in the matrix MPI_Bcast(&computeData->n, 1, MPI_INT, ROOT, MPI_COMM_WORLD); // Each process calcules their own distribution get_dist(computeData->n, dist_data->myId, dist_data->numP, dist_data); if(dist_data->myId == ROOT) { // ROOT gets rows and vpos/vval distribution get_mat_dist(computeData, *dist_data, mat); TransformHeadertoLength(mat.vptr, computeData->n); // From vptr to vlen } else { // Non ROOT proceses gets row distribution get_rows_dist(computeData, dist_data->numP, computeData->n); CreateInts (&computeData->dist_elem, dist_data->numP*2); InitInts (computeData->dist_elem, dist_data->numP * 2, 0.0, 0); computeData->displs_elem = computeData->dist_elem + dist_data->numP; } // Allocate for each process their submatrix and get their distribution from ROOT mat_alloc(computeData, mat, *dist_data); computeSolution(*computeData, &subsol, mat, dist_data->myId, &full_vec); pre_compute(computeData, *dist_data, subsol, full_vec); //Free Initial data RemoveDoubles(&subsol); RemoveDoubles(&full_vec); if(dist_data->myId == ROOT) { RemoveSparseMatrix(&mat); } } /* * MPI Dist * Broadcast the vptr array and each process gets the data that corresponds to itself. * * mat.vptr must be in vlen format to work correctly */ void get_mat_dist(Compute_data *computeData, struct Dist_data dist_data, SparseMatrix mat) { int i, j; struct Dist_data dist_data_aux; #ifdef DEBUG if(dist_data.myId == ROOT) printf("Distribuyendo vptr\n"); #endif CreateInts (&computeData->dist_rows, dist_data.numP); CreateInts (&computeData->displs_rows, dist_data.numP); CreateInts (&computeData->dist_elem, dist_data.numP*2); computeData->displs_elem = computeData->dist_elem + dist_data.numP; InitInts (computeData->dist_rows, dist_data.numP, 0, 0); InitInts (computeData->displs_rows, dist_data.numP, 0, 0); InitInts (computeData->dist_elem, dist_data.numP*2, 0, 0); // Fill dist_rows and dist_elem so each process can make ScatterV or GatherV calls for(i=0; in, i, dist_data.numP, &dist_data_aux); computeData->dist_rows[i] = dist_data_aux.tamBl; computeData->dist_elem[i] = mat.vptr[dist_data_aux.fin] - mat.vptr[dist_data_aux.ini]; // Fill displacements if(i!=0) { computeData->displs_elem[i] = computeData->displs_elem[i-1] + computeData->dist_elem[i-1]; computeData->displs_rows[i] = computeData->displs_rows[i-1] + computeData->dist_rows[i-1]; } } #ifdef DEBUG printf("Proc %d almacena %d filas con %d elementos\n", dist_data.myId, computeData->dist_rows[dist_data.myId], computeData->dist_elem[dist_data.myId]); fflush(stdout); #endif } /* * MPI Dist * Get the rows distribution of n rows in a given number of processes */ void get_rows_dist(Compute_data *computeData, int numP, int n) { int i, j; struct Dist_data dist_data; CreateInts (&(computeData->dist_rows), numP); CreateInts (&(computeData->displs_rows), numP); InitInts (computeData->dist_rows, numP, 0, 0); InitInts (computeData->displs_rows, numP, 0, 0); // Fill dist_rows and dist_elem so each process can make ScatterV or GatherV calls for(i=0; idist_rows[i] = dist_data.tamBl; // Fill displacements if(i!=0) { computeData->displs_rows[i] = computeData->displs_rows[i-1] + computeData->dist_rows[i-1]; } } } /* * Matrix allocation * * The matrix that each process will use is allocated and * their vptr array initialised. * * MPI Dist * Distribute vpos and vvalues data among processes * Both arrays have the same distribution */ void mat_alloc(Compute_data *computeData, SparseMatrix mat, struct Dist_data dist_data) { int i; int elems; // Number of elements this process has #ifdef DEBUG if(dist_data.myId == ROOT) printf("Distribuyendo vpos y vvalue\n"); #endif // dist_rows[myId] is the number of rows, n the number of columns, and dist_elem[myId] is the number of elements this process will have in the matrix CreateSparseMatrixVptr(&(computeData->subm), dist_data.tamBl, computeData->n, 0); computeData->subm.vptr[0] = 0; MPI_Scatterv((mat.vptr)+1, computeData->dist_rows, computeData->displs_rows, MPI_INT, (computeData->subm.vptr)+1, dist_data.tamBl, MPI_INT, ROOT, MPI_COMM_WORLD); CreateInts(&(computeData->vlen), dist_data.tamBl); for(i=0; ivlen[i] = computeData->subm.vptr[i+1]; } TransformLengthtoHeader(computeData->subm.vptr, computeData->subm.dim1); // The array is converted from vlen to vptr elems = computeData->subm.vptr[dist_data.tamBl]; CreateSparseMatrixValues(&(computeData->subm), dist_data.tamBl, computeData->n, elems, 0); MPI_Scatterv(mat.vpos, computeData->dist_elem, computeData->displs_elem, MPI_INT, computeData->subm.vpos, elems, MPI_INT, ROOT, MPI_COMM_WORLD); MPI_Scatterv(mat.vval, computeData->dist_elem, computeData->displs_elem, MPI_DOUBLE, computeData->subm.vval, elems, MPI_DOUBLE, ROOT, MPI_COMM_WORLD); // Free elem arrays, as they are not going to be used again RemoveInts (&computeData->dist_elem); } /* * Compute solution */ void computeSolution(Compute_data computeData, double **subsol, SparseMatrix mat, int myId, double **full_vec) { CreateDoubles (subsol, computeData.dist_rows[myId]); InitDoubles (*subsol, computeData.dist_rows[myId], 0.0, 0.0); CreateDoubles(full_vec, computeData.n); InitDoubles (*full_vec, computeData.n, 1.0, 0.0); //Compute SOLUTION #ifdef ONLY_SYM ProdSymSparseMatrixVector (computeData.subm, *full_vec, *subsol); // sol += A * x #else ProdSparseMatrixVector (computeData.subm, *full_vec, *subsol); // sol += A * x #endif #ifdef DEBUG int aux, i; double *solD = NULL, *sol = NULL; if(myId == ROOT) { printf("Computing solution\n"); CreateDoubles (&sol, computeData.n); CreateDoubles (&solD, computeData.n); InitDoubles (sol, computeData.n, 0.0, 0.0); InitDoubles (solD, computeData.n, 0.0, 0.0); TransformLengthtoHeader(mat.vptr, mat.dim1); // vlen to vptr (At mat_alloc was needed as vlen) } MPI_Gatherv(*subsol, computeData.dist_rows[myId], MPI_DOUBLE, sol, computeData.dist_rows, computeData.displs_rows, MPI_DOUBLE, ROOT, MPI_COMM_WORLD); if(myId == ROOT) { #ifdef ONLY_SYM ProdSymSparseMatrixVector (mat, *full_vec, solD); // solD += A * x #else ProdSparseMatrixVector (mat, *full_vec, solD); // solD += A * x #endif // ONLY_SIM aux = 1; printf("Checking sol array is ok\n"); for(i=0; ires = NULL; computeData->z = NULL; computeData->d = NULL; computeData->umbral = 1.0e-8; CreateDoubles(&computeData->res, dist_data.tamBl); CreateDoubles(&computeData->z, dist_data.tamBl); CreateDoubles(&computeData->d, dist_data.tamBl); CreateDoubles (&computeData->vec, dist_data.tamBl); CreateDoubles (&computeData->d_full, computeData->n); InitDoubles (computeData->vec, dist_data.tamBl, DZERO, DZERO); // x = 0 InitDoubles (full_vec, computeData->n, DZERO, DZERO); // full_x = 0 computeData->iter = 0; #ifdef ONLY_SYM ProdSymSparseMatrixVector (computeData->subm, full_vec, computeData->z); // z += A * full_x // mkl_dcsrsymv ("U", &n, mat.vval, mat.vptr, mat.vpos, vec, z); // z = A * full_x #else ProdSparseMatrixVector (computeData->subm, full_vec, computeData->z); // z += A * full_x #endif rcopy (&(dist_data.tamBl), subsol, &IONE, computeData->res, &IONE); // res = b raxpy (&(dist_data.tamBl), &DMONE, computeData->z, &IONE, computeData->res, &IONE); // res -= z //rcopy (&(computeData.subm.dim1), computeData.res, &IONE, &(computeData.d+computeData.displs_rows[myId]), &IONE); // d_full = res MPI_Allgatherv(computeData->res, dist_data.tamBl, MPI_DOUBLE, computeData->d_full, computeData->dist_rows, computeData->displs_rows, MPI_DOUBLE, MPI_COMM_WORLD); rcopy (&(dist_data.tamBl), &(computeData->d_full[dist_data.ini]), &IONE, computeData->d, &IONE); // d = d_full[ini] to d_full[ini+tamBl] computeData->beta = rdot (&(dist_data.tamBl), computeData->res, &IONE, computeData->res, &IONE); // beta = res' * res MPI_Allreduce(MPI_IN_PLACE, &computeData->beta, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); computeData->tol = sqrt (computeData->beta); // tol = sqrt(beta) = norm (res) } /* * Bucle de computo principal */ int compute(Compute_data *computeData, struct Dist_data *dist_data, user_redist_t *user_data) { int IZERO = 0, IONE = 1; double DONE = 1.0, DMONE = -1.0, DZERO = 0.0; int state = MAM_UNRESERVED; int ended_loop = 1; int cnt = 0; int reconfigure = 0, rec_iter = 1; computeData->maxiter = 1000; while ((computeData->iter < computeData->maxiter) && (computeData->tol > computeData->umbral)) { MPI_Allgatherv(computeData->d, dist_data->tamBl, MPI_DOUBLE, computeData->d_full, computeData->dist_rows, computeData->displs_rows, MPI_DOUBLE, dist_data->comm); // d_full = Gather(d) // COMPUTATION #ifdef ONLY_SYM ProdSymSparseMatrixVector (computeData->subm, computeData->d_full, computeData->z); // z += A * d_full #else ProdSparseMatrixVector (computeData->subm, computeData->d_full, computeData->z); // z += A * d_full #endif computeData->rho = rdot (&(dist_data->tamBl), computeData->d, &IONE, computeData->z, &IONE); // rho = (d * z) MPI_Allreduce(MPI_IN_PLACE, &computeData->rho, 1, MPI_DOUBLE, MPI_SUM, dist_data->comm); // Reduce(rho, SUM) computeData->rho = computeData->beta / computeData->rho; // rho = beta / aux raxpy (&(dist_data->tamBl), &computeData->rho, computeData->d, &IONE, computeData->vec, &IONE); // x += rho * d computeData->rho = -computeData->rho; raxpy (&(dist_data->tamBl), &computeData->rho, computeData->z, &IONE, computeData->res, &IONE); // res -= rho * z computeData->alpha = computeData->beta; // alpha = beta computeData->beta = rdot (&(dist_data->tamBl), computeData->res, &IONE, computeData->res, &IONE); // beta = res' * res MPI_Allreduce(MPI_IN_PLACE, &computeData->beta, 1, MPI_DOUBLE, MPI_SUM, dist_data->comm); // Reduce(beta, SUM) computeData->alpha = computeData->beta / computeData->alpha; // alpha = beta / alpha rscal (&(dist_data->tamBl), &computeData->alpha, computeData->d, &IONE); // d = alpha * d raxpy (&(dist_data->tamBl), &DONE, computeData->res, &IONE, computeData->d, &IONE); // d += res computeData->tol = sqrt (computeData->beta); // tol = sqrt(beta) = norm (res) computeData->iter++; if (computeData->iter == rec_iter) reconfigure = 1; if (reconfigure) { MAM_Checkpoint(&state, MAM_CHECK_COMPLETION, user_func, (void *) user_data); if(state == MAM_COMPLETED) { reconfigure = 0; free_computeData(computeData); targets_update(dist_data, computeData, user_data); } } } if(state == MAM_PENDING) { MAM_Checkpoint(&state, MAM_WAIT_COMPLETION, user_func, (void *) user_data); free_computeData(computeData); targets_update(dist_data, computeData, user_data); } #ifdef DEBUG if(dist_data->myId == ROOT) printf ("Ended loop\n"); #endif return ended_loop; } void dumb(Compute_data *computeData, struct Dist_data *dist_data) { int i; if(dist_data->myId == 0) printf("TamBL="); fflush(stdout); MPI_Barrier(dist_data->comm); for(i=0; inumP; i++) { if(dist_data->myId == i) { printf("%d, ", dist_data->tamBl); } fflush(stdout); sleep(1); MPI_Barrier(dist_data->comm); } if(dist_data->myId == 0) printf("\n"); fflush(stdout); MPI_Barrier(dist_data->comm); if(dist_data->myId == 0) printf("Vlen="); fflush(stdout); MPI_Barrier(dist_data->comm); for(i=0; inumP; i++) { if(dist_data->myId == i) { for(int j=0; jtamBl; j++) { printf("%d, ", computeData->vlen[j]); } } fflush(stdout); sleep(1); MPI_Barrier(dist_data->comm); } if(dist_data->myId == 0) printf("\n"); fflush(stdout); MPI_Barrier(dist_data->comm); if(dist_data->myId == 0) printf("Vptr="); fflush(stdout); MPI_Barrier(dist_data->comm); for(i=0; inumP; i++) { if(dist_data->myId == i) { printf("%d, ", computeData->subm.vptr[dist_data->tamBl]); } fflush(stdout); sleep(1); MPI_Barrier(dist_data->comm); } if(dist_data->myId == 0) printf("\n"); fflush(stdout); MPI_Barrier(dist_data->comm); if(dist_data->myId == 0) printf("Tol="); fflush(stdout); MPI_Barrier(dist_data->comm); for(i=0; inumP; i++) { if(dist_data->myId == i) { printf("%lf, ", computeData->tol); } fflush(stdout); sleep(1); MPI_Barrier(dist_data->comm); } if(dist_data->myId == 0) printf("\n"); fflush(stdout); MPI_Barrier(dist_data->comm); if(dist_data->myId == 0) printf("Z[last]="); fflush(stdout); MPI_Barrier(dist_data->comm); for(i=0; inumP; i++) { if(dist_data->myId == i) { printf("%lf, ", computeData->z[dist_data->tamBl-1]); } fflush(stdout); sleep(1); MPI_Barrier(dist_data->comm); } if(dist_data->myId == 0) printf("\n"); fflush(stdout); MPI_Barrier(dist_data->comm); if(dist_data->myId == 0) printf("D[last]="); fflush(stdout); MPI_Barrier(dist_data->comm); for(i=0; inumP; i++) { if(dist_data->myId == i) { printf("%lf, ", computeData->d[dist_data->tamBl-1]); } fflush(stdout); sleep(1); MPI_Barrier(dist_data->comm); } if(dist_data->myId == 0) printf("\n"); fflush(stdout); MPI_Barrier(dist_data->comm); if(dist_data->myId == 0) printf("res[last]="); fflush(stdout); MPI_Barrier(dist_data->comm); for(i=0; inumP; i++) { if(dist_data->myId == i) { printf("%lf, ", computeData->res[dist_data->tamBl-1]); } fflush(stdout); sleep(1); MPI_Barrier(dist_data->comm); } if(dist_data->myId == 0) printf("\n"); fflush(stdout); MPI_Barrier(dist_data->comm); if(dist_data->myId == 0) printf("Vec[last]="); fflush(stdout); MPI_Barrier(dist_data->comm); for(i=0; inumP; i++) { if(dist_data->myId == i) { printf("%lf, ", computeData->vec[dist_data->tamBl-1]); } fflush(stdout); sleep(1); MPI_Barrier(dist_data->comm); } if(dist_data->myId == 0) printf("\n"); fflush(stdout); MPI_Barrier(dist_data->comm); } void free_computeData(Compute_data *computeData) { if(computeData->res != NULL) { RemoveDoubles (&computeData->res); } if(computeData->z != NULL) { RemoveDoubles (&computeData->z); } if(computeData->d != NULL) { RemoveDoubles (&computeData->d); } if(computeData->vec != NULL) { RemoveDoubles (&computeData->vec); } if(computeData->d_full != NULL) { RemoveDoubles (&computeData->d_full); } if(computeData->subm.vptr != NULL) { RemoveSparseMatrix2 (&computeData->subm); } if(computeData->dist_rows != NULL) { RemoveInts (&computeData->dist_rows); } if(computeData->displs_rows != NULL) { RemoveInts (&computeData->displs_rows); } if(computeData->vlen != NULL) { RemoveInts (&computeData->vlen); } } /* * _____________________________________________________________________________________ * || || * || || * || DISTRIBUTION FUNCTIONS || * || || * || || * \_____________________________________________________________________________________/ */ /* * Las siguientes funciones están todas relacionadas con la distribución de los datos * o procesos. */ /* * ======================================================================================== * ======================================================================================== * ========================PARENTS COMMUNICATION FUNCTIONS================================= * ======================================================================================== * ======================================================================================== */ /* * Función para declarar los datos a comunicar por parte de MAM */ void originals_set_data(struct Dist_data *dist_data, Compute_data *computeData, int num_target) { size_t index; MAM_Set_target_number(num_target); MAM_Data_add(&(computeData->n), NULL, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT); MAM_Data_add(&(computeData->umbral), NULL, 1, MPI_DOUBLE, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT); MAM_Data_add(&(computeData->iter), NULL, 1, MPI_INT, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE); MAM_Data_add(&(computeData->tol), NULL, 1, MPI_DOUBLE, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE); MAM_Data_add(&(computeData->beta), NULL, 1, MPI_DOUBLE, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE); MAM_Data_add(computeData->d, NULL, computeData->n, MPI_DOUBLE, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE); MAM_Data_add(computeData->vec, NULL, computeData->n, MPI_DOUBLE, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE); MAM_Data_add(computeData->res, NULL, computeData->n, MPI_DOUBLE, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE); MAM_Data_add(computeData->z, NULL, computeData->n, MPI_DOUBLE, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE); MAM_Data_add(computeData->vlen, NULL, computeData->n, MPI_INT, MAM_DATA_DISTRIBUTED, MAM_DATA_CONSTANT); } /* * ======================================================================================== * ======================================================================================== * ========================CHILDREN COMMUNICATION FUNCTIONS================================ * ======================================================================================== * ======================================================================================== */ /* * Función llamada por MAM como callback. * * La misma realiza la redistribucion de datos por parte del usuario. * Como se usan comunicaciones no bloqueantes, primero se inicia * la comunicación y en las siguientes llamadas se comprueba si * la misma ha terminado. */ void user_func(void *args) { int local_flag, flag = 0; mam_user_reconf_t user_reconf; MAM_Get_Reconf_Info(&user_reconf); user_redist_t *user_data = (user_redist_t *) args; if(!user_data->initiated) { MPI_Bcast(&user_data->start_time, 1, MPI_DOUBLE, 0, user_reconf.comm); targets_distribution(user_reconf, user_data); user_data->initiated = 1; if(user_reconf.rank_state == MAM_PROC_NEW_RANK) { MPI_Waitall(2, user_data->reqs, MPI_STATUSES_IGNORE); flag = 1; } } else { MPI_Testall(2, user_data->reqs, &local_flag, MPI_STATUSES_IGNORE); MPI_Allreduce(&local_flag, &flag, 1, MPI_INT, MPI_MIN, user_data->comm); } if(flag) MAM_Resume_redistribution(NULL); } /* * Funcion encargada de realizar la redistribucion de datos * del usuario. * * Calcula el total de elementos a enviar/recibir por cada proceso * y tras ello llama a la funcion Ialltoallv dos veces. * * Además inicializa la memoria para aquellos procesos que vayan * a recibir datos. */ void targets_distribution(mam_user_reconf_t user_reconf, user_redist_t *user_data) { int i, n, offset, elems, numP, *vlen, *rank_states; int *scounts, *rcounts, *sdispls, *rdispls; size_t total_qty; void *value = NULL; struct Dist_data dist_data; MPI_Datatype type; int aux_int; int *recv_vpos = &aux_int; double aux_double; double *recv_vval = &aux_double; MPI_Comm_size(user_reconf.comm, &numP); scounts = calloc(numP, sizeof(int)); sdispls = calloc(numP, sizeof(int)); rcounts = calloc(numP, sizeof(int)); rdispls = calloc(numP, sizeof(int)); offset = 0; rank_states = (int *) malloc(numP * sizeof(int)); MPI_Allgather(&user_reconf.rank_state, 1, MPI_INT, rank_states, 1, MPI_INT, user_reconf.comm); MAM_Data_get_pointer(&value, 0, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_CONSTANT); vlen = ((int *)value); n = (int) total_qty; if(user_reconf.rank_state != MAM_PROC_ZOMBIE) { MPI_Comm_rank(user_data->comm, &dist_data.myId); dist_data.numP = user_reconf.numT; if(user_reconf.rank_state == MAM_PROC_NEW_RANK) { user_data->array_vpos = &aux_int; user_data->array_vval = &aux_double; for(i=0; iother_subm, dist_data.tamBl, n, 0); user_data->other_subm.vptr[0] = 0; //memcpy(user_data->other_subm.vptr+1, vlen, dist_data.tamBl * sizeof(int)); for(i=0; iother_subm.vptr[i+1] = vlen[i]; } TransformLengthtoHeader(user_data->other_subm.vptr, user_data->other_subm.dim1); // The array is converted from vlen to vptr elems = user_data->other_subm.vptr[dist_data.tamBl]; CreateSparseMatrixValues(&user_data->other_subm, dist_data.tamBl, n, elems, 0); recv_vpos = user_data->other_subm.vpos; recv_vval = user_data->other_subm.vval; prepare_redist_counts(rcounts, rdispls, user_reconf.numS, offset, dist_data, user_data->other_subm.vptr); } if(user_reconf.rank_state != MAM_PROC_NEW_RANK) { MPI_Comm_rank(user_data->comm, &dist_data.myId); dist_data.numP = user_reconf.numS; get_dist(n, dist_data.myId, dist_data.numP, &dist_data); offset = (user_reconf.numS + user_reconf.numT) == numP ? user_reconf.numS : 0; prepare_redist_counts(scounts, sdispls, user_reconf.numT, offset, dist_data, user_data->array_vptr); } // COMUNICACION DE DATOS // MPI_Ialltoallv(user_data->array_vpos, scounts, sdispls, MPI_INT, recv_vpos, rcounts, rdispls, MPI_INT, user_reconf.comm, &user_data->reqs[0]); MPI_Ialltoallv(user_data->array_vval, scounts, sdispls, MPI_DOUBLE, recv_vval, rcounts, rdispls, MPI_DOUBLE, user_reconf.comm, &user_data->reqs[1]); free(rank_states); free(scounts); free(sdispls); free(rcounts); free(rdispls); } void targets_update(struct Dist_data *dist_data, Compute_data *computeData, user_redist_t *user_data) { int IONE = 1, i; size_t entry, total_qty; double start_time; void *value = NULL; MPI_Datatype type; MPI_Comm_size(dist_data->comm, &dist_data->numP); MPI_Comm_rank(dist_data->comm, &dist_data->myId); entry = 0; MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT); computeData->n = *((int *)value); MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_CONSTANT); computeData->umbral = *((double *)value); get_dist(computeData->n, dist_data->myId, dist_data->numP, dist_data); get_rows_dist(computeData, dist_data->numP, computeData->n); entry = 0; MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE); computeData->iter = *((int *)value); MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE); computeData->tol = *((double *)value); MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_REPLICATED, MAM_DATA_VARIABLE); computeData->beta = *((double *)value); entry = 0; MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE); computeData->d = ((double *)value); CreateDoubles(&computeData->d_full, computeData->n); rcopy (&(dist_data->tamBl), computeData->d, &IONE, &(computeData->d_full[dist_data->ini]), &IONE); // d_full[ini] to d_full[ini+tamBl] = d MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE); computeData->vec = ((double *)value); MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE); computeData->res = ((double *)value); MAM_Data_get_pointer(&value, entry++, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_VARIABLE); computeData->z = ((double *)value); MAM_Data_get_pointer(&value, 0, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_CONSTANT); computeData->vlen = ((int *)value); start_time = user_data->start_time; computeData->subm = user_data->other_subm; *user_data = empty_user_data; user_data->start_time = start_time; user_data->array_vptr = computeData->subm.vptr; user_data->array_vpos = computeData->subm.vpos; user_data->array_vval = computeData->subm.vval; user_data->comm = dist_data->comm; } /* * ======================================================================================== * ======================================================================================== * ================================DISTRIBUTION FUNCTIONS================================== * ======================================================================================== * ======================================================================================== */ /* * Obtiene para el Id que se pasa junto a su * numero de procesos total, con cuantas filas (tamBl), * elementos por fila, y total de filas (fin - ini) * con las que va a trabajar el proceso */ void get_dist(int total_r, int id, int numP, struct Dist_data *dist_data) { int rem; dist_data->tot_r = total_r; dist_data->tamBl = total_r / numP; rem = total_r % numP; if(id < rem) { // First subgroup dist_data->ini = id * dist_data->tamBl + id; dist_data->fin = (id+1) * dist_data->tamBl + (id+1); } else { // Second subgroup dist_data->ini = id * dist_data->tamBl + rem; dist_data->fin = (id+1) * dist_data->tamBl + rem; } if(dist_data->fin > total_r) { dist_data->fin = total_r; } if(dist_data->ini > dist_data->fin) { dist_data->ini = dist_data->fin; } dist_data->tamBl = dist_data->fin - dist_data->ini; } void prepare_redist_counts(int *counts, int *displs, int numP_other, int offset, struct Dist_data dist_data, int *vptr) { int idS[2], i, idS_zero; int last_index, first_index; getIds_intercomm(dist_data, numP_other, idS); idS[0] += offset; idS[1] += offset; idS_zero = 0; if(!idS[0]) { set_counts(0, numP_other, dist_data, offset, counts); idS_zero = 1; } for(i=idS[0] + idS_zero; i= other.fin || data_dist.fin <= other.ini) { return; } // Obtiene el proceso con mayor ini entre los dos procesos biggest_ini = (data_dist.ini > other.ini) ? data_dist.ini : other.ini; // Obtiene el proceso con menor fin entre los dos procesos smallest_end = (data_dist.fin < other.fin) ? data_dist.fin : other.fin; sendcounts[id] = smallest_end - biggest_ini; // Numero de elementos a enviar/recibir del proceso Id } /* * Obtiene para un proceso de un grupo a que rango procesos de * otro grupo tiene que enviar o recibir datos. * * Devuelve el primer identificador y el último (Excluido) con el que * comunicarse. */ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int *idS) { int idI, idE; int tamOther = dist_data.tot_r / numP_other; int remOther = dist_data.tot_r % numP_other; int middle = (tamOther + 1) * remOther; if(middle > dist_data.ini) { // First subgroup idI = dist_data.ini / (tamOther + 1); } else { // Second subgroup idI = ((dist_data.ini - middle) / tamOther) + remOther; } if(middle >= dist_data.fin) { // First subgroup idE = dist_data.fin / (tamOther + 1); idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE; } else { // Second subgroup idE = ((dist_data.fin - middle) / tamOther) + remOther; idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE; } idS[0] = idI; idS[1] = idE; } void print_global_results(double start_time) { size_t i; double sp_time, sy_time, asy_time, mall_time, global_time; MAM_Retrieve_times(&sp_time, &sy_time, &asy_time, &mall_time); global_time = MPI_Wtime() - start_time; printf("T_spawn: %lf", sp_time); printf("\nT_SR: %lf", sy_time); printf("\nT_AR: %lf", asy_time); printf("\nT_Malleability: %lf", mall_time); printf("\nT_total: %lf\n", global_time); } /* double starttime, endtime, total, res; MPI_Barrier(MPI_COMM_WORLD); starttime = MPI_Wtime(); endtime = MPI_Wtime(); total = endtime - starttime; MPI_Reduce(&total, &res, 1, MPI_DOUBLE, MPI_MAX, ROOT, MPI_COMM_WORLD); if(dist_data.myId == ROOT) {printf("Tiempo BCAST PADRE %f\n", total); fflush(stdout);} */