#include #include #include #include #include "distribution_methods/block_distribution.h" #include "CommDist.h" #include "malleabilityDataStructures.h" //void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts); void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync void check_requests(struct Counts s_counts, struct Counts r_counts, int red_method, int red_strategies, MPI_Request **requests, size_t *request_qty); void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm); void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method); void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win); void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win); void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests); void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win); void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests); void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests); /* * Reserva memoria para un vector de hasta "qty" elementos. * Los "qty" elementos se disitribuyen entre los "numP" procesos * que llaman a esta funcion. */ void malloc_comm_array(char **array, int qty, int myId, int numP) { struct Dist_data dist_data; get_block_dist(qty, myId, numP, &dist_data); if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) { printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); exit(1); } /* int i; for(i=0; i 0) { sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request)); } for(i=init; i 0) { MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE); } } /* * Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating * how data should be redistributed * * - send (IN): Array with the data to send. This value can be NULL for children. * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data. * If the process receives data and is NULL, the behaviour is undefined. * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the * displacements. * - tamBl (IN): How many elements are stored in the parameter "send". * - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements. * - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll). * * FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get * A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount * you need to perform multiples Gets. * prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000 * prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA 16000 * prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000 */ void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) { int datasize; MPI_Win win; MPI_Type_size(datatype, &datasize); MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win); #if USE_MAL_DEBUG >= 3 DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm); #endif switch(red_method) { case MALL_RED_RMA_LOCKALL: sync_rma_lockall(recv, datatype, r_counts, win); break; case MALL_RED_RMA_LOCK: sync_rma_lock(recv, datatype, r_counts, win); break; } #if USE_MAL_DEBUG >= 3 DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm); #endif MPI_Win_free(&win); } /* * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock. * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data. * If the process receives data and is NULL, the behaviour is undefined. * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the * displacements. * - win (IN): Window to use to perform the redistribution. * */ void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) { int i, target_displs, datasize; size_t offset; MPI_Type_size(datatype, &datasize); target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed for(i=r_counts.idI; i= numO)) { MPI_Ibarrier(comm, &((*requests)[*request_qty-1]) ); //FIXME Not easy to read... } } if(aux_comm_used) { MPI_Comm_free(&aux_comm); } freeCounts(&s_counts); freeCounts(&r_counts); return 0; //FIXME In this case is always false... } /* * Checks if a set of requests have been completed (1) or not (0). * * - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm". * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE). * - red_strategies (IN): * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended. * - request_qty (IN): Quantity of requests in "requests". * * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). */ int async_communication_check(int myId, int is_children_group, int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty) { int completed, req_completed, all_req_null, test_err, aux_condition; size_t i; completed = 1; all_req_null = 1; test_err = MPI_SUCCESS; if (is_children_group) return 1; //FIXME Deberia devolver un num negativo if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) { // The Ibarrier should only be posted at this point if the process // has other requests which has not confirmed as completed yet, // but are confirmed now. if (requests[request_qty-1] == MPI_REQUEST_NULL) { for(i=0; i= 3 DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); #endif if(post_ibarrier) { MPI_Ibarrier(comm, &(requests[request_qty-1]) ); MPI_Wait(&(requests[request_qty-1]), MPI_STATUS_IGNORE); } } /* * Frees Requests/Windows associated to a particular redistribution. * Should be called for each output result of calling "async_communication_start". * * - red_method (IN): * - red_strategies (IN): * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended. * - request_qty (IN): Quantity of requests in "requests". * - win (IN): Window to free. */ void async_communication_end(int red_method, int red_strategies, MPI_Request *requests, size_t request_qty, MPI_Win *win) { //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); } if(red_method == MALL_RED_RMA_LOCKALL || red_method == MALL_RED_RMA_LOCK) { MPI_Win_free(win); } } /* * Performs a series of non-blocking point2point communications to redistribute an array in a block distribution. * It should be called after calculating how data should be redistributed. * * - send (IN): Array with the data to send. This value can not be NULL for parents. * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to * receive data. If the process receives data and is NULL, the behaviour is undefined. * - s_counts (IN): Struct which describes how many elements will send this process to each children and * the displacements. * - r_counts (IN): Structure which describes how many elements will receive this process from each parent * and the displacements. * - comm (IN): Communicator to use to perform the redistribution. * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. * */ void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) { int i, j = 0, datasize; size_t offset; MPI_Type_size(datatype, &datasize); for(i=s_counts.idI; i numO ? numP : numO; } mallocCounts(s_counts, array_size+offset_ids); mallocCounts(r_counts, array_size+offset_ids); MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work if(is_children_group) { offset_ids = 0; prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, r_counts); // Obtener distribución para este hijo get_block_dist(qty, myId, numP, &dist_data); *recv = malloc(dist_data.tamBl * datasize); //get_block_dist(qty, myId, numP, &dist_data); //print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv"); } else { //get_block_dist(qty, myId, numP, &dist_data); prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, s_counts); if(!is_intercomm && myId < numO) { prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts); // Obtener distribución para este hijo y reservar vector de recibo get_block_dist(qty, myId, numO, &dist_data); *recv = malloc(dist_data.tamBl * datasize); //print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv"); } //print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send"); } } /* * Ensures that the array of request of a process has an amount of elements equal to the amount of communication * functions the process will perform. In case the array is not initialized or does not have enough space it is * allocated/reallocated to the minimum amount of space needed. * * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group. * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group. * - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer * is null or not enough space has been reserved the pointer is allocated/reallocated. * - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication * functions to perform, it is modified to the minimum value. */ void check_requests(struct Counts s_counts, struct Counts r_counts, int red_method, int red_strategies, MPI_Request **requests, size_t *request_qty) { size_t i, sum; MPI_Request *aux; switch(red_method) { case MALL_RED_BASELINE: sum = 1; break; case MALL_RED_POINT: default: sum = (size_t) s_counts.idE - s_counts.idI; sum += (size_t) r_counts.idE - r_counts.idI; break; } if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) { sum++; } if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests if (*requests == NULL) { *requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request)); } else { // Array exists, but is too small aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request)); *requests = aux; } if (*requests == NULL) { fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n"); MPI_Abort(MPI_COMM_WORLD, 1); } for(i=0; i < sum; i++) { (*requests)[i] = MPI_REQUEST_NULL; } *request_qty = sum; } /* * Función para obtener si entre las estrategias elegidas, se utiliza * la estrategia pasada como segundo argumento. * * Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso * contrario. */ int malleability_red_contains_strat(int comm_strategies, int strategy, int *result) { int value = comm_strategies % strategy ? 0 : 1; if(result != NULL) *result = value; return value; } /* * Función para anyadir una estrategia a un conjunto. * * Devuelve en "result" 1(Verdadero) si se ha anyadido, 0(Falso) en caso * contrario. */ int malleability_red_add_strat(int *comm_strategies, int strategy) { if(malleability_red_contains_strat(*comm_strategies, strategy, NULL)) return 1; *comm_strategies = *comm_strategies * strategy; return 1; }