Commit 39e1435c authored by iker_martin's avatar iker_martin
Browse files

Now redistributions always use Intracomms. Intercomms can be used still....

Now redistributions always use Intracomms. Intercomms can be used still. Commented barriers in manager.
parent d8139b51
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
#include "distribution_methods/block_distribution.h" #include "distribution_methods/block_distribution.h"
#include "CommDist.h" #include "CommDist.h"
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts); void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty); void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty);
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm); void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
...@@ -75,8 +75,14 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int ...@@ -75,8 +75,14 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
/* PREPARE COMMUNICATION */ /* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm); MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts); prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts);
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else {
aux_comm = comm;
}
/* PERFORM COMMUNICATION */ /* PERFORM COMMUNICATION */
switch(red_method) { switch(red_method) {
...@@ -95,11 +101,11 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int ...@@ -95,11 +101,11 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
break; break;
case MALL_RED_POINT: case MALL_RED_POINT:
sync_point2point(send, *recv, is_intercomm, myId, s_counts, r_counts, comm); sync_point2point(send, *recv, is_intercomm, myId, s_counts, r_counts, aux_comm);
break; break;
case MALL_RED_BASELINE: case MALL_RED_BASELINE:
default: default:
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm); MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, aux_comm);
break; break;
} }
...@@ -292,9 +298,16 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in ...@@ -292,9 +298,16 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in
/* PREPARE COMMUNICATION */ /* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm); MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts); prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts);
check_requests(s_counts, r_counts, requests, request_qty); check_requests(s_counts, r_counts, requests, request_qty);
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else {
aux_comm = comm;
}
/* PERFORM COMMUNICATION */ /* PERFORM COMMUNICATION */
switch(red_method) { switch(red_method) {
...@@ -302,11 +315,11 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in ...@@ -302,11 +315,11 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in
case MALL_RED_RMA_LOCK: case MALL_RED_RMA_LOCK:
return MALL_DENIED; //TODO Realizar versiones asíncronas return MALL_DENIED; //TODO Realizar versiones asíncronas
case MALL_RED_POINT: case MALL_RED_POINT:
async_point2point(send, *recv, s_counts, r_counts, comm, *requests); async_point2point(send, *recv, s_counts, r_counts, aux_comm, *requests);
break; break;
case MALL_RED_BASELINE: case MALL_RED_BASELINE:
default: default:
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm, &((*requests)[0])); MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, aux_comm, &((*requests)[0]));
break; break;
} }
...@@ -324,7 +337,7 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in ...@@ -324,7 +337,7 @@ int async_communication(char *send, char **recv, int qty, int myId, int numP, in
if(aux_comm_used) { if(aux_comm_used) {
MPI_Comm_free(&aux_comm); MPI_Comm_free(&aux_comm);
} }
freeCounts(&s_counts); freeCounts(&s_counts);
freeCounts(&r_counts); freeCounts(&r_counts);
return 0; //FIXME In this case is always false... return 0; //FIXME In this case is always false...
...@@ -385,13 +398,13 @@ void async_point2point(char *send, char *recv, struct Counts s_counts, struct Co ...@@ -385,13 +398,13 @@ void async_point2point(char *send, char *recv, struct Counts s_counts, struct Co
* - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group. * - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
* *
*/ */
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) { void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
int array_size = numO; int array_size = numO;
int offset_ids = 0; int offset_ids = 0;
struct Dist_data dist_data; struct Dist_data dist_data;
if(is_intercomm) { if(is_intercomm) {
//offset_ids = numP; //FIXME Modify only if active? offset_ids = is_sync ? numP : 0; //FIXME Modify only if active?
} else { } else {
array_size = numP > numO ? numP : numO; array_size = numP > numO ? numP : numO;
} }
......
...@@ -182,6 +182,7 @@ int malleability_checkpoint() { ...@@ -182,6 +182,7 @@ int malleability_checkpoint() {
break; break;
case MALL_NOT_STARTED: case MALL_NOT_STARTED:
// Comprobar si se tiene que realizar un redimensionado // Comprobar si se tiene que realizar un redimensionado
//MPI_Barrier(mall->comm);
mall_conf->results->malleability_time[mall_conf->grp] = MPI_Wtime(); mall_conf->results->malleability_time[mall_conf->grp] = MPI_Wtime();
//if(CHECK_RMS()) {return MALL_DENIED;} //if(CHECK_RMS()) {return MALL_DENIED;}
...@@ -196,6 +197,7 @@ int malleability_checkpoint() { ...@@ -196,6 +197,7 @@ int malleability_checkpoint() {
case MALL_SPAWN_SINGLE_PENDING: case MALL_SPAWN_SINGLE_PENDING:
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time); state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) { if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start; mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start; mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start;
...@@ -221,11 +223,13 @@ int malleability_checkpoint() { ...@@ -221,11 +223,13 @@ int malleability_checkpoint() {
break; break;
case MALL_SPAWN_ADAPT_PENDING: case MALL_SPAWN_ADAPT_PENDING:
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_start = MPI_Wtime(); mall_conf->results->spawn_start = MPI_Wtime();
unset_spawn_postpone_flag(state); unset_spawn_postpone_flag(state);
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time); state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) { if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start; mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
malleability_checkpoint(); malleability_checkpoint();
} }
...@@ -237,6 +241,7 @@ int malleability_checkpoint() { ...@@ -237,6 +241,7 @@ int malleability_checkpoint() {
break; break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo? case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
//MPI_Barrier(mall->comm);
mall_conf->results->malleability_end = MPI_Wtime(); mall_conf->results->malleability_end = MPI_Wtime();
state = MALL_COMPLETED; state = MALL_COMPLETED;
break; break;
...@@ -521,6 +526,7 @@ void Children_init() { ...@@ -521,6 +526,7 @@ void Children_init() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm); comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
//MPI_Barrier(mall->intercomm);
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) { if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS); recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
...@@ -528,13 +534,14 @@ void Children_init() { ...@@ -528,13 +534,14 @@ void Children_init() {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
} }
//MPI_Barrier(mall->intercomm);
mall_conf->results->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona mall_conf->results->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
} }
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm); comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
//MPI_Barrier(mall->intercomm);
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS); recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
// TODO Crear funcion especifica y anyadir para Asinc // TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo y qty // TODO Tener en cuenta el tipo y qty
...@@ -547,14 +554,17 @@ void Children_init() { ...@@ -547,14 +554,17 @@ void Children_init() {
} }
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm); MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
} }
//MPI_Barrier(mall->intercomm);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
} }
mall_conf->results->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
// Guardar los resultados de esta transmision // Guardar los resultados de esta transmision
comm_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm); comm_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm);
if(!is_intercomm) { if(!is_intercomm) {
malleability_comms_update(mall->intercomm); malleability_comms_update(mall->intercomm);
} }
//MPI_Barrier(mall->comm);
mall_conf->results->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
} }
...@@ -570,11 +580,13 @@ void Children_init() { ...@@ -570,11 +580,13 @@ void Children_init() {
* Si se pide en segundo plano devuelve el estado actual. * Si se pide en segundo plano devuelve el estado actual.
*/ */
int spawn_step(){ int spawn_step(){
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_start = MPI_Wtime(); mall_conf->results->spawn_start = MPI_Wtime();
state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm)); state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) { if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start; mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
} }
return state; return state;
...@@ -621,6 +633,7 @@ int start_redistribution() { ...@@ -621,6 +633,7 @@ int start_redistribution() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm); comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
//FIXME No se envian los datos replicados (rep_a_data) //FIXME No se envian los datos replicados (rep_a_data)
//MPI_Barrier(mall->intercomm);
mall_conf->results->async_time[mall_conf->grp] = MPI_Wtime(); mall_conf->results->async_time[mall_conf->grp] = MPI_Wtime();
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) { if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
return thread_creation(); return thread_creation();
...@@ -684,6 +697,7 @@ int check_redistribution() { ...@@ -684,6 +697,7 @@ int check_redistribution() {
} }
MPI_Comm_test_inter(mall->intercomm, &is_intercomm); MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution(); return end_redistribution();
} }
...@@ -710,9 +724,9 @@ int end_redistribution() { ...@@ -710,9 +724,9 @@ int end_redistribution() {
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm); comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
//MPI_Barrier(mall->intercomm);
mall_conf->results->sync_time[mall_conf->grp] = MPI_Wtime(); mall_conf->results->sync_time[mall_conf->grp] = MPI_Wtime();
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS); send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
if(!is_intercomm) mall_conf->results->sync_end = MPI_Wtime(); // Merge method only
// TODO Crear funcion especifica y anyadir para Asinc // TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo // TODO Tener en cuenta el tipo
...@@ -725,6 +739,8 @@ int end_redistribution() { ...@@ -725,6 +739,8 @@ int end_redistribution() {
} }
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm); MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
} }
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->results->sync_end = MPI_Wtime(); // Merge method only
} }
comm_results(mall_conf->results, rootBcast, mall_conf->config_file->n_resizes, mall->intercomm); comm_results(mall_conf->results, rootBcast, mall_conf->config_file->n_resizes, mall->intercomm);
...@@ -751,6 +767,7 @@ int end_redistribution() { ...@@ -751,6 +767,7 @@ int end_redistribution() {
///============================================= ///=============================================
//TODO Add comment //TODO Add comment
int shrink_redistribution() { int shrink_redistribution() {
//MPI_Barrier(mall->comm);
double time_extra = MPI_Wtime(); double time_extra = MPI_Wtime();
//TODO Create new state before collecting zombies. Processes can perform tasks before that. Then call again Malleability to commit the change //TODO Create new state before collecting zombies. Processes can perform tasks before that. Then call again Malleability to commit the change
...@@ -769,6 +786,7 @@ int shrink_redistribution() { ...@@ -769,6 +786,7 @@ int shrink_redistribution() {
MPI_Comm_free(&(mall->intercomm)); MPI_Comm_free(&(mall->intercomm));
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_extra; mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_extra;
if(malleability_spawn_contains_strat(mall_conf->spawn_strategies,MALL_SPAWN_PTHREAD, NULL)) { if(malleability_spawn_contains_strat(mall_conf->spawn_strategies,MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_real_time[mall_conf->grp] += MPI_Wtime() - time_extra; mall_conf->results->spawn_real_time[mall_conf->grp] += MPI_Wtime() - time_extra;
...@@ -865,6 +883,7 @@ int thread_check() { ...@@ -865,6 +883,7 @@ int thread_check() {
return -2; return -2;
} }
MPI_Comm_test_inter(mall->intercomm, &is_intercomm); MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution(); return end_redistribution();
} }
......
...@@ -45,6 +45,7 @@ int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) { ...@@ -45,6 +45,7 @@ int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) {
// WORK // WORK
int spawn_err = MPI_Comm_spawn(spawn_data.cmd, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, spawn_data.root, comm, child, MPI_ERRCODES_IGNORE); int spawn_err = MPI_Comm_spawn(spawn_data.cmd, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, spawn_data.root, comm, child, MPI_ERRCODES_IGNORE);
MPI_Comm_set_name(*child, "MPI_COMM_MALL_RESIZE"); MPI_Comm_set_name(*child, "MPI_COMM_MALL_RESIZE");
// END WORK // END WORK
if(spawn_err != MPI_SUCCESS) { if(spawn_err != MPI_SUCCESS) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment