Commit fc5bd315 authored by iker_martin's avatar iker_martin
Browse files

Added code to choose malleability methods at startup. Fixed minor error in P2P redistribution.

parent 01e07630
......@@ -96,8 +96,12 @@ int main (int argc, char *argv[]) {
struct Dist_data dist_data;
if (argc >= 5) {
num_children = atoi(argv[2]);
nodelist = argv[3];
num_nodes = atoi(argv[4]);
sm = atoi(argv[3]);
ss = atoi(argv[4]);
rm = atoi(argv[5]);
rs = atoi(argv[6]);
nodelist = argv[7];
num_nodes = atoi(argv[8]);
num_cpus = num_nodes * num_cpus;
}
......@@ -397,9 +401,12 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm) {
int state = MALL_NOT_STARTED;
int ended_loop = 1;
int cnt = 0;
int reconfigure = 0;
computeData->maxiter = 1000;
dumb(computeData,dist_data);
while ((computeData->iter < computeData->maxiter) && (computeData->tol > computeData->umbral)) {
//while (computeData->tol > computeData->umbral) {
......@@ -423,17 +430,19 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm) {
computeData->alpha = computeData->beta / computeData->alpha; // alpha = beta / alpha
dscal (&(dist_data->tamBl), &computeData->alpha, computeData->d, &IONE); // d = alpha * d
daxpy (&(dist_data->tamBl), &DONE, computeData->res, &IONE, computeData->d, &IONE); // d += res
if(computeData->iter==3) dumb(computeData,dist_data);
// if(computeData->iter==3) dumb(computeData,dist_data);
MPI_Allgatherv(computeData->d, dist_data->tamBl, MPI_DOUBLE, computeData->d_full,
computeData->dist_rows, computeData->displs_rows, MPI_DOUBLE, dist_data->comm); // d_full = Gather(d)
computeData->tol = sqrt (computeData->beta); // tol = sqrt(beta) = norm (res)
computeData->iter++;
if (computeData->iter == 3) {
if (computeData->iter == 3) reconfigure = 1;
if (reconfigure) {
state = malleability_checkpoint();
if ((state == MALL_COMPLETED && sm == 0) || state == MALL_ZOMBIE) {break;}
else if(state == MALL_COMPLETED) {
dumb(computeData,dist_data);
reconfigure = 0;
//dumb(computeData,dist_data);
free_computeData(computeData, 0);
update_dist_data(dist_data);
dist_new(dist_data, computeData);
......@@ -451,14 +460,14 @@ int compute(Compute_data *computeData, struct Dist_data *dist_data, int sm) {
void dumb(Compute_data *computeData, struct Dist_data *dist_data) {
int i;
sleep(dist_data->myId+dist_data->numP);
printf("P%d -tamBl=%d D=", dist_data->myId, dist_data->tamBl);
printf("P%d -tamBl=%d", dist_data->myId, dist_data->tamBl);
/*for(i=0; i<dist_data->tamBl; i++) {
printf("%lf ", computeData->d[i]);
}*/
printf("\n");
printf("D_full=");
for(i=0; i<computeData->n; i++) {
printf("%lf ", computeData->d_full[i]);
printf("Vlen=");
for(i=0; i<dist_data->tamBl; i++) {
printf("%d ", computeData->vlen[i]);
}
printf("\n");
fflush(stdout); MPI_Barrier(dist_data->comm);
......@@ -524,6 +533,8 @@ void free_computeData(Compute_data *computeData, int terminate) {
*/
int dist_old(struct Dist_data *dist_data, Compute_data *computeData, int num_children, int sm, int ss, int rm, int rs) {
int phy_dist = 2;
int send_synch = rs == 1 ? 1 : 0;
set_malleability_configuration(sm, ss, phy_dist, rm, rs);
set_children_number(num_children);
......@@ -540,10 +551,10 @@ int dist_old(struct Dist_data *dist_data, Compute_data *computeData, int num_chi
malleability_add_data(computeData->z, computeData->n, MAL_DOUBLE, MAL_DATA_ALONE, 0, 1);
//FIXME SIguientes valores pueden ser asincronos
malleability_add_data(computeData->vlen, computeData->n, MAL_INT, 1+MAL_DATA_INDEPENDENT, 0, 1);
malleability_add_data(computeData->vlen, computeData->n, MAL_INT, 1+MAL_DATA_INDEPENDENT, 0, send_synch);
malleability_add_data(computeData->subm.vpos, computeData->n, MAL_INT, 1+MAL_DATA_DEPENDENT, 0, 1);
malleability_add_data(computeData->subm.vval, computeData->n, MAL_DOUBLE, 1+MAL_DATA_DEPENDENT, 0, 1);
malleability_add_data(computeData->subm.vpos, computeData->n, MAL_INT, 1+MAL_DATA_DEPENDENT, 0, send_synch);
malleability_add_data(computeData->subm.vval, computeData->n, MAL_DOUBLE, 1+MAL_DATA_DEPENDENT, 0, send_synch);
}
/*
......@@ -565,8 +576,11 @@ int dist_old(struct Dist_data *dist_data, Compute_data *computeData, int num_chi
*
*/
void dist_new(struct Dist_data *dist_data, Compute_data *computeData) {
int IONE = 1, i;
int IONE = 1, i, is_synch;
size_t entry, entries;
void *value = NULL;
is_synch = 1;
entry = 3;
malleability_get_data(&value, 0, 1, 1);
computeData->n = *((int *)value);
......@@ -594,7 +608,10 @@ void dist_new(struct Dist_data *dist_data, Compute_data *computeData) {
CreateDoubles(&computeData->d, dist_data->tamBl);
dcopy (&(dist_data->tamBl), &(computeData->d_full[dist_data->ini]), &IONE, computeData->d, &IONE); // d = d_full[ini] to d_full[ini+tamBl]
malleability_get_data(&value, 3, 0, 1);
malleability_get_entries(&entries, 0, 0); //Get if there is any asynch data to recover
if(entries) is_synch=0;
malleability_get_data(&value, entry, 0, is_synch);
computeData->vlen = ((int *)value);
CreateSparseMatrixVptr(&(computeData->subm), dist_data->tamBl, computeData->n, 0);
computeData->subm.vptr[0] = 0;
......@@ -603,10 +620,11 @@ void dist_new(struct Dist_data *dist_data, Compute_data *computeData) {
}
TransformLengthtoHeader(computeData->subm.vptr, computeData->subm.dim1); // The array is converted from vlen to vptr
malleability_get_data(&value, 4, 0, 1);
malleability_get_data(&value, entry+1, 0, is_synch);
computeData->subm.vpos = ((int *)value);
malleability_get_data(&value, 5, 0, 1);
malleability_get_data(&value, entry+2, 0, is_synch);
computeData->subm.vval = ((double *)value);
}
void update_dist_data(struct Dist_data *dist_data) {
......
......@@ -97,8 +97,12 @@ int sync_communication(void *send, void **recv, int qty, int mal_type, int depen
if(dependency == 1+MAL_DATA_DEPENDENT) {
if(is_children_group) {
recalculate_counts(&r_counts, (int *) ind_recv, recv, mal_type);
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts.counts, r_counts.displs, numO, 0, "Children C ");
} else {
recalculate_counts(&s_counts, (int *) ind_send, recv, mal_type);
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, s_counts.counts, s_counts.displs, numO, 0, "Parents ");
if(!is_intercomm) {
recalculate_counts(&r_counts, (int *) ind_recv, recv, mal_type);
}
......@@ -108,7 +112,7 @@ int sync_communication(void *send, void **recv, int qty, int mal_type, int depen
/* PERFORM COMMUNICATION */
switch(red_method) {
case MALL_RED_POINT:
sync_point2point(send, *recv, mal_type, is_intercomm, myId, s_counts, r_counts, aux_comm);
sync_point2point(send, *recv, mal_type, is_intercomm, myId, s_counts, r_counts, comm);
break;
case MALL_RED_BASELINE:
default:
......@@ -155,15 +159,19 @@ int sync_communication(void *send, void **recv, int qty, int mal_type, int depen
*/
void sync_point2point(void *send, void *recv, int mal_type, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
int i, j, init, end, total_sends;
size_t datasize, offset;
MPI_Request *sends;
MPI_Datatype datatype;
if(mal_type == MAL_INT) {
datatype = MPI_INT;
datasize = sizeof(int);
} else if(mal_type == MAL_DOUBLE) {
datatype = MPI_DOUBLE;
datasize = sizeof(double);
} else if(mal_type == MAL_CHAR) {
datatype = MPI_CHAR;
datasize = sizeof(char);
} else {
printf("Malleability -- Redistribution type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
......@@ -185,7 +193,8 @@ void sync_point2point(void *send, void *recv, int mal_type, int is_intercomm, in
}
for(i=init; i<end; i++) {
sends[j] = MPI_REQUEST_NULL;
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
offset = s_counts.displs[i] * datasize;
MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
j++;
}
......@@ -197,7 +206,8 @@ void sync_point2point(void *send, void *recv, int mal_type, int is_intercomm, in
}
for(i=init; i<end; i++) {
MPI_Recv(recv+r_counts.displs[i], r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
offset = r_counts.displs[i] * datasize;
MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
}
if(total_sends > 0) {
......@@ -257,12 +267,19 @@ int async_communication(void *send, void **recv, int qty, int mal_type, int depe
MPI_Abort(MPI_COMM_WORLD, -1);
}
if(dependency == 1+MAL_DATA_DEPENDENT) {
if(is_children_group) {
recalculate_counts(&r_counts, (int *) ind_recv, recv, mal_type);
} else {
recalculate_counts(&s_counts, (int *) ind_send, recv, mal_type);
if(!is_intercomm) {
recalculate_counts(&r_counts, (int *) ind_recv, recv, mal_type);
}
}
}
/* PERFORM COMMUNICATION */
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
case MALL_RED_RMA_LOCK:
return MALL_DENIED; //TODO Realizar versiones asíncronas
case MALL_RED_POINT:
async_point2point(send, *recv, datatype, s_counts, r_counts, comm, *requests);
break;
......
......@@ -55,6 +55,7 @@ typedef struct { //FIXME numC_spawned no se esta usando
} malleability_t;
int state = MALL_UNRESERVED; //FIXME Mover a otro lado
int dep_not_send = 1; //FIXME BORRAR
malleability_config_t *mall_conf;
malleability_t *mall;
......@@ -427,7 +428,9 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
void *aux_send, *aux_recv;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
i= dep_not_send ? 0 : data_struct->entries - 2; //FIXME BORRAR
for(; i < data_struct->entries; i++) {
if(data_struct->dependencies[i] == 1+MAL_DATA_DEPENDENT && dep_not_send) break; //FIXME BORRAR dep_not_send
aux_send = data_struct->arrays[i];
aux_recv = NULL;
async_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], data_struct->dependencies[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies, mall->intercomm,
......@@ -661,6 +664,11 @@ int check_redistribution() {
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
}
if(dep_not_send && mall_conf->spawn_method == MALL_SPAWN_MERGE) { //FIXME BORRAR // A MITAD -- Algunas partes son útiles
dep_not_send = 0;
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
return MALL_DIST_PENDING;
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
//if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment