Commit cf716c90 authored by iker_martin's avatar iker_martin
Browse files

Improved strat wait_targets so it only performs a single Ibarrier for all...

Improved strat wait_targets so it only performs a single Ibarrier for all asynch redistribution. Added function to check config is valid before starting reconfiguration
parent c706c00b
......@@ -309,13 +309,6 @@ void async_communication_start(void *send, void **recv, int qty, MPI_Datatype da
break;
}
/* POST REQUESTS CHECKS */
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
if(!is_children_group && (mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->myId >= numO)) { // TODO Simplify to "if rank is source only" or "if rank will be zombie"
MPI_Ibarrier(comm, &((*requests)[*request_qty-1]) ); //FIXME Not easy to read...
}
}
freeCounts(&s_counts);
freeCounts(&r_counts);
}
......@@ -329,38 +322,19 @@ void async_communication_start(void *send, void **recv, int qty, MPI_Datatype da
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
*/
int async_communication_check(int is_children_group, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
int completed, req_completed, all_req_null, test_err, aux_condition;
int async_communication_check(int is_children_group, MPI_Request *requests, size_t request_qty) {
int completed, req_completed, test_err;
size_t i;
completed = 1;
all_req_null = 1;
test_err = MPI_SUCCESS;
if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
// The Ibarrier should only be posted at this point if the process
// has other requests which has not confirmed as completed yet,
// but are confirmed now.
if (requests[request_qty-1] == MPI_REQUEST_NULL) {
for(i=0; i<request_qty; i++) {
aux_condition = requests[i] == MPI_REQUEST_NULL;
all_req_null = all_req_null && aux_condition;
test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
completed = completed && req_completed;
}
if(completed && !all_req_null) { MPI_Ibarrier(comm, &(requests[request_qty-1])); }
}
test_err = MPI_Test(&(requests[request_qty-1]), &completed, MPI_STATUS_IGNORE);
} else {
for(i=0; i<request_qty; i++) {
test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
completed = completed && req_completed;
}
// test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
for(i=0; i<request_qty; i++) {
test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
completed = completed && req_completed;
}
//test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", mall->myId);
......@@ -378,17 +352,12 @@ int async_communication_check(int is_children_group, MPI_Comm comm, MPI_Request
* - comm (IN): Communicator to use to confirm finalizations of redistribution
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
* - post_ibarrier (IN): Whether an Ibarrier should be posted by this process or not.
*/
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier) {
void async_communication_wait(MPI_Request *requests, size_t request_qty) {
MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE);
#if USE_MAL_DEBUG >= 3
DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
if(post_ibarrier) {
MPI_Ibarrier(comm, &(requests[request_qty-1]) );
MPI_Wait(&(requests[request_qty-1]), MPI_STATUS_IGNORE);
}
}
/*
......@@ -624,9 +593,6 @@ void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request
sum += (size_t) r_counts.idE - r_counts.idI;
break;
}
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
sum++;
}
if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests
......
......@@ -10,8 +10,8 @@
void sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm);
void async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_check(int is_children_group, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier);
int async_communication_check(int is_children_group, MPI_Request *requests, size_t request_qty);
void async_communication_wait(MPI_Request *requests, size_t request_qty);
void async_communication_end(MPI_Request *requests, size_t request_qty, MPI_Win *win);
......
......@@ -18,7 +18,7 @@ int MAM_I_set_spawn_strat(unsigned int strategy, unsigned int *strategies);
int MAM_I_set_red_strat(unsigned int strategy, unsigned int *strategies);
int MAM_I_set_target_number(unsigned int new_numC);
int MAM_I_configuration_get_info();
int MAM_I_configuration_get_defaults();
int MAM_I_contains_strat(unsigned int comm_strategies, unsigned int strategy);
int MAM_I_add_strat(unsigned int *comm_strategies, unsigned int strategy);
......@@ -108,16 +108,6 @@ void MAM_Set_key_configuration(int key, int required, int *provided) {
}
} else {*provided = *(config->value); }
} else { printf("MAM: Key %d does not exist\n", key); }
//TODO -- Llevar esto a una funcion de MAM_Config_init para asegurar que es correcto.
if(mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL) {
if(MAM_I_contains_strat(mall_conf->spawn_strategies, MAM_STRAT_SPAWN_INTERCOMM)) {
MAM_I_remove_strat(&mall_conf->spawn_strategies, MAM_MASK_SPAWN_INTERCOMM);
}
if(MAM_I_contains_strat(mall_conf->red_strategies, MAM_STRAT_RED_WAIT_SOURCES)) {
MAM_I_set_red_strat(MAM_STRAT_RED_WAIT_TARGETS, &mall_conf->red_strategies);
}
}
}
/*
......@@ -180,10 +170,10 @@ void MAM_Init_configuration() {
configSettings[MAM_RED_STRATEGIES].value = &mall_conf->red_strategies;
}
void MAM_Check_configuration() {
void MAM_Set_initial_configuration() {
int not_filled = 1;
not_filled = MAM_I_configuration_get_info();
not_filled = MAM_I_configuration_get_defaults();
if(not_filled) {
if(mall->myId == mall->root) printf("MAM WARNING: Starting configuration not set\n");
fflush(stdout);
......@@ -198,15 +188,28 @@ void MAM_Check_configuration() {
#endif
}
void MAM_Check_configuration() {
if(mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL) {
if(MAM_I_contains_strat(mall_conf->spawn_strategies, MAM_STRAT_SPAWN_INTERCOMM)) {
MAM_I_remove_strat(&mall_conf->spawn_strategies, MAM_MASK_SPAWN_INTERCOMM);
}
if(MAM_I_contains_strat(mall_conf->red_strategies, MAM_STRAT_RED_WAIT_SOURCES)) {
MAM_I_set_red_strat(MAM_STRAT_RED_WAIT_TARGETS, &mall_conf->red_strategies);
}
}
if(mall->numC == mall->numP) { // Migrate
MAM_Set_key_configuration(MAM_SPAWN_METHOD, MALL_SPAWN_BASELINE, NULL);
}
}
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================?????????????????=====================||
//======================================================||
//======================================================||
int MAM_I_configuration_get_info() { //FIXME Cambiar nombre
int MAM_I_configuration_get_defaults() {
size_t i;
int set_value;
char *tmp = NULL;
......@@ -313,13 +316,9 @@ int MAM_I_set_red_strat(unsigned int strategy, unsigned int *strategies) {
}
int MAM_I_set_target_number(unsigned int new_numC) {
int provided;
if(state > MALL_NOT_STARTED || new_numC == 0) return MALL_DENIED;
mall->numC = (int) new_numC;
if(mall->numC == mall->numP) { // Migrar //FIXME Cambiar de sitio
MAM_Set_key_configuration(MAM_SPAWN_METHOD, MALL_SPAWN_BASELINE, &provided);
}
return new_numC;
}
......
......@@ -5,6 +5,7 @@
#include "malleabilityStates.h"
void MAM_Init_configuration();
void MAM_Set_initial_configuration();
void MAM_Check_configuration();
#endif
......@@ -43,6 +43,10 @@ typedef struct {
MPI_Comm intercomm, tmp_comm;
MPI_Comm *user_comm;
MPI_Datatype struct_type;
// Specific vars for Wait_targets strat
int wait_targets_posted;
MPI_Request wait_targets;
char *name_exec, *nodelist;
int num_cpus, num_nodes, nodelist_len;
......
......@@ -123,7 +123,7 @@ int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(vo
}
MAM_check_hosts();
MAM_Check_configuration();
MAM_Set_initial_configuration();
#if USE_MAL_BARRIERS && USE_MAL_DEBUG
if(mall->myId == mall->root)
......@@ -334,9 +334,6 @@ void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int
} else if(mall_conf->red_method == MALL_RED_POINT || mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL) {
total_reqs = mall->numC;
}
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
total_reqs++;
}
add_data(data, total_qty, type, total_reqs, dist_a_data);
}
......@@ -370,9 +367,6 @@ void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Da
} else if(mall_conf->red_method == MALL_RED_POINT || mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL) {
total_reqs = mall->numC;
}
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
total_reqs++;
}
modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
}
......@@ -504,16 +498,18 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
//======================================================||
int MAM_St_rms(int *mam_state) {
*mam_state = MAM_NOT_STARTED;
state = MALL_RMS_COMPLETED;
reset_malleability_times();
// Comprobar si se tiene que realizar un redimensionado
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->malleability_start = MPI_Wtime();
//if(CHECK_RMS()) {return MALL_DENIED;}
*mam_state = MAM_NOT_STARTED;
state = MALL_RMS_COMPLETED;
MAM_Check_configuration();
mall->wait_targets_posted = 0;
//if(CHECK_RMS()) {return MALL_DENIED;}
return 1;
}
......@@ -644,7 +640,7 @@ void Children_init(void (*user_function)(void *), void *user_args) {
size_t i;
#if USE_MAL_DEBUG
DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("MaM will now initialize spawned processes", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
malleability_connect_children(mall->comm, &(mall->intercomm));
......@@ -655,13 +651,13 @@ void Children_init(void (*user_function)(void *), void *user_args) {
mall->root_collectives = mall->root_parents;
#if USE_MAL_DEBUG
DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Spawned start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
......@@ -679,20 +675,23 @@ void Children_init(void (*user_function)(void *), void *user_args) {
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
}
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Spawned started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
int post_ibarrier = 0;
// FIXME No permite el uso de ibarrier ahora mismo. Realmente solo hace falta un ibarrier para todos
for(i=0; i<rep_a_data->entries; i++) {
async_communication_wait(mall->intercomm, rep_a_data->requests[i], rep_a_data->request_qty[i], post_ibarrier);
async_communication_wait(rep_a_data->requests[i], rep_a_data->request_qty[i]);
}
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { post_ibarrier=1; }
for(i=0; i<dist_a_data->entries; i++) {
async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
async_communication_wait(dist_a_data->requests[i], dist_a_data->request_qty[i]);
}
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
mall->wait_targets_posted = 1;
MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE);
}
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Targets waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Spawned waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
for(i=0; i<dist_a_data->entries; i++) {
async_communication_end(dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
......@@ -708,7 +707,7 @@ void Children_init(void (*user_function)(void *), void *user_args) {
mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
}
#if USE_MAL_DEBUG
DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Spawned have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
......@@ -812,9 +811,14 @@ int start_redistribution() {
return thread_creation();
} else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
for(i=0; i<rep_a_data->entries; i++) {
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
}
if(mall->zombie && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
mall->wait_targets_posted = 1;
}
return MALL_DIST_PENDING;
}
}
......@@ -837,44 +841,54 @@ int start_redistribution() {
* //FIXME Modificar para que se tenga en cuenta rep_a_data
*/
int check_redistribution(int wait_completed) {
int completed, local_completed, all_completed, post_ibarrier;
int completed, local_completed, all_completed;
size_t i, req_qty;
MPI_Request *req_completed;
MPI_Win window;
post_ibarrier = 0;
local_completed = 1;
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
if(wait_completed) {
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->myId >= mall->numC) {
post_ibarrier=1;
}
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL) && !mall->wait_targets_posted) {
MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
mall->wait_targets_posted = 1;
}
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
async_communication_wait(req_completed, req_qty);
}
for(i=0; i<rep_a_data->entries; i++) {
req_completed = rep_a_data->requests[i];
req_qty = rep_a_data->request_qty[i];
async_communication_wait(mall->intercomm, req_completed, req_qty, 0); //FIXME Ibarrier does not work with rep_a_data
async_communication_wait(req_completed, req_qty);
}
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Wait(&mall->wait_targets, MPI_STATUS_IGNORE); }
} else {
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, mall->intercomm, req_completed, req_qty);
local_completed = local_completed && completed;
}
for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
req_completed = rep_a_data->requests[i];
req_qty = rep_a_data->request_qty[i];
completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, mall->intercomm, req_completed, req_qty);
local_completed = local_completed && completed;
if(mall->wait_targets_posted) {
MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE);
} else {
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
local_completed = local_completed && completed;
}
for(i=0; i<rep_a_data->entries; i++) {
req_completed = rep_a_data->requests[i];
req_qty = rep_a_data->request_qty[i];
completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
local_completed = local_completed && completed;
}
if(local_completed && MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
MPI_Ibarrier(mall->intercomm, &mall->wait_targets);
mall->wait_targets_posted = 1;
MPI_Test(&mall->wait_targets, &local_completed, MPI_STATUS_IGNORE); //TODO - Figure out if last process takes profit from calling here
}
}
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment