Commit 8984a81d authored by iker_martin's avatar iker_martin
Browse files

Added changes from the article JournalOfSupercomputing

parent f1511cb4
......@@ -7,16 +7,17 @@
/*
* Realiza una multiplicación de matrices de tamaño n
*/
double computeMatrix(double *matrix, int n) { //FIXME No da tiempos repetibles
double computeMatrix(double *matrix, int n) {
int row, col;
double aux;
aux=0;
for(row=0; row<n; row++) {
for(col=0; col<n; col++) {
aux += ( (int)(matrix[row*n + col] + exp(sqrt(row*col))) % n);
aux += (int)(matrix[row*n + col] * matrix[row*n + col]);
}
}
return aux;
}
......@@ -42,20 +43,25 @@ double computePiSerial(int n) {
*/
void initMatrix(double **matrix, size_t n) {
size_t i, j;
double *aux = NULL;
freeMatrix(matrix);
// Init matrix
if(matrix != NULL) {
*matrix = malloc(n * n * sizeof(double));
if(*matrix == NULL) { MPI_Abort(MPI_COMM_WORLD, -1);}
for(i=0; i < n; i++) {
for(j=0; j < n; j++) {
(*matrix)[i*n + j] = i+j;
}
aux = (double *) malloc(n * n * sizeof(double));
if(aux == NULL) { perror("Computing matrix could not be allocated"); MPI_Abort(MPI_COMM_WORLD, -1);}
for(i=0; i < n; i++) {
for(j=0; j < n; j++) {
aux[i*n + j] = (i+j) * 1.1;
}
}
*matrix = aux;
}
void freeMatrix(double **matrix) {
// Init matrix
if(*matrix != NULL) {
......
......@@ -36,7 +36,7 @@ double init_comm_reduce_pt(group_data group, configuration *config_file, iter_st
*/
double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute) {
double result = 0;
int qty = 20000;
int qty = 5000;
iter_stage_t *stage = &(config_file->stages[stage_i]);
stage->operations = qty;
......
......@@ -5,7 +5,8 @@
#include "distribution_methods/block_distribution.h"
#include "CommDist.h"
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
//void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, char **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_strategies, MPI_Request **requests, size_t *request_qty);
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
......@@ -78,7 +79,18 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
// prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); //FIXME MAGICAL VALUE
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else {
aux_comm = comm;
}
// FIXME END REFACTOR
/* PERFORM COMMUNICATION */
switch(red_method) {
......@@ -90,19 +102,15 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
} else {
get_block_dist(qty, myId, numO, &dist_data);
}
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else { aux_comm = comm; }
sync_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, red_method);
break;
case MALL_RED_POINT:
sync_point2point(send, *recv, is_intercomm, myId, s_counts, r_counts, comm);
sync_point2point(send, *recv, is_intercomm, myId, s_counts, r_counts, aux_comm);
break;
case MALL_RED_BASELINE:
default:
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm);
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, aux_comm);
break;
}
......@@ -283,7 +291,16 @@ int async_communication_start(char *send, char **recv, int qty, int myId, int nu
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
//prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); // TODO MAGICAL VALUE
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else {
aux_comm = comm;
}
// FIXME END REFACTOR
check_requests(s_counts, r_counts, red_strategies, requests, request_qty);
/* PERFORM COMMUNICATION */
......@@ -296,18 +313,14 @@ int async_communication_start(char *send, char **recv, int qty, int myId, int nu
} else {
get_block_dist(qty, myId, numO, &dist_data);
}
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else { aux_comm = comm; }
async_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, red_method, *requests, win);
break;
case MALL_RED_POINT:
async_point2point(send, *recv, s_counts, r_counts, comm, *requests);
async_point2point(send, *recv, s_counts, r_counts, aux_comm, *requests);
break;
case MALL_RED_BASELINE:
default:
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm, &((*requests)[0]));
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, aux_comm, &((*requests)[0]));
break;
}
......@@ -545,13 +558,14 @@ void async_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win, MPI_Requ
* - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
*
*/
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
//FIXME Ensure name for is_sync variable
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
int array_size = numO;
int offset_ids = 0;
struct Dist_data dist_data;
if(is_intercomm) {
//offset_ids = !is_children_group ? numP : 0; //FIXME Modify only if active?
offset_ids = is_sync ? numP : 0; //FIXME Modify only if active?
} else {
array_size = numP > numO ? numP : numO;
}
......
......@@ -182,6 +182,7 @@ int malleability_checkpoint() {
break;
case MALL_NOT_STARTED:
// Comprobar si se tiene que realizar un redimensionado
//MPI_Barrier(mall->comm);
mall_conf->results->malleability_time[mall_conf->grp] = MPI_Wtime();
//if(CHECK_RMS()) {return MALL_DENIED;}
......@@ -196,6 +197,7 @@ int malleability_checkpoint() {
case MALL_SPAWN_SINGLE_PENDING:
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start;
......@@ -221,11 +223,13 @@ int malleability_checkpoint() {
break;
case MALL_SPAWN_ADAPT_PENDING:
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_start = MPI_Wtime();
unset_spawn_postpone_flag(state);
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
malleability_checkpoint();
}
......@@ -237,6 +241,7 @@ int malleability_checkpoint() {
break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
//MPI_Barrier(mall->comm);
mall_conf->results->malleability_end = MPI_Wtime();
state = MALL_COMPLETED;
break;
......@@ -527,6 +532,7 @@ void Children_init() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
//MPI_Barrier(mall->intercomm);
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
......@@ -541,13 +547,14 @@ void Children_init() {
}
}
//MPI_Barrier(mall->intercomm);
mall_conf->results->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
//MPI_Barrier(mall->intercomm);
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo y qty
......@@ -560,8 +567,9 @@ void Children_init() {
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
}
//MPI_Barrier(mall->intercomm);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
}
mall_conf->results->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
// Guardar los resultados de esta transmision
comm_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm);
......@@ -569,6 +577,8 @@ void Children_init() {
malleability_comms_update(mall->intercomm);
}
//MPI_Barrier(mall->comm);
mall_conf->results->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
}
......@@ -583,11 +593,13 @@ void Children_init() {
* Si se pide en segundo plano devuelve el estado actual.
*/
int spawn_step(){
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_start = MPI_Wtime();
state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
}
return state;
......@@ -634,6 +646,7 @@ int start_redistribution() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
//FIXME No se envian los datos replicados (rep_a_data)
//MPI_Barrier(mall->intercomm);
mall_conf->results->async_time[mall_conf->grp] = MPI_Wtime();
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
return thread_creation();
......@@ -685,6 +698,7 @@ int check_redistribution() {
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......@@ -711,9 +725,9 @@ int end_redistribution() {
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
//MPI_Barrier(mall->intercomm);
mall_conf->results->sync_time[mall_conf->grp] = MPI_Wtime();
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
if(!is_intercomm) mall_conf->results->sync_end = MPI_Wtime(); // Merge method only
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo
......@@ -726,6 +740,8 @@ int end_redistribution() {
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
}
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->results->sync_end = MPI_Wtime(); // Merge method only
}
comm_results(mall_conf->results, rootBcast, mall_conf->config_file->n_resizes, mall->intercomm);
......@@ -752,6 +768,7 @@ int end_redistribution() {
///=============================================
//TODO Add comment
int shrink_redistribution() {
//MPI_Barrier(mall->comm);
double time_extra = MPI_Wtime();
//TODO Create new state before collecting zombies. Processes can perform tasks before that. Then call again Malleability to commit the change
......@@ -770,6 +787,7 @@ int shrink_redistribution() {
MPI_Comm_free(&(mall->intercomm));
//MPI_Barrier(mall->comm);
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_extra;
if(malleability_spawn_contains_strat(mall_conf->spawn_strategies,MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_real_time[mall_conf->grp] += MPI_Wtime() - time_extra;
......@@ -866,6 +884,7 @@ int thread_check() {
return -2;
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment