Commit 69193a34 authored by iker_martin's avatar iker_martin
Browse files

Change order of MaM execution. Deleted unneded codes and minor fixes

parent d75645d6
...@@ -23,6 +23,7 @@ void init_group_struct(char *argv[], int argc, int myId, int numP); ...@@ -23,6 +23,7 @@ void init_group_struct(char *argv[], int argc, int myId, int numP);
void init_application(); void init_application();
void obtain_op_times(); void obtain_op_times();
void free_application_data(); void free_application_data();
void free_zombie_process();
void print_general_info(int myId, int grp, int numP); void print_general_info(int myId, int grp, int numP);
int print_local_results(); int print_local_results();
...@@ -32,6 +33,7 @@ int create_out_file(char *nombre, int *ptr, int newstdout); ...@@ -32,6 +33,7 @@ int create_out_file(char *nombre, int *ptr, int newstdout);
void init_originals(); void init_originals();
void init_targets(); void init_targets();
void update_targets();
void user_redistribution(void *args); void user_redistribution(void *args);
configuration *config_file; configuration *config_file;
...@@ -44,6 +46,7 @@ int main(int argc, char *argv[]) { ...@@ -44,6 +46,7 @@ int main(int argc, char *argv[]) {
int numP, myId, res; int numP, myId, res;
int req; int req;
int im_child; int im_child;
int abort_needed = 0;
int num_cpus, num_nodes; int num_cpus, num_nodes;
char *nodelist = NULL; char *nodelist = NULL;
...@@ -69,7 +72,9 @@ int main(int argc, char *argv[]) { ...@@ -69,7 +72,9 @@ int main(int argc, char *argv[]) {
init_group_struct(argv, argc, myId, numP); init_group_struct(argv, argc, myId, numP);
im_child = MAM_Init(ROOT, &comm, argv[0], nodelist, num_cpus, num_nodes, user_redistribution, NULL); im_child = MAM_Init(ROOT, &comm, argv[0], nodelist, num_cpus, num_nodes, user_redistribution, NULL);
if(!im_child) { //TODO REFACTOR Simplificar inicio if(im_child) {
update_targets();
} else {
init_application(); init_application();
init_originals(); init_originals();
...@@ -95,8 +100,8 @@ int main(int argc, char *argv[]) { ...@@ -95,8 +100,8 @@ int main(int argc, char *argv[]) {
MAM_Set_target_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED MAM_Set_target_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
if(group->grp != 0) { if(group->grp != 0) {
malleability_modify_data(&(group->grp), 0, 1, MPI_INT, 1, 0); malleability_modify_data(&(group->grp), 0, 1, MPI_INT, 1, 1);
malleability_modify_data(&(group->iter_start), 2, 1, MPI_INT, 1, 0); malleability_modify_data(&(group->iter_start), 0, 1, MPI_INT, 1, 0);
} }
} }
...@@ -124,11 +129,12 @@ int main(int argc, char *argv[]) { ...@@ -124,11 +129,12 @@ int main(int argc, char *argv[]) {
MPI_Comm_free(&comm); MPI_Comm_free(&comm);
} }
if(group->myId == ROOT && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE) { if(group->myId == ROOT && config_file->groups[group->grp-1].sm == MALL_SPAWN_MERGE) {
MPI_Abort(MPI_COMM_WORLD, -100); abort_needed = 1;
} }
free_application_data(); free_application_data();
if(abort_needed) { MPI_Abort(MPI_COMM_WORLD, -100); }
MPI_Finalize(); MPI_Finalize();
return 0; return 0;
} }
...@@ -154,8 +160,8 @@ int work() { ...@@ -154,8 +160,8 @@ int work() {
maxiter = config_file->groups[group->grp].iters; maxiter = config_file->groups[group->grp].iters;
state = MAM_NOT_STARTED; state = MAM_NOT_STARTED;
res = 0; res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) { for(iter=group->iter_start; iter < maxiter; iter++) {
iterate(state); iterate(state);
} }
...@@ -174,7 +180,6 @@ int work() { ...@@ -174,7 +180,6 @@ int work() {
} }
if(config_file->n_groups == group->grp + 1) { res=1; } if(config_file->n_groups == group->grp + 1) { res=1; }
if(state == MAM_ZOMBIE) res=state;
return res; return res;
} }
...@@ -206,7 +211,7 @@ double iterate(int async_comm) { ...@@ -206,7 +211,7 @@ double iterate(int async_comm) {
// Se esta realizando una redistribucion de datos asincrona // Se esta realizando una redistribucion de datos asincrona
if(async_comm == MAM_PENDING) { if(async_comm == MAM_PENDING) {
// TODO Que diferencie entre ambas en el IO // TODO Que diferencie entre tipo de partes asincronas?
results->iters_async += 1; results->iters_async += 1;
} }
...@@ -470,7 +475,14 @@ void free_application_data() { ...@@ -470,7 +475,14 @@ void free_application_data() {
group->async_array = NULL; group->async_array = NULL;
} }
MAM_Finalize(); MAM_Finalize();
free_zombie_process();
}
/*
* Libera la memoria asociada a un proceso Zombie
*/
void free_zombie_process() {
free_results_data(results, config_file->n_stages); free_results_data(results, config_file->n_stages);
free(results); free(results);
...@@ -519,8 +531,8 @@ void init_originals() { ...@@ -519,8 +531,8 @@ void init_originals() {
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs); config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
MAM_Set_target_number(config_file->groups[group->grp+1].procs); MAM_Set_target_number(config_file->groups[group->grp+1].procs);
malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0); malleability_add_data(&(group->grp), 1, MPI_INT, 1, 1);
malleability_add_data(&run_id, 1, MPI_INT, 1, 0); malleability_add_data(&run_id, 1, MPI_INT, 1, 1);
malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0); malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);
if(config_file->sdr) { if(config_file->sdr) {
...@@ -540,7 +552,7 @@ void init_targets() { ...@@ -540,7 +552,7 @@ void init_targets() {
size_t i, entries; size_t i, entries;
void *value = NULL; void *value = NULL;
malleability_get_data(&value, 0, 1, 0); malleability_get_data(&value, 0, 1, 1);
group->grp = *((int *)value); group->grp = *((int *)value);
group->grp = group->grp + 1; group->grp = group->grp + 1;
...@@ -549,13 +561,28 @@ void init_targets() { ...@@ -549,13 +561,28 @@ void init_targets() {
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters); init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
results_comm(results, ROOT, config_file->n_resizes, new_comm); results_comm(results, ROOT, config_file->n_resizes, new_comm);
// TODO Refactor - Que sea una unica funcion malleability_get_data(&value, 1, 1, 1);
// Obtiene las variables que van a utilizar los hijos
malleability_get_data(&value, 1, 1, 0);
run_id = *((int *)value); run_id = *((int *)value);
malleability_get_data(&value, 2, 1, 0); if(config_file->adr) {
malleability_get_entries(&entries, 0, 1);
group->async_qty = (int *) malloc(entries * sizeof(int));
group->async_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 1);
group->async_array[i] = (char *)value;
group->async_qty[i] = DR_MAX_SIZE;
}
group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
group->async_data_groups = entries;
}
}
void update_targets() { //FIXME Should not be needed after redist -- Declarar antes
size_t i, entries;
void *value = NULL;
malleability_get_data(&value, 0, 1, 0);
group->iter_start = *((int *)value); group->iter_start = *((int *)value);
if(config_file->sdr) { if(config_file->sdr) {
...@@ -570,35 +597,25 @@ void init_targets() { ...@@ -570,35 +597,25 @@ void init_targets() {
group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE; group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
group->sync_data_groups = entries; group->sync_data_groups = entries;
} }
if(config_file->adr) {
malleability_get_entries(&entries, 0, 1);
group->async_qty = (int *) malloc(entries * sizeof(int));
group->async_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 1);
group->async_array[i] = (char *)value;
group->async_qty[i] = DR_MAX_SIZE;
}
group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
group->async_data_groups = entries;
}
} }
void user_redistribution(void *args) { void user_redistribution(void *args) {
int commited; int commited;
mam_user_reconf_t user_reconf; mam_user_reconf_t user_reconf;
MAM_Get_Reconf_Info(&user_reconf); MAM_Get_Reconf_Info(&user_reconf);
new_comm = user_reconf.comm; new_comm = user_reconf.comm;
if(user_reconf.rank_state == 1) { //FIXME Crear MAM_NEW_RANK? if(user_reconf.rank_state == MAM_PROC_NEW_RANK) {
init_targets(); init_targets();
} else { } else {
send_config_file(config_file, ROOT, new_comm); send_config_file(config_file, ROOT, new_comm);
results_comm(results, ROOT, config_file->n_resizes, new_comm); results_comm(results, ROOT, config_file->n_resizes, new_comm);
print_local_results(); print_local_results();
if(user_reconf.rank_state == MAM_PROC_ZOMBIE) {
free_zombie_process();
}
} }
MAM_Commit(&commited); MAM_Resume_redistribution(&commited);
} }
...@@ -59,6 +59,7 @@ typedef struct { ...@@ -59,6 +59,7 @@ typedef struct {
typedef struct { //FIXME numC_spawned no se esta usando typedef struct { //FIXME numC_spawned no se esta usando
int myId, numP, numC, numC_spawned, root, root_parents, zombie; int myId, numP, numC, numC_spawned, root, root_parents, zombie;
int is_intercomm;
pthread_t async_thread; pthread_t async_thread;
MPI_Comm comm, thread_comm; MPI_Comm comm, thread_comm;
MPI_Comm intercomm, tmp_comm; MPI_Comm intercomm, tmp_comm;
......
...@@ -12,10 +12,24 @@ ...@@ -12,10 +12,24 @@
#define MALLEABILITY_USE_SYNCHRONOUS 0 #define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1 #define MALLEABILITY_USE_ASYNCHRONOUS 1
void MAM_Commit(int *mam_state, int is_children_group);
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous); void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous); void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);
int MAM_St_not_started(int *mam_state);
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
int MAM_St_red_pending(int *mam_state, int wait_completed);
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
int MAM_St_spawn_adapt_pending(int wait_completed);
int MAM_St_spawn_adapted(int *mam_state);
int MAM_St_red_completed(int *mam_state);
int MAM_St_completed(int *mam_state);
void Children_init(void (*user_function)(void *), void *user_args); void Children_init(void (*user_function)(void *), void *user_args);
int spawn_step(); int spawn_step();
int start_redistribution(); int start_redistribution();
...@@ -31,7 +45,7 @@ void print_comms_state(); ...@@ -31,7 +45,7 @@ void print_comms_state();
void malleability_comms_update(MPI_Comm comm); void malleability_comms_update(MPI_Comm comm);
int MAM_I_convert_key(char *key); int MAM_I_convert_key(char *key);
void MAM_I_create_user_struct(int mam_state, int is_children_group); void MAM_I_create_user_struct(int is_children_group);
int state = MALL_UNRESERVED; //FIXME Mover a otro lado int state = MALL_UNRESERVED; //FIXME Mover a otro lado
...@@ -147,6 +161,7 @@ void MAM_Finalize() { ...@@ -147,6 +161,7 @@ void MAM_Finalize() {
free_malleability_times(); free_malleability_times();
if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm)); if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm)); if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
free(mall); free(mall);
free(mall_conf); free(mall_conf);
free(user_reconf); free(user_reconf);
...@@ -169,159 +184,90 @@ void MAM_Finalize() { ...@@ -169,159 +184,90 @@ void MAM_Finalize() {
* *
*/ */
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) { int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
int is_intercomm; int call_checkpoint = 0;
switch(state) { switch(state) {
case MALL_UNRESERVED: case MALL_UNRESERVED:
*mam_state = MAM_UNRESERVED; *mam_state = MAM_UNRESERVED;
break; break;
case MALL_NOT_STARTED: case MALL_NOT_STARTED:
*mam_state = MAM_NOT_STARTED; call_checkpoint = MAM_St_not_started(mam_state);
reset_malleability_times();
// Comprobar si se tiene que realizar un redimensionado
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->malleability_start = MPI_Wtime();
//if(CHECK_RMS()) {return MALL_DENIED;}
state = spawn_step();
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
}
break; break;
case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado y comienza la redistribucion case MALL_SPAWN_PENDING: // Comprueba si el spawn ha terminado
case MALL_SPAWN_SINGLE_PENDING: case MALL_SPAWN_SINGLE_PENDING:
state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed); call_checkpoint = MAM_St_spawn_pending(wait_completed);
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
}
break; break;
case MALL_SPAWN_ADAPT_POSTPONE: case MALL_SPAWN_ADAPT_POSTPONE:
case MALL_SPAWN_COMPLETED: case MALL_SPAWN_COMPLETED:
state = start_redistribution(); call_checkpoint = MAM_St_red_start();
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
break; break;
case MALL_DIST_PENDING: case MALL_DIST_PENDING:
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) { call_checkpoint = MAM_St_red_pending(mam_state, wait_completed);
state = thread_check(wait_completed);
} else {
state = check_redistribution(wait_completed);
}
if(state != MALL_DIST_PENDING) {
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
}
break; break;
case MALL_SPAWN_ADAPT_PENDING: case MALL_USER_PENDING:
call_checkpoint = MAM_St_user_pending(mam_state, wait_completed, user_function, user_args);
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_start = MPI_Wtime();
unset_spawn_postpone_flag(state);
state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
}
break; break;
case MALL_SPAWN_ADAPTED: //FIXME Borrar? case MALL_USER_COMPLETED:
state = shrink_redistribution(); call_checkpoint = MAM_St_user_completed();
if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
break; break;
case MALL_DIST_COMPLETED: case MALL_SPAWN_ADAPT_PENDING:
MPI_Comm_test_inter(mall->intercomm, &is_intercomm); call_checkpoint = MAM_St_spawn_adapt_pending(wait_completed);
if(is_intercomm) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
}
MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
state = MALL_USER_PENDING;
*mam_state = MAM_USER_PENDING;
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->malleability_end = MPI_Wtime();
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
break;
case MALL_USER_PENDING:
#if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
#endif
if(user_function != NULL) {
MAM_I_create_user_struct(*mam_state, MALLEABILITY_NOT_CHILDREN);
user_function(user_args);
} else {
state = MALL_COMPLETED; //FIXME Deberia ser hacer sync redist
*mam_state = MAM_COMPLETED; //FIXME Deberia ser hacer sync redist
}
if(state != MALL_USER_PENDING && state != MALL_NOT_STARTED) { // TODO Quitar la segunda parte cuando USER este antes de redist sinc
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
}
if(state == MALL_NOT_STARTED) { //FIXME Muy feo, borrar
*mam_state = MAM_COMMITED;
}
break; break;
case MALL_COMPLETED: case MALL_SPAWN_ADAPTED:
MAM_Commit(mam_state); case MALL_DIST_COMPLETED:
call_checkpoint = MAM_St_completed(mam_state);
break; break;
} }
if(state > MALL_ZOMBIE && state < MALL_COMPLETED) *mam_state = MAM_PENDING; if(call_checkpoint) { MAM_Checkpoint(mam_state, wait_completed, user_function, user_args); }
if(state > MALL_NOT_STARTED && state < MALL_COMPLETED) *mam_state = MAM_PENDING;
return state; return state;
} }
/* /*
* TODO * TODO
*/ */
void MAM_Commit(int *mam_state) { void MAM_Resume_redistribution(int *mam_state) {
if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE || state == MALL_USER_PENDING)) { //FIXME El ultimo habria que borrarlo state = MALL_USER_COMPLETED;
*mam_state = MALL_DENIED; *mam_state = MAM_PENDING;
return; }
}
/*
* TODO
*/
void MAM_Commit(int *mam_state, int rootBcast) {
int zombies = 0;
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout); MPI_Barrier(mall->intercomm); if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
#endif #endif
// Zombies treatment // Get times before commiting
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
int zombies; // This communication is only needed when a root process will become a zombie
MPI_Allreduce(&state, &zombies, 1, MPI_INT, MPI_MIN, mall->intercomm); malleability_times_broadcast(rootBcast);
if(zombies == MALL_ZOMBIE) {
zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root);
}
} }
// Reset/Free unneded communicators // Free unneded communicators
if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm)); if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm); if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge // Zombies treatment
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
MPI_Allreduce(&mall->zombie, &zombies, 1, MPI_INT, MPI_MAX, mall->comm);
if(zombies) {
zombies_collect_suspended(mall->comm);
}
}
// Zombies KILL // Zombies KILL
if(state == MALL_ZOMBIE || mall->zombie) { if(mall->zombie) {
#if USE_MAL_DEBUG >= 2 #if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout); DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
#endif #endif
...@@ -330,12 +276,16 @@ void MAM_Commit(int *mam_state) { ...@@ -330,12 +276,16 @@ void MAM_Commit(int *mam_state) {
exit(0); exit(0);
} }
// Reset/Free communicators
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
MPI_Comm_rank(mall->comm, &(mall->myId)); MPI_Comm_rank(mall->comm, &(mall->myId));
MPI_Comm_size(mall->comm, &(mall->numP)); MPI_Comm_size(mall->comm, &(mall->numP));
mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents; mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
mall->root_parents = -1; mall->root_parents = -1;
state = MALL_NOT_STARTED; state = MALL_NOT_STARTED;
if(mam_state != NULL) *mam_state = MAM_COMMITED; if(mam_state != NULL) *mam_state = MAM_COMPLETED;
// Set new communicator // Set new communicator
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; } if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
...@@ -343,6 +293,11 @@ void MAM_Commit(int *mam_state) { ...@@ -343,6 +293,11 @@ void MAM_Commit(int *mam_state) {
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout); if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
#endif #endif
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->malleability_end = MPI_Wtime();
} }
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) { int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
...@@ -437,10 +392,10 @@ void MAM_Set_target_number(int numC){ ...@@ -437,10 +392,10 @@ void MAM_Set_target_number(int numC){
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) { void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant) {
size_t total_reqs = 0; size_t total_reqs = 0;
if(is_constant) { if(is_constant) { //Async
if(is_replicated) { if(is_replicated) {
total_reqs = 1; total_reqs = 1;
add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ??? add_data(data, total_qty, type, total_reqs, rep_a_data);
} else { } else {
if(mall_conf->red_method == MALL_RED_BASELINE) { if(mall_conf->red_method == MALL_RED_BASELINE) {
total_reqs = 1; total_reqs = 1;
...@@ -453,7 +408,7 @@ void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int ...@@ -453,7 +408,7 @@ void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int
add_data(data, total_qty, type, total_reqs, dist_a_data); add_data(data, total_qty, type, total_reqs, dist_a_data);
} }
} else { } else { //Sync
if(is_replicated) { if(is_replicated) {
add_data(data, total_qty, type, total_reqs, rep_s_data); add_data(data, total_qty, type, total_reqs, rep_s_data);
} else { } else {
...@@ -605,6 +560,131 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch ...@@ -605,6 +560,131 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
} }
} }
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//====================MAM STAGES========================||
//======================================================||
//======================================================||
int MAM_St_not_started(int *mam_state) {
*mam_state = MAM_NOT_STARTED;
reset_malleability_times();
// Comprobar si se tiene que realizar un redimensionado
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->malleability_start = MPI_Wtime();
//if(CHECK_RMS()) {return MALL_DENIED;}
state = spawn_step();
//FIXME Esto es necesario pero feo
if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId >= mall->numC){ mall->zombie = 1; }
else if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
return 1;
}
return 0;
}
int MAM_St_spawn_pending(int wait_completed) {
state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
return 1;
}
return 0;
}
int MAM_St_red_start() {
state = start_redistribution();
return 1;
}
int MAM_St_red_pending(int *mam_state, int wait_completed) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
state = thread_check(wait_completed);
} else {
state = check_redistribution(wait_completed);
}
if(state != MALL_DIST_PENDING) {
if(mall->is_intercomm) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
}
MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
state = MALL_USER_PENDING;
*mam_state = MAM_USER_PENDING;
return 1;
}
return 0;
}
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
#if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
#endif
if(user_function != NULL) {
MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
do {
user_function(user_args);
} while(wait_completed && state == MALL_USER_PENDING);
} else {
MAM_Resume_redistribution(mam_state);
}
if(state != MALL_USER_PENDING) {
#if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Ended USER redistribution", mall->myId, mall->numP); fflush(stdout);
#endif
return 1;
}
return 0;
}
int MAM_St_user_completed() {
state = end_redistribution();
return 1;
}
int MAM_St_spawn_adapt_pending(int wait_completed) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_start = MPI_Wtime();
unset_spawn_postpone_flag(state);
state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
return 1;
}
return 0;
}
int MAM_St_completed(int *mam_state) {
int rootBcast;
if(mall->is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = mall->root;
}
MAM_Commit(mam_state, rootBcast);
return 0;
}
//======================================================|| //======================================================||
//================PRIVATE FUNCTIONS=====================|| //================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================|| //=====================CHILDREN=========================||
...@@ -618,27 +698,26 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch ...@@ -618,27 +698,26 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
*/ */
void Children_init(void (*user_function)(void *), void *user_args) { void Children_init(void (*user_function)(void *), void *user_args) {
size_t i; size_t i;
int numP_parents, root_parents; int numP_parents;
int is_intercomm;
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("MaM will now initialize children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm)); malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &mall->root_parents, &(mall->intercomm));
MPI_Comm_test_inter(mall->intercomm, &is_intercomm); MPI_Comm_test_inter(mall->intercomm, &mall->is_intercomm);
if(!is_intercomm) { // For intracommunicators, these processes will be added if(!mall->is_intercomm) { // For intracommunicators, these processes will be added
MPI_Comm_rank(mall->intercomm, &(mall->myId)); MPI_Comm_rank(mall->intercomm, &(mall->myId));
MPI_Comm_size(mall->intercomm, &(mall->numP)); MPI_Comm_size(mall->intercomm, &(mall->numP));
} }
MAM_Comm_main_structures(root_parents); MAM_Comm_main_structures(mall->root_parents);
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("Targets have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm); comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, mall->root_parents, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
#if USE_MAL_DEBUG >= 2 #if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("Children start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
...@@ -649,17 +728,24 @@ void Children_init(void (*user_function)(void *), void *user_args) { ...@@ -649,17 +728,24 @@ void Children_init(void (*user_function)(void *), void *user_args) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) { if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS); recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_parents, mall->intercomm);
}
} else { } else {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
//for(i=0; i<rep_a_data->entries; i++) { for(i=0; i<rep_a_data->entries; i++) {
// MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], root_parents, mall->intercomm, &(rep_a_data)); MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_parents, mall->intercomm, &(rep_a_data->requests[i][0]));
//} }
#if USE_MAL_DEBUG >= 2 #if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
int post_ibarrier = 0; int post_ibarrier = 0;
// FIXME No permite el uso de ibarrier ahora mismo. Realmente solo hace falta un ibarrier para todos
for(i=0; i<rep_a_data->entries; i++) {
async_communication_wait(mall->intercomm, rep_a_data->requests[i], rep_a_data->request_qty[i], post_ibarrier);
}
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { post_ibarrier=1; } if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { post_ibarrier=1; }
for(i=0; i<dist_a_data->entries; i++) { for(i=0; i<dist_a_data->entries; i++) {
async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier); async_communication_wait(mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i], post_ibarrier);
...@@ -670,6 +756,9 @@ void Children_init(void (*user_function)(void *), void *user_args) { ...@@ -670,6 +756,9 @@ void Children_init(void (*user_function)(void *), void *user_args) {
for(i=0; i<dist_a_data->entries; i++) { for(i=0; i<dist_a_data->entries; i++) {
async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i])); async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
} }
for(i=0; i<rep_a_data->entries; i++) {
async_communication_end(mall_conf->red_method, mall_conf->red_strategies, rep_a_data->requests[i], rep_a_data->request_qty[i], &(rep_a_data->windows[i]));
}
} }
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
...@@ -681,16 +770,28 @@ void Children_init(void (*user_function)(void *), void *user_args) { ...@@ -681,16 +770,28 @@ void Children_init(void (*user_function)(void *), void *user_args) {
DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("Targets have completed asynchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm); if(mall->is_intercomm) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
}
MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
mall->numC = numP_parents;
if(user_function != NULL) {
state = MALL_USER_PENDING;
MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
user_function(user_args);
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, mall->root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm); MPI_Barrier(mall->intercomm);
#endif #endif
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS); recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
// TODO Crear funcion especifica y anyadir para Asinc
for(i=0; i<rep_s_data->entries; i++) { for(i=0; i<rep_s_data->entries; i++) {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], root_parents, mall->intercomm); MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_parents, mall->intercomm);
} }
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm); MPI_Barrier(mall->intercomm);
...@@ -701,32 +802,10 @@ void Children_init(void (*user_function)(void *), void *user_args) { ...@@ -701,32 +802,10 @@ void Children_init(void (*user_function)(void *), void *user_args) {
DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("Targets have completed synchronous data redistribution step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
// Guardar los resultados de esta transmision MAM_Commit(NULL, mall->root_parents);
malleability_times_broadcast(mall->root);
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
state = MALL_COMPLETED;
if(is_intercomm) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
}
MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
mall->numC = numP_parents;
if(user_function != NULL) {
state = MALL_USER_PENDING;
MAM_I_create_user_struct(MAM_COMPLETED, MALLEABILITY_CHILDREN);
user_function(user_args);
} else {
MAM_Commit(NULL);
}
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
if(mall->myId == mall->root){ DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); } fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("MaM has been initialized correctly for new ranks", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
} }
...@@ -773,18 +852,19 @@ int spawn_step(){ ...@@ -773,18 +852,19 @@ int spawn_step(){
* grupos de procesos. * grupos de procesos.
*/ */
int start_redistribution() { int start_redistribution() {
int rootBcast, is_intercomm; int rootBcast;
size_t i;
is_intercomm = 0; mall->is_intercomm = 0;
if(mall->intercomm != MPI_COMM_NULL) { if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_test_inter(mall->intercomm, &is_intercomm); MPI_Comm_test_inter(mall->intercomm, &mall->is_intercomm);
} else { } else {
// Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn // Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
// y se trata del spawn Merge Shrink // y se trata del spawn Merge Shrink
MPI_Comm_dup(mall->comm, &(mall->intercomm)); MPI_Comm_dup(mall->comm, &(mall->intercomm));
} }
if(is_intercomm) { if(mall->is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL; rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else { } else {
rootBcast = mall->root; rootBcast = mall->root;
...@@ -794,7 +874,6 @@ int start_redistribution() { ...@@ -794,7 +874,6 @@ int start_redistribution() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm); comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
//FIXME No se envian los datos replicados (rep_a_data)
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm); MPI_Barrier(mall->intercomm);
#endif #endif
...@@ -803,10 +882,13 @@ int start_redistribution() { ...@@ -803,10 +882,13 @@ int start_redistribution() {
return thread_creation(); return thread_creation();
} else { } else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], rootBcast, mall->intercomm, &(rep_a_data->requests[i][0]));
}
return MALL_DIST_PENDING; return MALL_DIST_PENDING;
} }
} }
return end_redistribution(); return MALL_USER_PENDING;
} }
...@@ -825,7 +907,7 @@ int start_redistribution() { ...@@ -825,7 +907,7 @@ int start_redistribution() {
* //FIXME Modificar para que se tenga en cuenta rep_a_data * //FIXME Modificar para que se tenga en cuenta rep_a_data
*/ */
int check_redistribution(int wait_completed) { int check_redistribution(int wait_completed) {
int is_intercomm, completed, local_completed, all_completed, post_ibarrier; int completed, local_completed, all_completed, post_ibarrier;
size_t i, req_qty; size_t i, req_qty;
MPI_Request *req_completed; MPI_Request *req_completed;
MPI_Win window; MPI_Win window;
...@@ -834,11 +916,10 @@ int check_redistribution(int wait_completed) { ...@@ -834,11 +916,10 @@ int check_redistribution(int wait_completed) {
#if USE_MAL_DEBUG >= 2 #if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("Sources are testing for all asynchronous redistributions", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(wait_completed) { if(wait_completed) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) { if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
if( is_intercomm || mall->myId >= mall->numC) { if( mall->is_intercomm || mall->myId >= mall->numC) {
post_ibarrier=1; post_ibarrier=1;
} }
} }
...@@ -847,6 +928,11 @@ int check_redistribution(int wait_completed) { ...@@ -847,6 +928,11 @@ int check_redistribution(int wait_completed) {
req_qty = dist_a_data->request_qty[i]; req_qty = dist_a_data->request_qty[i];
async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier); async_communication_wait(mall->intercomm, req_completed, req_qty, post_ibarrier);
} }
for(i=0; i<rep_a_data->entries; i++) {
req_completed = rep_a_data->requests[i];
req_qty = rep_a_data->request_qty[i];
async_communication_wait(mall->intercomm, req_completed, req_qty, 0); //FIXME Ibarrier does not work with rep_a_data
}
} else { } else {
for(i=0; i<dist_a_data->entries; i++) { for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i]; req_completed = dist_a_data->requests[i];
...@@ -854,6 +940,12 @@ int check_redistribution(int wait_completed) { ...@@ -854,6 +940,12 @@ int check_redistribution(int wait_completed) {
completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty); completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
local_completed = local_completed && completed; local_completed = local_completed && completed;
} }
for(i=0; i<rep_a_data->entries; i++) { //FIXME Ibarrier does not work with rep_a_data
req_completed = rep_a_data->requests[i];
req_qty = rep_a_data->request_qty[i];
completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
local_completed = local_completed && completed;
}
#if USE_MAL_DEBUG >= 2 #if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("Sources will now check a global decision", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
...@@ -872,12 +964,18 @@ int check_redistribution(int wait_completed) { ...@@ -872,12 +964,18 @@ int check_redistribution(int wait_completed) {
window = dist_a_data->windows[i]; window = dist_a_data->windows[i];
async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window); async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
} }
for(i=0; i<rep_a_data->entries; i++) {
req_completed = rep_a_data->requests[i];
req_qty = rep_a_data->request_qty[i];
window = rep_a_data->windows[i];
async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
}
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm); MPI_Barrier(mall->intercomm);
#endif #endif
if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only if(!mall->is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
return end_redistribution(); return MALL_USER_PENDING;
} }
...@@ -891,10 +989,9 @@ int check_redistribution(int wait_completed) { ...@@ -891,10 +989,9 @@ int check_redistribution(int wait_completed) {
*/ */
int end_redistribution() { int end_redistribution() {
size_t i; size_t i;
int is_intercomm, rootBcast, local_state; int rootBcast, local_state;
MPI_Comm_test_inter(mall->intercomm, &is_intercomm); if(mall->is_intercomm) {
if(is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL; rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else { } else {
rootBcast = mall->root; rootBcast = mall->root;
...@@ -908,20 +1005,17 @@ int end_redistribution() { ...@@ -908,20 +1005,17 @@ int end_redistribution() {
mall_conf->times->sync_start = MPI_Wtime(); mall_conf->times->sync_start = MPI_Wtime();
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS); send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
// TODO Crear funcion especifica y anyadir para Asinc
for(i=0; i<rep_s_data->entries; i++) { for(i=0; i<rep_s_data->entries; i++) {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm); MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], rootBcast, mall->intercomm);
} }
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm); MPI_Barrier(mall->intercomm);
#endif #endif
if(!is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only if(!mall->is_intercomm) mall_conf->times->sync_end = MPI_Wtime(); // Merge method only
} }
malleability_times_broadcast(rootBcast);
local_state = MALL_DIST_COMPLETED; local_state = MALL_DIST_COMPLETED;
if(!is_intercomm) { // Merge Spawn if(!mall->is_intercomm) { // Merge Spawn
if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas if(mall->numP > mall->numC) { // Shrink || Merge Shrink requiere de mas tareas
local_state = MALL_SPAWN_ADAPT_PENDING; local_state = MALL_SPAWN_ADAPT_PENDING;
} }
...@@ -930,43 +1024,6 @@ int end_redistribution() { ...@@ -930,43 +1024,6 @@ int end_redistribution() {
return local_state; return local_state;
} }
///=============================================
///=============================================
///=============================================
//TODO DEPRECATED
int shrink_redistribution() {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
double time_extra = MPI_Wtime();
MPI_Abort(MPI_COMM_WORLD, -20); //
zombies_collect_suspended(*(mall->user_comm), mall->myId, mall->numP, mall->numC, mall->root);
if(mall->myId < mall->numC) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_free(&(mall->intercomm));
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_time += MPI_Wtime() - time_extra;
return MALL_DIST_COMPLETED;
} else {
return MALL_ZOMBIE;
}
}
// TODO MOVER A OTRO LADO?? // TODO MOVER A OTRO LADO??
//======================================================|| //======================================================||
//================PRIVATE FUNCTIONS=====================|| //================PRIVATE FUNCTIONS=====================||
...@@ -996,7 +1053,7 @@ int thread_creation() { ...@@ -996,7 +1053,7 @@ int thread_creation() {
* El estado de la comunicación es devuelto al finalizar la función. * El estado de la comunicación es devuelto al finalizar la función.
*/ */
int thread_check(int wait_completed) { int thread_check(int wait_completed) {
int all_completed = 0, is_intercomm; int all_completed = 0;
if(wait_completed && comm_state == MALL_DIST_PENDING) { if(wait_completed && comm_state == MALL_DIST_PENDING) {
if(pthread_join(mall->async_thread, NULL)) { if(pthread_join(mall->async_thread, NULL)) {
...@@ -1016,12 +1073,11 @@ int thread_check(int wait_completed) { ...@@ -1016,12 +1073,11 @@ int thread_check(int wait_completed) {
MPI_Abort(MPI_COMM_WORLD, -1); MPI_Abort(MPI_COMM_WORLD, -1);
return -2; return -2;
} }
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm); MPI_Barrier(mall->intercomm);
#endif #endif
if(!is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only if(!mall->is_intercomm) mall_conf->times->async_end = MPI_Wtime(); // Merge method only
return end_redistribution(); return end_redistribution();
} }
...@@ -1035,7 +1091,19 @@ int thread_check(int wait_completed) { ...@@ -1035,7 +1091,19 @@ int thread_check(int wait_completed) {
* por el valor "commAsync". * por el valor "commAsync".
*/ */
void* thread_async_work() { void* thread_async_work() {
int rootBcast;
size_t i;
if(mall->is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = mall->root;
}
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS); send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], rootBcast, mall->intercomm);
}
comm_state = MALL_DIST_COMPLETED; comm_state = MALL_DIST_COMPLETED;
pthread_exit(NULL); pthread_exit(NULL);
} }
...@@ -1091,17 +1159,18 @@ int MAM_I_convert_key(char *key) { ...@@ -1091,17 +1159,18 @@ int MAM_I_convert_key(char *key) {
/* /*
* TODO Por hacer * TODO Por hacer
*/ */
void MAM_I_create_user_struct(int mam_state, int is_children_group) { void MAM_I_create_user_struct(int is_children_group) {
user_reconf->comm = mall->tmp_comm; user_reconf->comm = mall->tmp_comm;
user_reconf->rank_state = mam_state;
if(is_children_group) { if(is_children_group) {
user_reconf->rank_state = is_children_group; //FIXME Elegir nombre adecuado user_reconf->rank_state = MAM_PROC_NEW_RANK;
user_reconf->numS = mall->numC; user_reconf->numS = mall->numC;
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numC; if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numC;
else user_reconf->numT = mall->numC + mall->numP; else user_reconf->numT = mall->numC + mall->numP;
} else { } else {
user_reconf->numS = mall->numP; user_reconf->numS = mall->numP;
user_reconf->numT = mall->numC; user_reconf->numT = mall->numC;
if(mall->zombie) user_reconf->rank_state = MAM_PROC_ZOMBIE;
else user_reconf->rank_state = MAM_PROC_CONTINUE;
} }
} }
...@@ -18,7 +18,8 @@ typedef struct { ...@@ -18,7 +18,8 @@ typedef struct {
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes, void (*user_function)(void *), void *user_args); int MAM_Init(int root, MPI_Comm *comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes, void (*user_function)(void *), void *user_args);
void MAM_Finalize(); void MAM_Finalize();
int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args); int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
void MAM_Commit(int *mam_state); void MAM_Resume_redistribution(int *mam_state);
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info); int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info);
void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies); void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies);
......
...@@ -6,10 +6,11 @@ ...@@ -6,10 +6,11 @@
//States //States
#define MALL_DENIED -1 #define MALL_DENIED -1
enum mall_inner_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING, enum mall_inner_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING,
MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED, MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED,
MALL_SPAWN_ADAPT_PENDING, MALL_SPAWN_ADAPTED, MALL_COMPLETED, MALL_USER_PENDING}; MALL_SPAWN_ADAPT_PENDING, MALL_USER_PENDING, MALL_USER_COMPLETED, MALL_SPAWN_ADAPTED, MALL_COMPLETED};
enum mam_states{MAM_UNRESERVED, MAM_NOT_STARTED, MAM_ZOMBIE, MAM_PENDING, MAM_COMPLETED, MAM_USER_PENDING, MAM_COMMITED}; enum mam_states{MAM_UNRESERVED, MAM_NOT_STARTED, MAM_PENDING, MAM_USER_PENDING, MAM_COMPLETED};
enum mam_proc_states{MAM_PROC_CONTINUE, MAM_PROC_NEW_RANK, MAM_PROC_ZOMBIE};
enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE}; enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
#define MALL_SPAWN_PTHREAD 2 #define MALL_SPAWN_PTHREAD 2
#define MALL_SPAWN_SINGLE 3 #define MALL_SPAWN_SINGLE 3
......
...@@ -31,7 +31,7 @@ void reset_malleability_times() { ...@@ -31,7 +31,7 @@ void reset_malleability_times() {
void free_malleability_times() { void free_malleability_times() {
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
DEBUG_FUNC("Freeing recording structure", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(mall->comm); DEBUG_FUNC("Freeing recording structure", mall->myId, mall->numP); fflush(stdout);
#endif #endif
if(mall_conf->times != NULL) { if(mall_conf->times != NULL) {
if(mall_conf->times->times_type != MPI_DATATYPE_NULL) { if(mall_conf->times->times_type != MPI_DATATYPE_NULL) {
...@@ -41,7 +41,7 @@ void free_malleability_times() { ...@@ -41,7 +41,7 @@ void free_malleability_times() {
free(mall_conf->times); free(mall_conf->times);
} }
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
DEBUG_FUNC("Freed recording structure", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(mall->comm); DEBUG_FUNC("Freed recording structure", mall->myId, mall->numP); fflush(stdout);
#endif #endif
} }
......
...@@ -16,30 +16,34 @@ int offset_pids, *pids = NULL; ...@@ -16,30 +16,34 @@ int offset_pids, *pids = NULL;
void gestor_usr2() {} void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root) { void zombies_collect_suspended(MPI_Comm comm) {
int pid = getpid(); int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int)); int *pids_counts = malloc(mall->numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int)); int *pids_displs = malloc(mall->numP * sizeof(int));
int i, count=1; int i, count=1;
if(myId < numC) { #if USE_MAL_DEBUG > 2
if(mall->myId == mall->root){ DEBUG_FUNC("Collecting zombies", mall->myId, mall->numP); } fflush(stdout);
#endif
if(mall->myId < mall->numC) {
count = 0; count = 0;
if(myId == root) { if(mall->myId == mall->root) {
for(i=0; i < numC; i++) { for(i=0; i < mall->numC; i++) {
pids_counts[i] = 0; pids_counts[i] = 0;
} }
for(i=numC; i<numP; i++) { for(i=mall->numC; i<mall->numP; i++) {
pids_counts[i] = 1; pids_counts[i] = 1;
pids_displs[i] = (i + offset_pids) - numC; pids_displs[i] = (i - mall->numC) + offset_pids;
} }
offset_pids += numP - numC; offset_pids += mall->numP - mall->numC;
} }
} }
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, root, comm); MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, mall->root, comm);
free(pids_counts); free(pids_counts);
free(pids_displs); free(pids_displs);
if(myId >= numC) { if(mall->myId >= mall->numC) {
zombies_suspend(); zombies_suspend();
} }
} }
......
...@@ -8,8 +8,9 @@ ...@@ -8,8 +8,9 @@
#include <unistd.h> #include <unistd.h>
#include <mpi.h> #include <mpi.h>
#include <signal.h> #include <signal.h>
#include "malleabilityDataStructures.h"
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root); void zombies_collect_suspended(MPI_Comm comm);
void zombies_service_init(); void zombies_service_init();
void zombies_service_free(); void zombies_service_free();
void zombies_awake(); void zombies_awake();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment