Commit f6c0587b authored by iker_martin's avatar iker_martin
Browse files

Added new option for checkpointing to wait until the reconfigutation is...

Added new option for checkpointing to wait until the reconfigutation is performed. Also, now it returns simplified states.
parent ab984234
......@@ -175,7 +175,7 @@ int main(int argc, char *argv[]) {
}
res = work();
if(res == MALL_ZOMBIE) break;
if(res == MAM_ZOMBIE) break;
if(res==1) { // Se ha llegado al final de la aplicacion
MPI_Barrier(comm);
results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
......@@ -221,9 +221,10 @@ int main(int argc, char *argv[]) {
*/
int work() {
int iter, maxiter, state, res;
int wait_completed = MAM_CHECK_COMPLETION;
maxiter = config_file->groups[group->grp].iters;
state = MALL_NOT_STARTED;
state = MAM_NOT_STARTED;
res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) {
......@@ -231,21 +232,21 @@ int work() {
}
if(config_file->n_groups != group->grp + 1)
state = malleability_checkpoint();
malleability_checkpoint(&state, wait_completed);
iter = 0;
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE || state == MALL_SPAWN_ADAPT_PENDING) {
while(state == MAM_PENDING) {
if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
iterate(state);
iter++;
group->iter_start = iter;
}
state = malleability_checkpoint();
} else { wait_completed = MAM_WAIT_COMPLETION; }
malleability_checkpoint(&state, wait_completed);
}
if(config_file->n_groups == group->grp + 1) res=1;
if(state == MALL_ZOMBIE) res=state;
if(state == MAM_ZOMBIE) res=state;
return res;
}
......@@ -276,7 +277,7 @@ double iterate(int async_comm) {
}
// Se esta realizando una redistribucion de datos asincrona
if(async_comm == MALL_DIST_PENDING || async_comm == MALL_SPAWN_PENDING || async_comm == MALL_SPAWN_SINGLE_PENDING) {
if(async_comm == MAM_PENDING) {
// TODO Que diferencie entre ambas en el IO
results->iters_async += 1;
}
......
......@@ -6,6 +6,7 @@
#include <mpi.h>
#include "Main_datatypes.h"
// 0 1 2 3 4 5 6 7 8
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_IPOINT, COMP_WAIT, COMP_BCAST, COMP_ALLGATHER, COMP_REDUCE, COMP_ALLREDUCE};
double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute);
......
......@@ -193,11 +193,20 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
* - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
* - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
*
* FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get
* A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount
* you need to perform multiples Gets.
* prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000
* prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA 16000
* prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
*/
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) {
MPI_Win win;
MPI_Win_create(send, (MPI_Aint)tamBl, sizeof(char), MPI_INFO_NULL, comm, &win);
MPI_Win_create(send, (MPI_Aint)tamBl * sizeof(char), sizeof(char), MPI_INFO_NULL, comm, &win);
#if USE_MAL_DEBUG >= 3
DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
#endif
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
sync_rma_lockall(recv, r_counts, win);
......@@ -206,7 +215,9 @@ void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Com
sync_rma_lock(recv, r_counts, win);
break;
}
#if USE_MAL_DEBUG >= 3
DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
#endif
MPI_Win_free(&win);
}
......@@ -359,7 +370,7 @@ int async_communication_check(int myId, int is_children_group, int red_strategie
all_req_null = 1;
test_err = MPI_SUCCESS;
if (is_children_group) return 1;
if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
......@@ -398,19 +409,19 @@ int async_communication_check(int myId, int is_children_group, int red_strategie
* Waits until the completion of a set of requests. If the Ibarrier strategy
* is being used, the corresponding ibarrier is posted.
*
* - red_strategies (IN):
* - comm (IN): Communicator to use to confirm finalizations of redistribution
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
* - post_ibarrier (IN): Whether an Ibarrier should be posted by this process or not.
*/
void async_communication_wait(int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier) {
MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE);
#if USE_MAL_DEBUG >= 3
DEBUG_FUNC("Targets Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
if(post_ibarrier) {
MPI_Ibarrier(comm, &(requests[request_qty-1]) );
MPI_Wait(&(requests[request_qty-1]), MPI_STATUS_IGNORE); //TODO Is it really needed? It will be ensured later
MPI_Wait(&(requests[request_qty-1]), MPI_STATUS_IGNORE);
}
}
......@@ -480,7 +491,7 @@ void async_point2point(char *send, char *recv, struct Counts s_counts, struct Co
*/
void async_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win) {
MPI_Win_create(send, (MPI_Aint)tamBl, sizeof(char), MPI_INFO_NULL, comm, win);
MPI_Win_create(send, (MPI_Aint)tamBl * sizeof(char), sizeof(char), MPI_INFO_NULL, comm, win);
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
async_rma_lockall(recv, r_counts, *win, requests);
......
......@@ -21,7 +21,7 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
int async_communication_start(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_check(int myId, int is_children_group, int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_wait(int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier);
void async_communication_end(int red_method, int red_strategies, MPI_Request *requests, size_t request_qty, MPI_Win *win);
//int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies);
//void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies);
......
......@@ -19,7 +19,7 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
void Children_init();
int spawn_step();
int start_redistribution();
int check_redistribution();
int check_redistribution(int wait_completed);
int end_redistribution();
int shrink_redistribution();
......@@ -27,7 +27,7 @@ void comm_node_data(int rootBcast, int is_child_group);
void def_nodeinfo_type(MPI_Datatype *node_type);
int thread_creation();
int thread_check();
int thread_check(int wait_completed);
void* thread_async_work();
void print_comms_state();
......@@ -164,13 +164,14 @@ void free_malleability() {
* Si solo hay datos sincronos se envian tras la creacion de los procesos
* y finalmente se desconectan los dos grupos de procesos.
*/
int malleability_checkpoint() {
double end_real_time;
int malleability_checkpoint(int *mam_state, int wait_completed) {
switch(state) {
case MALL_UNRESERVED:
*mam_state = MAM_UNRESERVED;
break;
case MALL_NOT_STARTED:
*mam_state = MAM_NOT_STARTED;
reset_malleability_times();
// Comprobar si se tiene que realizar un redimensionado
......@@ -183,37 +184,37 @@ int malleability_checkpoint() {
state = spawn_step();
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
malleability_checkpoint();
malleability_checkpoint(mam_state, wait_completed);
}
break;
case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado y comienza la redistribucion
case MALL_SPAWN_SINGLE_PENDING:
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
malleability_checkpoint();
malleability_checkpoint(mam_state, wait_completed);
}
break;
case MALL_SPAWN_ADAPT_POSTPONE:
case MALL_SPAWN_COMPLETED:
state = start_redistribution();
malleability_checkpoint();
malleability_checkpoint(mam_state, wait_completed);
break;
case MALL_DIST_PENDING:
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
state = thread_check();
state = thread_check(wait_completed);
} else {
state = check_redistribution();
state = check_redistribution(wait_completed);
}
if(state != MALL_DIST_PENDING) {
malleability_checkpoint();
malleability_checkpoint(mam_state, wait_completed);
}
break;
......@@ -224,20 +225,21 @@ int malleability_checkpoint() {
#endif
mall_conf->times->spawn_start = MPI_Wtime();
unset_spawn_postpone_flag(state);
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
malleability_checkpoint();
malleability_checkpoint(mam_state, wait_completed);
}
break;
case MALL_SPAWN_ADAPTED:
state = shrink_redistribution();
malleability_checkpoint();
if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE;
malleability_checkpoint(mam_state, wait_completed);
break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
......@@ -246,29 +248,39 @@ int malleability_checkpoint() {
#endif
mall_conf->times->malleability_end = MPI_Wtime();
state = MALL_COMPLETED;
*mam_state = MAM_COMPLETED;
break;
}
if(state > MALL_ZOMBIE && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
return state;
}
void MAM_Commit(int *mam_state) {
//Hacer borrado de comunicadores no necesarios
//Update de comunicadores
//Reiniciar algunas estructuras ¿Cuales?
//Llamar a funcion de zombies
//Devolver el estado de mam
}
// Funciones solo necesarias por el benchmark
//-------------------------------------------------------------------------------------------------------------
void set_benchmark_grp(int grp) {
mall_conf->grp = grp;
}
void set_benchmark_configuration(configuration *config_file) {
mall_conf->config_file = config_file;
}
void get_benchmark_configuration(configuration **config_file) {
*config_file = mall_conf->config_file;
}
//-------------------------------------------------------------------------------------------------------------
void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
malleability_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
}
//-------------------------------------------------------------------------------------------------------------
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
mall_conf->spawn_method = spawn_method;
......@@ -530,7 +542,7 @@ void Children_init() {
MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, root_parents, mall->intercomm);
#if USE_MAL_DEBUG
DEBUG_FUNC("Children have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
......@@ -548,13 +560,16 @@ void Children_init() {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Children started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
int post_ibarrier = 0;
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { post_ibarrier=1; }
for(i=0; i<dist_a_data->entries; i++) {
async_communication_wait(mall_conf->red_strategies, mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i]);
async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
}
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Children waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Targets waited for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
for(i=0; i<dist_a_data->entries; i++) {
async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
......@@ -567,7 +582,7 @@ void Children_init() {
mall_conf->times->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
}
#if USE_MAL_DEBUG
DEBUG_FUNC("Children have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
......@@ -594,7 +609,7 @@ void Children_init() {
mall_conf->times->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
}
#if USE_MAL_DEBUG
DEBUG_FUNC("Children have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
// Guardar los resultados de esta transmision
......@@ -711,16 +726,30 @@ int start_redistribution() {
* los hijos han terminado de recibir.
* //FIXME Modificar para que se tenga en cuenta rep_a_data
*/
int check_redistribution() {
int is_intercomm, completed, local_completed, all_completed;
int check_redistribution(int wait_completed) {
int is_intercomm, completed, local_completed, all_completed, post_ibarrier;
size_t i, req_qty;
MPI_Request *req_completed;
MPI_Win window;
post_ibarrier = 0;
local_completed = 1;
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Originals are checking for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(wait_completed) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
if( is_intercomm || mall->myId >= mall->numC) {
post_ibarrier=1;
}
}
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
}
} else {
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
......@@ -728,13 +757,15 @@ int check_redistribution() {
local_completed = local_completed && completed;
}
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Originals will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
}
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Originals sent asyncrhonous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
DEBUG_FUNC("Sources sent asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
for(i=0; i<dist_a_data->entries; i++) {
......@@ -744,7 +775,6 @@ int check_redistribution() {
async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
#endif
......@@ -926,9 +956,17 @@ int thread_creation() {
*
* El estado de la comunicación es devuelto al finalizar la función.
*/
int thread_check() {
int thread_check(int wait_completed) {
int all_completed = 0, is_intercomm;
if(wait_completed && comm_state == MALL_DIST_PENDING) {
if(pthread_join(mall->async_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -2;
}
}
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
......
......@@ -14,7 +14,8 @@
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes);
void free_malleability();
void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint();
int malleability_checkpoint(int *mam_state, int wait_completed);
void MAM_Commit(int *mam_state);
void set_benchmark_grp(int grp);
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies);
......
......@@ -6,9 +6,10 @@
//States
#define MALL_DENIED -1
enum mall_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING,
enum mall_inner_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING,
MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED,
MALL_SPAWN_ADAPT_PENDING, MALL_SPAWN_ADAPTED, MALL_COMPLETED};
enum mam_states{MAM_UNRESERVED, MAM_NOT_STARTED, MAM_ZOMBIE, MAM_PENDING, MAM_COMPLETED};
enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
#define MALL_SPAWN_PTHREAD 2
#define MALL_SPAWN_SINGLE 3
......@@ -22,6 +23,9 @@ enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA
#define MAL_APP_EXECUTING 0
#define MAL_APP_ENDED 1
#define MAM_CHECK_COMPLETION 0
#define MAM_WAIT_COMPLETION 1
//TODO DEPRECATE
#define MAL_INT 0
#define MAL_CHAR 1
......
......@@ -71,7 +71,7 @@ int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, spawn_data.root, 130, *child, MPI_STATUS_IGNORE);
set_spawn_state(MALL_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
wakeup_completion();
} else {
port_name = malloc(1);
}
......
......@@ -6,6 +6,7 @@
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#include "ProcessDist.h"
#include "GenericSpawn.h"
#include "Baseline.h"
......@@ -19,8 +20,6 @@ Spawn_data *spawn_data = NULL;
pthread_t spawn_thread;
MPI_Comm *returned_comm;
double end_time; //FIXME REFACTOR
//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodelist, int myId, int root, int initial_qty, int target_qty, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm);
void set_basic_spawn_dtype();
......@@ -29,8 +28,8 @@ void deallocate_spawn_data();
//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, int global_state);
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time);
int check_single_state(MPI_Comm comm, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, int wait_completed);
//--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn();
......@@ -83,7 +82,7 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) {
int local_state;
int global_state=MALL_NOT_STARTED;
......@@ -91,10 +90,10 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, local_state);
global_state = check_single_state(comm, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Baseline
global_state = check_generic_state(comm, child, local_state, real_time);
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Generic
global_state = check_generic_state(comm, child, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
global_state = local_state;
......@@ -127,7 +126,7 @@ void unset_spawn_postpone_flag(int outside_state) {
int local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE && outside_state == MALL_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) {
set_spawn_state(MALL_SPAWN_PENDING, MALL_SPAWN_PTHREAD);
wakeup();
wakeup_redistribution();
}
}
......@@ -247,7 +246,7 @@ void set_basic_spawn_dtype() {
MPI_Get_address(spawn_data, &dir);
MPI_Get_address(&(spawn_data->root_parents), &displs[0]);
MPI_Get_address(&(spawn_data->initial_qty), &displs[1]);
MPI_Get_address(&(spawn_data->initial_qty), &displs[1]); //FIXME Obtener por la funcion ya existente
MPI_Get_address(&(spawn_data->spawn_is_single), &displs[2]);
MPI_Get_address(&(spawn_data->spawn_method), &displs[3]);
......@@ -305,7 +304,6 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
break;
}
// END WORK
end_time = MPI_Wtime();
aux_state = get_spawn_state(spawn_data->spawn_is_async);
if(!(aux_state == MALL_SPAWN_PENDING && local_state == MALL_SPAWN_ADAPT_POSTPONE)) {
set_spawn_state(local_state, spawn_data->spawn_is_async);
......@@ -351,9 +349,10 @@ void* thread_work() {
if(local_state == MALL_SPAWN_ADAPT_POSTPONE || local_state == MALL_SPAWN_PENDING) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos
local_state = wait_wakeup();
local_state = wait_redistribution();
generic_spawn(returned_comm, MALL_DIST_COMPLETED);
}
wakeup_completion();
pthread_exit(NULL);
}
......@@ -368,7 +367,10 @@ void* thread_work() {
* los procesos no root y se devuelve el estado
* "MALL_SPAWN_PENDING".
*/
int check_single_state(MPI_Comm comm, int global_state) {
int check_single_state(MPI_Comm comm, int global_state, int wait_completed) {
while(wait_completed && mall->myId == mall->root && global_state == MALL_SPAWN_SINGLE_PENDING) {
global_state = wait_completion();
}
MPI_Bcast(&global_state, 1, MPI_INT, spawn_data->root, comm);
// Non-root processes join root to finalize the spawn
......@@ -393,15 +395,16 @@ int check_single_state(MPI_Comm comm, int global_state) {
* Si ha terminado libera la memoria asociada a spawn_data
* y devuelve el estado "MALL_SPAWN_COMPLETED".
*/
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time) {
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, int wait_completed) {
int global_state;
while(wait_completed && local_state == MALL_SPAWN_PENDING) local_state = wait_completion();
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
*child = *returned_comm;
deallocate_spawn_data(spawn_data);
*real_time=end_time;
deallocate_spawn_data();
}
return global_state;
}
......@@ -7,7 +7,7 @@
#include "../malleabilityDataStructures.h"
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed);
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents);
......
......@@ -4,19 +4,21 @@
#include "Spawn_state.h"
pthread_mutex_t spawn_mutex;
pthread_cond_t spawn_cond;
pthread_cond_t spawn_cond, completion_cond;
int spawn_state;
int waiting_redistribution=0;
int waiting_redistribution=0, waiting_completion=0;
void init_spawn_state() {
pthread_mutex_init(&spawn_mutex,NULL);
pthread_cond_init(&spawn_cond,NULL);
pthread_cond_init(&completion_cond,NULL);
set_spawn_state(1,0); //FIXME First parameter is a horrible magical number
}
void free_spawn_state() {
pthread_mutex_destroy(&spawn_mutex);
pthread_cond_destroy(&spawn_cond);
pthread_cond_destroy(&completion_cond);
}
int get_spawn_state(int is_async) {
......@@ -41,7 +43,7 @@ void set_spawn_state(int value, int is_async) {
}
}
int wait_wakeup() {
int wait_redistribution() {
pthread_mutex_lock(&spawn_mutex);
if(!waiting_redistribution) {
waiting_redistribution=1;
......@@ -52,7 +54,7 @@ int wait_wakeup() {
return get_spawn_state(1);
}
void wakeup() {
void wakeup_redistribution() {
pthread_mutex_lock(&spawn_mutex);
if(waiting_redistribution) {
pthread_cond_signal(&spawn_cond);
......@@ -60,3 +62,23 @@ void wakeup() {
waiting_redistribution=1;
pthread_mutex_unlock(&spawn_mutex);
}
int wait_completion() {
pthread_mutex_lock(&spawn_mutex);
if(!waiting_completion) {
waiting_completion=1;
pthread_cond_wait(&completion_cond, &spawn_mutex);
}
waiting_completion=0;
pthread_mutex_unlock(&spawn_mutex);
return get_spawn_state(1);
}
void wakeup_completion() {
pthread_mutex_lock(&spawn_mutex);
if(waiting_completion) {
pthread_cond_signal(&completion_cond);
}
waiting_completion=1;
pthread_mutex_unlock(&spawn_mutex);
}
......@@ -11,7 +11,10 @@ void free_spawn_state();
int get_spawn_state(int is_async);
void set_spawn_state(int value, int is_async);
int wait_wakeup();
void wakeup();
int wait_redistribution();
void wakeup_redistribution();
int wait_completion();
void wakeup_completion();
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment