Commit a14e5c25 authored by iker_martin's avatar iker_martin
Browse files

Added new functionality for stage processing. Now is possible to perform the...

Added new functionality for stage processing. Now is possible to perform the stage until desired time stage is reached instead of using a number of operations
parent edc48a8e
......@@ -49,8 +49,9 @@ static int handler(void* user, const char* section, const char* name,
// Iter stage
} else if (MATCH(stage_name, "Stage_Type") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
//if(pconfig->actual_stage < pconfig->n_stages)
pconfig->stages[pconfig->actual_stage].pt = atoi(value);
} else if (MATCH(stage_name, "Stage_Time_Capped") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].t_capped = atoi(value);
} else if (MATCH(stage_name, "Stage_Bytes") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].bytes = atoi(value);
} else if (MATCH(stage_name, "Stage_Time") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
......
......@@ -28,6 +28,9 @@ typedef struct {
typedef struct
{
int pt; // Procedure type
// Wether the stage completes after "operations" iterations (0)
// or after "t_stage" time has passed (1).
int t_capped;
double t_stage; // Time to complete the stage
double t_op;
......@@ -56,9 +59,7 @@ typedef struct
size_t actual_resize, actual_stage;
int rigid_times;
int granularity, sdr, adr;
double latency_m, bw_m;
double t_op_comms;
iter_stage_t *stages;
group_config_t *groups;
} configuration;
......
......@@ -88,13 +88,13 @@ void malloc_config_stages(configuration *user_config) {
size_t i;
if(user_config != NULL) {
user_config->stages = malloc(sizeof(iter_stage_t) * user_config->n_stages);
user_config->t_op_comms = 0;
for(i=0; i<user_config->n_stages; i++) {
user_config->stages[i].array = NULL;
user_config->stages[i].full_array = NULL;
user_config->stages[i].double_array = NULL;
user_config->stages[i].counts.counts = NULL;
user_config->stages[i].real_bytes = 0;
user_config->stages[i].t_capped = 0;
}
}
}
......@@ -144,8 +144,8 @@ void print_config(configuration *user_config) {
printf("Config loaded: R=%zu, S=%zu, granularity=%d, SDR=%d, ADR=%d\n",
user_config->n_resizes, user_config->n_stages, user_config->granularity, user_config->sdr, user_config->adr);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %zu: PT=%d, T_stage=%lf, bytes=%d\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes);
printf("Stage %zu: PT=%d, T_stage=%lf, bytes=%d, T_capped=%d\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].t_capped);
}
for(i=0; i<user_config->n_resizes; i++) {
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, AT=%d, SM=%d, SS=%d\n",
......@@ -176,8 +176,8 @@ void print_config_group(configuration *user_config, size_t grp) {
printf("Config: granularity=%d, SDR=%d, ADR=%d\n",
user_config->granularity, user_config->sdr, user_config->adr);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %zu: PT=%d, T_stage=%lf, bytes=%d\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes);
printf("Stage %zu: PT=%d, T_stage=%lf, bytes=%d, T_capped=%d\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].t_capped);
}
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, AT=%d, SM=%d, SS=%d, parents=%d, children=%d\n",
grp, user_config->groups[grp].iters, user_config->groups[grp].procs, user_config->groups[grp].factor,
......@@ -272,15 +272,14 @@ void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 8;
int blocklengths[8] = {1, 1, 1, 1, 1, 1, 1, 1};
int i, counts = 6;
int blocklengths[6] = {1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = MPI_UNSIGNED_LONG;
types[2] = types[3] = types[4] = types[5] = MPI_INT;
types[6] = types[7] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
......@@ -291,8 +290,6 @@ void def_struct_config_file(configuration *config_file, MPI_Datatype *config_typ
MPI_Get_address(&(config_file->sdr), &displs[3]);
MPI_Get_address(&(config_file->adr), &displs[4]);
MPI_Get_address(&(config_file->rigid_times), &displs[5]);
MPI_Get_address(&(config_file->latency_m), &displs[6]);
MPI_Get_address(&(config_file->bw_m), &displs[7]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -343,13 +340,13 @@ void def_struct_groups(group_config_t *groups, size_t n_resizes, MPI_Datatype *c
* de la estructuras de fases de iteracion en una sola comunicacion.
*/
void def_struct_iter_stage(iter_stage_t *stages, size_t n_stages, MPI_Datatype *config_type) {
int i, counts = 4;
int blocklengths[4] = {1, 1, 1, 1};
int i, counts = 5;
int blocklengths[5] = {1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
// Rellenar vector types
types[0] = types[3] = MPI_INT;
types[0] = types[3] = types[4] = MPI_INT;
types[1] = types[2] = MPI_DOUBLE;
// Rellenar vector displs
......@@ -359,6 +356,7 @@ void def_struct_iter_stage(iter_stage_t *stages, size_t n_stages, MPI_Datatype *
MPI_Get_address(&(stages->t_stage), &displs[1]);
MPI_Get_address(&(stages->t_op), &displs[2]);
MPI_Get_address(&(stages->bytes), &displs[3]);
MPI_Get_address(&(stages->t_capped), &displs[4]);
for(i=0;i<counts;i++) displs[i] -= dir;
......
......@@ -75,7 +75,7 @@ double init_stage(configuration *config_file, int stage_i, group_data group, MPI
*/
double process_stage(configuration config_file, iter_stage_t stage, group_data group, MPI_Comm comm) {
int i;
double result;
double result, t_start, t_total = 0;
switch(stage.pt) {
//Computo
......@@ -93,18 +93,29 @@ double process_stage(configuration config_file, iter_stage_t stage, group_data g
case COMP_POINT:
point_to_point(group.myId, group.numP, ROOT, comm, stage.array, stage.real_bytes);
break;
case COMP_BCAST:
if(stage.bytes != 0) {
if(stage.t_capped) {
while(t_total < stage.t_stage) {
t_start = MPI_Wtime();
MPI_Bcast(stage.array, stage.real_bytes, MPI_CHAR, ROOT, comm);
t_total += MPI_Wtime() - t_start;
MPI_Bcast(&t_total, 1, MPI_DOUBLE, ROOT, comm);
}
} else {
for(i=0; i < stage.operations; i++) {
point_to_point_inter(group.myId, group.numP, comm, stage.array, stage.real_bytes);
MPI_Bcast(stage.array, stage.real_bytes, MPI_CHAR, ROOT, comm);
}
}
break;
case COMP_ALLGATHER:
if(stage.bytes != 0) {
if(stage.t_capped) {
while(t_total < stage.t_stage) {
t_start = MPI_Wtime();
MPI_Allgatherv(stage.array, stage.my_bytes, MPI_CHAR, stage.full_array, stage.counts.counts, stage.counts.displs, MPI_CHAR, comm);
t_total += MPI_Wtime() - t_start;
MPI_Bcast(&t_total, 1, MPI_DOUBLE, ROOT, comm);
}
} else {
for(i=0; i < stage.operations; i++) {
MPI_Allgatherv(stage.array, stage.my_bytes, MPI_CHAR, stage.full_array, stage.counts.counts, stage.counts.displs, MPI_CHAR, comm);
......@@ -112,8 +123,13 @@ double process_stage(configuration config_file, iter_stage_t stage, group_data g
}
break;
case COMP_REDUCE:
if(stage.bytes != 0) {
if(stage.t_capped) {
while(t_total < stage.t_stage) {
t_start = MPI_Wtime();
MPI_Reduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, ROOT, comm);
t_total += MPI_Wtime() - t_start;
MPI_Bcast(&t_total, 1, MPI_DOUBLE, ROOT, comm);
}
} else {
for(i=0; i < stage.operations; i++) {
MPI_Reduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, ROOT, comm);
......@@ -121,8 +137,13 @@ double process_stage(configuration config_file, iter_stage_t stage, group_data g
}
break;
case COMP_ALLREDUCE:
if(stage.bytes != 0) {
if(stage.t_capped) {
while(t_total < stage.t_stage) {
t_start = MPI_Wtime();
MPI_Allreduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, comm);
t_total += MPI_Wtime() - t_start;
MPI_Bcast(&t_total, 1, MPI_DOUBLE, ROOT, comm);
}
} else {
for(i=0; i < stage.operations; i++) {
MPI_Allreduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, comm);
......@@ -213,15 +234,16 @@ void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_
stage->array = malloc(stage->real_bytes * sizeof(char));
}
// TODO Compute should be always 1 if the number of processes is different
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double time = 0;
if(stage->array != NULL)
free(stage->array);
stage->real_bytes = stage->bytes ? stage->bytes : config_file->granularity;
stage->real_bytes = (stage->bytes && !stage->t_capped) ? stage->bytes : config_file->granularity;
stage->array = malloc(stage->real_bytes * sizeof(char));
if(compute && stage->bytes) {
if(compute && !stage->bytes && !stage->t_capped) {
time = init_emulation_comm_time(group, config_file, stage, comm);
} else {
stage->operations = 1;
......@@ -229,7 +251,7 @@ double init_comm_bcast_pt(group_data group, configuration *config_file, iter_sta
return time;
}
// TODO Compute should be always 1 if the number of processes is different
double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double time=0;
struct Dist_data dist_data;
......@@ -241,7 +263,7 @@ double init_comm_allgatherv_pt(group_data group, configuration *config_file, ite
if(stage->full_array != NULL)
free(stage->full_array);
stage->real_bytes = stage->bytes ? stage->bytes : config_file->granularity;
stage->real_bytes = (stage->bytes && !stage->t_capped) ? stage->bytes : config_file->granularity;
prepare_comm_allgatherv(group.numP, stage->real_bytes, &(stage->counts));
......@@ -251,7 +273,7 @@ double init_comm_allgatherv_pt(group_data group, configuration *config_file, ite
stage->array = malloc(stage->my_bytes * sizeof(char));
stage->full_array = malloc(stage->real_bytes * sizeof(char));
if(compute && stage->bytes) {
if(compute && !stage->bytes && !stage->t_capped) {
time = init_emulation_comm_time(group, config_file, stage, comm);
} else {
stage->operations = 1;
......@@ -260,6 +282,7 @@ double init_comm_allgatherv_pt(group_data group, configuration *config_file, ite
return time;
}
// TODO Compute should be always 1 if the number of processes is different
double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double time = 0;
if(stage->array != NULL)
......@@ -267,12 +290,12 @@ double init_comm_reduce_pt(group_data group, configuration *config_file, iter_st
if(stage->full_array != NULL)
free(stage->full_array);
stage->real_bytes = stage->bytes ? stage->bytes : config_file->granularity;
stage->real_bytes = (stage->bytes && !stage->t_capped) ? stage->bytes : config_file->granularity;
stage->array = malloc(stage->real_bytes * sizeof(char));
//Full array para el reduce necesita el mismo tamanyo
stage->full_array = malloc(stage->real_bytes * sizeof(char));
if(compute && stage->bytes) {
if(compute && !stage->bytes && !stage->t_capped) {
time = init_emulation_comm_time(group, config_file, stage, comm);
} else {
stage->operations = 1;
......
......@@ -11,9 +11,4 @@ enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLGATHE
double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute);
//double stage_init_all();
double process_stage(configuration config_file, iter_stage_t stage, group_data group, MPI_Comm comm);
double latency(int myId, int numP, MPI_Comm comm);
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n);
//FIXME Refactor el void??
void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm);
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment