#include #include #include #include #include "ToolsMAM.h" struct Dist_data { int ini; int fin; int tamBl; // Numero de filas int n; int myId; int numP; int numP_parents; MPI_Comm comm; }; //---------------------------------------------------------------------------------------------------- void get_dist(int total_r, int id, int numP, struct Dist_data *dist_data); void prepare_redist_counts(int *counts, int *displs, int numP_other, int offset, struct Dist_data dist_data, int *vptr); void prepare_redist_counts_vlen(int *counts, int *displs, int numP_other, int offset, struct Dist_data dist_data); void set_counts(int id, int numP, struct Dist_data data_dist, int offset, int *sendcounts); void getIds_intercomm(struct Dist_data dist_data, int numP_other, int *idS); //---------------------------------------------------------------------------------------------------- void print_counts2(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, int include_zero, const char* name); void print_global_results(double start_time); //---------------------------------------------------------------------------------------------------- /* * Funcion encargada de realizar la redistribucion de datos * asíncrona del usuario. * * Calcula el total de elementos a enviar/recibir por cada proceso * y tras ello llama a la funcion Ialltoallv dos veces. * * Además inicializa la memoria para aquellos procesos que vayan * a recibir datos. */ void targets_distribution(mam_user_reconf_t user_reconf, user_redist_t *user_data) { int i, n, offset, elems, numP, *vlen, *rank_states; int *scounts, *rcounts, *sdispls, *rdispls; size_t total_qty; void *value = NULL; struct Dist_data dist_data; MPI_Datatype type; int aux_int; int *recv_vpos = &aux_int; double aux_double; double *recv_vval = &aux_double; MPI_Comm_size(user_reconf.comm, &numP); scounts = (int *) calloc(numP, sizeof(int)); sdispls = (int *) calloc(numP, sizeof(int)); rcounts = (int *) calloc(numP, sizeof(int)); rdispls = (int *) calloc(numP, sizeof(int)); offset = 0; rank_states = (int *) malloc(numP * sizeof(int)); MPI_Allgather(&user_reconf.rank_state, 1, MPI_INT, rank_states, 1, MPI_INT, user_reconf.comm); MAM_Data_get_pointer(&value, 0, &total_qty, &type, MAM_DATA_DISTRIBUTED, MAM_DATA_CONSTANT); vlen = ((int *)value); n = (int) total_qty; if(user_reconf.rank_state != MAM_PROC_ZOMBIE) { MPI_Comm_rank(user_data->comm, &dist_data.myId); dist_data.numP = user_reconf.numT; if(user_reconf.rank_state == MAM_PROC_NEW_RANK) { user_data->array_vpos = &aux_int; user_data->array_vval = &aux_double; for(i=0; iother_subm, dist_data.tamBl, n, 0); user_data->other_subm.vptr[0] = 0; for(i=0; iother_subm.vptr[i+1] = vlen[i]; } TransformLengthtoHeader(user_data->other_subm.vptr, user_data->other_subm.dim1); // The array is converted from vlen to vptr elems = user_data->other_subm.vptr[dist_data.tamBl]; CreateSparseMatrixValues(&user_data->other_subm, dist_data.tamBl, n, elems, 0); recv_vpos = user_data->other_subm.vpos; recv_vval = user_data->other_subm.vval; prepare_redist_counts(rcounts, rdispls, user_reconf.numS, offset, dist_data, user_data->other_subm.vptr); // print_counts2(dist_data, rcounts, rdispls, numP, 0, "TARGETS"); } if(user_reconf.rank_state != MAM_PROC_NEW_RANK) { MPI_Comm_rank(user_data->comm, &dist_data.myId); dist_data.numP = user_reconf.numS; get_dist(n, dist_data.myId, dist_data.numP, &dist_data); offset = (user_reconf.numS + user_reconf.numT) == numP ? user_reconf.numS : 0; prepare_redist_counts(scounts, sdispls, user_reconf.numT, offset, dist_data, user_data->array_vptr); // print_counts2(dist_data, scounts, sdispls, numP, 0, "SOURCES"); } // COMUNICACION DE DATOS // MPI_Ialltoallv(user_data->array_vpos, scounts, sdispls, MPI_INT, recv_vpos, rcounts, rdispls, MPI_INT, user_reconf.comm, &user_data->reqs[0]); MPI_Ialltoallv(user_data->array_vval, scounts, sdispls, MPI_DOUBLE, recv_vval, rcounts, rdispls, MPI_DOUBLE, user_reconf.comm, &user_data->reqs[1]); free(rank_states); free(scounts); free(sdispls); free(rcounts); free(rdispls); } void targets_distribution_synch(mam_user_reconf_t user_reconf, user_redist_t *user_data) { int i, n, offset, elems, rank, numP, *rank_states; int *scounts, *rcounts, *sdispls, *rdispls; //size_t total_qty; //void *value = NULL; struct Dist_data dist_data; //MPI_Datatype type; int aux_int; int *recv_vpos = &aux_int; double aux_double; double *recv_vval = &aux_double; user_data->recv_vlen = &aux_int; MPI_Comm_rank(user_reconf.comm, &rank); MPI_Comm_size(user_reconf.comm, &numP); scounts = calloc(numP, sizeof(int)); sdispls = calloc(numP, sizeof(int)); rcounts = calloc(numP, sizeof(int)); rdispls = calloc(numP, sizeof(int)); offset = 0; rank_states = (int *) malloc(numP * sizeof(int)); MPI_Allgather(&user_reconf.rank_state, 1, MPI_INT, rank_states, 1, MPI_INT, user_reconf.comm); if(rank == 0) n = user_data->n; MPI_Bcast(&n, 1, MPI_INT, 0, user_reconf.comm); if(user_reconf.rank_state != MAM_PROC_ZOMBIE) { MPI_Comm_rank(user_data->comm, &dist_data.myId); dist_data.numP = user_reconf.numT; if(user_reconf.rank_state == MAM_PROC_NEW_RANK) { user_data->array_vlen = &aux_int; for(i=0; irecv_vlen, dist_data.tamBl); prepare_redist_counts_vlen(rcounts, rdispls, user_reconf.numS, offset, dist_data); // print_counts2(dist_data, rcounts, rdispls, numP, 0, "TARGETS"); } if(user_reconf.rank_state != MAM_PROC_NEW_RANK) { MPI_Comm_rank(user_data->comm, &dist_data.myId); dist_data.numP = user_reconf.numS; get_dist(n, dist_data.myId, dist_data.numP, &dist_data); offset = (user_reconf.numS + user_reconf.numT) == numP ? user_reconf.numS : 0; prepare_redist_counts_vlen(scounts, sdispls, user_reconf.numT, offset, dist_data); // print_counts2(dist_data, scounts, sdispls, numP, 0, "SOURCES"); } // COMUNICACION DE DATOS // MPI_Alltoallv(user_data->array_vlen, scounts, sdispls, MPI_INT, user_data->recv_vlen, rcounts, rdispls, MPI_INT, user_reconf.comm); free(scounts); free(sdispls); free(rcounts); free(rdispls); scounts = calloc(numP, sizeof(int)); sdispls = calloc(numP, sizeof(int)); rcounts = calloc(numP, sizeof(int)); rdispls = calloc(numP, sizeof(int)); offset = 0; if(user_reconf.rank_state != MAM_PROC_ZOMBIE) { MPI_Comm_rank(user_data->comm, &dist_data.myId); dist_data.numP = user_reconf.numT; if(user_reconf.rank_state == MAM_PROC_NEW_RANK) { user_data->array_vlen = &aux_int; for(i=0; iother_subm, dist_data.tamBl, n, 0); user_data->other_subm.vptr[0] = 0; //memcpy(user_data->other_subm.vptr+1, vlen, dist_data.tamBl * sizeof(int)); for(i=0; iother_subm.vptr[i+1] = user_data->recv_vlen[i]; } TransformLengthtoHeader(user_data->other_subm.vptr, user_data->other_subm.dim1); // The array is converted from vlen to vptr elems = user_data->other_subm.vptr[dist_data.tamBl]; CreateSparseMatrixValues(&user_data->other_subm, dist_data.tamBl, n, elems, 0); recv_vpos = user_data->other_subm.vpos; recv_vval = user_data->other_subm.vval; prepare_redist_counts(rcounts, rdispls, user_reconf.numS, offset, dist_data, user_data->other_subm.vptr); } if(user_reconf.rank_state != MAM_PROC_NEW_RANK) { MPI_Comm_rank(user_data->comm, &dist_data.myId); dist_data.numP = user_reconf.numS; get_dist(n, dist_data.myId, dist_data.numP, &dist_data); offset = (user_reconf.numS + user_reconf.numT) == numP ? user_reconf.numS : 0; prepare_redist_counts(scounts, sdispls, user_reconf.numT, offset, dist_data, user_data->array_vptr); } // COMUNICACION DE DATOS // MPI_Alltoallv(user_data->array_vpos, scounts, sdispls, MPI_INT, recv_vpos, rcounts, rdispls, MPI_INT, user_reconf.comm); MPI_Alltoallv(user_data->array_vval, scounts, sdispls, MPI_DOUBLE, recv_vval, rcounts, rdispls, MPI_DOUBLE, user_reconf.comm); free(rank_states); free(scounts); free(sdispls); free(rcounts); free(rdispls); } /* * ======================================================================================== * ======================================================================================== * ================================DISTRIBUTION FUNCTIONS================================== * ======================================================================================== * ======================================================================================== */ /* * Obtiene para el Id que se pasa junto a su * numero de procesos total, con cuantas filas (tamBl), * elementos por fila, y total de filas (fin - ini) * con las que va a trabajar el proceso */ void get_dist(int total_r, int id, int numP, struct Dist_data *dist_data) { int rem; dist_data->n = total_r; dist_data->tamBl = total_r / numP; rem = total_r % numP; if(id < rem) { // First subgroup dist_data->ini = id * dist_data->tamBl + id; dist_data->fin = (id+1) * dist_data->tamBl + (id+1); } else { // Second subgroup dist_data->ini = id * dist_data->tamBl + rem; dist_data->fin = (id+1) * dist_data->tamBl + rem; } if(dist_data->fin > total_r) { dist_data->fin = total_r; } if(dist_data->ini > dist_data->fin) { dist_data->ini = dist_data->fin; } dist_data->tamBl = dist_data->fin - dist_data->ini; } void prepare_redist_counts(int *counts, int *displs, int numP_other, int offset, struct Dist_data dist_data, int *vptr) { int idS[2], i, idS_zero; int last_index, first_index; getIds_intercomm(dist_data, numP_other, idS); idS[0] += offset; idS[1] += offset; idS_zero = 0; if(!idS[0]) { set_counts(0, numP_other, dist_data, offset, counts); idS_zero = 1; } for(i=idS[0] + idS_zero; i= other.fin || data_dist.fin <= other.ini) { return; } // Obtiene el proceso con mayor ini entre los dos procesos biggest_ini = (data_dist.ini > other.ini) ? data_dist.ini : other.ini; // Obtiene el proceso con menor fin entre los dos procesos smallest_end = (data_dist.fin < other.fin) ? data_dist.fin : other.fin; sendcounts[id] = smallest_end - biggest_ini; // Numero de elementos a enviar/recibir del proceso Id } /* * Obtiene para un proceso de un grupo a que rango procesos de * otro grupo tiene que enviar o recibir datos. * * Devuelve el primer identificador y el último (Excluido) con el que * comunicarse. */ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int *idS) { int idI, idE; int tamOther = dist_data.n / numP_other; int remOther = dist_data.n % numP_other; int middle = (tamOther + 1) * remOther; if(middle > dist_data.ini) { // First subgroup idI = dist_data.ini / (tamOther + 1); } else { // Second subgroup idI = ((dist_data.ini - middle) / tamOther) + remOther; } if(middle >= dist_data.fin) { // First subgroup idE = dist_data.fin / (tamOther + 1); idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE; } else { // Second subgroup idE = ((dist_data.fin - middle) / tamOther) + remOther; idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE; } idS[0] = idI; idS[1] = idE; } void print_counts2(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, int include_zero, const char* name) { int i; for(i=0; i < size; i++) { if(xcounts[i] != 0 || include_zero) { printf("P%d of %d | %scounts[%d]=%d disp=%d\n", data_dist.myId, data_dist.numP, name, i, xcounts[i], xdispls[i]); } } } void print_global_results(double start_time) { double sp_time, sy_time, asy_time, mall_time, global_time; MAM_Retrieve_times(&sp_time, &sy_time, &asy_time, &mall_time); global_time = MPI_Wtime() - start_time; printf("T_spawn: %lf", sp_time); printf("\nT_SR: %lf", sy_time); printf("\nT_AR: %lf", asy_time); printf("\nT_Malleability: %lf\n", mall_time); //printf("T_total: %lf\n", global_time); }