Commit ca92ad42 authored by iker_martin's avatar iker_martin
Browse files

WIP - Anyadiendo modelado de colectivas por recta de regresion lineal

parent 93704077
......@@ -152,6 +152,8 @@ void init_config_stages(configuration *user_config, int stages) {
user_config->iter_stage[i].array = NULL;
user_config->iter_stage[i].full_array = NULL;
user_config->iter_stage[i].double_array = NULL;
user_config->iter_stage[i].counts = NULL;
user_config->iter_stage[i].displs = NULL;
user_config->iter_stage[i].real_bytes = 0;
}
}
......@@ -169,6 +171,7 @@ void free_config(configuration *user_config) {
free(user_config->phy_dist);
for(i=0; i < user_config->iter_stages; i++) {
if(user_config->iter_stage[i].array != NULL) {
free(user_config->iter_stage[i].array);
user_config->iter_stage[i].array = NULL;
......@@ -181,6 +184,16 @@ void free_config(configuration *user_config) {
free(user_config->iter_stage[i].double_array);
user_config->iter_stage[i].double_array = NULL;
}
if(user_config->iter_stage[i].counts != NULL) {
free(user_config->iter_stage[i].counts);
user_config->iter_stage[i].counts = NULL;
}
if(user_config->iter_stage[i].displs != NULL) {
free(user_config->iter_stage[i].displs);
user_config->iter_stage[i].displs = NULL;
}
}
//free(user_config->iter_stage); //FIXME ERROR de memoria relacionado con la carpeta malleability
......@@ -226,8 +239,8 @@ void print_config_group(configuration *user_config, int grp) {
sons = user_config->procs[grp+1];
}
printf("Config: matrix=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d\n",
user_config->matrix_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst);
printf("Config: matrix=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d, latency=%lf, bw=%lf\n",
user_config->matrix_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst, user_config->latency_m, user_config->bw_m);
for(i=0; i<user_config->iter_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d\n",
i, user_config->iter_stage[i].pt, user_config->iter_stage[i].t_stage, user_config->iter_stage[i].real_bytes);
......
......@@ -11,9 +11,17 @@ typedef struct
double t_op;
int operations;
int bytes, real_bytes;
// Variables to represent linear regresion
// for collective calls.
double slope, intercept;
// Arrays to communicate data;
char* array, *full_array;
double* double_array;
// Arrays to indicate how many bytes are received from each rank
int *counts, *displs;
} iter_stage_t;
typedef struct
......
......@@ -111,7 +111,6 @@ int main(int argc, char *argv[]) {
group->grp = group->grp + 1;
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
print_config(config_file, group->grp);
set_benchmark_grp(group->grp);
get_malleability_user_comm(&comm);
MPI_Comm_size(comm, &(group->numP));
......@@ -378,6 +377,9 @@ void init_application() {
config_file->latency_m = latency(group->myId, group->numP, comm);
config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
obtain_op_times(1);
linear_regression_stage( (void*)&(config_file->iter_stage[0]), (void*)group, comm);
printf("TEST P%d -- slope=%lf intercept=%lf\n", group->myId, config_file->iter_stage[0].slope, config_file->iter_stage[0].intercept);
}
/*
......
......@@ -19,3 +19,10 @@ typedef struct {
char **argv;
char *sync_array, *async_array;
} group_data;
/*----------LINEAR REGRESSION TYPES--------------*/
#define LR_ARRAY_TAM 7
// Array for linear regression computation
// Cantidades 10b 100b 1Kb 100Kb 1Mb 10Mb 100Mb
double LR_bytes_array[LR_ARRAY_TAM] = {10, 100, 1000, 100000, 1000000, 10000000, 100000000};
......@@ -2,4 +2,6 @@
#include <stdio.h>
#include <mpi.h>
void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, int qty);
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#include "Main_datatypes.h"
#include "linear_reg.h"
// Linear regression
// Y = a +bX
// Bytes = a +b(Time)
//
// X is the independent variable, which correlates to the Communication time
// Y is the dependent variable, which correlates to the number of bytes
//
void lr_avg_plus_diff(double *array, double *avg, double *diffs);
/*
* Computes the slope and intercept for a given array of times
* so users can calculate the number of bytes for a given time.
*
*/
void lr_compute(double *times, double *slope, double *intercept) {
int i;
double avgX, avgY;
double *diffsX, *diffsY;
double SSxx, SSxy;
diffsX = malloc(LR_ARRAY_TAM * sizeof(double));
diffsY = malloc(LR_ARRAY_TAM * sizeof(double));
SSxx = SSxy = 0;
lr_avg_plus_diff(times, &avgX, diffsX);
lr_avg_plus_diff(LR_bytes_array, &avgY, diffsY);
for(i=0; i<LR_ARRAY_TAM; i++) {
SSxx+= diffsX[i]*diffsX[i];
SSxy+= diffsX[i]*diffsY[i];
}
*slope = SSxy / SSxx;
*intercept = avgY - (*slope * avgX);
free(diffsX);
free(diffsY);
}
/*
* Computes the average of an arrray and
* the difference of each element in respect to the average.
*
* Returns the average and an the difference of each element.
*/
void lr_avg_plus_diff(double *array, double *avg, double *diffs) {
int i;
double sum = 0;
for(i=0; i<LR_ARRAY_TAM; i++) {
sum+= array[i];
}
*avg = sum / LR_ARRAY_TAM;
for(i=0; i<LR_ARRAY_TAM; i++) {
diffs[i]= *avg - array[i];
}
}
//======================================================||
//======================================================||
//==================TIMES COMPUTATION===================||
//======================================================||
//======================================================||
/*
* Obtains an array of times to perform a "Broadcast"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, double *times) {
int i, j, n, loop_count = 10;
double start_time, stop_time, elapsed_time;
char *aux;
elapsed_time = 0;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
aux = malloc(n * sizeof(char));
MPI_Barrier(comm);
start_time = MPI_Wtime();
for(j=0; j<loop_count; j++) {
MPI_Bcast(aux, n, MPI_CHAR, root, comm);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time = (stop_time - start_time) / loop_count;
MPI_Reduce(MPI_IN_PLACE, &elapsed_time, n, MPI_DOUBLE, MPI_MAX, root, comm);
if(myId == root) {
times[i] = elapsed_time;
}
free(aux);
}
}
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
void lr_compute(double *times, double *slope, double *intercept);
void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, double *times);
......@@ -4,6 +4,7 @@
#include <mpi.h>
#include "computing_func.h"
#include "comunication_func.h"
#include "linear_reg.h"
#include "Main_datatypes.h"
#include "process_stage.h"
#include "../malleability/malleabilityManager.h" //FIXME Refactor
......@@ -30,65 +31,90 @@ void get_byte_dist(int qty, int id, int numP, int *result);
*/
void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm, int compute) {
double result, start_time, t_stage;
int qty = 20000;
int i, aux_bytes, qty = 20000;
group_data group = *((group_data *) group_void);
configuration config_file = *((configuration *) config_file_void);
config_file.iter_stage[stage].operations = qty;
t_stage = config_file.iter_stage[stage].t_stage * config_file.factors[group.grp];
configuration *config_file = (configuration *) config_file_void;
iter_stage_t *stage_data = &(config_file->iter_stage[stage]);
stage_data->operations = qty;
t_stage = stage_data->t_stage * config_file->factors[group.grp];
if(config_file.iter_stage[stage].bytes == 0) {
config_file.iter_stage[stage].bytes = (config_file.iter_stage[stage].t_stage - config_file.latency_m) * config_file.bw_m;
if(stage_data->bytes == 0) {
stage_data->bytes = (stage_data->t_stage - config_file->latency_m) * config_file->bw_m;
}
get_byte_dist(config_file.iter_stage[stage].bytes, group.myId, group.numP, &(config_file.iter_stage[stage].real_bytes) );
get_byte_dist(stage_data->bytes, group.myId, group.numP, &(stage_data->real_bytes) );
start_time = MPI_Wtime();
result = 0;
switch(config_file.iter_stage[stage].pt) {
switch(stage_data->pt) {
//Computo
case COMP_MATRIX:
initMatrix(&(config_file.iter_stage[stage].double_array), config_file.matrix_tam);
initMatrix(&(stage_data->double_array), config_file->matrix_tam);
case COMP_PI:
if(group.myId == ROOT && compute) {
result+= process_stage(config_file_void, stage, group_void, comm);
}
break;
break;
//Comunicación
case COMP_POINT:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].real_bytes);
if(stage_data->array != NULL)
free(stage_data->array);
stage_data->array = malloc(sizeof(char) * stage_data->real_bytes);
break;
case COMP_BCAST:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].real_bytes = config_file.iter_stage[stage].bytes; // Caso especial al usar Bcast
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].real_bytes);
break;
case COMP_ALLTOALL: //FIXME Utilizar version V
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].real_bytes);
if(config_file.iter_stage[stage].full_array != NULL)
free(config_file.iter_stage[stage].full_array);
config_file.iter_stage[stage].full_array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
break;
case COMP_REDUCE: //FIXME Utilizar version V
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].real_bytes);
if(stage_data->array != NULL)
free(stage_data->array);
stage_data->real_bytes = stage_data->bytes; // Caso especial al usar Bcast
stage_data->array = malloc(sizeof(char) * stage_data->real_bytes);
break;
case COMP_ALLGATHER:
if(stage_data->counts != NULL)
free(stage_data->counts);
stage_data->counts = calloc(group.numP,sizeof(int));
if(stage_data->displs != NULL)
free(stage_data->displs);
stage_data->displs = calloc(group.numP,sizeof(int));
get_byte_dist(stage_data->bytes, 0, group.numP, &aux_bytes);
stage_data->counts[0] = aux_bytes;
stage_data->displs[0] = 0;
for(i=1; i<group.numP; i++){
get_byte_dist(stage_data->bytes, i, group.numP, &aux_bytes);
stage_data->counts[i] = aux_bytes;
stage_data->displs[i] = stage_data->displs[i-1] + stage_data->counts[i-1];
}
if(stage_data->array != NULL)
free(stage_data->array);
stage_data->array = malloc(sizeof(char) * stage_data->real_bytes);
if(stage_data->full_array != NULL)
free(stage_data->full_array);
stage_data->full_array = malloc(sizeof(char) * stage_data->bytes);
break;
case COMP_REDUCE:
case COMP_ALLREDUCE:
stage_data->real_bytes = stage_data->bytes;
if(stage_data->array != NULL)
free(stage_data->array);
stage_data->array = malloc(sizeof(char) * stage_data->real_bytes);
//Full array para el reduce necesita el mismo tamanyo
if(config_file.iter_stage[stage].full_array != NULL)
free(config_file.iter_stage[stage].full_array);
config_file.iter_stage[stage].full_array = malloc(sizeof(char) * config_file.iter_stage[stage].real_bytes);
if(stage_data->full_array != NULL)
free(stage_data->full_array);
stage_data->full_array = malloc(sizeof(char) * stage_data->real_bytes);
break;
}
if(compute) {
config_file.iter_stage[stage].t_op = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion
MPI_Bcast(&(config_file.iter_stage[stage].t_op), 1, MPI_DOUBLE, ROOT, comm);
stage_data->t_op = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion
MPI_Bcast(&(stage_data->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
config_file.iter_stage[stage].operations = t_stage / config_file.iter_stage[stage].t_op;
stage_data->operations = t_stage / stage_data->t_op;
}
/*
......@@ -122,12 +148,15 @@ double process_stage(void *config_file_void, int stage, void *group_void, MPI_Co
case COMP_BCAST:
MPI_Bcast(stage_data.array, stage_data.real_bytes, MPI_CHAR, ROOT, comm);
break;
case COMP_ALLTOALL:
MPI_Alltoall(stage_data.array, stage_data.real_bytes, MPI_CHAR, stage_data.full_array, stage_data.real_bytes, MPI_CHAR, comm);
case COMP_ALLGATHER:
MPI_Allgatherv(stage_data.array, stage_data.real_bytes, MPI_CHAR, stage_data.full_array, stage_data.counts, stage_data.displs, MPI_CHAR, comm);
break;
case COMP_REDUCE:
MPI_Reduce(stage_data.array, stage_data.full_array, stage_data.real_bytes, MPI_CHAR, MPI_MAX, ROOT, comm);
break;
case COMP_ALLREDUCE:
MPI_Allreduce(stage_data.array, stage_data.full_array, stage_data.real_bytes, MPI_CHAR, MPI_MAX, comm);
break;
}
return result;
}
......@@ -150,26 +179,27 @@ double latency(int myId, int numP, MPI_Comm comm) {
elapsed_time = 0;
if(myId+1 != numP || (myId+1 == numP && numP % 2 == 0)) {
for(i=0; i<loop_count; i++){
MPI_Barrier(comm);
start_time = MPI_Wtime();
if(myId % 2 == 0){
MPI_Ssend(&aux, 1, MPI_CHAR, myId+1, 99, comm);
MPI_Recv(&aux, 1, MPI_CHAR, myId+1, 99, comm, MPI_STATUS_IGNORE);
MPI_Barrier(comm);
start_time = MPI_Wtime();
if(myId % 2 == 0){
for(i=0; i<loop_count; i++){
MPI_Ssend(&aux, 0, MPI_CHAR, myId+1, 99, comm);
}
else if(myId % 2 == 1){
MPI_Recv(&aux, 1, MPI_CHAR, myId-1, 99, comm, MPI_STATUS_IGNORE);
MPI_Ssend(&aux, 1, MPI_CHAR, myId-1, 99, comm);
MPI_Recv(&aux, 0, MPI_CHAR, myId+1, 99, comm, MPI_STATUS_IGNORE);
} else {
for(i=0; i<loop_count; i++){
MPI_Recv(&aux, 0, MPI_CHAR, myId-1, 99, comm, MPI_STATUS_IGNORE);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time += stop_time - start_time;
MPI_Ssend(&aux, 0, MPI_CHAR, myId-1, 99, comm);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time = (stop_time - start_time) / loop_count;
}
if(myId %2 == 0) {
elapsed_time/=loop_count;
if(myId %2 != 0) {
elapsed_time=0;
}
MPI_Allreduce(&elapsed_time, &max_time, 1, MPI_DOUBLE, MPI_MAX, comm);
return max_time;
......@@ -190,41 +220,82 @@ double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n) {
n_bytes = n * sizeof(char);
aux = malloc(n_bytes);
elapsed_time = 0;
time = 0;
if(myId+1 != numP || (myId+1 == numP && numP % 2 == 0)) {
for(i=0; i<loop_count; i++){
MPI_Barrier(comm);
start_time = MPI_Wtime();
if(myId %2 == 0){
MPI_Barrier(comm);
start_time = MPI_Wtime();
if(myId % 2 == 0){
for(i=0; i<loop_count; i++){
MPI_Ssend(aux, n, MPI_CHAR, myId+1, 99, comm);
MPI_Recv(aux, n, MPI_CHAR, myId+1, 99, comm, MPI_STATUS_IGNORE);
}
else if(myId %2 == 1){
MPI_Recv(aux, 0, MPI_CHAR, myId+1, 99, comm, MPI_STATUS_IGNORE);
} else {
for(i=0; i<loop_count; i++){
MPI_Recv(aux, n, MPI_CHAR, myId-1, 99, comm, MPI_STATUS_IGNORE);
MPI_Ssend(aux, n, MPI_CHAR, myId-1, 99, comm);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time += stop_time - start_time;
MPI_Ssend(aux, 0, MPI_CHAR, myId-1, 99, comm);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time = (stop_time - start_time) / loop_count;
}
if(myId %2 == 0) {
time = elapsed_time / loop_count - latency;
time = elapsed_time - latency;
}
MPI_Allreduce(&time, &max_time, 1, MPI_DOUBLE, MPI_MAX, comm);
bw = ((double)n_bytes * 2) / max_time;
bw = ((double)n_bytes) / max_time;
free(aux);
return bw;
}
/*
*
*/
void linear_regression_stage(void *stage_void, void *group_void, MPI_Comm comm) {
group_data group = *((group_data *) group_void);
iter_stage_t *stage = (iter_stage_t *) stage_void;
double *times = NULL;
if(group.myId == ROOT) {
times = malloc(LR_ARRAY_TAM * sizeof(double));
}
switch(stage->pt) {
//Comunicaciones
case COMP_BCAST:
lr_times_bcast(group.myId, group.numP, ROOT, comm, times);
if(group.myId == ROOT) {
lr_compute(times, &(stage->slope), &(stage->intercept));
}
MPI_Bcast(&(stage->slope), 1, MPI_DOUBLE, ROOT, comm);
MPI_Bcast(&(stage->intercept), 1, MPI_DOUBLE, ROOT, comm);
break;
case COMP_ALLGATHER:
break;
case COMP_REDUCE:
break;
case COMP_ALLREDUCE:
break;
default:
break;
}
free(times);
}
/*
* Obatains for "Id" and "numP", how many
* bytes will have process "Id" and returns
* that quantity.
*
* Processes under "rem" will have more data
* than those with ranks higher or equal to "rem".
*
* TODO Refactor: Ya existe esta funcion en malleability/CommDist.c
*/
void get_byte_dist(int qty, int id, int numP, int *result) {
......
......@@ -4,7 +4,7 @@
//#include "Main_datatypes.h"
//#include "../malleability/malleabilityManager.h" //FIXME Refactor
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLTOALL, COMP_REDUCE};
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLGATHER, COMP_REDUCE, COMP_ALLREDUCE};
//FIXME Refactor el void
void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm, int compute);
......@@ -12,3 +12,4 @@ double process_stage(void *config_file_void, int stage, void* group_void, MPI_Co
double latency(int myId, int numP, MPI_Comm comm);
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n);
void linear_regression_stage(void *stage_void, void *group_void, MPI_Comm comm);
#mpicc -Wall Main/Main.c Main/computing_func.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
mpicc -Wall Main/Main.c Main/computing_func.c Main/comunication_func.c Main/process_stage.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
mpicc -g -Wall Main/Main.c Main/computing_func.c Main/comunication_func.c Main/linear_reg.c Main/process_stage.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
if [ $# -gt 0 ]
then
......
......@@ -13,12 +13,13 @@ configFile=$1
outIndex=$2
echo "MPICH"
module load mpich-3.4.1-noucx
#module load mpich-3.4.1-noucx
#export HYDRA_DEBUG=1
numP=$(bash recordMachinefile.sh $configFile)
#mpirun -print-all-exitcodes -f hostfile.o$SLURM_JOB_ID $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes
#mpirun -np 2 /home/martini/Instalaciones/valgrind-mpich-3.4.1-noucx/bin/valgrind --leak-check=full --show-leak-kinds=all --log-file=nc.vg.%p $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes
mpirun -np $numP $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment