Commit db1ffc32 authored by iker_martin's avatar iker_martin
Browse files

Added Intercomm strategy for Baseline. Refactor of some functions to reduce code. Some bug fixes.

parent 8097cf12
......@@ -517,10 +517,6 @@ void init_originals() {
size_t i;
if(config_file->n_groups > 1) {
MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
MAM_Set_target_number(config_file->groups[group->grp+1].procs);
malleability_add_data(&(group->grp), 1, MPI_INT, 1, 1);
malleability_add_data(&run_id, 1, MPI_INT, 1, 1);
malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);
......
......@@ -7,11 +7,10 @@
#include "MAM_Configuration.h"
#include "malleabilityDataStructures.h"
//void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty);
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm);
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
......@@ -60,7 +59,6 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
......@@ -68,56 +66,37 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - comm (IN): Communicator to use to perform the redistribution.
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/
int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, MPI_Comm comm) {
int is_intercomm, aux_comm_used = 0;
void sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm) {
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
MPI_Comm aux_comm = MPI_COMM_NULL;
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
// prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts); //FIXME Needs the datatype?
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); //FIXME MAGICAL VALUE
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else {
aux_comm = comm;
}
// FIXME END REFACTOR
prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts);
/* PERFORM COMMUNICATION */
switch(mall_conf->red_method) {
case MALL_RED_RMA_LOCKALL:
case MALL_RED_RMA_LOCK:
if(is_children_group) {
dist_data.tamBl = 0;
} else {
get_block_dist(qty, myId, numO, &dist_data);
get_block_dist(qty, mall->myId, numO, &dist_data);
}
sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm);
sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm);
break;
case MALL_RED_POINT:
sync_point2point(send, *recv, datatype, is_intercomm, myId, s_counts, r_counts, aux_comm);
sync_point2point(send, *recv, datatype, s_counts, r_counts, comm);
break;
case MALL_RED_BASELINE:
default:
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm);
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm);
break;
}
if(aux_comm_used) {
MPI_Comm_free(&aux_comm);
}
freeCounts(&s_counts);
freeCounts(&r_counts);
return 1; //FIXME In this case is always true...
}
/*
......@@ -127,9 +106,6 @@ int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype,
* - send (IN): Array with the data to send. This value can not be NULL for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to
* receive data. If the process receives data and is NULL, the behaviour is undefined.
* - is_intercomm (IN): Indicates wether the communicator is an intercommunicator (TRUE) or an
* intracommunicator (FALSE).
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - s_counts (IN): Struct which describes how many elements will send this process to each children and
* the displacements.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent
......@@ -137,7 +113,7 @@ int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype,
* - comm (IN): Communicator to use to perform the redistribution.
*
*/
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
int i, j, init, end, total_sends, datasize;
size_t offset, offset2;
MPI_Request *sends;
......@@ -145,12 +121,12 @@ void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_inte
MPI_Type_size(datatype, &datasize);
init = s_counts.idI;
end = s_counts.idE;
if(!is_intercomm && (s_counts.idI == myId || s_counts.idE == myId + 1)) {
offset = s_counts.displs[myId] + datasize;
offset2 = r_counts.displs[myId] + datasize;
memcpy(send+offset, recv+offset2, s_counts.counts[myId]);
if(mall_conf->spawn_method == MALL_SPAWN_MERGE && (s_counts.idI == mall->myId || s_counts.idE == mall->myId + 1)) {
offset = s_counts.displs[mall->myId] + datasize;
offset2 = r_counts.displs[mall->myId] + datasize;
memcpy(send+offset, recv+offset2, s_counts.counts[mall->myId]);
if(s_counts.idI == myId) init = s_counts.idI+1;
if(s_counts.idI == mall->myId) init = s_counts.idI+1;
else end = s_counts.idE-1;
}
......@@ -168,9 +144,9 @@ void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_inte
init = r_counts.idI;
end = r_counts.idE;
if(!is_intercomm) {
if(r_counts.idI == myId) init = r_counts.idI+1;
else if(r_counts.idE == myId + 1) end = r_counts.idE-1;
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
if(r_counts.idI == mall->myId) init = r_counts.idI+1;
else if(r_counts.idE == mall->myId + 1) end = r_counts.idE-1;
}
for(i=init; i<end; i++) {
......@@ -292,7 +268,6 @@ void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts,
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
......@@ -304,26 +279,13 @@ void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts,
* - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be
* modified to the expected value.
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always false...
*/
int async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
int is_intercomm, aux_comm_used = 0;
void async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
MPI_Comm aux_comm = MPI_COMM_NULL;
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
//prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); // TODO MAGICAL VALUE
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else {
aux_comm = comm;
}
// FIXME END REFACTOR
prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts);
check_requests(s_counts, r_counts, requests, request_qty);
/* PERFORM COMMUNICATION */
......@@ -334,46 +296,40 @@ int async_communication_start(void *send, void **recv, int qty, MPI_Datatype dat
if(is_children_group) {
dist_data.tamBl = 0;
} else {
get_block_dist(qty, myId, numO, &dist_data);
get_block_dist(qty, mall->myId, numO, &dist_data);
}
async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm, *requests, win);
async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm, *requests, win);
break;
case MALL_RED_POINT:
async_point2point(send, *recv, datatype, s_counts, r_counts, aux_comm, *requests);
async_point2point(send, *recv, datatype, s_counts, r_counts, comm, *requests);
break;
case MALL_RED_BASELINE:
default:
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm, &((*requests)[0]));
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm, &((*requests)[0]));
break;
}
/* POST REQUESTS CHECKS */
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
if(!is_children_group && (is_intercomm || myId >= numO)) {
if(!is_children_group && (mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->myId >= numO)) { // TODO Simplify to "if rank is source only" or "if rank will be zombie"
MPI_Ibarrier(comm, &((*requests)[*request_qty-1]) ); //FIXME Not easy to read...
}
}
if(aux_comm_used) {
MPI_Comm_free(&aux_comm);
}
freeCounts(&s_counts);
freeCounts(&r_counts);
return 0; //FIXME In this case is always false...
}
/*
* Checks if a set of requests have been completed (1) or not (0).
*
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
*/
int async_communication_check(int myId, int is_children_group, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
int async_communication_check(int is_children_group, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
int completed, req_completed, all_req_null, test_err, aux_condition;
size_t i;
completed = 1;
......@@ -407,7 +363,7 @@ int async_communication_check(int myId, int is_children_group, MPI_Comm comm, MP
}
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", myId);
printf("P%d aborting -- Test Async\n", mall->myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
}
......@@ -581,56 +537,64 @@ void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts
* how many elements sends/receives to other processes for the new group.
*
* - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - is_intercomm (IN): Indicates wether the used communicator is a intercomunicator(TRUE) or intracommunicator(FALSE).
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* process receives data and is NULL, the behaviour is undefined.
* - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
*
*/
//FIXME Ensure name for is_sync variable
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
int array_size = numO;
int offset_ids = 0;
int datasize;
struct Dist_data dist_data;
if(is_intercomm) {
offset_ids = is_sync ? numP : 0; //FIXME Modify only if active?
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
offset_ids = MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL) ?
0 : numP;
} else {
array_size = numP > numO ? numP : numO;
}
mallocCounts(s_counts, array_size+offset_ids);
mallocCounts(r_counts, array_size+offset_ids);
MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
if(is_children_group) {
offset_ids = 0;
prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, r_counts);
prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
get_block_dist(qty, mall->myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * datasize);
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
#if USE_MAL_DEBUG >= 4
get_block_dist(qty, mall->myId, numP, &dist_data);
print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
#endif
} else {
//get_block_dist(qty, myId, numP, &dist_data);
prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, s_counts);
if(!is_intercomm && myId < numO) {
prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts);
// Obtener distribución para este hijo y reservar vector de recibo
get_block_dist(qty, myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * datasize);
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
#if USE_MAL_DEBUG >= 4
get_block_dist(qty, mall->myId, numP, &dist_data);
#endif
prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, s_counts);
if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId < numO) {
prepare_comm_alltoall(mall->myId, numO, numP, qty, offset_ids, r_counts);
// Obtener distribución para este hijo y reservar vector de recibo
get_block_dist(qty, mall->myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * datasize);
#if USE_MAL_DEBUG >= 4
print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
#endif
}
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
#if USE_MAL_DEBUG >= 4
print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
#endif
}
}
......
......@@ -7,10 +7,10 @@
#include <string.h>
#include "malleabilityStates.h"
int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, MPI_Comm comm);
void sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm);
int async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_check(int myId, int is_children_group, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_check(int is_children_group, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier);
void async_communication_end(MPI_Request *requests, size_t request_qty, MPI_Win *win);
......
......@@ -20,7 +20,7 @@ int MAM_I_set_target_number(unsigned int new_numC);
int MAM_I_configuration_get_info();
int MAM_I_contains_strat(unsigned int comm_strategies, unsigned int strategy, int *result);
int MAM_I_contains_strat(unsigned int comm_strategies, unsigned int strategy);
int MAM_I_add_strat(unsigned int *comm_strategies, unsigned int strategy);
int MAM_I_remove_strat(unsigned int *comm_strategies, unsigned int strategy);
......@@ -34,6 +34,8 @@ mam_config_setting_t configSettings[] = {
{NULL, 1, INT_MAX, {.set_config_complex = MAM_I_set_target_number }, MAM_NUM_TARGETS_ENV}
};
unsigned int masks_spawn[] = {MAM_STRAT_CLEAR_VALUE, MAM_MASK_PTHREAD, MAM_MASK_SPAWN_SINGLE, MAM_MASK_SPAWN_INTERCOMM};
unsigned int masks_red[] = {MAM_STRAT_CLEAR_VALUE, MAM_MASK_PTHREAD, MAM_MASK_RED_WAIT_SOURCES, MAM_MASK_RED_WAIT_TARGETS};
/**
* @brief Set configuration parameters for MAM.
......@@ -107,10 +109,14 @@ void MAM_Set_key_configuration(int key, int required, int *provided) {
} else {*provided = *(config->value); }
} else { printf("MAM: Key %d does not exist\n", key); }
//FIXME Si cambia esto no se indica...
if(!MAM_I_contains_strat(mall_conf->red_strategies, MAM_STRAT_RED_WAIT_TARGETS, &aux) &&
(mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL)) {
MAM_I_set_red_strat(MAM_STRAT_RED_WAIT_TARGETS, &mall_conf->red_strategies);
//TODO -- Llevar esto a una funcion de MAM_Config_init para asegurar que es correcto.
if(mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL) {
if(MAM_I_contains_strat(mall_conf->spawn_strategies, MAM_STRAT_SPAWN_INTERCOMM)) {
MAM_I_remove_strat(&mall_conf->spawn_strategies, MAM_MASK_SPAWN_INTERCOMM);
}
if(MAM_I_contains_strat(mall_conf->red_strategies, MAM_STRAT_RED_WAIT_SOURCES)) {
MAM_I_set_red_strat(MAM_STRAT_RED_WAIT_TARGETS, &mall_conf->red_strategies);
}
}
}
......@@ -118,26 +124,29 @@ void MAM_Set_key_configuration(int key, int required, int *provided) {
* Retorna si una estrategia aparece o no
*/
int MAM_Contains_strat(int key, unsigned int strategy, int *result) {
int strategies, aux;
int strategies, aux = MAM_OK;
unsigned int len = 0, mask;
switch(key) {
case MAM_SPAWN_STRATEGIES:
if(strategy < MAM_STRATS_SPAWN_LEN) {
strategies = mall_conf->spawn_strategies;
}
strategies = mall_conf->spawn_strategies;
mask = masks_spawn[strategy];
len = MAM_STRATS_SPAWN_LEN;
break;
case MAM_RED_STRATEGIES:
if(strategy < MAM_STRATS_RED_LEN) {
strategies = mall_conf->red_strategies;
}
strategies = mall_conf->red_strategies;
mask = masks_red[strategy];
len = MAM_STRATS_RED_LEN;
break;
default:
aux = MALL_DENIED;
break;
}
if(aux != MALL_DENIED) {
MAM_I_contains_strat(strategies, strategy, &aux);
if(aux == MAM_OK && strategy < len) {
aux = MAM_I_contains_strat(strategies, mask);
} else {
aux = 0;
}
if(result != NULL) *result = aux;
......@@ -197,7 +206,7 @@ void MAM_Check_configuration() {
//======================================================||
int MAM_I_configuration_get_info() {
int MAM_I_configuration_get_info() { //FIXME Cambiar nombre
size_t i;
int set_value;
char *tmp = NULL;
......@@ -246,6 +255,9 @@ int MAM_I_set_spawn_strat(unsigned int strategy, unsigned int *strategies) {
case MAM_STRAT_SPAWN_SINGLE:
result = MAM_I_add_strat(strategies, MAM_MASK_SPAWN_SINGLE);
break;
case MAM_STRAT_SPAWN_INTERCOMM:
result = MAM_I_add_strat(strategies, MAM_MASK_SPAWN_INTERCOMM);
break;
default:
//Unkown strategy
result = MALL_DENIED;
......@@ -267,7 +279,7 @@ int MAM_I_set_red_strat(unsigned int strategy, unsigned int *strategies) {
*strategies = MAM_STRAT_CLEAR_VALUE;
result = MAM_STRATS_MODIFIED;
break;
case MAM_STRAT_RED_PTHREAD:
case MAM_STRAT_RED_PTHREAD: //TODO - IMPROVEMENT - This could be done with a single operation instead of 3.
result = MAM_I_add_strat(strategies, MAM_MASK_PTHREAD);
if(result == MAM_STRATS_ADDED) {
strat_removed += MAM_I_remove_strat(strategies, MAM_MASK_RED_WAIT_SOURCES);
......@@ -305,28 +317,28 @@ int MAM_I_set_target_number(unsigned int new_numC) {
if(state > MALL_NOT_STARTED || new_numC == 0) return MALL_DENIED;
mall->numC = (int) new_numC;
if(mall->numC == mall->numP) { // Migrar
if(mall->numC == mall->numP) { // Migrar //FIXME Cambiar de sitio
MAM_Set_key_configuration(MAM_SPAWN_METHOD, MALL_SPAWN_BASELINE, &provided);
}
return new_numC;
}
/*
* Returns 1 if strategy is applied, 0 otherwise
*/
int MAM_I_contains_strat(unsigned int comm_strategies, unsigned int strategy, int *result) {
*result = comm_strategies & strategy;
return *result;
int MAM_I_contains_strat(unsigned int comm_strategies, unsigned int strategy) {
return comm_strategies & strategy;
}
int MAM_I_add_strat(unsigned int *comm_strategies, unsigned int strategy) {
if(MAM_I_contains_strat(*comm_strategies, strategy, NULL)) return MAM_OK;
if(MAM_I_contains_strat(*comm_strategies, strategy)) return MAM_OK;
*comm_strategies |= strategy;
return MAM_STRATS_ADDED;
}
int MAM_I_remove_strat(unsigned int *comm_strategies, unsigned int strategy) {
if(!MAM_I_contains_strat(*comm_strategies, strategy, NULL)) return MAM_OK;
if(!MAM_I_contains_strat(*comm_strategies, strategy)) return MAM_OK;
*comm_strategies &= ~strategy;
return MAM_STRATS_MODIFIED;
}
......@@ -11,6 +11,7 @@
#define MAM_MASK_PTHREAD 0x01
#define MAM_MASK_SPAWN_SINGLE 0x02
#define MAM_MASK_SPAWN_INTERCOMM 0x04
#define MAM_MASK_RED_WAIT_SOURCES 0x02
#define MAM_MASK_RED_WAIT_TARGETS 0x04
......
......@@ -45,6 +45,7 @@ void MAM_Free_main_datatype() {
void MAM_Comm_main_structures(int rootBcast) {
MPI_Bcast(MPI_BOTTOM, 1, mall->struct_type, rootBcast, mall->intercomm);
if(mall->nodelist == NULL) {
mall->nodelist = malloc((mall->nodelist_len+1) * sizeof(char));
mall->nodelist[mall->nodelist_len] = '\0';
......
......@@ -35,9 +35,9 @@ typedef struct {
} malleability_config_t;
typedef struct {
int myId, numP, numC, root, zombie;
int myId, numP, numC, zombie;
int root, root_collectives;
int num_parents, root_parents;
int is_intercomm;
pthread_t async_thread;
MPI_Comm comm, thread_comm;
MPI_Comm intercomm, tmp_comm;
......
......@@ -15,7 +15,7 @@
#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1
void MAM_Commit(int *mam_state, int is_children_group);
void MAM_Commit(int *mam_state);
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);
......@@ -182,6 +182,7 @@ void MAM_Finalize() {
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
int call_checkpoint = 0;
//TODO This could be changed to an array with the functions to call in each case
switch(state) {
case MALL_UNRESERVED:
*mam_state = MAM_UNRESERVED;
......@@ -241,7 +242,7 @@ void MAM_Resume_redistribution(int *mam_state) {
/*
* TODO
*/
void MAM_Commit(int *mam_state, int rootBcast) {
void MAM_Commit(int *mam_state) {
int zombies = 0;
#if USE_MAL_DEBUG
if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
......@@ -250,7 +251,7 @@ void MAM_Commit(int *mam_state, int rootBcast) {
// Get times before commiting
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
// This communication is only needed when a root process will become a zombie
malleability_times_broadcast(rootBcast);
malleability_times_broadcast(mall->root_collectives);
}
// Free unneded communicators
......@@ -281,7 +282,8 @@ void MAM_Commit(int *mam_state, int rootBcast) {
MPI_Comm_rank(mall->comm, &mall->myId);
MPI_Comm_size(mall->comm, &mall->numP);
mall->root = mall->root_parents;
mall->root = mall_conf->spawn_method == MALL_SPAWN_BASELINE ? mall->root : mall->root_parents;
mall->root_parents = mall->root;
state = MALL_NOT_STARTED;
if(mam_state != NULL) *mam_state = MAM_COMPLETED;
......@@ -450,7 +452,7 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
for(i=0; i < data_struct->entries; i++) {
aux_send = data_struct->arrays[i];
aux_recv = NULL;
async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,
async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,
mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
......@@ -458,7 +460,7 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
for(i=0; i < data_struct->entries; i++) {
aux_send = data_struct->arrays[i];
aux_recv = NULL;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
}
......@@ -476,14 +478,14 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = data_struct->arrays[i];
async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN,
async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN,
mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
data_struct->arrays[i] = aux;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = data_struct->arrays[i];
sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
data_struct->arrays[i] = aux;
}
}
......@@ -495,6 +497,10 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
//====================MAM STAGES========================||
//======================================================||
//======================================================||
//======================================================||
//======================================================||
//======================================================||
//======================================================||
int MAM_St_rms(int *mam_state) {
*mam_state = MAM_NOT_STARTED;
......@@ -523,6 +529,9 @@ int MAM_St_spawn_start() {
}
int MAM_St_spawn_pending(int wait_completed) {
fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
if(mall->myId == 0)printf("TEST END\n");
fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
#if USE_MAL_BARRIERS
......@@ -535,6 +544,12 @@ int MAM_St_spawn_pending(int wait_completed) {
}
int MAM_St_red_start() {
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
mall->root_collectives = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
mall->root_collectives = mall->root;
}
state = start_redistribution();
return 1;
}
......@@ -547,7 +562,7 @@ int MAM_St_red_pending(int *mam_state, int wait_completed) {
}
if(state != MALL_DIST_PENDING) {
if(mall->is_intercomm) {
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
......@@ -606,14 +621,7 @@ int MAM_St_spawn_adapt_pending(int wait_completed) {
}
int MAM_St_completed(int *mam_state) {
int rootBcast;
if(mall->is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = mall->root;
}
MAM_Commit(mam_state, rootBcast);
MAM_Commit(mam_state);
return 0;
}
......@@ -623,6 +631,10 @@ int MAM_St_completed(int *mam_state) {
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
//======================================================||
//======================================================||
//======================================================||
//======================================================||
/*
* Inicializacion de los datos de los hijos.
* En la misma se reciben datos de los padres: La configuracion
......@@ -637,17 +649,17 @@ void Children_init(void (*user_function)(void *), void *user_args) {
#endif
malleability_connect_children(mall->comm, &(mall->intercomm));
MPI_Comm_test_inter(mall->intercomm, &mall->is_intercomm);
if(!mall->is_intercomm) { // For intracommunicators, these processes will be added
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { // For Merge Method, these processes will be added
MPI_Comm_rank(mall->intercomm, &mall->myId);
MPI_Comm_size(mall->intercomm, &mall->numP);
}
mall->root_collectives = mall->root_parents;
#if USE_MAL_DEBUG
DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, mall->root_parents, mall->intercomm);
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
......@@ -659,13 +671,13 @@ void Children_init(void (*user_function)(void *), void *user_args) {
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_parents, mall->intercomm);
MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
}
} else {
recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_parents, mall->intercomm, &(rep_a_data->requests[i][0]));
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
}
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
......@@ -700,7 +712,7 @@ void Children_init(void (*user_function)(void *), void *user_args) {
DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
if(mall->is_intercomm) {
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
......@@ -712,7 +724,7 @@ void Children_init(void (*user_function)(void *), void *user_args) {
user_function(user_args);
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, mall->root_parents, mall->intercomm);
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
......@@ -720,7 +732,7 @@ void Children_init(void (*user_function)(void *), void *user_args) {
recv_data(mall->num_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
for(i=0; i<rep_s_data->entries; i++) {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_parents, mall->intercomm);
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
}
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
......@@ -731,7 +743,7 @@ void Children_init(void (*user_function)(void *), void *user_args) {
DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
MAM_Commit(NULL, mall->root_parents);
MAM_Commit(NULL);
#if USE_MAL_DEBUG
DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
......@@ -743,6 +755,8 @@ void Children_init(void (*user_function)(void *), void *user_args) {
//=====================PARENTS==========================||
//======================================================||
//======================================================||
//======================================================||
//======================================================||
/*
* Se encarga de realizar la creacion de los procesos hijos.
......@@ -781,25 +795,15 @@ int spawn_step(){
* grupos de procesos.
*/
int start_redistribution() {
int rootBcast;
size_t i;
mall->is_intercomm = 0;
if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_test_inter(mall->intercomm, &mall->is_intercomm);
} else {
if(mall->intercomm == MPI_COMM_NULL) {
// Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
// y se trata del spawn Merge Shrink
MPI_Comm_dup(mall->comm, &(mall->intercomm));
}
if(mall->is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = mall->root;
}
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
......@@ -810,7 +814,7 @@ int start_redistribution() {
} else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], rootBcast, mall->intercomm, &(rep_a_data->requests[i][0]));
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
}
return MALL_DIST_PENDING;
}
......@@ -846,7 +850,7 @@ int check_redistribution(int wait_completed) {
if(wait_completed) {
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
if( mall->is_intercomm || mall->myId >= mall->numC) {
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->myId >= mall->numC) {
post_ibarrier=1;
}
}
......@@ -864,13 +868,13 @@ int check_redistribution(int wait_completed) {
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall->intercomm, req_completed, req_qty);
completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, mall->intercomm, req_completed, req_qty);
local_completed = local_completed && completed;
}
for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
req_completed = rep_a_data->requests[i];
req_qty = rep_a_data->request_qty[i];
completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall->intercomm, req_completed, req_qty);
completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, mall->intercomm, req_completed, req_qty);
local_completed = local_completed && completed;
}
#if USE_MAL_DEBUG >= 2
......@@ -901,7 +905,7 @@ int check_redistribution(int wait_completed) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
#endif
if(!mall->is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
return MALL_USER_PENDING;
}
......@@ -916,15 +920,9 @@ int check_redistribution(int wait_completed) {
*/
int end_redistribution() {
size_t i;
int rootBcast, local_state;
int local_state;
if(mall->is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = mall->root;
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN);
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
......@@ -933,19 +931,17 @@ int end_redistribution() {
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
for(i=0; i<rep_s_data->entries; i++) {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
}
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
#endif
if(!mall->is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
}
local_state = MALL_DIST_COMPLETED;
if(!mall->is_intercomm) { // Merge Spawn
if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
local_state = MALL_SPAWN_ADAPT_PENDING;
}
if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->numP > mall->numC) { // Merge Shrink
local_state = MALL_SPAWN_ADAPT_PENDING;
}
return local_state;
......@@ -1004,7 +1000,7 @@ int thread_check(int wait_completed) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
#endif
if(!mall->is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......@@ -1018,18 +1014,11 @@ int thread_check(int wait_completed) {
* por el valor "commAsync".
*/
void* thread_async_work() {
int rootBcast;
size_t i;
if(mall->is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = mall->root;
}
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], rootBcast, mall->intercomm);
MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
}
comm_state = MALL_DIST_COMPLETED;
pthread_exit(NULL);
......
......@@ -15,7 +15,7 @@ enum mam_proc_states{MAM_PROC_CONTINUE, MAM_PROC_NEW_RANK, MAM_PROC_ZOMBIE};
enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE, MAM_METHODS_SPAWN_LEN};
enum mam_spawn_strategies{MAM_STRAT_SPAWN_CLEAR, MAM_STRAT_SPAWN_PTHREAD, MAM_STRAT_SPAWN_SINGLE, MAM_STRATS_SPAWN_LEN};
enum mam_spawn_strategies{MAM_STRAT_SPAWN_CLEAR, MAM_STRAT_SPAWN_PTHREAD, MAM_STRAT_SPAWN_SINGLE, MAM_STRAT_SPAWN_INTERCOMM, MAM_STRATS_SPAWN_LEN};
enum mam_phy_dist_methods{MALL_DIST_SPREAD = 1, MALL_DIST_COMPACT, MAM_METHODS_PHYSICAL_DISTRIBUTION_LEN}; //FIXME Cambiar nombres a PHY_DIST?
enum mam_phy_info_methods{MALL_DIST_STRING = 1, MALL_DIST_HOSTFILE}; //FIXME Cambiar nombres a PHY_DIST?
......
#include "malleabilityTypes.h"
#include "malleabilityDataStructures.h"
#include "MAM_Configuration.h"
void init_malleability_data_struct(malleability_data_t *data_struct, size_t size);
......@@ -80,22 +82,14 @@ void modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type,
* En el argumento "root" todos tienen que indicar quien es el proceso raiz de los padres
* unicamente.
*/
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) {
int is_intercomm, rootBcast = MPI_PROC_NULL, type_size;
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group) {
int type_size;
size_t i, j;
MPI_Datatype entries_type, struct_type;
MPI_Comm_test_inter(intercomm, &is_intercomm);
if(is_intercomm && !is_children_group) {
rootBcast = myId == root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = root;
}
// Mandar primero numero de entradas
def_malleability_entries(data_struct_dist, data_struct_rep, &entries_type);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, rootBcast, intercomm);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, mall->root_collectives, mall->intercomm);
if(is_children_group && ( data_struct_rep->entries != 0 || data_struct_dist->entries != 0 )) {
init_malleability_data_struct(data_struct_rep, data_struct_rep->entries);
......@@ -103,7 +97,7 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
}
def_malleability_qty_type(data_struct_dist, data_struct_rep, &struct_type);
MPI_Bcast(MPI_BOTTOM, 1, struct_type, rootBcast, intercomm);
MPI_Bcast(MPI_BOTTOM, 1, struct_type, mall->root_collectives, mall->intercomm);
if(is_children_group) {
for(i=0; i < data_struct_rep->entries; i++) {
......
......@@ -27,7 +27,7 @@ typedef struct {
void add_data(void *data, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct);
void modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group);
void free_malleability_data_struct(malleability_data_t *data_struct);
#endif
......@@ -30,6 +30,7 @@ int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de err
} else if(spawn_data.spawn_is_single) { // Children path
single_strat_children(child);
}
return MALL_SPAWN_COMPLETED;
}
......@@ -48,7 +49,6 @@ int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) {
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_data.spawn_qty);
}
MAM_Comm_main_structures(rootBcast);
return spawn_err;
......
......@@ -19,7 +19,6 @@
Spawn_data *spawn_data = NULL;
pthread_t spawn_thread;
MPI_Comm *returned_comm;
//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
void set_spawn_configuration(MPI_Comm comm);
......@@ -28,12 +27,12 @@ void deallocate_spawn_data();
//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, int wait_completed);
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed);
//--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn();
void* thread_work();
int allocate_thread_spawn(MPI_Comm *child);
void* thread_work(void *args);
//--------------PUBLIC FUNCTIONS---------------//
......@@ -71,7 +70,7 @@ int init_spawn(MPI_Comm comm, MPI_Comm *child) {
MALL_SPAWN_ADAPT_POSTPONE : local_state;
set_spawn_state(local_state, 0);
if((spawn_data->spawn_is_single && mall->myId == mall->root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn();
allocate_thread_spawn(child);
}
}
......@@ -90,10 +89,10 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) {
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, local_state, wait_completed);
global_state = check_single_state(comm, child, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Generic
global_state = check_generic_state(comm, child, local_state, wait_completed);
global_state = check_generic_state(comm, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
global_state = local_state;
......@@ -148,10 +147,14 @@ void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents) {
MPI_Comm_remote_size(*parents, &spawn_data->initial_qty);
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
baseline(*spawn_data, parents);
if(!spawn_data->spawn_is_intercomm) {
intracomm_strategy(MALLEABILITY_CHILDREN, parents);
}
break;
case MALL_SPAWN_MERGE:
spawn_data->target_qty += spawn_data->initial_qty;
......@@ -176,10 +179,10 @@ void set_spawn_configuration(MPI_Comm comm) {
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
spawn_data->comm = comm;
spawn_data->mapping_fill_method = MALL_DIST_STRING;
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
......@@ -233,6 +236,9 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
local_state = baseline(*spawn_data, child);
if(!spawn_data->spawn_is_intercomm) {
local_state = intracomm_strategy(MALLEABILITY_NOT_CHILDREN, child);
}
break;
case MALL_SPAWN_MERGE:
local_state = merge(*spawn_data, child, data_stage);
......@@ -253,8 +259,8 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
* No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
* asociado al terminar.
*/
int allocate_thread_spawn() {
if(pthread_create(&spawn_thread, NULL, thread_work, NULL)) {
int allocate_thread_spawn(MPI_Comm *child) {
if(pthread_create(&spawn_thread, NULL, thread_work, (void *) child)) {
printf("Error al crear el hilo de SPAWN\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
......@@ -274,18 +280,18 @@ int allocate_thread_spawn() {
* Una vez esta lista la configuracion y es posible crear los procesos
* se avisa al hilo maestro.
*/
void* thread_work() {
void* thread_work(void *args) {
int local_state;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
MPI_Comm *child = (MPI_Comm *) args;
generic_spawn(returned_comm, MALL_NOT_STARTED);
generic_spawn(child, MALL_NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE || local_state == MALL_SPAWN_PENDING) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos
local_state = wait_redistribution();
generic_spawn(returned_comm, MALL_DIST_COMPLETED);
generic_spawn(child, MALL_DIST_COMPLETED);
}
wakeup_completion();
......@@ -302,7 +308,7 @@ void* thread_work() {
* los procesos no root y se devuelve el estado
* "MALL_SPAWN_PENDING".
*/
int check_single_state(MPI_Comm comm, int global_state, int wait_completed) {
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed) {
while(wait_completed && mall->myId == mall->root && global_state == MALL_SPAWN_SINGLE_PENDING) {
global_state = wait_completion();
}
......@@ -315,7 +321,7 @@ int check_single_state(MPI_Comm comm, int global_state, int wait_completed) {
set_spawn_state(global_state, spawn_data->spawn_is_async);
if(mall->myId != mall->root) {
allocate_thread_spawn(spawn_data);
allocate_thread_spawn(child);
}
}
return global_state;
......@@ -330,7 +336,7 @@ int check_single_state(MPI_Comm comm, int global_state, int wait_completed) {
* Si ha terminado libera la memoria asociada a spawn_data
* y devuelve el estado "MALL_SPAWN_COMPLETED".
*/
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, int wait_completed) {
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed) {
int global_state;
while(wait_completed && local_state == MALL_SPAWN_PENDING) local_state = wait_completion();
......@@ -338,8 +344,6 @@ int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, int wai
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
set_spawn_state(global_state, spawn_data->spawn_is_async);
*child = *returned_comm;
deallocate_spawn_data();
}
return global_state;
}
......@@ -36,6 +36,11 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
return local_state;
}
int intracomm_strategy(int is_children_group, MPI_Comm *child) {
merge_adapt_expand(child, is_children_group);
return MALL_SPAWN_COMPLETED;
}
//--------------PRIVATE MERGE TYPE FUNCTIONS---------------//
/*
......@@ -52,7 +57,7 @@ void merge_adapt_expand(MPI_Comm *child, int is_children_group) {
MPI_Intercomm_merge(*child, is_children_group, &new_comm); //El que pone 0 va primero
MPI_Comm_free(child); //POSIBLE ERROR?
MPI_Comm_free(child);
*child = new_comm;
}
......
......@@ -8,5 +8,6 @@
#include "Spawn_DataStructure.h"
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state);
int intracomm_strategy(int is_children_group, MPI_Comm *child);
#endif
......@@ -8,7 +8,7 @@
typedef struct {
int spawn_qty, initial_qty, target_qty;
int already_created;
int spawn_is_single, spawn_is_async;
int spawn_is_single, spawn_is_async, spawn_is_intercomm;
MPI_Info mapping;
int mapping_fill_method;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment