Commit 8d5e3337 authored by iker_martin's avatar iker_martin
Browse files

CG main code improved main loop and data redistribution. Commdist now uses...

CG main code improved main loop and data redistribution. Commdist now uses intracomm for asynch baseline
parent a671d88c
......@@ -406,12 +406,14 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm) {
int state = MALL_NOT_STARTED;
int ended_loop = 1;
int cnt = 0;
int reconfigure = 0, rec_iter = 500;
int reconfigure = 0, rec_iter = 10;
computeData->maxiter = 1000;
computeData->maxiter = 110;
while ((computeData->iter < computeData->maxiter) && (computeData->tol > computeData->umbral)) {
MPI_Allgatherv(computeData->d, dist_data->tamBl, MPI_DOUBLE, computeData->d_full,
computeData->dist_rows, computeData->displs_rows, MPI_DOUBLE, dist_data->comm); // d_full = Gather(d)
// COMPUTATION
#ifdef ONLY_SYM
ProdSymSparseMatrixVector (computeData->subm, computeData->d_full, computeData->z); // z += A * d_full
......@@ -431,8 +433,8 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm) {
rscal (&(dist_data->tamBl), &computeData->alpha, computeData->d, &IONE); // d = alpha * d
raxpy (&(dist_data->tamBl), &DONE, computeData->res, &IONE, computeData->d, &IONE); // d += res
MPI_Allgatherv(computeData->d, dist_data->tamBl, MPI_DOUBLE, computeData->d_full,
computeData->dist_rows, computeData->displs_rows, MPI_DOUBLE, dist_data->comm); // d_full = Gather(d)
//MPI_Allgatherv(computeData->d, dist_data->tamBl, MPI_DOUBLE, computeData->d_full,
// computeData->dist_rows, computeData->displs_rows, MPI_DOUBLE, dist_data->comm); // d_full = Gather(d)
computeData->tol = sqrt (computeData->beta); // tol = sqrt(beta) = norm (res)
computeData->iter++;
......@@ -610,7 +612,8 @@ int dist_old(struct Dist_data *dist_data, Compute_data *computeData, int num_chi
malleability_add_data(&(computeData->beta), 1, MAL_DOUBLE, MAL_DATA_ALONE, 1, 1);
malleability_add_data(&(computeData->umbral), 1, MAL_DOUBLE, MAL_DATA_ALONE, 1, 1);
malleability_add_data(computeData->d_full, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 1, 1);
//malleability_add_data(computeData->d_full, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 1, 1);
malleability_add_data(computeData->d, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 0, 1);
malleability_add_data(computeData->vec, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 0, 1);
malleability_add_data(computeData->res, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 0, 1);
......@@ -645,7 +648,7 @@ void dist_new(struct Dist_data *dist_data, Compute_data *computeData) {
size_t entry, entries;
void *value = NULL;
is_synch = 1;
entry = 3;
entry = 0;
malleability_get_data(&value, 0, 1, 1);
computeData->n = *((int *)value);
......@@ -658,25 +661,29 @@ void dist_new(struct Dist_data *dist_data, Compute_data *computeData) {
malleability_get_data(&value, 4, 1, 1);
computeData->umbral = *((double *)value);
malleability_get_data(&value, 5, 1, 1);
computeData->d_full = ((double *)value);
//malleability_get_data(&value, 5, 1, 1);
//computeData->d_full = ((double *)value);
malleability_get_data(&value, entry++, 0, 1);
computeData->d = ((double *)value);
malleability_get_data(&value, 0, 0, 1);
malleability_get_data(&value, entry++, 0, 1);
computeData->vec = ((double *)value);
malleability_get_data(&value, 1, 0, 1);
malleability_get_data(&value, entry++, 0, 1);
computeData->res = ((double *)value);
malleability_get_data(&value, 2, 0, 1);
malleability_get_data(&value, entry++, 0, 1);
computeData->z = ((double *)value);
get_dist(computeData->n, dist_data->myId, dist_data->numP, dist_data);
get_rows_dist(computeData, dist_data->numP, computeData->n);
CreateDoubles(&computeData->d, dist_data->tamBl);
rcopy (&(dist_data->tamBl), &(computeData->d_full[dist_data->ini]), &IONE, computeData->d, &IONE); // d = d_full[ini] to d_full[ini+tamBl]
//CreateDoubles(&computeData->d, dist_data->tamBl);
//rcopy (&(dist_data->tamBl), &(computeData->d_full[dist_data->ini]), &IONE, computeData->d, &IONE); // d = d_full[ini] to d_full[ini+tamBl]
CreateDoubles(&computeData->d_full, computeData->n);
rcopy (&(dist_data->tamBl), computeData->d, &IONE, &(computeData->d_full[dist_data->ini]), &IONE); // d_full[ini] to d_full[ini+tamBl] = d
malleability_get_entries(&entries, 0, 0); //Get if there is any asynch data to recover
if(entries) { is_synch=0; entry=0; }
malleability_get_data(&value, entry, 0, is_synch);
malleability_get_data(&value, entry++, 0, is_synch);
computeData->vlen = ((int *)value);
CreateSparseMatrixVptr(&(computeData->subm), dist_data->tamBl, computeData->n, 0);
......@@ -686,9 +693,9 @@ void dist_new(struct Dist_data *dist_data, Compute_data *computeData) {
}
TransformLengthtoHeader(computeData->subm.vptr, computeData->subm.dim1); // The array is converted from vlen to vptr
malleability_get_data(&value, entry+1, 0, is_synch);
malleability_get_data(&value, entry++, 0, is_synch);
computeData->subm.vpos = ((int *)value);
malleability_get_data(&value, entry+2, 0, is_synch);
malleability_get_data(&value, entry++, 0, is_synch);
computeData->subm.vval = ((double *)value);
}
......
......@@ -253,9 +253,14 @@ int async_communication(void *send, void **recv, int qty, int mal_type, int depe
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, mal_type, myId, numP, numO, is_children_group, is_intercomm, 0, recv, &s_counts, &r_counts);
prepare_redistribution(qty, mal_type, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts);
check_requests(s_counts, r_counts, requests, request_qty);
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else { aux_comm = comm; }
if(mal_type == MAL_INT) {
datatype = MPI_INT;
} else if(mal_type == MAL_DOUBLE) {
......@@ -281,11 +286,11 @@ int async_communication(void *send, void **recv, int qty, int mal_type, int depe
/* PERFORM COMMUNICATION */
switch(red_method) {
case MALL_RED_POINT:
async_point2point(send, *recv, mal_type, s_counts, r_counts, comm, *requests);
async_point2point(send, *recv, mal_type, s_counts, r_counts, aux_comm, *requests);
break;
case MALL_RED_BASELINE:
default:
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm, &((*requests)[0]));
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm, &((*requests)[0]));
break;
}
......
......@@ -193,6 +193,7 @@ int malleability_checkpoint() {
break;
case MALL_NOT_STARTED:
// Comprobar si se tiene que realizar un redimensionado
//MPI_Barrier(mall->comm);
mall_conf->malleability_time = MPI_Wtime();
state = spawn_step();
......@@ -206,6 +207,7 @@ int malleability_checkpoint() {
case MALL_SPAWN_SINGLE_PENDING:
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPTED) {
//MPI_Barrier(mall->comm);
mall_conf->spawn_time = MPI_Wtime() - mall_conf->spawn_start;
malleability_checkpoint();
......@@ -230,11 +232,13 @@ int malleability_checkpoint() {
break;
case MALL_SPAWN_ADAPT_PENDING:
//MPI_Barrier(mall->comm);
mall_conf->spawn_start = MPI_Wtime();
unset_spawn_postpone_flag(state);
state = check_spawn_state(&(mall->intercomm), mall->comm, &end_real_time);
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
//MPI_Barrier(mall->comm);
mall_conf->spawn_time = MPI_Wtime() - mall_conf->spawn_start;
malleability_checkpoint();
}
......@@ -246,6 +250,7 @@ int malleability_checkpoint() {
break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
//MPI_Barrier(mall->comm);
mall_conf->malleability_end = MPI_Wtime();
state = MALL_COMPLETED;
break;
......@@ -444,8 +449,7 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
if(data_struct->dependencies[i] == 1+MAL_DATA_DEPENDENT && dep_not_send && mall_conf->spawn_method == MALL_SPAWN_MERGE) break; //FIXME BORRAR dep_not_send
aux_send = data_struct->arrays[i];
aux_recv = NULL;
async_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
&(data_struct->requests[i]), &(data_struct->request_qty[i]));
async_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]));
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
} else {
......@@ -470,8 +474,7 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = data_struct->arrays[i];
async_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
&(data_struct->requests[i]), &(data_struct->request_qty[i]));
async_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]));
data_struct->arrays[i] = aux;
}
} else {
......@@ -512,6 +515,7 @@ void Children_init() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
//MPI_Barrier(mall->intercomm);
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
......@@ -519,11 +523,13 @@ void Children_init() {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
}
//MPI_Barrier(mall->intercomm);
mall_conf->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
//MPI_Barrier(mall->intercomm);
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
// TODO Crear funcion especifica y anyadir para Asinc
......@@ -542,15 +548,16 @@ void Children_init() {
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
}
//MPI_Barrier(mall->intercomm);
mall_conf->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
}
mall_conf->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
comm_results(root_parents, 1);
if(!is_intercomm) {
malleability_comms_update(mall->intercomm);
}
//MPI_Barrier(mall->comm);
mall_conf->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
}
......@@ -565,11 +572,13 @@ void Children_init() {
* Si se pide en segundo plano devuelve el estado actual.
*/
int spawn_step(){
//MPI_Barrier(mall->comm);
mall_conf->spawn_start = MPI_Wtime();
state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
//MPI_Barrier(mall->comm);
mall_conf->spawn_time = MPI_Wtime() - mall_conf->spawn_start;
}
return state;
......@@ -615,6 +624,7 @@ int start_redistribution() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
//FIXME No se envian los datos replicados (rep_a_data)
//MPI_Barrier(mall->intercomm);
mall_conf->async_time = MPI_Wtime();
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
return thread_creation();
......@@ -658,6 +668,7 @@ int check_redistribution() {
for(j=0; j<req_qty; j++) {
test_err = MPI_Test(&(req_completed[j]), &completed, MPI_STATUS_IGNORE);
local_completed = local_completed && completed;
}
// test_err = MPI_Testall(req_qty, req_completed, &completed, MPI_STATUSES_IGNORE);
}
......@@ -683,6 +694,7 @@ int check_redistribution() {
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......@@ -709,9 +721,9 @@ int end_redistribution() {
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
//MPI_Barrier(mall->intercomm);
mall_conf->sync_time = MPI_Wtime();
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
if(!is_intercomm) mall_conf->sync_end = MPI_Wtime(); // Merge method only
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo
......@@ -729,6 +741,8 @@ int end_redistribution() {
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
}
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->sync_end = MPI_Wtime(); // Merge method only
}
int compute = mall_conf->spawn_method == MALL_SPAWN_MERGE ? 1 : 0;
......@@ -756,6 +770,7 @@ int end_redistribution() {
///=============================================
//TODO Add comment
int shrink_redistribution() {
//MPI_Barrier(mall->comm);
double time_extra = MPI_Wtime();
//TODO Create new state before collecting zombies. Processes can perform tasks before that. Then call again Malleability to commit the change
......@@ -774,6 +789,7 @@ int shrink_redistribution() {
MPI_Comm_free(&(mall->intercomm));
//MPI_Barrier(mall->comm);
mall_conf->spawn_time += MPI_Wtime() - time_extra;
return MALL_DIST_COMPLETED;
} else {
......@@ -867,6 +883,7 @@ int thread_check() {
return -2;
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
//MPI_Barrier(mall->intercomm);
if(!is_intercomm) mall_conf->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment