Commit 63787a2e authored by iker_martin's avatar iker_martin
Browse files

Added new features(Commit, allow semi-automatic datatypes). Deleted unneded...

Added new features(Commit, allow semi-automatic datatypes). Deleted unneded codes and minor bug fixes.
parent f6c0587b
......@@ -32,11 +32,11 @@ int create_out_file(char *nombre, int *ptr, int newstdout);
configuration *config_file;
group_data *group;
results_data *results;
MPI_Comm comm;
MPI_Comm comm, new_comm;
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
int main(int argc, char *argv[]) {
int numP, myId, res;
int numP, myId, res, commited;
int req;
int im_child;
size_t i;
......@@ -54,6 +54,7 @@ int main(int argc, char *argv[]) {
MPI_Comm_rank(MPI_COMM_WORLD, &myId);
MPI_Comm_size(MPI_COMM_WORLD, &numP);
comm = MPI_COMM_WORLD;
new_comm = MPI_COMM_NULL;
if(req != MPI_THREAD_MULTIPLE) {
printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
......@@ -67,7 +68,6 @@ int main(int argc, char *argv[]) {
if(!im_child) { //TODO REFACTOR Simplificar inicio
init_application();
set_benchmark_grp(group->grp);
set_benchmark_configuration(config_file);
if(config_file->n_groups > 1) {
......@@ -75,19 +75,19 @@ int main(int argc, char *argv[]) {
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
//malleability_add_data(&(results->exec_start), 1, MAL_DOUBLE, 1, 1);
malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0);
malleability_add_data(&run_id, 1, MPI_INT, 1, 0);
malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);
malleability_add_data(&(results->exec_start), 1, MPI_DOUBLE, 1, 0);
if(config_file->sdr) {
for(i=0; i<group->sync_data_groups; i++) {
malleability_add_data(group->sync_array[i], group->sync_qty[i], MAL_CHAR, 0, 1);
malleability_add_data(group->sync_array[i], group->sync_qty[i], MPI_CHAR, 0, 0);
}
}
if(config_file->adr) {
for(i=0; i<group->async_data_groups; i++) {
malleability_add_data(group->async_array[i], group->async_qty[i], MAL_CHAR, 0, 0);
malleability_add_data(group->async_array[i], group->async_qty[i], MPI_CHAR, 0, 1);
}
}
}
......@@ -95,32 +95,28 @@ int main(int argc, char *argv[]) {
MPI_Barrier(comm);
results->exec_start = MPI_Wtime();
} else { //Init hijos
get_malleability_user_comm(&comm);
MAM_Commit(&commited, &comm);
get_benchmark_configuration(&config_file);
// TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos
void *value = NULL;
size_t entries;
malleability_get_data(&value, 0, 1, 1);
malleability_get_data(&value, 0, 1, 0);
group->grp = *((int *)value);
malleability_get_data(&value, 1, 1, 1);
malleability_get_data(&value, 1, 1, 0);
run_id = *((int *)value);
malleability_get_data(&value, 2, 1, 1);
malleability_get_data(&value, 2, 1, 0);
group->iter_start = *((int *)value);
//malleability_get_data(&value, 3, 1, 1);
//results->exec_start = *((double *)value);
if(config_file->sdr) {
malleability_get_entries(&entries, 0, 1);
malleability_get_entries(&entries, 0, 0);
group->sync_qty = (int *) malloc(entries * sizeof(int));
group->sync_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 1);
malleability_get_data(&value, i, 0, 0);
group->sync_array[i] = (char *)value;
group->sync_qty[i] = DR_MAX_SIZE;
}
......@@ -128,11 +124,11 @@ int main(int argc, char *argv[]) {
group->sync_data_groups = entries;
}
if(config_file->adr) {
malleability_get_entries(&entries, 0, 0);
malleability_get_entries(&entries, 0, 1);
group->async_qty = (int *) malloc(entries * sizeof(int));
group->async_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 0);
malleability_get_data(&value, i, 0, 1);
group->async_array[i] = (char *)value;
group->async_qty[i] = DR_MAX_SIZE;
}
......@@ -144,6 +140,8 @@ int main(int argc, char *argv[]) {
results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
malleability_get_data(&value, 3, 1, 0);
results->exec_start = *((double *)value);
}
//
......@@ -151,12 +149,9 @@ int main(int argc, char *argv[]) {
//
group->grp = group->grp - 1; // TODO REFACTOR???
do {
get_malleability_user_comm(&comm);
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
group->grp = group->grp + 1;
set_benchmark_grp(group->grp);
if(group->grp != 0) {
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
......@@ -169,12 +164,13 @@ int main(int argc, char *argv[]) {
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
if(group->grp != 0) {
malleability_modify_data(&(group->grp), 0, 1, MAL_INT, 1, 1);
malleability_modify_data(&(group->iter_start), 2, 1, MAL_INT, 1, 1);
malleability_modify_data(&(group->grp), 0, 1, MPI_INT, 1, 0);
malleability_modify_data(&(group->iter_start), 2, 1, MPI_INT, 1, 0);
}
}
res = work();
if(res == MAM_ZOMBIE) break;
if(res==1) { // Se ha llegado al final de la aplicacion
MPI_Barrier(comm);
......@@ -183,6 +179,9 @@ int main(int argc, char *argv[]) {
print_local_results();
reset_results_index(results);
if(comm != MPI_COMM_WORLD) MPI_Comm_free(&comm);
comm = new_comm;
} while(config_file->n_groups > group->grp + 1 && config_file->groups[group->grp+1].sm == MALL_SPAWN_MERGE);
//
......@@ -220,7 +219,7 @@ int main(int argc, char *argv[]) {
* de procesos. En caso contrario se devuelve 0.
*/
int work() {
int iter, maxiter, state, res;
int iter, maxiter, state, res, commited;
int wait_completed = MAM_CHECK_COMPLETION;
maxiter = config_file->groups[group->grp].iters;
......@@ -246,6 +245,7 @@ int work() {
if(config_file->n_groups == group->grp + 1) res=1;
else { MAM_Commit(&commited, &new_comm); }
if(state == MAM_ZOMBIE) res=state;
return res;
}
......
......@@ -7,21 +7,18 @@
#include "malleabilityDataStructures.h"
//void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, char **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_strategies, MPI_Request **requests, size_t *request_qty);
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method);
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win);
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method);
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void async_point2point(char *send, char *recv, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
void async_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win);
void async_rma_lock(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void perform_manual_communication(char *send, char *recv, int myId, struct Counts s_counts, struct Counts r_counts);
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win);
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
/*
* Reserva memoria para un vector de hasta "qty" elementos.
......@@ -72,7 +69,7 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, MPI_Comm comm) {
int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int red_method, MPI_Comm comm) {
int is_intercomm, aux_comm_used = 0;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
......@@ -80,9 +77,9 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
// prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
// prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts); //FIXME Needs the datatype?
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); //FIXME MAGICAL VALUE
prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); //FIXME MAGICAL VALUE
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
......@@ -103,15 +100,15 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
} else {
get_block_dist(qty, myId, numO, &dist_data);
}
sync_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, red_method);
sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm, red_method);
break;
case MALL_RED_POINT:
sync_point2point(send, *recv, is_intercomm, myId, s_counts, r_counts, aux_comm);
sync_point2point(send, *recv, datatype, is_intercomm, myId, s_counts, r_counts, aux_comm);
break;
case MALL_RED_BASELINE:
default:
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, aux_comm);
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm);
break;
}
......@@ -140,15 +137,19 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
* - comm (IN): Communicator to use to perform the redistribution.
*
*/
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
int i, j, init, end, total_sends;
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
int i, j, init, end, total_sends, datasize;
size_t offset, offset2;
MPI_Request *sends;
MPI_Type_size(datatype, &datasize);
init = s_counts.idI;
end = s_counts.idE;
if(!is_intercomm && (s_counts.idI == myId || s_counts.idE == myId + 1)) {
perform_manual_communication(send, recv, myId, s_counts, r_counts);
offset = s_counts.displs[myId] + datasize;
offset2 = r_counts.displs[myId] + datasize;
memcpy(send+offset, recv+offset2, s_counts.counts[myId]);
if(s_counts.idI == myId) init = s_counts.idI+1;
else end = s_counts.idE-1;
}
......@@ -160,7 +161,8 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
}
for(i=init; i<end; i++) {
sends[j] = MPI_REQUEST_NULL;
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], MPI_CHAR, i, 99, comm, &(sends[j]));
offset = s_counts.displs[i] * datasize;
MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
j++;
}
......@@ -172,7 +174,8 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
}
for(i=init; i<end; i++) {
MPI_Recv(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, 99, comm, MPI_STATUS_IGNORE);
offset = r_counts.displs[i] * datasize;
MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
}
if(total_sends > 0) {
......@@ -200,19 +203,21 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
* prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA 16000
* prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
*/
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) {
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) {
int datasize;
MPI_Win win;
MPI_Win_create(send, (MPI_Aint)tamBl * sizeof(char), sizeof(char), MPI_INFO_NULL, comm, &win);
MPI_Type_size(datatype, &datasize);
MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win);
#if USE_MAL_DEBUG >= 3
DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
#endif
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
sync_rma_lockall(recv, r_counts, win);
sync_rma_lockall(recv, datatype, r_counts, win);
break;
case MALL_RED_RMA_LOCK:
sync_rma_lock(recv, r_counts, win);
sync_rma_lock(recv, datatype, r_counts, win);
break;
}
#if USE_MAL_DEBUG >= 3
......@@ -232,19 +237,22 @@ void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Com
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win) {
int i, target_displs;
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
int i, target_displs, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
target_displs = r_counts.first_target_displs;
for(i=r_counts.idI; i<r_counts.idE; i++) {
offset = r_counts.displs[i] * datasize;
MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
MPI_Get(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win);
MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
MPI_Win_unlock(i, win);
target_displs=0;
}
}
/*
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
......@@ -254,13 +262,17 @@ void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win) {
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win) {
int i, target_displs;
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
int i, target_displs, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
target_displs = r_counts.first_target_displs;
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Get(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win);
offset = r_counts.displs[i] * datasize;
MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
target_displs=0;
}
MPI_Win_unlock_all(win);
......@@ -295,7 +307,7 @@ void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win) {
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always false...
*/
int async_communication_start(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
int async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
int is_intercomm, aux_comm_used = 0;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
......@@ -304,8 +316,8 @@ int async_communication_start(char *send, char **recv, int qty, int myId, int nu
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
//prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); // TODO MAGICAL VALUE
//prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); // TODO MAGICAL VALUE
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
......@@ -325,14 +337,14 @@ int async_communication_start(char *send, char **recv, int qty, int myId, int nu
} else {
get_block_dist(qty, myId, numO, &dist_data);
}
async_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, red_method, *requests, win);
async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm, red_method, *requests, win);
break;
case MALL_RED_POINT:
async_point2point(send, *recv, s_counts, r_counts, aux_comm, *requests);
async_point2point(send, *recv, datatype, s_counts, r_counts, aux_comm, *requests);
break;
case MALL_RED_BASELINE:
default:
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, aux_comm, &((*requests)[0]));
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm, &((*requests)[0]));
break;
}
......@@ -459,16 +471,20 @@ void async_communication_end(int red_method, int red_strategies, MPI_Request *re
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_point2point(char *send, char *recv, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
int i, j = 0;
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
int i, j = 0, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
for(i=s_counts.idI; i<s_counts.idE; i++) {
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], MPI_CHAR, i, 99, comm, &(requests[j]));
offset = s_counts.displs[i] * datasize;
MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
j++;
}
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Irecv(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, 99, comm, &(requests[j]));
offset = r_counts.displs[i] * datasize;
MPI_Irecv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
j++;
}
}
......@@ -489,15 +505,17 @@ void async_point2point(char *send, char *recv, struct Counts s_counts, struct Co
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win) {
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win) {
int datasize;
MPI_Win_create(send, (MPI_Aint)tamBl * sizeof(char), sizeof(char), MPI_INFO_NULL, comm, win);
MPI_Type_size(datatype, &datasize);
MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, win);
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
async_rma_lockall(recv, r_counts, *win, requests);
async_rma_lockall(recv, datatype, r_counts, *win, requests);
break;
case MALL_RED_RMA_LOCK:
async_rma_lock(recv, r_counts, *win, requests);
async_rma_lock(recv, datatype, r_counts, *win, requests);
break;
}
}
......@@ -512,13 +530,17 @@ void async_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Co
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_rma_lock(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
int i, target_displs, j = 0;
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
int i, target_displs, j = 0, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
target_displs = r_counts.first_target_displs;
for(i=r_counts.idI; i<r_counts.idE; i++) {
offset = r_counts.displs[i] * datasize;
MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
MPI_Rget(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win, &(requests[j]));
MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
MPI_Win_unlock(i, win);
target_displs=0;
j++;
......@@ -534,13 +556,17 @@ void async_rma_lock(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
int i, target_displs, j = 0;
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
int i, target_displs, j = 0, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
target_displs = r_counts.first_target_displs;
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Rget(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win, &(requests[j]));
offset = r_counts.displs[i] * datasize;
MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
target_displs=0;
j++;
}
......@@ -574,9 +600,10 @@ void async_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win, MPI_Requ
*
*/
//FIXME Ensure name for is_sync variable
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
int array_size = numO;
int offset_ids = 0;
int datasize;
struct Dist_data dist_data;
if(is_intercomm) {
......@@ -586,6 +613,7 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr
}
mallocCounts(s_counts, array_size+offset_ids);
mallocCounts(r_counts, array_size+offset_ids);
MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
if(is_children_group) {
offset_ids = 0;
......@@ -593,9 +621,9 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
*recv = malloc(dist_data.tamBl * datasize);
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Children C ");
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
} else {
//get_block_dist(qty, myId, numP, &dist_data);
......@@ -604,10 +632,10 @@ void prepare_redistribution(int qty, int myId, int numP, int numO, int is_childr
prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts);
// Obtener distribución para este hijo y reservar vector de recibo
get_block_dist(qty, myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Children P ");
*recv = malloc(dist_data.tamBl * datasize);
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
}
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Parents ");
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
}
}
......@@ -653,24 +681,6 @@ void check_requests(struct Counts s_counts, struct Counts r_counts, int red_stra
*request_qty = sum;
}
/*
* Special case to perform a manual copy of data when a process has to send data to itself. Only used
* when the MPI communication is not able to hand this situation. An example is when using point to point
* communications and the process has to perform a Send and Recv to itself
* - send (IN): Array with the data to send. This value can not be NULL.
* - recv (OUT): Array where data will be written. This value can not be NULL.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
*/
void perform_manual_communication(char *send, char *recv, int myId, struct Counts s_counts, struct Counts r_counts) {
int i;
for(i=0; i<s_counts.counts[myId];i++) {
recv[i+r_counts.displs[myId]] = send[i+s_counts.displs[myId]];
}
}
/*
* Función para obtener si entre las estrategias elegidas, se utiliza
* la estrategia pasada como segundo argumento.
......
......@@ -6,20 +6,12 @@
#include <mpi.h>
#include <string.h>
#include "malleabilityStates.h"
#include "malleabilityDataStructures.h"
//#define MAL_COMM_COMPLETED 0
//#define MAL_COMM_UNINITIALIZED 2
//#define MAL_ASYNC_PENDING 1
//#define MAL_USE_NORMAL 0
//#define MAL_USE_IBARRIER 1
//#define MAL_USE_POINT 2
//#define MAL_USE_THREAD 3
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
//int async_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty);
int async_communication_start(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_check(int myId, int is_children_group, int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier);
void async_communication_end(int red_method, int red_strategies, MPI_Request *requests, size_t request_qty, MPI_Win *win);
......
#include "malleabilityDataStructures.h"
/*
* Crea un tipo derivado para mandar las dos estructuras principales
* de MaM.
*/
void MAM_Def_main_datatype() {
int i, counts = 8;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
for(i=0; i<counts; i++) {
blocklengths[i] = 1;
types[i] = MPI_INT;
}
// Obtener direccion base
MPI_Get_address(&(mall_conf->spawn_method), &displs[0]);
MPI_Get_address(&(mall_conf->spawn_strategies), &displs[1]);
MPI_Get_address(&(mall_conf->spawn_dist), &displs[2]);
MPI_Get_address(&(mall_conf->red_method), &displs[3]);
MPI_Get_address(&(mall_conf->red_strategies), &displs[4]);
MPI_Get_address(&(mall->num_cpus), &displs[5]);
MPI_Get_address(&(mall->num_nodes), &displs[6]);
MPI_Get_address(&(mall->nodelist_len), &displs[7]);
MPI_Type_create_struct(counts, blocklengths, displs, types, &mall->struct_type);
MPI_Type_commit(&mall->struct_type);
}
void MAM_Free_main_datatype() {
if(mall->struct_type != MPI_DATATYPE_NULL) {
MPI_Type_free(&mall->struct_type);
}
}
/*
* Comunica datos necesarios de las estructuras
* principales de MAM de sources a targets.
*/
void MAM_Comm_main_structures(int rootBcast) {
MPI_Bcast(MPI_BOTTOM, 1, mall->struct_type, rootBcast, mall->intercomm);
if(mall->nodelist == NULL) {
mall->nodelist = malloc((mall->nodelist_len+1) * sizeof(char));
mall->nodelist[mall->nodelist_len] = '\0';
}
MPI_Bcast(mall->nodelist, mall->nodelist_len, MPI_CHAR, rootBcast, mall->intercomm);
}
......@@ -54,7 +54,6 @@ typedef struct {
int red_method;
int red_strategies;
int grp;
malleability_times_t *times;
configuration *config_file;
} malleability_config_t;
......@@ -65,7 +64,8 @@ typedef struct { //FIXME numC_spawned no se esta usando
MPI_Comm comm, thread_comm;
MPI_Comm intercomm;
MPI_Comm user_comm;
int dup_user_comm;
MPI_Datatype struct_type;
int dup_user_comm; //FIXME Borrar
char *name_exec, *nodelist;
int num_cpus, num_nodes, nodelist_len;
......@@ -75,4 +75,10 @@ typedef struct { //FIXME numC_spawned no se esta usando
malleability_config_t *mall_conf;
malleability_t *mall;
/* --- FUNCTIONS --- */
void MAM_Def_main_datatype();
void MAM_Free_main_datatype();
void MAM_Comm_main_structures(int rootBcast);
#endif
......@@ -23,9 +23,6 @@ int check_redistribution(int wait_completed);
int end_redistribution();
int shrink_redistribution();
void comm_node_data(int rootBcast, int is_child_group);
void def_nodeinfo_type(MPI_Datatype *node_type);
int thread_creation();
int thread_check(int wait_completed);
void* thread_async_work();
......@@ -74,6 +71,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
mall->myId = myId;
mall->numP = numP;
mall->root = root;
mall->root_parents = -1;
mall->comm = dup_comm;
mall->thread_comm = thread_comm;
mall->user_comm = comm;
......@@ -92,6 +90,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
zombies_service_init();
init_malleability_times();
MAM_Def_main_datatype();
// Si son el primer grupo de procesos, obtienen los datos de los padres
MPI_Comm_get_parent(&(mall->intercomm));
......@@ -136,6 +135,7 @@ void free_malleability() {
free(dist_s_data);
free(dist_a_data);
MAM_Free_main_datatype();
free_malleability_times();
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
......@@ -236,9 +236,9 @@ int malleability_checkpoint(int *mam_state, int wait_completed) {
}
break;
case MALL_SPAWN_ADAPTED:
case MALL_SPAWN_ADAPTED: //FIXME Borrar?
state = shrink_redistribution();
if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE;
if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla
malleability_checkpoint(mam_state, wait_completed);
break;
......@@ -256,19 +256,42 @@ int malleability_checkpoint(int *mam_state, int wait_completed) {
return state;
}
void MAM_Commit(int *mam_state) {
//Hacer borrado de comunicadores no necesarios
//Update de comunicadores
//Reiniciar algunas estructuras ¿Cuales?
//Llamar a funcion de zombies
//Devolver el estado de mam
/*
* TODO
*/
void MAM_Commit(int *mam_state, MPI_Comm *new_comm) {
if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
*mam_state = MALL_DENIED;
return;
}
#if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
int zombies;
MPI_Allreduce(&state, &zombies, 1, MPI_INT, MPI_MIN, mall->intercomm);
if(zombies == MALL_ZOMBIE) {
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, NULL, 0, 0);
}
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) {
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
}
mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
mall->root_parents = -1;
state = MALL_NOT_STARTED;
*mam_state = MAM_COMMITED;
*new_comm = mall->user_comm;
#if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
}
// Funciones solo necesarias por el benchmark
//-------------------------------------------------------------------------------------------------------------
void set_benchmark_grp(int grp) {
mall_conf->grp = grp;
}
void set_benchmark_configuration(configuration *config_file) {
mall_conf->config_file = config_file;
}
......@@ -283,6 +306,8 @@ void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_t
}
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
if(state > MALL_NOT_STARTED) return;
mall_conf->spawn_method = spawn_method;
mall_conf->spawn_strategies = spawn_strategies;
mall_conf->spawn_dist = spawn_dist;
......@@ -300,6 +325,8 @@ void set_malleability_configuration(int spawn_method, int spawn_strategies, int
* Tiene que ser llamado despues de setear la config
*/
void set_children_number(int numC){
if(state > MALL_NOT_STARTED) return;
if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
mall->numC = numC;
mall->numC_spawned = numC - mall->numP;
......@@ -335,19 +362,13 @@ void get_malleability_user_comm(MPI_Comm *comm) {
*
* Mas informacion en la funcion "add_data".
*
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_add_data(void *data, size_t total_qty, int type, int is_replicated, int is_constant) {
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
size_t total_reqs = 0;
if(is_constant) {
if(is_replicated) {
add_data(data, total_qty, type, total_reqs, rep_s_data);
} else {
add_data(data, total_qty, type, total_reqs, dist_s_data);
}
} else {
if(is_replicated) {
total_reqs = 1;
add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
} else {
if(mall_conf->red_method == MALL_RED_BASELINE) {
......@@ -361,6 +382,12 @@ void malleability_add_data(void *data, size_t total_qty, int type, int is_replic
add_data(data, total_qty, type, total_reqs, dist_a_data);
}
} else {
if(is_replicated) {
add_data(data, total_qty, type, total_reqs, rep_s_data);
} else {
add_data(data, total_qty, type, total_reqs, dist_s_data);
}
}
}
......@@ -371,19 +398,13 @@ void malleability_add_data(void *data, size_t total_qty, int type, int is_replic
* Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
*
* Mas informacion en la funcion "modify_data".
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int is_replicated, int is_constant) {
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
size_t total_reqs = 0;
if(is_constant) {
if(is_replicated) {
modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
} else {
modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
}
} else {
if(is_replicated) {
total_reqs = 1;
modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
} else {
if(mall_conf->red_method == MALL_RED_BASELINE) {
......@@ -397,27 +418,32 @@ void malleability_modify_data(void *data, size_t index, size_t total_qty, int ty
modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
}
} else {
if(is_replicated) {
modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
} else {
modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
}
}
}
/*
* Devuelve el numero de entradas para la estructura de descripcion de
* datos elegida.
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
if(is_constant) {
if(is_replicated) {
*entries = rep_s_data->entries;
*entries = rep_a_data->entries;
} else {
*entries = dist_s_data->entries;
*entries = dist_a_data->entries;
}
} else {
if(is_replicated) {
*entries = rep_a_data->entries;
*entries = rep_s_data->entries;
} else {
*entries = dist_a_data->entries;
*entries = dist_s_data->entries;
}
}
}
......@@ -428,22 +454,21 @@ void malleability_get_entries(size_t *entries, int is_replicated, int is_constan
* con la funcion "malleability_add_data()".
* Es tarea del usuario saber el tipo de esos datos.
* TODO Refactor a que sea automatico
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
malleability_data_t *data_struct;
if(is_constant) {
if(is_replicated) {
data_struct = rep_s_data;
data_struct = rep_a_data;
} else {
data_struct = dist_s_data;
data_struct = dist_a_data;
}
} else {
if(is_replicated) {
data_struct = rep_a_data;
data_struct = rep_s_data;
} else {
data_struct = dist_a_data;
data_struct = dist_s_data;
}
}
......@@ -464,22 +489,22 @@ void malleability_get_data(void **data, size_t index, int is_replicated, int is_
*/
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
size_t i;
char *aux_send, *aux_recv;
void *aux_send, *aux_recv;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_send = data_struct->arrays[i];
aux_recv = NULL;
async_communication_start(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies,
mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method,
mall_conf->red_strategies, mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_send = data_struct->arrays[i];
aux_recv = NULL;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
}
}
......@@ -491,20 +516,20 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
*/
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
size_t i;
char *aux, aux_s;
void *aux, *aux_s = NULL;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
async_communication_start(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies,
aux = data_struct->arrays[i];
async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies,
mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
data_struct->arrays[i] = (void *) aux;
data_struct->arrays[i] = aux;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
data_struct->arrays[i] = (void *) aux;
aux = data_struct->arrays[i];
sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
data_struct->arrays[i] = aux;
}
}
}
......@@ -537,9 +562,7 @@ void Children_init() {
}
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
comm_node_data(root_parents, MALLEABILITY_CHILDREN);
MPI_Bcast(&(mall_conf->red_method), 1, MPI_INT, root_parents, mall->intercomm);
MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, root_parents, mall->intercomm);
MAM_Comm_main_structures(root_parents);
#if USE_MAL_DEBUG
DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
......@@ -593,15 +616,8 @@ void Children_init() {
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo y qty
for(i=0; i<rep_s_data->entries; i++) {
MPI_Datatype datatype;
if(rep_s_data->types[i] == MAL_INT) {
datatype = MPI_INT;
} else {
datatype = MPI_CHAR;
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], root_parents, mall->intercomm);
}
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
......@@ -614,15 +630,12 @@ void Children_init() {
// Guardar los resultados de esta transmision
malleability_times_broadcast(mall->root);
if(!is_intercomm) {
malleability_comms_update(mall->intercomm);
}
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
state = MALL_COMPLETED;
#if USE_MAL_DEBUG
DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
......@@ -690,9 +703,7 @@ int start_redistribution() {
}
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
comm_node_data(rootBcast, MALLEABILITY_NOT_CHILDREN);
MPI_Bcast(&(mall_conf->red_method), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, rootBcast, mall->intercomm);
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); }
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
......@@ -811,15 +822,8 @@ int end_redistribution() {
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo
for(i=0; i<rep_s_data->entries; i++) {
MPI_Datatype datatype;
if(rep_s_data->types[i] == MAL_INT) {
datatype = MPI_INT;
} else {
datatype = MPI_CHAR;
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
}
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
......@@ -831,17 +835,11 @@ int end_redistribution() {
local_state = MALL_DIST_COMPLETED;
if(!is_intercomm) { // Merge Spawn
if(mall->numP < mall->numC) { // Expand
malleability_comms_update(mall->intercomm);
} else { // Shrink || Merge Shrink requiere de mas tareas
if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
local_state = MALL_SPAWN_ADAPT_PENDING;
}
}
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) {
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
}
return local_state;
}
......@@ -884,50 +882,6 @@ int shrink_redistribution() {
}
}
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=================COMM NODE INFO ======================||
//======================================================||
//======================================================||
//TODO Add comment
void comm_node_data(int rootBcast, int is_child_group) {
MPI_Datatype node_type;
def_nodeinfo_type(&node_type);
MPI_Bcast(mall, 1, node_type, rootBcast, mall->intercomm);
if(is_child_group) {
mall->nodelist = malloc((mall->nodelist_len+1) * sizeof(char));
mall->nodelist[mall->nodelist_len] = '\0';
}
MPI_Bcast(mall->nodelist, mall->nodelist_len, MPI_CHAR, rootBcast, mall->intercomm);
MPI_Type_free(&node_type);
}
//TODO Add comment
void def_nodeinfo_type(MPI_Datatype *node_type) {
int i, counts = 3;
int blocklengths[3] = {1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = MPI_INT;
// Rellenar vector displs
MPI_Get_address(mall, &dir);
MPI_Get_address(&(mall->num_cpus), &displs[0]);
MPI_Get_address(&(mall->num_nodes), &displs[1]);
MPI_Get_address(&(mall->nodelist_len), &displs[2]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, node_type);
MPI_Type_commit(node_type);
}
// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
......@@ -1021,17 +975,20 @@ void print_comms_state() {
free(test);
}
/*
* Función solo necesaria en Merge
*/
void malleability_comms_update(MPI_Comm comm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso? Tendria que hacerlo el usuario
MPI_Comm_dup(comm, &(mall->thread_comm));
MPI_Comm_dup(comm, &(mall->comm));
MPI_Comm_dup(comm, &(mall->user_comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MAM_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MAM");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");
}
......@@ -15,15 +15,14 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
void free_malleability();
void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint(int *mam_state, int wait_completed);
void MAM_Commit(int *mam_state);
void set_benchmark_grp(int grp);
void MAM_Commit(int *mam_state, MPI_Comm *new_comm);
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies);
void set_children_number(int numC); // TODO TO BE DEPRECATED
void get_malleability_user_comm(MPI_Comm *comm);
void malleability_add_data(void *data, size_t total_qty, int type, int is_replicated, int is_constant);
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int is_replicated, int is_constant);
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant);
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant);
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant);
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant);
......
......@@ -9,7 +9,7 @@
enum mall_inner_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING,
MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED,
MALL_SPAWN_ADAPT_PENDING, MALL_SPAWN_ADAPTED, MALL_COMPLETED};
enum mam_states{MAM_UNRESERVED, MAM_NOT_STARTED, MAM_ZOMBIE, MAM_PENDING, MAM_COMPLETED};
enum mam_states{MAM_UNRESERVED, MAM_NOT_STARTED, MAM_ZOMBIE, MAM_PENDING, MAM_COMPLETED, MAM_COMMITED};
enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
#define MALL_SPAWN_PTHREAD 2
#define MALL_SPAWN_SINGLE 3
......
......@@ -20,7 +20,7 @@ void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleabilit
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
*/
void add_data(void *data, size_t total_qty, int type, size_t request_qty, malleability_data_t *data_struct) {
void add_data(void *data, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct) {
size_t i;
if(data_struct->entries == 0) {
......@@ -49,7 +49,7 @@ void add_data(void *data, size_t total_qty, int type, size_t request_qty, mallea
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
*/
void modify_data(void *data, size_t index, size_t total_qty, int type, size_t request_qty, malleability_data_t *data_struct) {
void modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct) {
size_t i;
if(data_struct->entries < index) { // Index does not exist
......@@ -81,7 +81,7 @@ void modify_data(void *data, size_t index, size_t total_qty, int type, size_t re
* unicamente.
*/
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) {
int is_intercomm, rootBcast = MPI_PROC_NULL;
int is_intercomm, rootBcast = MPI_PROC_NULL, type_size;
size_t i, j;
MPI_Datatype entries_type, struct_type;
......@@ -97,7 +97,7 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
def_malleability_entries(data_struct_dist, data_struct_rep, &entries_type);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, rootBcast, intercomm);
if(is_children_group && ( data_struct_rep->entries != 0 || data_struct_dist->entries != 0 )) { //FIXME Que pasa si ambos valores son 0?
if(is_children_group && ( data_struct_rep->entries != 0 || data_struct_dist->entries != 0 )) {
init_malleability_data_struct(data_struct_rep, data_struct_rep->entries);
init_malleability_data_struct(data_struct_dist, data_struct_dist->entries);
}
......@@ -107,14 +107,15 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
if(is_children_group) {
for(i=0; i < data_struct_rep->entries; i++) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int
MPI_Type_size(data_struct_rep->types[i], &type_size);
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * type_size);
data_struct_rep->requests[i] = (MPI_Request *) malloc(data_struct_rep->request_qty[i] * sizeof(MPI_Request));
for(j=0; j < data_struct_rep->request_qty[i]; j++) {
data_struct_rep->requests[i][j] = MPI_REQUEST_NULL;
}
}
for(i=0; i < data_struct_dist->entries; i++) {
data_struct_dist->arrays[i] = (void *) NULL;
data_struct_dist->arrays[i] = (void *) NULL; // TODO Se podria inicializar aqui?
data_struct_dist->requests[i] = (MPI_Request *) malloc(data_struct_dist->request_qty[i] * sizeof(MPI_Request));
for(j=0; j < data_struct_dist->request_qty[i]; j++) {
data_struct_dist->requests[i][j] = MPI_REQUEST_NULL;
......@@ -142,7 +143,7 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size
data_struct->max_entries = size;
data_struct->qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->types = (int *) malloc(size * sizeof(int));
data_struct->types = (MPI_Datatype *) malloc(size * sizeof(MPI_Datatype));
data_struct->request_qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *));
data_struct->windows = (MPI_Win *) malloc(size * sizeof(MPI_Win));
......@@ -161,14 +162,14 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size
*/
void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t qty_to_add) {
size_t i, needed, *qty_aux, *request_qty_aux;
int *types_aux;
MPI_Datatype *types_aux;
MPI_Win *windows_aux;
MPI_Request **requests_aux;
void **arrays_aux;
needed = data_struct->max_entries + qty_to_add;
qty_aux = (size_t *) realloc(data_struct->qty, needed * sizeof(int));
types_aux = (int *) realloc(data_struct->types, needed * sizeof(int));
types_aux = (MPI_Datatype *) realloc(data_struct->types, needed * sizeof(MPI_Datatype));
request_qty_aux = (size_t *) realloc(data_struct->request_qty, needed * sizeof(int));
requests_aux = (MPI_Request **) realloc(data_struct->requests, needed * sizeof(MPI_Request *));
windows_aux = (MPI_Win *) realloc(data_struct->windows, needed * sizeof(MPI_Win));
......@@ -184,6 +185,8 @@ void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t q
arrays_aux[i] = NULL;
}
//TODO
//if(data_struct->qty != qty_aux && data_struct->qty != NULL) free(data_struct->qty);
data_struct->qty = qty_aux;
data_struct->types = types_aux;
data_struct->request_qty = request_qty_aux;
......@@ -199,7 +202,7 @@ void free_malleability_data_struct(malleability_data_t *data_struct) {
max = data_struct->entries;
if(max != 0) {
for(i=0; i<max; i++) {
//free(data_struct->arrays[i]); //FIXME Valores alojados con 1 elemento no se liberan?
//free(data_struct->arrays[i]); //FIXME Valores alojados con 1 elemento no se liberan? Usar qty? Comprobar si esta alojado por el usuario?
}
if(data_struct->qty != NULL) {
......@@ -281,11 +284,14 @@ void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleabilit
MPI_Get_address((data_struct_rep->qty), &displs[0]);
MPI_Get_address((data_struct_rep->request_qty), &displs[1]);
MPI_Get_address((data_struct_rep->types), &displs[2]);
MPI_Get_address((data_struct_rep->types), &displs[2]); // MPI_Datatype uses typedef int to be declared
MPI_Get_address((data_struct_dist->qty), &displs[3]);
MPI_Get_address((data_struct_dist->request_qty), &displs[4]);
MPI_Get_address((data_struct_dist->types), &displs[5]);
MPI_Get_address((data_struct_dist->types), &displs[5]); // MPI_Datatype uses typedef int to be declared
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type);
}
......@@ -14,7 +14,7 @@ typedef struct {
size_t entries; // Indica numero de vectores a comunicar (replicated data)
size_t max_entries;
size_t *qty; // Indica numero de elementos en cada subvector de sync_array
int *types;
MPI_Datatype *types;
// Vector de vectores de request. En cada elemento superior se indican los requests a comprobar para dar por finalizada
// la comunicacion de ese dato
......@@ -25,8 +25,8 @@ typedef struct {
} malleability_data_t;
void add_data(void *data, size_t total_qty, int type, size_t request_qty, malleability_data_t *data_struct);
void modify_data(void *data, size_t index, size_t total_qty, int type, size_t request_qty, malleability_data_t *data_struct);
void add_data(void *data, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct);
void modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm);
void free_malleability_data_struct(malleability_data_t *data_struct);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment