#include #include #include #include #include "distribution_methods/block_distribution.h" #include "CommDist.h" void prepare_redistribution(int qty, int mal_type, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts); void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty); void sync_point2point(void *send, void *recv, int mal_type, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm); void async_point2point(void *send, void *recv, int mal_type, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests); void perform_manual_communication(void *send, void *recv, int mal_type, int myId, struct Counts s_counts, struct Counts r_counts); void *ind_send, *ind_recv; //FIXME Borrar void recalculate_counts(struct Counts *counts, int *array, void **recv, int mal_type); int recalculate_elems(int *array, int ini, int fin); /* * Reserva memoria para un vector de hasta "qty" elementos. * Los "qty" elementos se disitribuyen entre los "numP" procesos * que llaman a esta funcion. */ void malloc_comm_array(char **array, int qty, int myId, int numP) { struct Dist_data dist_data; get_block_dist(qty, myId, numP, &dist_data); if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) { printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); exit(1); } /* int i; for(i=0; i 0) { sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request)); } for(i=init; i 0) { MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE); } } //================================================================================ //================================================================================ //========================ASYNCHRONOUS FUNCTIONS================================== //================================================================================ //================================================================================ /* * //TODO Añadir estrategia IBARRIER * Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions. * In the redistribution is differenciated parent group from the children and the values each group indicates can be * different. * * - send (IN): Array with the data to send. This data can not be null for parents. * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data. * If the process receives data and is NULL, the behaviour is undefined. * - qty (IN): Sum of elements shared by all processes that will send data. * - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm". * - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using * "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm". * - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the * resize, while for the children is the amount of parents. * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE). * - comm (IN): Communicator to use to perform the redistribution. * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer * is null or not enough space has been reserved the pointer is allocated/reallocated. * - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be * modified to the expected value. * * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always false... */ int async_communication(void *send, void **recv, int qty, int mal_type, int dependency, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty) { int is_intercomm, aux_comm_used = 0; struct Counts s_counts, r_counts; MPI_Datatype datatype; MPI_Comm aux_comm = MPI_COMM_NULL; /* PREPARE COMMUNICATION */ MPI_Comm_test_inter(comm, &is_intercomm); prepare_redistribution(qty, mal_type, myId, numP, numO, is_children_group, is_intercomm, 0, recv, &s_counts, &r_counts); check_requests(s_counts, r_counts, requests, request_qty); if(mal_type == MAL_INT) { datatype = MPI_INT; } else if(mal_type == MAL_DOUBLE) { datatype = MPI_DOUBLE; } else if(mal_type == MAL_CHAR) { datatype = MPI_CHAR; } else { printf("Malleability -- Redistribution type not recognised\n"); MPI_Abort(MPI_COMM_WORLD, -1); } if(dependency == 1+MAL_DATA_DEPENDENT) { if(is_children_group) { recalculate_counts(&r_counts, (int *) ind_recv, recv, mal_type); } else { recalculate_counts(&s_counts, (int *) ind_send, recv, mal_type); if(!is_intercomm) { recalculate_counts(&r_counts, (int *) ind_recv, recv, mal_type); } } } /* PERFORM COMMUNICATION */ switch(red_method) { case MALL_RED_POINT: async_point2point(send, *recv, mal_type, s_counts, r_counts, comm, *requests); break; case MALL_RED_BASELINE: default: MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm, &((*requests)[0])); break; } /* POST REQUESTS CHECKS */ if(is_children_group) { MPI_Waitall(*request_qty, *requests, MPI_STATUSES_IGNORE); } if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) { //FIXME Strategy not fully implemented MPI_Ibarrier(comm, &((*requests)[*request_qty-1]) ); //FIXME Not easy to read... if(is_children_group) { MPI_Wait(&((*requests)[*request_qty-1]), MPI_STATUSES_IGNORE); //FIXME Not easy to read... } } if(aux_comm_used) { MPI_Comm_free(&aux_comm); } if(dependency == 1+MAL_DATA_INDEPENDENT) { if(is_children_group) { ind_recv = *recv; } else { ind_send = send; if(!is_intercomm) { ind_recv = *recv; } } } freeCounts(&s_counts); freeCounts(&r_counts); return 0; //FIXME In this case is always false... } /* * Performs a series of non-blocking point2point communications to redistribute an array in a block distribution. * It should be called after calculating how data should be redistributed. * * - send (IN): Array with the data to send. This value can not be NULL for parents. * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to * receive data. If the process receives data and is NULL, the behaviour is undefined. * - s_counts (IN): Struct which describes how many elements will send this process to each children and * the displacements. * - r_counts (IN): Structure which describes how many elements will receive this process from each parent * and the displacements. * - comm (IN): Communicator to use to perform the redistribution. * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. * */ void async_point2point(void *send, void *recv, int mal_type, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) { int i, j = 0; size_t datasize, offset; MPI_Datatype datatype; if(mal_type == MAL_INT) { datatype = MPI_INT; datasize = sizeof(int); } else if(mal_type == MAL_DOUBLE) { datatype = MPI_DOUBLE; datasize = sizeof(double); } else if(mal_type == MAL_CHAR) { datatype = MPI_CHAR; datasize = sizeof(char); } else { printf("Malleability -- Redistribution type not recognised\n"); MPI_Abort(MPI_COMM_WORLD, -1); } for(i=s_counts.idI; i numO ? numP : numO; } mallocCounts(s_counts, array_size+offset_ids); mallocCounts(r_counts, array_size+offset_ids); if(is_children_group) { offset_ids = 0; prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, r_counts); // Obtener distribución para este hijo get_block_dist(qty, myId, numP, &dist_data); *recv = malloc(dist_data.tamBl * datasize); //get_block_dist(qty, myId, numP, &dist_data); //print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 1, "Children C "); } else { //get_block_dist(qty, myId, numP, &dist_data); prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, s_counts); if(!is_intercomm && myId < numO) { prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts); // Obtener distribución para este hijo y reservar vector de recibo get_block_dist(qty, myId, numO, &dist_data); *recv = malloc(dist_data.tamBl * datasize); //print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 1, "Children P "); } //print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 1, "Parents "); } } /* * Ensures that the array of request of a process has an amount of elements equal to the amount of communication * functions the process will perform. In case the array is not initialized or does not have enough space it is * allocated/reallocated to the minimum amount of space needed. * * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group. * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group. * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer * is null or not enough space has been reserved the pointer is allocated/reallocated. * - request_qty (OUT): Quantity of requests to be used. If the value is smaller than the amount of communication * functions to perform, it is modified to the minimum value. */ void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty) { size_t i, sum; MPI_Request *aux; sum = (size_t) s_counts.idE - s_counts.idI; sum += (size_t) r_counts.idE - r_counts.idI; if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests // FIXME Si es la estrategia Ibarrier como se tiene en cuenta en el total?? if (*requests == NULL) { *requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request)); } else { // Array exists, but is too small aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request)); *requests = aux; } if (*requests == NULL) { fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n"); MPI_Abort(MPI_COMM_WORLD, 1); } for(i=0; i < sum; i++) { (*requests)[i] = MPI_REQUEST_NULL; } *request_qty = sum; } /* * Special case to perform a manual copy of data when a process has to send data to itself. Only used * when the MPI communication is not able to hand this situation. An example is when using point to point * communications and the process has to perform a Send and Recv to itself * - send (IN): Array with the data to send. This value can not be NULL. * - recv (OUT): Array where data will be written. This value can not be NULL. * - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm". * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group. * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group. */ void perform_manual_communication(void *send, void *recv, int mal_type, int myId, struct Counts s_counts, struct Counts r_counts) { int i; if(mal_type == MAL_INT) { int *new_recv, *new_send; new_recv = (int *) recv; new_send = (int *) send; for(i=0; icounts[counts->idI]; counts->counts[counts->idI] = recalculate_elems(array, ini, fin); for(i=counts->idI+1; iidE; i++) { fin = counts->displs[i] + counts->counts[i]; ini = counts->displs[i]; counts->counts[i] = recalculate_elems(array, ini, fin); counts->displs[i] = counts->displs[i-1] + counts->counts[i-1]; } if(*recv != NULL) { int datasize, qty; if(mal_type == MAL_INT) { datasize = sizeof(int); } else if(mal_type == MAL_DOUBLE) { datasize = sizeof(double); } else if(mal_type == MAL_CHAR) { datasize = sizeof(char); } else { printf("Malleability -- Redistribution type not recognised\n"); MPI_Abort(MPI_COMM_WORLD, -1); } qty = counts->counts[counts->idE-1] + counts->displs[counts->idE-1]; free(*recv); *recv = malloc(qty * datasize); } } int recalculate_elems(int *array, int ini, int fin) { int i, sol = 0; for(i=ini; i