Commit 9a0629d9 authored by iker_martin's avatar iker_martin
Browse files
parents 0473e8a1 f453a143
objects1 := ini
objects2 := results read_ini
CC := gcc
MCC := mpicc
CFLAGS := -Wall
all: $(objects1) $(objects2)
$(objects1): %: %.c %.h
$(CC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
$(objects2): %: %.c %.h $(objects1).h $(TOP)/malleability/distribution_methods/block_distribution.h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
......@@ -4,6 +4,7 @@
#include <mpi.h>
#include "read_ini.h"
#include "../malleability/ProcessDist.h"
#include "../malleability/distribution_methods/block_distribution.h"
#include "ini.h"
......@@ -11,7 +12,7 @@ void malloc_config_resizes(configuration *user_config, int resizes);
void init_config_stages(configuration *user_config, int stages);
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type);
void def_struct_config_file_array(configuration *config_file, MPI_Datatype *config_type);
void def_struct_iter_stage(iter_stage_t *iter_stage, MPI_Datatype *config_type);
void def_struct_iter_stage(iter_stage_t *stages, int n_stages, MPI_Datatype *config_type);
/*
* Funcion utilizada para leer el fichero de configuracion
......@@ -25,71 +26,73 @@ static int handler(void* user, const char* section, const char* name,
configuration* pconfig = (configuration*)user;
char *resize_name = malloc(10 * sizeof(char));
int act_resize = pconfig->actual_resize;
snprintf(resize_name, 10, "resize%d", act_resize);
snprintf(resize_name, 10, "resize%d", pconfig->actual_resize);
char *iter_name = malloc(10 * sizeof(char));
int act_iter = pconfig->actual_iter;
snprintf(iter_name, 10, "stage%d", act_iter);
char *stage_name = malloc(10 * sizeof(char));
snprintf(stage_name, 10, "stage%d", pconfig->actual_stage);
#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0
if (MATCH("general", "resizes")) {
pconfig->resizes = atoi(value) + 1;
malloc_config_resizes(pconfig, pconfig->resizes);
} else if (MATCH("general", "iter_stages")) {
pconfig->iter_stages = atoi(value);
pconfig->iter_stage = malloc(sizeof(iter_stage_t) * pconfig->iter_stages);
init_config_stages(pconfig, pconfig->iter_stages);
} else if (MATCH("general", "matrix_tam")) {
pconfig->matrix_tam = atoi(value);
} else if (MATCH("general", "comm_tam")) {
pconfig->comm_tam = atoi(value);
if (MATCH("general", "R")) {
pconfig->n_resizes = atoi(value) + 1;
malloc_config_resizes(pconfig, pconfig->n_resizes);
} else if (MATCH("general", "S")) {
pconfig->n_stages = atoi(value);
pconfig->stages = malloc(sizeof(iter_stage_t) * pconfig->n_stages);
init_config_stages(pconfig, pconfig->n_stages);
} else if (MATCH("general", "Granularity")) {
pconfig->granularity = atoi(value);
} else if (MATCH("general", "SDR")) {
pconfig->sdr = atoi(value);
} else if (MATCH("general", "ADR")) {
pconfig->adr = atoi(value);
} else if (MATCH("general", "AIB")) { //TODO Refactor cambiar nombre
pconfig->aib = atoi(value);
} else if (MATCH("general", "CST")) {
pconfig->cst = atoi(value);
} else if (MATCH("general", "CSS")) {
pconfig->css = atoi(value);
} else if (MATCH("general", "AT")) {
pconfig->at = atoi(value);
} else if (MATCH("general", "SM")) {
pconfig->sm = atoi(value);
} else if (MATCH("general", "SS")) {
pconfig->ss = atoi(value);
// Iter stage
} else if (MATCH(iter_name, "PT")) {
pconfig->iter_stage[act_iter].pt = atoi(value);
} else if (MATCH(iter_name, "bytes")) {
pconfig->iter_stage[act_iter].bytes = atoi(value);
} else if (MATCH(iter_name, "t_stage")) {
pconfig->iter_stage[act_iter].t_stage = atof(value);
pconfig->actual_iter = pconfig->actual_iter+1; // Ultimo elemento del grupo
} else if (MATCH(stage_name, "PT")) {
if(pconfig->actual_stage < pconfig->n_stages)
pconfig->stages[pconfig->actual_stage].pt = atoi(value);
} else if (MATCH(stage_name, "bytes")) {
if(pconfig->actual_stage < pconfig->n_stages)
pconfig->stages[pconfig->actual_stage].bytes = atoi(value);
} else if (MATCH(stage_name, "t_stage")) {
if(pconfig->actual_stage < pconfig->n_stages) {
pconfig->stages[pconfig->actual_stage].t_stage = atof(value);
pconfig->actual_stage = pconfig->actual_stage+1; // Ultimo elemento del grupo
}
// Resize stage
} else if (MATCH(resize_name, "iters")) {
pconfig->iters[act_resize] = atoi(value);
} else if (MATCH(resize_name, "procs")) {
pconfig->procs[act_resize] = atoi(value);
} else if (MATCH(resize_name, "factor")) {
pconfig->factors[act_resize] = atof(value);
} else if (MATCH(resize_name, "physical_dist")) {
} else if (MATCH(resize_name, "Iters")) {
if(pconfig->actual_resize < pconfig->n_resizes)
pconfig->iters[pconfig->actual_resize] = atoi(value);
} else if (MATCH(resize_name, "Procs")) {
if(pconfig->actual_resize < pconfig->n_resizes)
pconfig->procs[pconfig->actual_resize] = atoi(value);
} else if (MATCH(resize_name, "FactorS")) {
if(pconfig->actual_resize < pconfig->n_resizes)
pconfig->factors[pconfig->actual_resize] = atof(value);
} else if (MATCH(resize_name, "Dist")) {
if(pconfig->actual_resize < pconfig->n_resizes) {
char *aux = strdup(value);
if (strcmp(aux, "node") == 0) {
pconfig->phy_dist[act_resize] = COMM_PHY_NODES;
if (strcmp(aux, "spread") == 0) {
pconfig->phy_dist[pconfig->actual_resize] = COMM_PHY_SPREAD;
} else {
pconfig->phy_dist[act_resize] = COMM_PHY_CPU;
pconfig->phy_dist[pconfig->actual_resize] = COMM_PHY_COMPACT;
}
free(aux);
pconfig->actual_resize = pconfig->actual_resize+1; // Ultimo elemento del grupo
}
} else {
return 0; /* unknown section or name, error */
}
free(resize_name);
free(iter_name);
free(stage_name);
return 1;
}
......@@ -109,7 +112,7 @@ configuration *read_ini_file(char *file_name) {
return NULL;
}
config->actual_resize=0;
config->actual_iter=0;
config->actual_stage=0;
if(ini_parse(file_name, handler, config) < 0) { // Obtener configuracion
printf("Can't load '%s'\n", file_name);
......@@ -150,10 +153,12 @@ void malloc_config_resizes(configuration *user_config, int resizes) {
void init_config_stages(configuration *user_config, int stages) {
int i;
if(user_config != NULL) {
for(i=0; i<user_config->iter_stages; i++) {
user_config->iter_stage[i].array = NULL;
user_config->iter_stage[i].full_array = NULL;
user_config->iter_stage[i].double_array = NULL;
for(i=0; i<user_config->n_stages; i++) {
user_config->stages[i].array = NULL;
user_config->stages[i].full_array = NULL;
user_config->stages[i].double_array = NULL;
user_config->stages[i].counts.counts = NULL;
user_config->stages[i].real_bytes = 0;
}
}
}
......@@ -169,19 +174,27 @@ void free_config(configuration *user_config) {
free(user_config->factors);
free(user_config->phy_dist);
for(i=0; i < user_config->iter_stages; i++) {
if(user_config->iter_stage[i].array != NULL)
free(user_config->iter_stage[i].array);
user_config->iter_stage[i].array = NULL;
if(user_config->iter_stage[i].full_array != NULL)
free(user_config->iter_stage[i].full_array);
user_config->iter_stage[i].full_array = NULL;
if(user_config->iter_stage[i].double_array != NULL)
free(user_config->iter_stage[i].double_array);
user_config->iter_stage[i].double_array = NULL;
for(i=0; i < user_config->n_stages; i++) {
if(user_config->stages[i].array != NULL) {
free(user_config->stages[i].array);
user_config->stages[i].array = NULL;
}
if(user_config->stages[i].full_array != NULL) {
free(user_config->stages[i].full_array);
user_config->stages[i].full_array = NULL;
}
if(user_config->stages[i].double_array != NULL) {
free(user_config->stages[i].double_array);
user_config->stages[i].double_array = NULL;
}
if(user_config->stages[i].counts.counts != NULL) {
freeCounts(&(user_config->stages[i].counts));
}
}
free(user_config->iter_stage);
//free(user_config->stages); //FIXME ERROR de memoria relacionado con la carpeta malleability
free(user_config);
}
}
......@@ -193,14 +206,15 @@ void free_config(configuration *user_config) {
void print_config(configuration *user_config, int grp) {
if(user_config != NULL) {
int i;
printf("Config loaded: resizes=%d, stages=%d, matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d || grp=%d\n",
user_config->resizes, user_config->iter_stages, user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst, grp);
for(i=0; i<user_config->iter_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d\n",
i, user_config->iter_stage[i].pt, user_config->iter_stage[i].t_stage, user_config->iter_stage[i].bytes);
printf("Config loaded: resizes=%d, stages=%d, granularity=%d, sdr=%d, adr=%d, at=%d, sm=%d, ss=%d, latency=%lf, bw=%lf || grp=%d\n",
user_config->n_resizes, user_config->n_stages, user_config->granularity, user_config->sdr, user_config->adr,
user_config->at, user_config->sm, user_config->ss, user_config->latency_m, user_config->bw_m, grp);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d, Intercept=%lf, Slope=%lf\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].intercept, user_config->stages[i].slope);
}
for(i=0; i<user_config->resizes; i++) {
printf("Resize %d: Iters=%d, Procs=%d, Factors=%f, Phy=%d\n",
for(i=0; i<user_config->n_resizes; i++) {
printf("Resize %d: Iters=%d, Procs=%d, Factors=%f, Dist=%d\n",
i, user_config->iters[i], user_config->procs[i], user_config->factors[i], user_config->phy_dist[i]);
}
}
......@@ -219,15 +233,15 @@ void print_config_group(configuration *user_config, int grp) {
if(grp > 0) {
parents = user_config->procs[grp-1];
}
if(grp < user_config->resizes - 1) {
if(grp < user_config->n_resizes - 1) {
sons = user_config->procs[grp+1];
}
printf("Config: matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d\n",
user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst);
for(i=0; i<user_config->iter_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d\n",
i, user_config->iter_stage[i].pt, user_config->iter_stage[i].t_stage, user_config->iter_stage[i].bytes);
printf("Config: granularity=%d, sdr=%d, adr=%d, at=%d, sm=%d, ss=%d, latency=%lf, bw=%lf\n",
user_config->granularity, user_config->sdr, user_config->adr, user_config->at, user_config->sm, user_config->ss, user_config->latency_m, user_config->bw_m);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d, Intercept=%lf, Slope=%lf\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].intercept, user_config->stages[i].slope);
}
printf("Config Group: iters=%d, factor=%f, phy=%d, procs=%d, parents=%d, sons=%d\n",
user_config->iters[grp], user_config->factors[grp], user_config->phy_dist[grp], user_config->procs[grp], parents, sons);
......@@ -263,12 +277,12 @@ void send_config_file(configuration *config_file, int root, MPI_Comm intercomm)
// Obtener un tipo derivado para enviar las estructuras de fases de iteracion
// con una sola comunicacion
def_struct_iter_stage(&(config_file->iter_stage[0]), &iter_stage_type);
def_struct_iter_stage(&(config_file->stages[0]), config_file->n_stages, &iter_stage_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->iter_stage, config_file->iter_stages, iter_stage_type, root, intercomm);
MPI_Bcast(config_file->factors, config_file->n_resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->stages, config_file->n_stages, iter_stage_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&config_type);
......@@ -300,56 +314,55 @@ void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_
MPI_Bcast(config_file, 1, config_type, root, intercomm);
//Inicializado de estructuras internas
malloc_config_resizes(config_file, config_file->resizes); // Reserva de memoria de los vectores
config_file->iter_stage = malloc(sizeof(iter_stage_t) * config_file->iter_stages);
malloc_config_resizes(config_file, config_file->n_resizes); // Reserva de memoria de los vectores
config_file->stages = malloc(sizeof(iter_stage_t) * config_file->n_stages);
// Obtener un tipo derivado para enviar los tres vectores
// de enteros con una sola comunicacion
def_struct_config_file_array(config_file, &config_type_array);
def_struct_iter_stage(&(config_file->iter_stage[0]), &iter_stage_type);
def_struct_iter_stage(&(config_file->stages[0]), config_file->n_stages, &iter_stage_type);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->iter_stage, config_file->iter_stages, iter_stage_type, root, intercomm);
MPI_Bcast(config_file->factors, config_file->n_resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->stages, config_file->n_stages, iter_stage_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
MPI_Type_free(&iter_stage_type);
init_config_stages(config_file, config_file->iter_stages); // Inicializar a NULL vectores
init_config_stages(config_file, config_file->n_stages); // Inicializar a NULL vectores
*config_file_out = config_file;
}
/*
* Tipo derivado para enviar 12 elementos especificos
* Tipo derivado para enviar 11 elementos especificos
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 12;
int blocklengths[12] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
int i, counts = 11;
int blocklengths[11] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = types[7] = types[8] = types[9] = MPI_INT;
types[10] = types[11] = MPI_DOUBLE;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = types[7] = types[8] = MPI_INT;
types[9] = types[10] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(&(config_file->resizes), &displs[0]);
MPI_Get_address(&(config_file->iter_stages), &displs[1]);
MPI_Get_address(&(config_file->actual_resize), &displs[2]);
MPI_Get_address(&(config_file->matrix_tam), &displs[3]);
MPI_Get_address(&(config_file->comm_tam), &displs[4]);
MPI_Get_address(&(config_file->sdr), &displs[5]);
MPI_Get_address(&(config_file->adr), &displs[6]);
MPI_Get_address(&(config_file->aib), &displs[7]);
MPI_Get_address(&(config_file->css), &displs[8]);
MPI_Get_address(&(config_file->cst), &displs[9]);
MPI_Get_address(&(config_file->latency_m), &displs[10]);
MPI_Get_address(&(config_file->bw_m), &displs[11]);
MPI_Get_address(&(config_file->n_resizes), &displs[0]);
MPI_Get_address(&(config_file->n_stages), &displs[1]);
MPI_Get_address(&(config_file->actual_resize), &displs[2]); // TODO Refactor Es necesario enviarlo?
MPI_Get_address(&(config_file->granularity), &displs[3]);
MPI_Get_address(&(config_file->sdr), &displs[4]);
MPI_Get_address(&(config_file->adr), &displs[5]);
MPI_Get_address(&(config_file->at), &displs[6]);
MPI_Get_address(&(config_file->ss), &displs[7]);
MPI_Get_address(&(config_file->sm), &displs[8]);
MPI_Get_address(&(config_file->latency_m), &displs[9]);
MPI_Get_address(&(config_file->bw_m), &displs[10]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -371,7 +384,7 @@ void def_struct_config_file_array(configuration *config_file, MPI_Datatype *conf
types[0] = types[1] = types[2] = MPI_INT;
// Modificar blocklengths al valor adecuado
blocklengths[0] = blocklengths[1] = blocklengths[2] = config_file->resizes;
blocklengths[0] = blocklengths[1] = blocklengths[2] = config_file->n_resizes;
//Rellenar vector displs
MPI_Get_address(config_file, &dir);
......@@ -394,27 +407,33 @@ void def_struct_config_file_array(configuration *config_file, MPI_Datatype *conf
* Tipo derivado para enviar elementos especificos
* de la estructuras de fases de iteracion en una sola comunicacion.
*/
void def_struct_iter_stage(iter_stage_t *iter_stage, MPI_Datatype *config_type) {
void def_struct_iter_stage(iter_stage_t *stages, int n_stages, MPI_Datatype *config_type) {
int i, counts = 4;
int blocklengths[4] = {1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
MPI_Datatype aux, types[counts];
// Rellenar vector types
types[0] = MPI_INT;
types[0] = types[3] = MPI_INT;
types[1] = MPI_FLOAT;
types[2] = types[3] = MPI_DOUBLE;
types[2] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(iter_stage, &dir);
MPI_Get_address(stages, &dir);
MPI_Get_address(&(iter_stage->pt), &displs[0]);
MPI_Get_address(&(iter_stage->t_stage), &displs[1]);
MPI_Get_address(&(iter_stage->t_op), &displs[2]);
MPI_Get_address(&(iter_stage->bytes), &displs[3]);
MPI_Get_address(&(stages->pt), &displs[0]);
MPI_Get_address(&(stages->t_stage), &displs[1]);
MPI_Get_address(&(stages->t_op), &displs[2]);
MPI_Get_address(&(stages->bytes), &displs[3]);
for(i=0;i<counts;i++) displs[i] -= dir;
if (n_stages == 1) {
MPI_Type_create_struct(counts, blocklengths, displs, types, config_type);
} else { // Si hay mas de una fase(estructura), el "extent" se modifica.
MPI_Type_create_struct(counts, blocklengths, displs, types, &aux);
// Tipo derivado para enviar N elementos de la estructura
MPI_Type_create_resized(aux, 0, sizeof(iter_stage_t), config_type);
}
MPI_Type_commit(config_type);
}
#ifndef READ_INI_H
#define READ_INI_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "../malleability/distribution_methods/block_distribution.h"
typedef struct
{
......@@ -10,25 +14,34 @@ typedef struct
double t_op;
int operations;
int bytes, bytes_real;
int bytes, real_bytes, my_bytes;
// Variables to represent linear regresion
// for collective calls.
double slope, intercept;
// Arrays to communicate data;
char* array, *full_array;
double* double_array;
// Arrays to indicate how many bytes are received from each rank
//int *counts, *displs;
struct Counts counts;
} iter_stage_t;
typedef struct
{
int resizes, iter_stages;
int actual_resize, actual_iter;
int matrix_tam, comm_tam, sdr, adr;
int css, cst;
int aib;
int n_resizes, n_stages;
int actual_resize, actual_stage;
int granularity, sdr, adr;
int sm, ss;
int at;
double latency_m, bw_m;
int *iters, *procs, *phy_dist;
float *factors;
iter_stage_t *iter_stage;
iter_stage_t *stages;
} configuration;
......@@ -40,3 +53,5 @@ void print_config_group(configuration *user_config, int grp);
// MPI Intercomm functions
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm);
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out);
#endif
......@@ -60,13 +60,13 @@ void recv_results(results_data *results, int root, int resizes, MPI_Comm interco
* En concreto son tres escalares y dos vectores de tamaño "resizes"
*/
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type) {
int i, counts = 5;
int blocklengths[] = {1, 1, 1, 1, 1};
int i, counts = 6;
int blocklengths[] = {1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = MPI_DOUBLE;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_DOUBLE;
blocklengths[3] = blocklengths[4] = resizes;
// Rellenar vector displs
......@@ -75,8 +75,9 @@ void def_results_type(results_data *results, int resizes, MPI_Datatype *results_
MPI_Get_address(&(results->sync_start), &displs[0]);
MPI_Get_address(&(results->async_start), &displs[1]);
MPI_Get_address(&(results->exec_start), &displs[2]);
MPI_Get_address(&(results->spawn_real_time[0]), &displs[3]);
MPI_Get_address(&(results->spawn_time[0]), &displs[4]); //TODO Revisar si se puede simplificar //FIXME Si hay mas de un spawn error?
MPI_Get_address(&(results->wasted_time), &displs[3]);
MPI_Get_address(&(results->spawn_real_time[0]), &displs[4]);
MPI_Get_address(&(results->spawn_time[0]), &displs[5]); //TODO Revisar si se puede simplificar //FIXME Si hay mas de un spawn error?
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -201,12 +202,12 @@ void print_global_results(results_data results, int resizes) {
* de los vectores de resultados.
*/
void init_results_data(results_data *results, int resizes, int iters_size) {
//*results = malloc(1 * sizeof(results_data)); FIXME Borrar
results->spawn_time = calloc(resizes, sizeof(double));
results->spawn_real_time = calloc(resizes, sizeof(double));
results->sync_time = calloc(resizes, sizeof(double));
results->async_time = calloc(resizes, sizeof(double));
results->wasted_time = 0;
results->iters_size = iters_size + 100;
results->iters_time = calloc(iters_size + 100, sizeof(double)); //FIXME Numero magico
......@@ -217,7 +218,6 @@ void init_results_data(results_data *results, int resizes, int iters_size) {
void realloc_results_iters(results_data *results, int needed) {
double *time_aux;
time_aux = (double *) realloc(results->iters_time, needed * sizeof(double));
if(time_aux == NULL) {
......@@ -230,7 +230,6 @@ void realloc_results_iters(results_data *results, int needed) {
/*
* Libera toda la memoria asociada con una estructura de resultados.
* TODO Asegurar que ha sido inicializado?
*/
void free_results_data(results_data *results) {
if(results != NULL) {
......@@ -241,5 +240,4 @@ void free_results_data(results_data *results) {
free(results->iters_time);
}
//free(*results); FIXME Borrar
}
#ifndef RESULTS_H
#define RESULTS_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
......@@ -14,7 +17,7 @@ typedef struct {
double sync_start, sync_end, *sync_time;
double async_start, async_end, *async_time;
double exec_start, exec_time;
//Overcharge time is time spent in malleability that is from IO modules
double wasted_time; // Time spent recalculating iter stages
} results_data;
void send_results(results_data *results, int root, int resizes, MPI_Comm intercomm);
......@@ -30,3 +33,5 @@ void print_global_results(results_data results, int resizes);
void init_results_data(results_data *results, int resizes, int iters_size);
void realloc_results_iters(results_data *results, int needed);
void free_results_data(results_data *results);
#endif
......@@ -6,6 +6,8 @@
#include <sys/stat.h>
#include "process_stage.h"
#include "Main_datatypes.h"
#include "../IOcodes/read_ini.h"
#include "../IOcodes/results.h"
#include "../malleability/CommDist.h"
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
......@@ -110,14 +112,14 @@ int main(int argc, char *argv[]) {
do {
group->grp = group->grp + 1;
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
if(group->grp != 0) obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
set_benchmark_grp(group->grp);
get_malleability_user_comm(&comm);
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
if(config_file->resizes != group->grp + 1) {
set_malleability_configuration(config_file->cst, config_file->css, config_file->phy_dist[group->grp+1], -1, config_file->aib, -1);
if(config_file->n_resizes != group->grp + 1) {
set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], -1, config_file->at, -1);
set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED
if(group->grp == 0) {
......@@ -136,7 +138,7 @@ int main(int argc, char *argv[]) {
print_local_results();
reset_results_index(results);
} while((config_file->resizes > group->grp + 1) && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD));
} while((config_file->n_resizes > group->grp + 1) && (config_file->sm == COMM_SPAWN_MERGE || config_file->sm == COMM_SPAWN_MERGE_PTHREAD));
//
// TERMINA LA EJECUCION ----------------------------------------------------------
......@@ -145,7 +147,7 @@ int main(int argc, char *argv[]) {
if(res==1) { // Se ha llegado al final de la aplicacion
MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
results->exec_time = MPI_Wtime() - results->exec_start;
results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
}
print_final_results(); // Pasado este punto ya no pueden escribir los procesos
......@@ -153,7 +155,7 @@ int main(int argc, char *argv[]) {
MPI_Comm_free(&comm);
}
if(group->myId == ROOT && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD)) {
if(group->myId == ROOT && (config_file->sm == COMM_SPAWN_MERGE || config_file->sm == COMM_SPAWN_MERGE_PTHREAD)) {
MPI_Abort(MPI_COMM_WORLD, -100);
}
free_application_data();
......@@ -183,21 +185,20 @@ int work() {
double *matrix = NULL;
maxiter = config_file->iters[group->grp];
//initMatrix(&matrix, config_file->matrix_tam);
state = MAL_NOT_STARTED;
res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) {
iterate(matrix, config_file->matrix_tam, state, iter);
iterate(matrix, config_file->granularity, state, iter);
}
if(config_file->resizes != group->grp + 1)
if(config_file->n_resizes != group->grp + 1)
state = malleability_checkpoint();
iter = 0;
while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) {
if(iter < config_file->iters[group->grp+1]) {
iterate(matrix, config_file->matrix_tam, state, iter);
iterate(matrix, config_file->granularity, state, iter);
iter++;
group->iter_start = iter;
}
......@@ -205,7 +206,7 @@ int work() {
}
if(config_file->resizes - 1 == group->grp) res=1;
if(config_file->n_resizes - 1 == group->grp) res=1;
if(state == MAL_ZOMBIE) res=state;
return res;
}
......@@ -229,8 +230,8 @@ double iterate(double *matrix, int n, int async_comm, int iter) {
start_time = MPI_Wtime();
for(i=0; i < config_file->iter_stages; i++) {
aux+= process_stage((void*)config_file, i, (void*)group, comm);
for(i=0; i < config_file->n_stages; i++) {
aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
}
actual_time = MPI_Wtime(); // Guardar tiempos
......@@ -311,7 +312,7 @@ int print_final_results() {
if(group->myId == ROOT) {
if(group->grp == config_file->resizes -1) {
if(group->grp == config_file->n_resizes -1) {
file_name = NULL;
file_name = malloc(20 * sizeof(char));
if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
......@@ -321,7 +322,7 @@ int print_final_results() {
ptr_out = dup(1);
create_out_file(file_name, &ptr_global, 1);
print_config(config_file, group->grp);
print_global_results(*results, config_file->resizes);
print_global_results(*results, config_file->n_resizes);
fflush(stdout);
free(file_name);
......@@ -365,7 +366,7 @@ void init_application() {
config_file = read_ini_file(group->argv[1]);
results = malloc(sizeof(results_data));
init_results_data(results, config_file->resizes, config_file->iters[group->grp]);
init_results_data(results, config_file->n_resizes, config_file->iters[group->grp]);
if(config_file->sdr) {
malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
}
......@@ -374,19 +375,33 @@ void init_application() {
}
int message_tam = 100000000;
message_tam = 10240000;
//for(int i=0; i<10; i++) {
config_file->latency_m = latency(group->myId, group->numP, comm);
config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
//if(group->myId == ROOT) printf("numP=%d Lat=%lf Bw=%lf\n", group->numP, config_file->latency_m, config_file->bw_m);
//}
obtain_op_times(1);
}
/*
* Obtiene cuanto tiempo es necesario para realizar una operacion de PI
*
* Si compute esta a 1 se considera que se esta inicializando el entorno
* y realizará trabajo extra.
*
* Si compute esta a 0 se considera un entorno inicializado y solo hay que
* realizar algunos cambios de reserva de memoria. Si es necesario recalcular
* algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
* al tiempo total de ejecucion.
*/
void obtain_op_times(int compute) {
int i;
for(i=0; i<config_file->iter_stages; i++) {
init_stage((void*)config_file, i, (void*)group, comm, compute);
double time = 0;
for(i=0; i<config_file->n_stages; i++) {
time+=init_stage(config_file, i, *group, comm, compute);
}
if(!compute) results->wasted_time += time;
}
/*
......
#ifndef MAIN_DATATYPES_H
#define MAIN_DATATYPES_H
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
......@@ -19,3 +22,5 @@ typedef struct {
char **argv;
char *sync_array, *async_array;
} group_data;
#endif
objects1 := computing_func comunication_func linear_reg
objects2 := process_stage
objects3 := Main
depends := Main_datatypes
CC := gcc
MCC := mpicc
CFLAGS := -Wall
all: $(objects1) $(objects2) $(objects3)
$(objects1): %: %.c %.h $(depends).h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
$(objects2): %: %.c %.h $(objects1).h $(depends).h $(TOP)/IOcodes/read_ini.h \
$(TOP)/malleability/distribution_methods/block_distribution.h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
$(objects3): %: %.c $(objects2).h $(depends).h $(TOP)/IOcodes/read_ini.h $(TOP)/IOcodes/results.h \
$(TOP)/malleability/CommDist.h $(TOP)/malleability/malleabilityStates.h \
$(TOP)/malleability/malleabilityManager.h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
#ifndef COMPUTING_FUNC_H
#define COMPUTING_FUNC_H
double computeMatrix(double *matrix, int n);
double computePiSerial(int n);
void initMatrix(double **matrix, int n);
void freeMatrix(double **matrix);
#endif
#ifndef COMUNICATION_FUNC_H
#define COMUNICATION_FUNC_H
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, int qty);
#endif
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <mpi.h>
#include "Main_datatypes.h"
#include "../malleability/distribution_methods/block_distribution.h"
#include "linear_reg.h"
// Array for linear regression computation
// Cantidades 10b 100b 1Kb 5Kb 10Kb 50Kb 100Kb 500Kb 1Mb 10Mb 100Mb
double LR_bytes_array[LR_ARRAY_TAM] = {10, 100, 1000, 5000,10000, 50000,100000, 500000, 1000000, 10000000, 100000000};
// Linear regression
// Y = a +bX
// Bytes = a +b(Time)
//
// X is the independent variable, which correlates to the Communication time
// Y is the dependent variable, which correlates to the number of bytes
//
void lr_avg_plus_diff(int tam, double *array, double *avg, double *diffs);
/*
* Computes and returns the related Y value to a given linear regression
*/
void lr_calc_Y(double slope, double intercept, double x_value, int *y_result) {
*y_result = (int) ceil(intercept + slope * x_value);
}
/*
* Computes the slope and intercept for a given array of times
* so users can calculate the number of bytes for a given time.
*
*/
void lr_compute(int tam, double *bytes, double *times, double *slope, double *intercept) {
int i;
double avgX, avgY;
double *diffsX, *diffsY;
double SSxx, SSxy;
diffsX = malloc(tam * sizeof(double));
diffsY = malloc(tam * sizeof(double));
SSxx = SSxy = 0;
lr_avg_plus_diff(tam, times, &avgX, diffsX);
lr_avg_plus_diff(tam, bytes, &avgY, diffsY);
for(i=0; i<tam; i++) {
SSxx+= diffsX[i]*diffsX[i];
SSxy+= diffsX[i]*diffsY[i];
}
*slope = SSxy / SSxx;
*intercept = avgY - (*slope * avgX);
free(diffsX);
free(diffsY);
}
/*
* Computes the average of an arrray and
* the difference of each element in respect to the average.
*
* Returns the average and an the difference of each element.
*/
void lr_avg_plus_diff(int tam, double *array, double *avg, double *diffs) {
int i;
double sum = 0;
for(i=0; i<tam; i++) {
sum+= array[i];
}
*avg = sum / tam;
for(i=0; i<tam; i++) {
diffs[i]= *avg - array[i];
}
}
//======================================================||
//======================================================||
//==================TIMES COMPUTATION===================||
//======================================================||
//======================================================||
/*
* Obtains an array of times to perform a "Broadcast"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
double start_time;
char *aux = NULL;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
aux = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Bcast(aux, n, MPI_CHAR, root, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
free(aux);
aux = NULL;
}
}
/*
* Obtains an array of times to perform an "Allgatherv"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_allgatherv(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
double start_time;
char *aux = NULL, *aux_full = NULL;
struct Dist_data dist_data;
struct Counts counts;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
prepare_comm_allgatherv(numP, n, &counts);
get_block_dist(n, myId, numP, &dist_data);
aux = malloc(dist_data.tamBl * sizeof(char));
aux_full = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Allgatherv(aux, dist_data.tamBl, MPI_CHAR, aux_full, counts.counts, counts.displs, MPI_CHAR, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
freeCounts(&counts);
free(aux);
free(aux_full);
aux_full = NULL;
aux = NULL;
}
}
/*
* Obtains an array of times to perform an "Reduce"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_reduce(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
double start_time;
char *aux = NULL, *aux_full = NULL;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
aux = malloc(n * sizeof(char));
aux_full = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Reduce(aux, aux_full, n, MPI_CHAR, MPI_MAX, root, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
free(aux);
free(aux_full);
aux_full = NULL;
aux = NULL;
}
}
/*
* Obtains an array of times to perform an "Allreduce"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_allreduce(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
double start_time;
char *aux = NULL, *aux_full = NULL;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
aux = malloc(n * sizeof(char));
aux_full = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Allreduce(aux, aux_full, n, MPI_CHAR, MPI_MAX, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
free(aux);
free(aux_full);
aux_full = NULL;
aux = NULL;
}
}
#ifndef LINEAR_REG_H
#define LINEAR_REG_H
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
/*----------LINEAR REGRESSION TYPES--------------*/
#define LR_ARRAY_TAM 11
// Array for linear regression computation
extern double LR_bytes_array[LR_ARRAY_TAM];
void lr_calc_Y(double slope, double intercept, double x_value, int *y_result);
void lr_compute(int loop_iters, double *bytes, double *times, double *slope, double *intercept);
void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
void lr_times_allgatherv(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
void lr_times_reduce(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
void lr_times_allreduce(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
#endif
......@@ -4,9 +4,21 @@
#include <mpi.h>
#include "computing_func.h"
#include "comunication_func.h"
#include "linear_reg.h"
#include "Main_datatypes.h"
#include "process_stage.h"
#include "../malleability/malleabilityManager.h" //FIXME Refactor
//#include "../malleability/malleabilityManager.h" //FIXME Refactor
#include "../malleability/distribution_methods/block_distribution.h"
void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm);
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
/*
* Calcula el tiempo por operacion o total de bytes a enviar
......@@ -23,68 +35,39 @@
* la bandera.
*
* TODO Que el trabajo se divida entre los procesos.
* TODO No tiene en cuenta cambios entre maquinas heterogeneas.
*/
void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm, int compute) {
double result, start_time, t_stage;
double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute) {
double result = 0;
int qty = 20000;
group_data group = *((group_data *) group_void);
configuration config_file = *((configuration *) config_file_void);
config_file.iter_stage[stage].operations = qty;
t_stage = config_file.iter_stage[stage].t_stage * config_file.factors[group.grp];
iter_stage_t *stage = &(config_file->stages[stage_i]);
stage->operations = qty;
if(config_file.iter_stage[stage].bytes == 0) {
config_file.iter_stage[stage].bytes = (t_stage - config_file.latency_m) * config_file.bw_m;
} else {
//config_file.iter_stage[stage].bytes = config_file.iter_stage[stage].bytes;
}
start_time = MPI_Wtime();
result = 0;
switch(config_file.iter_stage[stage].pt) {
switch(stage->pt) {
//Computo
case COMP_MATRIX:
initMatrix(&(config_file.iter_stage[stage].double_array), config_file.matrix_tam);
result = init_matrix_pt(group, config_file, stage, comm, compute);
case COMP_PI:
if(group.myId == ROOT && compute) {
result+= process_stage(config_file_void, stage, group_void, comm);
}
result = init_pi_pt(group, config_file, stage, comm, compute);
break;
//Comunicación
case COMP_POINT:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
init_comm_ptop_pt(group, config_file, stage, comm);
break;
case COMP_BCAST:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
break;
case COMP_ALLTOALL:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
if(config_file.iter_stage[stage].full_array != NULL)
free(config_file.iter_stage[stage].full_array);
config_file.iter_stage[stage].full_array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes * group.numP);
result = init_comm_bcast_pt(group, config_file, stage, comm);
break;
case COMP_ALLGATHER:
result = init_comm_allgatherv_pt(group, config_file, stage, comm);
break;
case COMP_REDUCE:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
//Full array para el reduce necesita el mismo tamanyo
if(config_file.iter_stage[stage].full_array != NULL)
free(config_file.iter_stage[stage].full_array);
config_file.iter_stage[stage].full_array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
case COMP_ALLREDUCE:
result = init_comm_reduce_pt(group, config_file, stage, comm);
break;
}
if(compute) {
config_file.iter_stage[stage].t_op = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion
MPI_Bcast(&(config_file.iter_stage[stage].t_op), 1, MPI_DOUBLE, ROOT, comm);
}
config_file.iter_stage[stage].operations = t_stage / config_file.iter_stage[stage].t_op;
return result;
}
/*
......@@ -92,37 +75,37 @@ void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm co
* de operacion a realizar y llamando a la funcion que
* realizara la operacion.
*/
double process_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm) {
double process_stage(configuration config_file, iter_stage_t stage, group_data group, MPI_Comm comm) {
int i;
double result;
group_data group = *((group_data *) group_void);
configuration config_file = *((configuration *) config_file_void);
iter_stage_t stage_data = config_file.iter_stage[stage];
switch(stage_data.pt) {
switch(stage.pt) {
//Computo
case COMP_PI:
for(i=0; i < stage_data.operations; i++) {
result += computePiSerial(config_file.matrix_tam);
for(i=0; i < stage.operations; i++) {
result += computePiSerial(config_file.granularity);
}
break;
case COMP_MATRIX:
for(i=0; i < stage_data.operations; i++) {
result += computeMatrix(stage_data.double_array, config_file.matrix_tam); //FIXME No da tiempos repetibles
for(i=0; i < stage.operations; i++) {
result += computeMatrix(stage.double_array, config_file.granularity); //FIXME No da tiempos repetibles
}
break;
//Comunicaciones
case COMP_POINT:
point_to_point(group.myId, group.numP, ROOT, comm, stage_data.array, stage_data.bytes);
point_to_point(group.myId, group.numP, ROOT, comm, stage.array, stage.real_bytes);
break;
case COMP_BCAST:
MPI_Bcast(stage_data.array, stage_data.bytes, MPI_CHAR, ROOT, comm);
MPI_Bcast(stage.array, stage.real_bytes, MPI_CHAR, ROOT, comm);
break;
case COMP_ALLTOALL:
MPI_Alltoall(stage_data.array, stage_data.bytes, MPI_CHAR, stage_data.full_array, stage_data.bytes, MPI_CHAR, comm);
case COMP_ALLGATHER:
MPI_Allgatherv(stage.array, stage.my_bytes, MPI_CHAR, stage.full_array, stage.counts.counts, stage.counts.displs, MPI_CHAR, comm);
break;
case COMP_REDUCE:
MPI_Reduce(stage_data.array, stage_data.full_array, stage_data.bytes, MPI_CHAR, MPI_MAX, ROOT, comm);
MPI_Reduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, ROOT, comm);
break;
case COMP_ALLREDUCE:
MPI_Allreduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, comm);
break;
}
return result;
......@@ -139,36 +122,30 @@ double process_stage(void *config_file_void, int stage, void *group_void, MPI_Co
// Devuelve la latencia del sistema.
double latency(int myId, int numP, MPI_Comm comm) {
int i, loop_count = 100;
double start_time, stop_time, elapsed_time, max_time;
double start_time, stop_time, time;
char aux;
aux = '0';
elapsed_time = 0;
if(myId+1 != numP || (myId+1 == numP && numP % 2 == 0)) {
for(i=0; i<loop_count; i++){
MPI_Barrier(comm);
start_time = MPI_Wtime();
if(myId % 2 == 0){
MPI_Ssend(&aux, 1, MPI_CHAR, myId+1, 99, comm);
MPI_Recv(&aux, 1, MPI_CHAR, myId+1, 99, comm, MPI_STATUS_IGNORE);
if(myId == 0) {
for(i=0; i<loop_count; i++){
MPI_Send(&aux, 0, MPI_CHAR, numP-1, 99, comm);
}
MPI_Recv(&aux, 0, MPI_CHAR, numP-1, 99, comm, MPI_STATUS_IGNORE);
} else if(myId+1 == numP) {
for(i=0; i<loop_count; i++){
MPI_Recv(&aux, 0, MPI_CHAR, 0, 99, comm, MPI_STATUS_IGNORE);
}
else if(myId % 2 == 1){
MPI_Recv(&aux, 1, MPI_CHAR, myId-1, 99, comm, MPI_STATUS_IGNORE);
MPI_Ssend(&aux, 1, MPI_CHAR, myId-1, 99, comm);
MPI_Send(&aux, 0, MPI_CHAR, 0, 99, comm);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time += stop_time - start_time;
}
}
time = (stop_time - start_time) / loop_count;
if(myId %2 == 0) {
elapsed_time/=loop_count;
}
MPI_Allreduce(&elapsed_time, &max_time, 1, MPI_DOUBLE, MPI_MAX, comm);
return max_time;
MPI_Bcast(&time, 1, MPI_DOUBLE, ROOT, comm);
return time;
}
......@@ -180,36 +157,244 @@ double latency(int myId, int numP, MPI_Comm comm) {
// Devuelve el tiempo necesario para realizar las pruebas
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n) {
int i, loop_count = 100, n_bytes;
double start_time, stop_time, elapsed_time, bw, time, max_time;
double start_time, stop_time, bw, time;
char *aux;
n_bytes = n * sizeof(char);
aux = malloc(n_bytes);
elapsed_time = 0;
time = 0;
if(myId+1 != numP || (myId+1 == numP && numP % 2 == 0)) {
for(i=0; i<loop_count; i++){
MPI_Barrier(comm);
start_time = MPI_Wtime();
if(myId %2 == 0){
MPI_Ssend(aux, n, MPI_CHAR, myId+1, 99, comm);
MPI_Recv(aux, n, MPI_CHAR, myId+1, 99, comm, MPI_STATUS_IGNORE);
if(myId == 0) {
for(i=0; i<loop_count; i++){
MPI_Send(aux, n, MPI_CHAR, numP-1, 99, comm);
}
MPI_Recv(aux, 0, MPI_CHAR, numP-1, 99, comm, MPI_STATUS_IGNORE);
} else if(myId+1 == numP) {
for(i=0; i<loop_count; i++){
MPI_Recv(aux, n, MPI_CHAR, 0, 99, comm, MPI_STATUS_IGNORE);
}
else if(myId %2 == 1){
MPI_Recv(aux, n, MPI_CHAR, myId-1, 99, comm, MPI_STATUS_IGNORE);
MPI_Ssend(aux, n, MPI_CHAR, myId-1, 99, comm);
MPI_Send(aux, 0, MPI_CHAR, 0, 99, comm);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time += stop_time - start_time;
time = (stop_time - start_time) / loop_count;
bw = ((double)n_bytes) / (time - latency);
MPI_Bcast(&bw, 1, MPI_DOUBLE, ROOT, comm);
free(aux);
return bw;
}
/*
* Creates a linear regression model to predict
* the number of bytes needed to perform a collective
* communication.
*/
void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm) {
int i, j, tam, loop_iters = 100;
tam = LR_ARRAY_TAM * loop_iters;
double *bytes = malloc(tam * sizeof(double));
double *times = malloc(tam * sizeof(double));
for(i=0; i<LR_ARRAY_TAM; i++) {
for(j=0; j<loop_iters; j++) {
bytes[i*loop_iters + j] = LR_bytes_array[i];
}
}
if(myId %2 == 0) {
time = elapsed_time / loop_count - latency;
switch(stage->pt) {
//Comunicaciones
case COMP_BCAST:
lr_times_bcast(group.myId, group.numP, ROOT, comm, loop_iters, times);
break;
case COMP_ALLGATHER:
lr_times_allgatherv(group.myId, group.numP, ROOT, comm, loop_iters, times);
break;
case COMP_REDUCE:
lr_times_reduce(group.myId, group.numP, ROOT, comm, loop_iters, times);
break;
case COMP_ALLREDUCE:
lr_times_allreduce(group.myId, group.numP, ROOT, comm, loop_iters, times);
break;
default:
return;
break;
}
MPI_Allreduce(&time, &max_time, 1, MPI_DOUBLE, MPI_MAX, comm);
bw = ((double)n_bytes * 2) / max_time;
return bw;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, times, LR_ARRAY_TAM * loop_iters, MPI_DOUBLE, MPI_MAX, ROOT, comm);
/*
printf("PT=%d ", stage->pt);
for(i=0; i<tam; i++) {
printf("%lf, ", times[i]);
}
printf("\n");
printf("BYTES ");
for(i=0; i<tam; i++) {
printf("%lf, ", bytes[i]);
}
printf("\n");
*/
lr_compute(tam, bytes, times, &(stage->slope), &(stage->intercept));
} else {
MPI_Reduce(times, NULL, LR_ARRAY_TAM * loop_iters, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
MPI_Bcast(&(stage->slope), 1, MPI_DOUBLE, ROOT, comm);
MPI_Bcast(&(stage->intercept), 1, MPI_DOUBLE, ROOT, comm);
free(times);
free(bytes);
}
/*
* ========================================================================================
* ========================================================================================
* =================================INIT STAGE FUNCTIONS===================================
* ========================================================================================
* ========================================================================================
*/
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double result, t_stage;
result = 0;
t_stage = stage->t_stage * config_file->factors[group.grp];
initMatrix(&(stage->double_array), config_file->granularity);
double start_time = MPI_Wtime();
if(group.myId == ROOT && compute) {
result+= process_stage(*config_file, *stage, group, comm);
}
if(compute) {
stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage->operations = t_stage / stage->t_op;
return result;
}
double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double result, t_stage, start_time;
result = 0;
t_stage = stage->t_stage * config_file->factors[group.grp];
start_time = MPI_Wtime();
if(group.myId == ROOT && compute) {
result+= process_stage(*config_file, *stage, group, comm);
}
if(compute) {
stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage->operations = t_stage / stage->t_op;
return result;
}
void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
struct Dist_data dist_data;
if(stage->array != NULL)
free(stage->array);
if(stage->bytes == 0) {
stage->bytes = (stage->t_stage - config_file->latency_m) * config_file->bw_m;
}
get_block_dist(stage->bytes, group.myId, group.numP, &dist_data);
stage->real_bytes = dist_data.tamBl;
stage->array = malloc(sizeof(char) * stage->real_bytes);
}
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
}
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->real_bytes);
return time;
}
double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
struct Dist_data dist_data;
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
}
if(stage->counts.counts != NULL)
freeCounts(&(stage->counts));
prepare_comm_allgatherv(group.numP, stage->real_bytes, &(stage->counts));
get_block_dist(stage->real_bytes, group.myId, group.numP, &dist_data);
stage->my_bytes = dist_data.tamBl;
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->my_bytes);
if(stage->full_array != NULL)
free(stage->full_array);
stage->full_array = malloc(sizeof(char) * stage->real_bytes);
return time;
}
double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
}
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->real_bytes);
//Full array para el reduce necesita el mismo tamanyo
if(stage->full_array != NULL)
free(stage->full_array);
stage->full_array = malloc(sizeof(char) * stage->real_bytes);
return time;
}
#ifndef PROCESS_STAGE_H
#define PROCESS_STAGE_H
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
//#include "Main_datatypes.h"
#include "Main_datatypes.h"
//#include "../malleability/malleabilityManager.h" //FIXME Refactor
#include "../IOcodes/read_ini.h"
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLTOALL, COMP_REDUCE};
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLGATHER, COMP_REDUCE, COMP_ALLREDUCE};
//FIXME Refactor el void
void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm, int compute);
double process_stage(void *config_file_void, int stage, void* group_void, MPI_Comm comm);
double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute);
double process_stage(configuration config_file, iter_stage_t stage, group_data group, MPI_Comm comm);
double latency(int myId, int numP, MPI_Comm comm);
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n);
void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm);
#endif
export TOP := $(dir $(CURDIR)/$(word $(words $(MAKEFILE_LIST)),$(MAKEFILE_LIST)))
BUILD := build
export BUILDDIR = $(addprefix $(TOP),$(BUILD))
SUBDIRS := IOcodes Main malleability
.PHONY: subdirs $(SUBDIRS) build all
#
#
#
#
CC := gcc
MCC := mpicc
CFLAGS := -Wall
LIBFLAGS := -lm -lslurm -pthread
#
#
#
#
all: subdirs exec
exec: subdirs
$(MCC) $(CFLAGS) -o test.out $(wildcard $(BUILDDIR)/*.o) $(LIBFLAGS)
subdirs: $(SUBDIRS)
$(SUBDIRS): | $(BUILD)
$(MAKE) -C $@
# Orden de compilacion para las carpetas
# Carpeta en la que almacenar los compilados
$(BUILD):
mkdir -p $(BUILD)
clean:
-rm -rf $(BUILD)
#mpicc -Wall Main/Main.c Main/computing_func.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
mpicc -Wall Main/Main.c Main/computing_func.c Main/comunication_func.c Main/process_stage.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
mpicc -Wall Main/Main.c Main/computing_func.c Main/comunication_func.c Main/linear_reg.c Main/process_stage.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c malleability/distribution_methods/block_distribution.c -pthread -lslurm -lm
if [ $# -gt 0 ]
then
......
......@@ -2,8 +2,10 @@
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include "distribution_methods/block_distribution.h"
#include "CommDist.h"
/*
struct Dist_data {
int ini; //Primer elemento a enviar
int fin; //Ultimo elemento a enviar
......@@ -21,6 +23,7 @@ struct Counts {
int *displs;
int *zero_arr;
};
*/
void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts);
......@@ -35,10 +38,11 @@ void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root,
void get_dist(int qty, int id, int numP, struct Dist_data *dist_data);
void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
/*
void mallocCounts(struct Counts *counts, int numP);
void freeCounts(struct Counts *counts);
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, const char* name);
*/
/*
* Reserva memoria para un vector de hasta "qty" elementos.
......@@ -507,51 +511,3 @@ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
(*idS)[0] = idI;
(*idS)[1] = idE;
}
/*
* Reserva memoria para los vectores de counts/displs de la funcion
* MPI_Alltoallv. Todos los vectores tienen un tamaño de numP, que es la
* cantidad de procesos en el otro grupo de procesos.
*
* El vector counts indica cuantos elementos se comunican desde este proceso
* al proceso "i" del otro grupo.
*
* El vector displs indica los desplazamientos necesarios para cada comunicacion
* con el proceso "i" del otro grupo.
*
* El vector zero_arr se utiliza cuando se quiere indicar un vector incializado
* a 0 en todos sus elementos. Sirve para indicar que no hay comunicacion.
*/
void mallocCounts(struct Counts *counts, int numP) {
counts->counts = calloc(numP, sizeof(int));
if(counts->counts == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->displs = calloc(numP, sizeof(int));
if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->zero_arr = calloc(numP, sizeof(int));
if(counts->zero_arr == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
}
/*
* Libera la memoria interna de una estructura Counts.
*
* No libera la memoria de la estructura counts si se ha alojado
* de forma dinamica.
*/
void freeCounts(struct Counts *counts) {
free(counts->counts);
free(counts->displs);
free(counts->zero_arr);
}
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, const char* name) {
int i;
for(i=0; i < size; i++) {
//if(xcounts[i] != 0) {
printf("P%d of %d | %scounts[%d]=%d disp=%d\n", data_dist.myId, data_dist.numP, name, i, xcounts[i], xdispls[i]);
//}
}
}
#ifndef COMMDIST_H
#define COMMDIST_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
......@@ -22,3 +25,4 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in
void malloc_comm_array(char **array, int qty, int myId, int numP);
#endif
dir_targets := distribution_methods
objects1 := CommDist
objects2 := malleabilityTypes malleabilityZombies ProcessDist
objects3 := malleabilityManager
depends := $(addsuffix .h, malleabilityDataStructures malleabilityStates)
CC := gcc
MCC := mpicc
CFLAGS := -Wall
.PHONY: $(dir_targets) subdir
all: subdir $(objects1) $(objects2) $(objects3)
subdir: $(dir_targets)
$(dir_targets): %:
$(MAKE) -C $@
$(objects1): %: %.c %.h $(depends) $(dir_targets)/block_distribution.h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
$(objects2): %: %.c %.h $(depends) $(TOP)/IOcodes/results.h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
$(objects3): %: %.c %.h $(objects1).h $(objects2).h $(depends) \
$(TOP)/IOcodes/read_ini.h $(TOP)/IOcodes/results.h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
#$(objects1) $(objects2) $(objects3)
# CommDist.c
# malleabilityTypes.c malleabilityZombies.c ProcessDist.c
#
# malleabilityManager.c
# malleabilityDataStructures.h malleabilityStates.h
#ifndef PROCESS_DIST_H
#define PROCESS_DIST_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
......@@ -13,3 +16,5 @@ void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm);
void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group);
void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId);
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment