Commit b5d18427 authored by iker_martin's avatar iker_martin
Browse files

Refactor de código para eliminar warnings. Moviendose Config a dos ficheros. WIP

parent 7836c7b6
#Ignore ini files
*.ini
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "read_ini.h"
#include "../malleability/spawn_methods/ProcessDist.h"
#include "../malleability/distribution_methods/block_distribution.h"
#include "ini.h"
#include "../malleability/spawn_methods/ProcessDist.h"
void malloc_config_resizes(configuration *user_config, int resizes);
void init_config_stages(configuration *user_config);
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type);
void def_struct_config_file_array(configuration *config_file, MPI_Datatype *config_type);
void def_struct_iter_stage(iter_stage_t *stages, int n_stages, MPI_Datatype *config_type);
ext_functions_t *user_functions;
/*
* Funcion utilizada para leer el fichero de configuracion
......@@ -32,36 +26,38 @@ static int handler(void* user, const char* section, const char* name,
snprintf(stage_name, 10, "stage%d", pconfig->actual_stage);
#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0
if (MATCH("general", "R")) {
if (MATCH("general", "Total_Resizes")) {
pconfig->n_resizes = atoi(value) + 1;
malloc_config_resizes(pconfig, pconfig->n_resizes);
} else if (MATCH("general", "S")) {
//malloc_config_resizes(pconfig); //FIXME Unknown
user_functions->resizes_f(pconfig);
} else if (MATCH("general", "Total_Stages")) {
pconfig->n_stages = atoi(value);
pconfig->stages = malloc(sizeof(iter_stage_t) * pconfig->n_stages);
init_config_stages(pconfig);
pconfig->stages = malloc(sizeof(iter_stage_t) * (size_t) pconfig->n_stages);
//init_config_stages(pconfig); //FIXME Unkown
user_functions->stages_f(pconfig);
} else if (MATCH("general", "Granularity")) {
pconfig->granularity = atoi(value);
} else if (MATCH("general", "SDR")) {
} else if (MATCH("general", "SDR")) { // TODO Refactor a nombre manual
pconfig->sdr = atoi(value);
} else if (MATCH("general", "ADR")) {
} else if (MATCH("general", "ADR")) { // TODO Refactor a nombre manual
pconfig->adr = atoi(value);
} else if (MATCH("general", "AT")) {
} else if (MATCH("general", "Asynch_Redistribution_Type")) {
pconfig->at = atoi(value);
} else if (MATCH("general", "SM")) {
} else if (MATCH("general", "Spawn_Method")) {
pconfig->sm = atoi(value);
} else if (MATCH("general", "SS")) {
} else if (MATCH("general", "Spawn_Strategy")) {
pconfig->ss = atoi(value);
// Iter stage
} else if (MATCH(stage_name, "PT")) {
} else if (MATCH(stage_name, "Stage_Type")) {
if(pconfig->actual_stage < pconfig->n_stages)
pconfig->stages[pconfig->actual_stage].pt = atoi(value);
} else if (MATCH(stage_name, "bytes")) {
} else if (MATCH(stage_name, "Stage_bytes")) {
if(pconfig->actual_stage < pconfig->n_stages)
pconfig->stages[pconfig->actual_stage].bytes = atoi(value);
} else if (MATCH(stage_name, "t_stage")) {
} else if (MATCH(stage_name, "Stage_time")) {
if(pconfig->actual_stage < pconfig->n_stages) {
pconfig->stages[pconfig->actual_stage].t_stage = atof(value);
pconfig->stages[pconfig->actual_stage].t_stage = (float) atof(value);
pconfig->actual_stage = pconfig->actual_stage+1; // Ultimo elemento del grupo
}
......@@ -74,7 +70,7 @@ static int handler(void* user, const char* section, const char* name,
pconfig->procs[pconfig->actual_resize] = atoi(value);
} else if (MATCH(resize_name, "FactorS")) {
if(pconfig->actual_resize < pconfig->n_resizes)
pconfig->factors[pconfig->actual_resize] = atof(value);
pconfig->factors[pconfig->actual_resize] =(float) atof(value);
} else if (MATCH(resize_name, "Dist")) {
if(pconfig->actual_resize < pconfig->n_resizes) {
char *aux = strdup(value);
......@@ -103,10 +99,10 @@ static int handler(void* user, const char* section, const char* name,
* La memoria de la estructura se reserva en la funcion y es conveniente
* liberarla con la funcion "free_config()"
*/
configuration *read_ini_file(char *file_name) {
configuration *read_ini_file(char *file_name, ext_functions_t init_functions) {
configuration *config = NULL;
config = malloc(sizeof(configuration) * 1);
config = malloc(sizeof(configuration));
if(config == NULL) {
printf("Error when reserving configuration structure\n");
return NULL;
......@@ -114,324 +110,11 @@ configuration *read_ini_file(char *file_name) {
config->actual_resize=0;
config->actual_stage=0;
user_functions = &init_functions;
if(ini_parse(file_name, handler, config) < 0) { // Obtener configuracion
printf("Can't load '%s'\n", file_name);
return NULL;
}
return config;
}
/*
* Reserva de memoria para los vectores de la estructura de configuracion
*
* Si se llama desde fuera de este fichero, la memoria de la estructura
* tiene que reservarse con la siguiente linea:
* "configuration *config = malloc(sizeof(configuration));"
*
* Sin embargo se puede obtener a traves de las funciones
* - read_ini_file
* - recv_config_file
*/
void malloc_config_resizes(configuration *user_config, int resizes) {
if(user_config != NULL) {
user_config->iters = malloc(sizeof(int) * resizes);
user_config->procs = malloc(sizeof(int) * resizes);
user_config->factors = malloc(sizeof(float) * resizes);
user_config->phy_dist = malloc(sizeof(int) * resizes);
}
}
/*
* Inicializa la memoria para las fases de iteraciones.
* No se reserva memoria, pero si se pone a NULL
* para poder liberar correctamente cada fase.
*
* Se puede obtener a traves de las funciones
* - read_ini_file
* - recv_config_file
*/
void init_config_stages(configuration *user_config) {
int i;
if(user_config != NULL) {
for(i=0; i<user_config->n_stages; i++) {
user_config->stages[i].array = NULL;
user_config->stages[i].full_array = NULL;
user_config->stages[i].double_array = NULL;
user_config->stages[i].counts.counts = NULL;
user_config->stages[i].real_bytes = 0;
user_config->stages[i].intercept = 0;
user_config->stages[i].slope = 0;
}
}
}
/*
* Libera toda la memoria de una estructura de configuracion
*/
void free_config(configuration *user_config) {
int i;
if(user_config != NULL) {
free(user_config->iters);
free(user_config->procs);
free(user_config->factors);
free(user_config->phy_dist);
for(i=0; i < user_config->n_stages; i++) {
if(user_config->stages[i].array != NULL) {
free(user_config->stages[i].array);
user_config->stages[i].array = NULL;
}
if(user_config->stages[i].full_array != NULL) {
free(user_config->stages[i].full_array);
user_config->stages[i].full_array = NULL;
}
if(user_config->stages[i].double_array != NULL) {
free(user_config->stages[i].double_array);
user_config->stages[i].double_array = NULL;
}
if(user_config->stages[i].counts.counts != NULL) {
freeCounts(&(user_config->stages[i].counts));
}
}
//free(user_config->stages); //FIXME ERROR de memoria relacionado con la carpeta malleability
free(user_config);
}
}
/*
* Imprime por salida estandar toda la informacion que contiene
* la configuracion pasada como argumento
*/
void print_config(configuration *user_config, int grp) {
if(user_config != NULL) {
int i;
printf("Config loaded: R=%d, S=%d, granularity=%d, SDR=%d, ADR=%d, AT=%d, SM=%d, SS=%d, latency=%2.8f, bw=%lf || grp=%d\n",
user_config->n_resizes, user_config->n_stages, user_config->granularity, user_config->sdr, user_config->adr,
user_config->at, user_config->sm, user_config->ss, user_config->latency_m, user_config->bw_m, grp);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d, Intercept=%lf, Slope=%lf\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].intercept, user_config->stages[i].slope);
}
for(i=0; i<user_config->n_resizes; i++) {
printf("Resize %d: Iters=%d, Procs=%d, Factors=%f, Dist=%d\n",
i, user_config->iters[i], user_config->procs[i], user_config->factors[i], user_config->phy_dist[i]);
}
}
}
/*
* Imprime por salida estandar la informacion relacionada con un
* solo grupo de procesos en su configuracion.
*/
void print_config_group(configuration *user_config, int grp) {
int i;
if(user_config != NULL) {
int parents, sons;
parents = sons = 0;
if(grp > 0) {
parents = user_config->procs[grp-1];
}
if(grp < user_config->n_resizes - 1) {
sons = user_config->procs[grp+1];
}
printf("Config: granularity=%d, SDR=%d, ADR=%d, AT=%d, SM=%d, SS=%d, latency=%2.8f, bw=%lf\n",
user_config->granularity, user_config->sdr, user_config->adr, user_config->at, user_config->sm, user_config->ss, user_config->latency_m, user_config->bw_m);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d, Intercept=%lf, Slope=%lf\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].intercept, user_config->stages[i].slope);
}
printf("Config Group: iters=%d, factor=%f, phy=%d, procs=%d, parents=%d, sons=%d\n",
user_config->iters[grp], user_config->factors[grp], user_config->phy_dist[grp], user_config->procs[grp], parents, sons);
}
}
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//| FUNCIONES DE INTERCOMUNICACION DE ESTRUCTURA DE CONFIGURACION ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| |/
/*
* Envia una estructura de configuracion al grupo de procesos al que se
* enlaza este grupo a traves del intercomunicador pasado como argumento.
*
* Esta funcion tiene que ser llamada por todos los procesos del mismo grupo
* e indicar cual es el proceso raiz que se encargara de enviar la
* configuracion al otro grupo.
*/
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm) {
MPI_Datatype config_type, config_type_array, iter_stage_type;
// Obtener un tipo derivado para enviar todos los
// datos escalares con una sola comunicacion
def_struct_config_file(config_file, &config_type);
// Obtener un tipo derivado para enviar los tres vectores
// de enteros con una sola comunicacion
def_struct_config_file_array(config_file, &config_type_array);
// Obtener un tipo derivado para enviar las estructuras de fases de iteracion
// con una sola comunicacion
def_struct_iter_stage(&(config_file->stages[0]), config_file->n_stages, &iter_stage_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->n_resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->stages, config_file->n_stages, iter_stage_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
MPI_Type_free(&iter_stage_type);
}
/*
* Recibe una estructura de configuracion desde otro grupo de procesos
* y la devuelve. La memoria de la estructura se reserva en esta funcion.
*
* Esta funcion tiene que ser llamada por todos los procesos del mismo grupo
* e indicar cual es el proceso raiz del otro grupo que se encarga de enviar
* la configuracion a este grupo.
*
* La memoria de la configuracion devuelta tiene que ser liberada con
* la funcion "free_config".
*/
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out) {
MPI_Datatype config_type, config_type_array, iter_stage_type;
configuration *config_file = malloc(sizeof(configuration) * 1);
// Obtener un tipo derivado para recibir todos los
// datos escalares con una sola comunicacion
def_struct_config_file(config_file, &config_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
//Inicializado de estructuras internas
malloc_config_resizes(config_file, config_file->n_resizes); // Reserva de memoria de los vectores
config_file->stages = malloc(sizeof(iter_stage_t) * config_file->n_stages);
// Obtener un tipo derivado para enviar los tres vectores
// de enteros con una sola comunicacion
def_struct_config_file_array(config_file, &config_type_array);
def_struct_iter_stage(&(config_file->stages[0]), config_file->n_stages, &iter_stage_type);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->n_resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->stages, config_file->n_stages, iter_stage_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
MPI_Type_free(&iter_stage_type);
init_config_stages(config_file); // Inicializar a NULL vectores
*config_file_out = config_file;
}
/*
* Tipo derivado para enviar 11 elementos especificos
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 11;
int blocklengths[11] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = types[7] = types[8] = MPI_INT;
types[9] = types[10] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(&(config_file->n_resizes), &displs[0]);
MPI_Get_address(&(config_file->n_stages), &displs[1]);
MPI_Get_address(&(config_file->actual_resize), &displs[2]); // TODO Refactor Es necesario enviarlo?
MPI_Get_address(&(config_file->granularity), &displs[3]);
MPI_Get_address(&(config_file->sdr), &displs[4]);
MPI_Get_address(&(config_file->adr), &displs[5]);
MPI_Get_address(&(config_file->at), &displs[6]);
MPI_Get_address(&(config_file->ss), &displs[7]);
MPI_Get_address(&(config_file->sm), &displs[8]);
MPI_Get_address(&(config_file->latency_m), &displs[9]);
MPI_Get_address(&(config_file->bw_m), &displs[10]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, config_type);
MPI_Type_commit(config_type);
}
/*
* Tipo derivado para enviar tres vectores de enteros
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file_array(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 3;
int blocklengths[3] = {1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = MPI_INT;
// Modificar blocklengths al valor adecuado
blocklengths[0] = blocklengths[1] = blocklengths[2] = config_file->n_resizes;
//Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(config_file->iters, &displs[0]);
MPI_Get_address(config_file->procs, &displs[1]);
MPI_Get_address(config_file->phy_dist, &displs[2]);
for(i=0;i<counts;i++) displs[i] -= dir;
// Tipo derivado para enviar un solo elemento de tres vectores
MPI_Type_create_struct(counts, blocklengths, displs, types, &aux);
// Tipo derivado para enviar N elementos de tres vectores(3N en total)
MPI_Type_create_resized(aux, 0, 1*sizeof(int), config_type);
MPI_Type_commit(config_type);
}
/*
* Tipo derivado para enviar elementos especificos
* de la estructuras de fases de iteracion en una sola comunicacion.
*/
void def_struct_iter_stage(iter_stage_t *stages, int n_stages, MPI_Datatype *config_type) {
int i, counts = 4;
int blocklengths[4] = {1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
// Rellenar vector types
types[0] = types[3] = MPI_INT;
types[1] = MPI_FLOAT;
types[2] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(stages, &dir);
MPI_Get_address(&(stages->pt), &displs[0]);
MPI_Get_address(&(stages->t_stage), &displs[1]);
MPI_Get_address(&(stages->t_op), &displs[2]);
MPI_Get_address(&(stages->bytes), &displs[3]);
for(i=0;i<counts;i++) displs[i] -= dir;
if (n_stages == 1) {
MPI_Type_create_struct(counts, blocklengths, displs, types, config_type);
} else { // Si hay mas de una fase(estructura), el "extent" se modifica.
MPI_Type_create_struct(counts, blocklengths, displs, types, &aux);
// Tipo derivado para enviar N elementos de la estructura
MPI_Type_create_resized(aux, 0, sizeof(iter_stage_t), config_type);
}
MPI_Type_commit(config_type);
}
......@@ -4,16 +4,13 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "../Main/Main_datatypes.h"
configuration *read_ini_file(char *file_name);
void free_config(configuration *user_config);
void print_config(configuration *user_config, int grp);
void print_config_group(configuration *user_config, int grp);
typedef void (*Malloc_conf)(configuration* user_config);
typedef struct {
Malloc_conf resizes_f, stages_f;
} ext_functions_t;
// MPI Intercomm functions
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm);
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out);
configuration *read_ini_file(char *file_name, ext_functions_t init_functions);
#endif
......@@ -3,6 +3,8 @@
#include <mpi.h>
#include "results.h"
#define RESULTS_EXTRA_SIZE 100
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type);
//======================================================||
......@@ -131,9 +133,9 @@ void reset_results_index(results_data *results) {
*/
void compute_results_iter(results_data *results, int myId, int root, MPI_Comm comm) {
if(myId == root)
MPI_Reduce(MPI_IN_PLACE, results->iters_time, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
MPI_Reduce(MPI_IN_PLACE, results->iters_time, (int) results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
else
MPI_Reduce(results->iters_time, NULL, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
MPI_Reduce(results->iters_time, NULL, (int) results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
}
......@@ -148,12 +150,12 @@ void compute_results_stages(results_data *results, int myId, int root, int stage
int i;
if(myId == root) {
for(i=0; i<stages; i++) {
MPI_Reduce(MPI_IN_PLACE, results->stage_times[i], results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
MPI_Reduce(MPI_IN_PLACE, results->stage_times[i], (int) results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
}
}
else {
for(i=0; i<stages; i++) {
MPI_Reduce(results->stage_times[i], NULL, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
MPI_Reduce(results->stage_times[i], NULL, (int) results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
}
}
}
......@@ -170,25 +172,25 @@ void compute_results_stages(results_data *results, int myId, int root, int stage
* por iteracion, el tipo (Normal o durante communicacion asincrona).
*/
void print_iter_results(results_data results) {
int i;
size_t i;
printf("T_iter: ");
for(i=0; i< results.iter_index; i++) {
printf("%lf ", results.iters_time[i]);
}
printf("\nAsync_Iters: %d\n", results.iters_async);
printf("\nAsync_Iters: %ld\n", results.iters_async);
}
/*
* Imprime por pantalla los resultados locales de un stage.
*/
void print_stage_results(results_data results, int n_stages) {
int i, j;
void print_stage_results(results_data results, size_t n_stages) {
size_t i, j;
for(i=0; i<n_stages; i++) {
printf("T_stage %d: ", i);
for(j=0; j< results.iter_index; j++) {
for(i=0; i < n_stages; i++) {
printf("T_stage %ld: ", i);
for(j=0; j < results.iter_index; j++) {
printf("%lf ", results.stage_times[i][j]);
}
printf("\n");
......@@ -200,15 +202,15 @@ void print_stage_results(results_data results, int n_stages) {
* Estos son el tiempo de creacion de procesos, los de comunicacion
* asincrona y sincrona y el tiempo total de ejecucion.
*/
void print_global_results(results_data results, int resizes) {
int i;
void print_global_results(results_data results, size_t resizes) {
size_t i;
printf("T_spawn: "); // FIXME REFACTOR Cambiar nombre a T_resize_real
for(i=0; i< resizes - 1; i++) {
printf("T_spawn: ");
for(i=0; i < resizes - 1; i++) {
printf("%lf ", results.spawn_time[i]);
}
printf("\nT_spawn_real: "); // FIXME REFACTOR Cambiar nombre a T_resize
printf("\nT_spawn_real: ");
for(i=0; i< resizes - 1; i++) {
printf("%lf ", results.spawn_real_time[i]);
}
......@@ -238,8 +240,8 @@ void print_global_results(results_data results, int resizes) {
* Los argumentos "resizes" y "iters_size" se necesitan para obtener el tamaño
* de los vectores de resultados.
*/
void init_results_data(results_data *results, int resizes, int stages, int iters_size) {
int i;
void init_results_data(results_data *results, size_t resizes, size_t stages, size_t iters_size) {
size_t i;
results->spawn_time = calloc(resizes, sizeof(double));
results->spawn_real_time = calloc(resizes, sizeof(double));
......@@ -247,11 +249,11 @@ void init_results_data(results_data *results, int resizes, int stages, int iters
results->async_time = calloc(resizes, sizeof(double));
results->wasted_time = 0;
results->iters_size = iters_size + 100;
results->iters_time = calloc(iters_size + 100, sizeof(double)); //FIXME Numero magico
results->stage_times = malloc(stages * sizeof(double*)); //FIXME Numero magico
results->iters_size = iters_size + RESULTS_EXTRA_SIZE;
results->iters_time = calloc(results->iters_size, sizeof(double));
results->stage_times = malloc(stages * sizeof(double*));
for(i=0; i<stages; i++) {
results->stage_times[i] = calloc(iters_size + 100, sizeof(double)); //FIXME Numero magico
results->stage_times[i] = calloc(results->iters_size, sizeof(double));
}
results->iters_async = 0;
......@@ -259,7 +261,7 @@ void init_results_data(results_data *results, int resizes, int stages, int iters
}
void realloc_results_iters(results_data *results, int stages, int needed) {
void realloc_results_iters(results_data *results, int stages, size_t needed) {
int i;
double *time_aux;
time_aux = (double *) realloc(results->iters_time, needed * sizeof(double));
......
......@@ -10,7 +10,7 @@
typedef struct {
// Iters data
double *iters_time, **stage_times;
int iters_async, iter_index, iters_size;
size_t iters_async, iter_index, iters_size;
// Spawn, Thread, Sync, Async and Exec time
double spawn_start, *spawn_time, *spawn_real_time;
......@@ -30,11 +30,11 @@ void compute_results_iter(results_data *results, int myId, int root, MPI_Comm co
void compute_results_stages(results_data *results, int myId, int root, int n_stages, MPI_Comm comm);
void print_iter_results(results_data results);
void print_stage_results(results_data results, int n_stages);
void print_global_results(results_data results, int resizes);
void print_stage_results(results_data results, size_t n_stages);
void print_global_results(results_data results, size_t resizes);
void init_results_data(results_data *results, int resizes, int stages, int iters_size);
void realloc_results_iters(results_data *results, int stages, int needed);
void init_results_data(results_data *results, size_t resizes, size_t stages, size_t iters_size);
void realloc_results_iters(results_data *results, int stages, size_t needed);
void free_results_data(results_data *results, int stages);
#endif
......@@ -6,14 +6,14 @@
#include <sys/stat.h>
#include "process_stage.h"
#include "Main_datatypes.h"
#include "../IOcodes/read_ini.h"
#include "configuration.h"
#include "../IOcodes/results.h"
#include "../malleability/CommDist.h"
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
int work();
double iterate(double *matrix, int n, int async_comm, int iter);
double iterate(int async_comm);
void init_group_struct(char *argv[], int argc, int myId, int numP);
void init_application();
......@@ -161,7 +161,7 @@ int main(int argc, char *argv[]) {
if(group->myId == ROOT && config_file->sm == MALL_SPAWN_MERGE) {
MPI_Abort(MPI_COMM_WORLD, -100);
}
free_application_data();
free_application_data(); //FIXME Error al liberar memoria de SDR/ADR
MPI_Finalize();
......@@ -185,14 +185,13 @@ int main(int argc, char *argv[]) {
*/
int work() {
int iter, maxiter, state, res;
double *matrix = NULL;
maxiter = config_file->iters[group->grp];
state = MALL_NOT_STARTED;
res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) {
iterate(matrix, config_file->granularity, state, iter);
iterate(state);
}
if(config_file->n_resizes != group->grp + 1)
......@@ -201,7 +200,7 @@ int work() {
iter = 0;
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE) {
if(iter < config_file->iters[group->grp+1]) {
iterate(matrix, config_file->granularity, state, iter);
iterate(state);
iter++;
group->iter_start = iter;
}
......@@ -226,24 +225,26 @@ int work() {
* Simula la ejecucción de una iteración de computo en la aplicación
* que dura al menos un tiempo de "time" segundos.
*/
double iterate(double *matrix, int n, int async_comm, int iter) {
double start_time, start_time_stage, actual_time, *times_stages;
int i, cnt_async = 0;
double iterate(int async_comm) {
double start_time, start_time_stage, actual_time, *times_stages_aux;
int i;
double aux = 0;
times_stages = malloc(config_file->n_stages * sizeof(double));
times_stages_aux = malloc((size_t) config_file->n_stages * sizeof(double));
start_time = MPI_Wtime();
for(i=0; i < config_file->n_stages; i++) {
start_time_stage = MPI_Wtime();
aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
times_stages[i] = MPI_Wtime() - start_time_stage;
times_stages_aux[i] = MPI_Wtime() - start_time_stage;
}
actual_time = MPI_Wtime(); // Guardar tiempos
// Se esta realizando una redistribucion de datos asincrona
if(async_comm == MALL_DIST_PENDING || async_comm == MALL_SPAWN_PENDING || async_comm == MALL_SPAWN_SINGLE_PENDING) {
// TODO Que diferencie entre ambas en el IO
if(async_comm == MALL_DIST_PENDING || async_comm == MALL_SPAWN_PENDING || async_comm == MALL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
cnt_async=1;
results->iters_async += 1;
}
if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
......@@ -251,12 +252,11 @@ double iterate(double *matrix, int n, int async_comm, int iter) {
}
results->iters_time[results->iter_index] = actual_time - start_time;
for(i=0; i < config_file->n_stages; i++) {
results->stage_times[i][results->iter_index] = times_stages[i];
results->stage_times[i][results->iter_index] = times_stages_aux[i];
}
results->iters_async += cnt_async;
results->iter_index = results->iter_index + 1;
free(times_stages);
free(times_stages_aux);
return aux;
}
......@@ -304,7 +304,7 @@ int print_local_results() {
print_config_group(config_file, group->grp);
print_iter_results(*results);
print_stage_results(*results, config_file->n_stages);
print_stage_results(*results, (size_t) config_file->n_stages);
free(file_name);
fflush(stdout);
......@@ -334,7 +334,7 @@ int print_final_results() {
ptr_out = dup(1);
create_out_file(file_name, &ptr_global, 1);
print_config(config_file, group->grp);
print_global_results(*results, config_file->n_resizes);
print_global_results(*results, (size_t)config_file->n_resizes);
fflush(stdout);
free(file_name);
......@@ -349,7 +349,7 @@ int print_final_results() {
* Inicializa la estructura group
*/
void init_group_struct(char *argv[], int argc, int myId, int numP) {
group = malloc(1 * sizeof(group_data));
group = malloc(sizeof(group_data));
group->myId = myId;
group->numP = numP;
group->grp = 0;
......@@ -376,9 +376,10 @@ void init_application() {
run_id = atoi(group->argv[2]);
}
config_file = read_ini_file(group->argv[1]);
//config_file = read_ini_file(group->argv[1]);
init_config(group->argv[1], &config_file);
results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->iters[group->grp]);
init_results_data(results, (size_t)config_file->n_resizes, (size_t)config_file->n_stages, (size_t)config_file->iters[group->grp]);
if(config_file->sdr) {
malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
}
......@@ -387,11 +388,9 @@ void init_application() {
}
int message_tam = 100000000;
for(int i=0; i<3; i++) {
config_file->latency_m = latency(group->myId, group->numP, comm);
config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
//if(group->myId == ROOT) printf("numP=%d Lat=%lf Bw=%lf\n", group->numP, config_file->latency_m, config_file->bw_m);
}
config_file->latency_m = latency(group->myId, group->numP, comm);
config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
obtain_op_times(1);
}
......
......@@ -28,7 +28,7 @@ typedef struct {
typedef struct
{
int pt; // Procedure type
float t_stage; // Time to complete the stage
double t_stage; // Time to complete the stage
double t_op;
int operations;
......@@ -59,6 +59,7 @@ typedef struct
int *iters, *procs, *phy_dist;
float *factors;
double t_op_comms;
iter_stage_t *stages;
} configuration;
......
......@@ -40,8 +40,8 @@ double computePiSerial(int n) {
/*
* Init matrix
*/
void initMatrix(double **matrix, int n) {
int i, j;
void initMatrix(double **matrix, size_t n) {
size_t i, j;
// Init matrix
if(matrix != NULL) {
......@@ -49,7 +49,7 @@ void initMatrix(double **matrix, int n) {
if(*matrix == NULL) { MPI_Abort(MPI_COMM_WORLD, -1);}
for(i=0; i < n; i++) {
for(j=0; j < n; j++) {
(*matrix)[i*n + j] = i+j;
(*matrix)[i*n + j] =(double) i + (double) j;
}
}
}
......
......@@ -3,7 +3,7 @@
double computeMatrix(double *matrix, int n);
double computePiSerial(int n);
void initMatrix(double **matrix, int n);
void initMatrix(double **matrix, size_t n);
void freeMatrix(double **matrix);
#endif
......@@ -20,3 +20,16 @@ void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, in
MPI_Send(array, qty, MPI_CHAR, next, 99, comm);
}
}
void point_to_point_inter(int myId, int numP, MPI_Comm comm, char *array, int qty) {
int target;
target = (myId + numP/2)%numP;
if(myId < numP/2) {
MPI_Send(array, qty, MPI_CHAR, target, 99, comm);
//MPI_Recv(array, qty, MPI_CHAR, target, 99, comm, MPI_STATUS_IGNORE);
} else {
MPI_Recv(array, qty, MPI_CHAR, target, 99, comm, MPI_STATUS_IGNORE);
//MPI_Send(array, qty, MPI_CHAR, target, 99, comm);
}
}
......@@ -7,5 +7,6 @@
void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, int qty);
void point_to_point_inter(int myId, int numP, MPI_Comm comm, char *array, int qty);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "../IOcodes/read_ini.h"
#include "configuration.h"
#include "../malleability/spawn_methods/ProcessDist.h"
#include "../malleability/distribution_methods/block_distribution.h"
void malloc_config_resizes(configuration *user_config);
void init_config_stages(configuration *user_config);
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type);
void def_struct_config_file_array(configuration *config_file, MPI_Datatype *config_type);
void def_struct_iter_stage(iter_stage_t *stages, size_t n_stages, MPI_Datatype *config_type);
/*
* Inicializa una estructura de configuracion
*
* Si el parametro "file_name" no es nulo,
* se obtiene la configuracion a partir de
* un fichero .ini
*
* En caso de que sea nulo, es el usuario
* el que tiene que elegir los valores a
* utilizar.
*/
void init_config(char *file_name, configuration **user_config) {
if(file_name != NULL) {
ext_functions_t mallocs;
mallocs.resizes_f = malloc_config_resizes;
mallocs.stages_f = init_config_stages;
*user_config = read_ini_file(file_name, mallocs);
} else {
configuration *config = NULL;
config = malloc(sizeof(configuration));
if(config == NULL) {
perror("Error when reserving configuration structure\n");
MPI_Abort(MPI_COMM_WORLD, -3);
return;
}
*user_config=config;
}
}
/*
* Reserva de memoria para los vectores de la estructura de configuracion
*
* Si se llama desde fuera de este fichero, la memoria de la estructura
* tiene que reservarse con la siguiente linea:
* "configuration *config = malloc(sizeof(configuration));"
*
* Sin embargo se puede obtener a traves de las funciones
* - read_ini_file
* - recv_config_file
*/
void malloc_config_resizes(configuration *user_config) {
size_t n_resizes = user_config->n_resizes;
if(user_config != NULL) {
user_config->iters = malloc(sizeof(int) * n_resizes);
user_config->procs = malloc(sizeof(int) * n_resizes);
user_config->factors = malloc(sizeof(float) * n_resizes);
user_config->phy_dist = malloc(sizeof(int) * n_resizes);
}
}
/*
* Inicializa la memoria para las fases de iteraciones.
* No se reserva memoria, pero si se pone a NULL
* para poder liberar correctamente cada fase.
*
* Se puede obtener a traves de las funciones
* - read_ini_file
* - recv_config_file
*/
void init_config_stages(configuration *user_config) {
int i;
if(user_config != NULL) {
for(i=0; i<user_config->n_stages; i++) {
user_config->stages[i].array = NULL;
user_config->stages[i].full_array = NULL;
user_config->stages[i].double_array = NULL;
user_config->stages[i].counts.counts = NULL;
user_config->stages[i].real_bytes = 0;
user_config->stages[i].intercept = 0;
user_config->stages[i].slope = 0;
}
}
}
/*
* Libera toda la memoria de una estructura de configuracion
*/
void free_config(configuration *user_config) {
int i;
if(user_config != NULL) {
free(user_config->iters);
free(user_config->procs);
free(user_config->factors);
free(user_config->phy_dist);
for(i=0; i < user_config->n_stages; i++) {
if(user_config->stages[i].array != NULL) {
free(user_config->stages[i].array);
user_config->stages[i].array = NULL;
}
if(user_config->stages[i].full_array != NULL) {
free(user_config->stages[i].full_array);
user_config->stages[i].full_array = NULL;
}
if(user_config->stages[i].double_array != NULL) {
free(user_config->stages[i].double_array);
user_config->stages[i].double_array = NULL;
}
if(user_config->stages[i].counts.counts != NULL) {
freeCounts(&(user_config->stages[i].counts));
}
}
//free(user_config->stages); //FIXME ERROR de memoria relacionado con la carpeta malleability
free(user_config);
}
}
/*
* Imprime por salida estandar toda la informacion que contiene
* la configuracion pasada como argumento
*/
void print_config(configuration *user_config, int grp) {
if(user_config != NULL) {
int i;
printf("Config loaded: R=%d, S=%d, granularity=%d, SDR=%d, ADR=%d, AT=%d, SM=%d, SS=%d, latency=%2.8f, bw=%lf || grp=%d\n",
user_config->n_resizes, user_config->n_stages, user_config->granularity, user_config->sdr, user_config->adr,
user_config->at, user_config->sm, user_config->ss, user_config->latency_m, user_config->bw_m, grp);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d, Intercept=%lf, Slope=%lf\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].intercept, user_config->stages[i].slope);
}
for(i=0; i<user_config->n_resizes; i++) {
printf("Resize %d: Iters=%d, Procs=%d, Factors=%f, Dist=%d\n",
i, user_config->iters[i], user_config->procs[i], user_config->factors[i], user_config->phy_dist[i]);
}
}
}
/*
* Imprime por salida estandar la informacion relacionada con un
* solo grupo de procesos en su configuracion.
*/
void print_config_group(configuration *user_config, int grp) {
int i;
if(user_config != NULL) {
int parents, sons;
parents = sons = 0;
if(grp > 0) {
parents = user_config->procs[grp-1];
}
if(grp < user_config->n_resizes - 1) {
sons = user_config->procs[grp+1];
}
printf("Config: granularity=%d, SDR=%d, ADR=%d, AT=%d, SM=%d, SS=%d, latency=%2.8f, bw=%lf\n",
user_config->granularity, user_config->sdr, user_config->adr, user_config->at, user_config->sm, user_config->ss, user_config->latency_m, user_config->bw_m);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d, Intercept=%lf, Slope=%lf\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].intercept, user_config->stages[i].slope);
}
printf("Config Group: iters=%d, factor=%f, phy=%d, procs=%d, parents=%d, sons=%d\n",
user_config->iters[grp], user_config->factors[grp], user_config->phy_dist[grp], user_config->procs[grp], parents, sons);
}
}
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//| FUNCIONES DE INTERCOMUNICACION DE ESTRUCTURA DE CONFIGURACION ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| ||
//||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| |/
/*
* Envia una estructura de configuracion al grupo de procesos al que se
* enlaza este grupo a traves del intercomunicador pasado como argumento.
*
* Esta funcion tiene que ser llamada por todos los procesos del mismo grupo
* e indicar cual es el proceso raiz que se encargara de enviar la
* configuracion al otro grupo.
*/
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm) {
MPI_Datatype config_type, config_type_array, iter_stage_type;
// Obtener un tipo derivado para enviar todos los
// datos escalares con una sola comunicacion
def_struct_config_file(config_file, &config_type);
// Obtener un tipo derivado para enviar los tres vectores
// de enteros con una sola comunicacion
def_struct_config_file_array(config_file, &config_type_array);
// Obtener un tipo derivado para enviar las estructuras de fases de iteracion
// con una sola comunicacion
def_struct_iter_stage(&(config_file->stages[0]), (size_t) config_file->n_stages, &iter_stage_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->n_resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->stages, config_file->n_stages, iter_stage_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
MPI_Type_free(&iter_stage_type);
}
/*
* Recibe una estructura de configuracion desde otro grupo de procesos
* y la devuelve. La memoria de la estructura se reserva en esta funcion.
*
* Esta funcion tiene que ser llamada por todos los procesos del mismo grupo
* e indicar cual es el proceso raiz del otro grupo que se encarga de enviar
* la configuracion a este grupo.
*
* La memoria de la configuracion devuelta tiene que ser liberada con
* la funcion "free_config".
*/
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out) {
MPI_Datatype config_type, config_type_array, iter_stage_type;
configuration *config_file = malloc(sizeof(configuration) * 1);
// Obtener un tipo derivado para recibir todos los
// datos escalares con una sola comunicacion
def_struct_config_file(config_file, &config_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
//Inicializado de estructuras internas
malloc_config_resizes(config_file); // Reserva de memoria de los vectores
config_file->stages = malloc(sizeof(iter_stage_t) * (size_t) config_file->n_stages);
// Obtener un tipo derivado para enviar los tres vectores
// de enteros con una sola comunicacion
def_struct_config_file_array(config_file, &config_type_array);
def_struct_iter_stage(&(config_file->stages[0]), (size_t) config_file->n_stages, &iter_stage_type);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->n_resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->stages, config_file->n_stages, iter_stage_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
MPI_Type_free(&iter_stage_type);
init_config_stages(config_file); // Inicializar a NULL vectores
*config_file_out = config_file;
}
/*
* Tipo derivado para enviar 11 elementos especificos
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 11;
int blocklengths[11] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = types[7] = types[8] = MPI_INT;
types[9] = types[10] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(&(config_file->n_resizes), &displs[0]);
MPI_Get_address(&(config_file->n_stages), &displs[1]);
MPI_Get_address(&(config_file->actual_resize), &displs[2]); // TODO Refactor Es necesario enviarlo?
MPI_Get_address(&(config_file->granularity), &displs[3]);
MPI_Get_address(&(config_file->sdr), &displs[4]);
MPI_Get_address(&(config_file->adr), &displs[5]);
MPI_Get_address(&(config_file->at), &displs[6]);
MPI_Get_address(&(config_file->ss), &displs[7]);
MPI_Get_address(&(config_file->sm), &displs[8]);
MPI_Get_address(&(config_file->latency_m), &displs[9]);
MPI_Get_address(&(config_file->bw_m), &displs[10]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, config_type);
MPI_Type_commit(config_type);
}
/*
* Tipo derivado para enviar tres vectores de enteros
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file_array(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 3;
int blocklengths[3] = {1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = MPI_INT;
// Modificar blocklengths al valor adecuado
blocklengths[0] = blocklengths[1] = blocklengths[2] = config_file->n_resizes;
//Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(config_file->iters, &displs[0]);
MPI_Get_address(config_file->procs, &displs[1]);
MPI_Get_address(config_file->phy_dist, &displs[2]);
for(i=0;i<counts;i++) displs[i] -= dir;
// Tipo derivado para enviar un solo elemento de tres vectores
MPI_Type_create_struct(counts, blocklengths, displs, types, &aux);
// Tipo derivado para enviar N elementos de tres vectores(3N en total)
MPI_Type_create_resized(aux, 0, 1*sizeof(int), config_type);
MPI_Type_commit(config_type);
}
/*
* Tipo derivado para enviar elementos especificos
* de la estructuras de fases de iteracion en una sola comunicacion.
*/
void def_struct_iter_stage(iter_stage_t *stages, size_t n_stages, MPI_Datatype *config_type) {
int i, counts = 4;
int blocklengths[4] = {1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
// Rellenar vector types
types[0] = types[3] = MPI_INT;
types[1] = MPI_FLOAT;
types[2] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(stages, &dir);
MPI_Get_address(&(stages->pt), &displs[0]);
MPI_Get_address(&(stages->t_stage), &displs[1]);
MPI_Get_address(&(stages->t_op), &displs[2]);
MPI_Get_address(&(stages->bytes), &displs[3]);
for(i=0;i<counts;i++) displs[i] -= dir;
if (n_stages == 1) {
MPI_Type_create_struct(counts, blocklengths, displs, types, config_type);
} else { // Si hay mas de una fase(estructura), el "extent" se modifica.
MPI_Type_create_struct(counts, blocklengths, displs, types, &aux);
// Tipo derivado para enviar N elementos de la estructura
MPI_Type_create_resized(aux, 0, sizeof(iter_stage_t), config_type);
}
MPI_Type_commit(config_type);
}
#ifndef CONFIGURATION_H
#define CONFIGURATION_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "../Main/Main_datatypes.h"
void init_config(char *file_name, configuration **user_config);
void free_config(configuration *user_config);
void print_config(configuration *user_config, int grp);
void print_config_group(configuration *user_config, int grp);
// MPI Intercomm functions
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm);
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out);
#endif
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <mpi.h>
#include "Main_datatypes.h"
#include "../malleability/distribution_methods/block_distribution.h"
#include "linear_reg.h"
// Array for linear regression computation
// Cantidades 10b 100b 1Kb 5Kb 10Kb 50Kb 100Kb 500Kb 1Mb 10Mb 100Mb
double LR_bytes_array[LR_ARRAY_TAM] = {10, 100, 1000, 5000,10000, 50000,100000, 500000, 1000000, 10000000, 100000000};
// Linear regression
// Y = a +bX
// Bytes = a +b(Time)
//
// X is the independent variable, which correlates to the Communication time
// Y is the dependent variable, which correlates to the number of bytes
//
void lr_avg_plus_diff(int tam, double *array, double *avg, double *diffs);
/*
* Computes and returns the related Y value to a given linear regression
*/
void lr_calc_Y(double slope, double intercept, double x_value, int *y_result) {
*y_result = (int) ceil(intercept + slope * x_value);
}
/*
* Computes the slope and intercept for a given array of times
* so users can calculate the number of bytes for a given time.
*
*/
void lr_compute(int tam, double *bytes, double *times, double *slope, double *intercept) {
int i;
double avgX, avgY;
double *diffsX, *diffsY;
double SSxx, SSxy;
diffsX = malloc(tam * sizeof(double));
diffsY = malloc(tam * sizeof(double));
SSxx = SSxy = 0;
lr_avg_plus_diff(tam, times, &avgX, diffsX);
lr_avg_plus_diff(tam, bytes, &avgY, diffsY);
for(i=0; i<tam; i++) {
SSxx+= diffsX[i]*diffsX[i];
SSxy+= diffsX[i]*diffsY[i];
}
*slope = SSxy / SSxx;
*intercept = avgY - (*slope * avgX);
free(diffsX);
free(diffsY);
}
/*
* Computes the average of an arrray and
* the difference of each element in respect to the average.
*
* Returns the average and an the difference of each element.
*/
void lr_avg_plus_diff(int tam, double *array, double *avg, double *diffs) {
int i;
double sum = 0;
for(i=0; i<tam; i++) {
sum+= array[i];
}
*avg = sum / tam;
for(i=0; i<tam; i++) {
diffs[i]= *avg - array[i];
}
}
//======================================================||
//======================================================||
//==================TIMES COMPUTATION===================||
//======================================================||
//======================================================||
/*
* Obtains an array of times to perform a "Broadcast"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
double start_time;
char *aux = NULL;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
aux = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Bcast(aux, n, MPI_CHAR, root, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
free(aux);
aux = NULL;
}
}
/*
* Obtains an array of times to perform an "Allgatherv"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_allgatherv(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
double start_time;
char *aux = NULL, *aux_full = NULL;
struct Dist_data dist_data;
struct Counts counts;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
prepare_comm_allgatherv(numP, n, &counts);
get_block_dist(n, myId, numP, &dist_data);
aux = malloc(dist_data.tamBl * sizeof(char));
aux_full = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Allgatherv(aux, dist_data.tamBl, MPI_CHAR, aux_full, counts.counts, counts.displs, MPI_CHAR, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
freeCounts(&counts);
free(aux);
free(aux_full);
aux_full = NULL;
aux = NULL;
}
}
/*
* Obtains an array of times to perform an "Reduce"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_reduce(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
double start_time;
char *aux = NULL, *aux_full = NULL;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
aux = malloc(n * sizeof(char));
aux_full = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Reduce(aux, aux_full, n, MPI_CHAR, MPI_MAX, root, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
free(aux);
free(aux_full);
aux_full = NULL;
aux = NULL;
}
}
/*
* Obtains an array of times to perform an "Allreduce"
* operation depending on a predifined set of number of bytes.
*/
void lr_times_allreduce(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
double start_time;
char *aux = NULL, *aux_full = NULL;
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
aux = malloc(n * sizeof(char));
aux_full = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Allreduce(aux, aux_full, n, MPI_CHAR, MPI_MAX, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
free(aux);
free(aux_full);
aux_full = NULL;
aux = NULL;
}
}
#ifndef LINEAR_REG_H
#define LINEAR_REG_H
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
/*----------LINEAR REGRESSION TYPES--------------*/
#define LR_ARRAY_TAM 11
// Array for linear regression computation
extern double LR_bytes_array[LR_ARRAY_TAM];
void lr_calc_Y(double slope, double intercept, double x_value, int *y_result);
void lr_compute(int loop_iters, double *bytes, double *times, double *slope, double *intercept);
void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
void lr_times_allgatherv(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
void lr_times_reduce(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
void lr_times_allreduce(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
#endif
......@@ -4,13 +4,11 @@
#include <mpi.h>
#include "computing_func.h"
#include "comunication_func.h"
#include "linear_reg.h"
#include "Main_datatypes.h"
#include "process_stage.h"
#include "../malleability/distribution_methods/block_distribution.h"
void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm);
double init_emulation_comm_time(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
......@@ -47,6 +45,7 @@ double init_stage(configuration *config_file, int stage_i, group_data group, MPI
//Computo
case COMP_MATRIX:
result = init_matrix_pt(group, config_file, stage, comm, compute);
break;
case COMP_PI:
result = init_pi_pt(group, config_file, stage, comm, compute);
break;
......@@ -87,7 +86,7 @@ double process_stage(configuration config_file, iter_stage_t stage, group_data g
break;
case COMP_MATRIX:
for(i=0; i < stage.operations; i++) {
result += computeMatrix(stage.double_array, config_file.granularity); //FIXME No da tiempos repetibles
result += computeMatrix(stage.double_array, config_file.granularity);
}
break;
//Comunicaciones
......@@ -95,7 +94,13 @@ double process_stage(configuration config_file, iter_stage_t stage, group_data g
point_to_point(group.myId, group.numP, ROOT, comm, stage.array, stage.real_bytes);
break;
case COMP_BCAST:
MPI_Bcast(stage.array, stage.real_bytes, MPI_CHAR, ROOT, comm);
if(stage.bytes != 0) {
MPI_Bcast(stage.array, stage.real_bytes, MPI_CHAR, ROOT, comm);
} else {
for(i=0; i < stage.operations; i++) {
point_to_point_inter(group.myId, group.numP, comm, stage.array, stage.real_bytes);
}
}
break;
case COMP_ALLGATHER:
MPI_Allgatherv(stage.array, stage.my_bytes, MPI_CHAR, stage.full_array, stage.counts.counts, stage.counts.displs, MPI_CHAR, comm);
......@@ -111,8 +116,6 @@ double process_stage(configuration config_file, iter_stage_t stage, group_data g
}
// Se realizan varios tests de latencia al
// mandar un único dato de tipo CHAR a los procesos impares
// desde el par inmediatamente anterior. Tras esto, los impares
......@@ -159,8 +162,8 @@ double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n) {
double start_time, stop_time, bw, time;
char *aux;
n_bytes = n * sizeof(char);
aux = malloc(n_bytes);
n_bytes = ((size_t)n) * sizeof(char);
aux = malloc((size_t)n_bytes);
time = 0;
......@@ -187,77 +190,6 @@ double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n) {
return bw;
}
/*
* Creates a linear regression model to predict
* the number of bytes needed to perform a collective
* communication.
*/
void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm) {
int i, j, tam, loop_iters = 100;
tam = LR_ARRAY_TAM * loop_iters;
double *bytes = malloc(tam * sizeof(double));
double *times = malloc(tam * sizeof(double));
for(i=0; i<LR_ARRAY_TAM; i++) {
for(j=0; j<loop_iters; j++) {
bytes[i*loop_iters + j] = LR_bytes_array[i];
}
}
// TODO Calcular solo si no se ha calculado para otra fase.
// Si se ha calculado antes, copiar esos valores
switch(stage->pt) {
//Comunicaciones
case COMP_BCAST:
lr_times_bcast(group.myId, group.numP, ROOT, comm, loop_iters, times);
break;
case COMP_ALLGATHER:
lr_times_allgatherv(group.myId, group.numP, ROOT, comm, loop_iters, times);
break;
case COMP_REDUCE:
lr_times_reduce(group.myId, group.numP, ROOT, comm, loop_iters, times);
break;
case COMP_ALLREDUCE:
lr_times_allreduce(group.myId, group.numP, ROOT, comm, loop_iters, times);
break;
default:
return;
break;
}
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, times, LR_ARRAY_TAM * loop_iters, MPI_DOUBLE, MPI_MAX, ROOT, comm);
/*
printf("PT=%d ", stage->pt);
for(i=0; i<tam; i++) {
printf("%lf, ", times[i]);
}
printf("\n");
printf("BYTES ");
for(i=0; i<tam; i++) {
printf("%lf, ", bytes[i]);
}
printf("\n");
*/
//if(stage->t_stage < 0.1) {
//lr_compute(8*loop_iters, bytes, times, &(stage->slope), &(stage->intercept));
//} else {
lr_compute(tam, bytes, times, &(stage->slope), &(stage->intercept));
//}
} else {
MPI_Reduce(times, NULL, LR_ARRAY_TAM * loop_iters, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
MPI_Bcast(&(stage->slope), 1, MPI_DOUBLE, ROOT, comm);
MPI_Bcast(&(stage->intercept), 1, MPI_DOUBLE, ROOT, comm);
free(times);
free(bytes);
}
/*
* ========================================================================================
* ========================================================================================
......@@ -266,12 +198,30 @@ void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm com
* ========================================================================================
*/
double init_emulation_comm_time(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
stage->array = malloc(sizeof(char) * (size_t)config_file->granularity);
if(config_file->t_op_comms != 0) {
stage->t_op = config_file->t_op_comms;
return time;
}
MPI_Barrier(comm);
start_time = MPI_Wtime();
process_stage(*config_file, *stage, group, comm);
MPI_Barrier(comm);
stage->t_op = ceil((MPI_Wtime() - start_time) / stage->operations); //Tiempo de una operacion
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
return time;
}
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double result, t_stage, start_time;
result = 0;
t_stage = stage->t_stage * config_file->factors[group.grp];
initMatrix(&(stage->double_array), config_file->granularity);
initMatrix(&(stage->double_array), (size_t) config_file->granularity);
if(compute) {
start_time = MPI_Wtime();
......@@ -281,7 +231,7 @@ double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t
}
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage->operations = t_stage / stage->t_op;
stage->operations = (int) ceil(t_stage / stage->t_op);
return result;
}
......@@ -299,106 +249,81 @@ double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *st
}
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage->operations = t_stage / stage->t_op;
stage->operations = (int) ceil(t_stage / stage->t_op);
return result;
}
void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
struct Dist_data dist_data;
int aux_bytes = stage->bytes;
if(stage->array != NULL)
free(stage->array);
if(stage->bytes == 0) {
stage->bytes = (stage->t_stage - config_file->latency_m) * config_file->bw_m;
if(aux_bytes == 0) {
//aux_bytes = (stage->t_stage - config_file->latency_m) * config_file->bw_m;
init_emulation_comm_time(group, config_file, stage, comm);
}
get_block_dist(stage->bytes, group.myId, group.numP, &dist_data);
stage->real_bytes = dist_data.tamBl;
stage->array = malloc(sizeof(char) * stage->real_bytes);
stage->real_bytes = aux_bytes;
stage->array = malloc(sizeof(char) * (size_t)stage->real_bytes);
}
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
}
double time = 0;
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->real_bytes);
if(stage->bytes != 0) {
stage->real_bytes = stage->bytes;
stage->array = malloc(sizeof(char) * (size_t)stage->real_bytes);
} else { // Prepare to emulate Collective as PtoP
time = init_emulation_comm_time(group, config_file, stage, comm);
}
return time;
}
double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
double time=0;
struct Dist_data dist_data;
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
}
if(stage->counts.counts != NULL)
freeCounts(&(stage->counts));
prepare_comm_allgatherv(group.numP, stage->real_bytes, &(stage->counts));
get_block_dist(stage->real_bytes, group.myId, group.numP, &dist_data);
stage->my_bytes = dist_data.tamBl;
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->my_bytes);
if(stage->full_array != NULL)
free(stage->full_array);
stage->full_array = malloc(sizeof(char) * stage->real_bytes);
stage->real_bytes = stage->bytes;
if(stage->bytes != 0) {
prepare_comm_allgatherv(group.numP, stage->real_bytes, &(stage->counts));
get_block_dist(stage->real_bytes, group.myId, group.numP, &dist_data);
stage->my_bytes = dist_data.tamBl;
stage->array = malloc(sizeof(char) * (size_t)stage->my_bytes);
if(stage->full_array != NULL)
free(stage->full_array);
stage->full_array = malloc(sizeof(char) * (size_t)stage->real_bytes);
if(stage->counts.counts != NULL)
freeCounts(&(stage->counts));
} else {
time = init_emulation_comm_time(group, config_file, stage, comm);
}
return time;
}
double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
double time = 0;
if(stage->array != NULL)
free(stage->array);
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
if(stage->bytes != 0) {
stage->array = malloc(sizeof(char) * (size_t)stage->real_bytes);
//Full array para el reduce necesita el mismo tamanyo
if(stage->full_array != NULL)
free(stage->full_array);
stage->full_array = malloc(sizeof(char) * (size_t)stage->real_bytes);
} else {
init_emulation_comm_time(group, config_file, stage, comm);
}
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->real_bytes);
//Full array para el reduce necesita el mismo tamanyo
if(stage->full_array != NULL)
free(stage->full_array);
stage->full_array = malloc(sizeof(char) * stage->real_bytes);
return time;
}
......@@ -27,7 +27,7 @@ $(BIN) : $(BUILD_DIR)/$(BIN)
# Actual target of the binary - depends on all .o files.
$(BUILD_DIR)/$(BIN) : $(OBJ)
$(MCC) $(C_FLAGS) $^ -o $@ $(LD_FLAGS)
$(MCC) $(C_FLAGS_ALL) $^ -o $@ $(LD_FLAGS)
# Include all .d files
# .d files are used for knowing the dependencies of each source file
......@@ -40,7 +40,7 @@ $(BUILD_DIR)/$(BIN) : $(OBJ)
# the same name as the .o file.
$(BUILD_DIR)/%.o : %.c
mkdir -p $(@D)
$(MCC) $(C_FLAGS) -MMD -c $< -o $@
$(MCC) $(C_FLAGS_ALL) -MMD -c $< -o $@
clean:
-rm $(BUILD_DIR)/$(BIN) $(OBJ) $(DEP)
......
......@@ -5,14 +5,14 @@
#include "distribution_methods/block_distribution.h"
#include "CommDist.h"
void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts);
void send_sync_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts);
void send_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
void send_async_point_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
/*
......@@ -24,7 +24,7 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
struct Dist_data dist_data;
get_block_dist(qty, myId, numP, &dist_data);
if( (*array = malloc(dist_data.tamBl * sizeof(char))) == NULL) {
if( (*array = malloc( (size_t) dist_data.tamBl * sizeof(char))) == NULL) {
printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl);
exit(1);
}
......@@ -51,23 +51,20 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
*
* El vector array no se modifica en esta funcion.
*/
int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child) {
int rootBcast = MPI_PROC_NULL;
int send_sync(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child) {
int *idS = NULL;
struct Counts counts;
struct Dist_data dist_data;
if(myId == root) rootBcast = MPI_ROOT;
get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
dist_data.intercomm = intercomm;
// Create arrays which contains info about how many elements will be send to each created process
mallocCounts(&counts, numP_child);
mallocCounts(&counts, (size_t)numP_child);
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
send_sync_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts);
send_sync_arrays(dist_data, array, numP_child, counts);
freeCounts(&counts);
free(idS);
......@@ -83,23 +80,23 @@ int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm inter
* El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
* Tiene que ser liberado posteriormente por el usuario.
*/
void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents) {
void recv_sync(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents) {
int *idS = NULL;
struct Counts counts;
struct Dist_data dist_data;
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char));
*array = malloc( (size_t)dist_data.tamBl * sizeof(char));
//(*array)[dist_data.tamBl] = '\0';
dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
mallocCounts(&counts, numP_parents);
mallocCounts(&counts, (size_t)numP_parents);
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
recv_sync_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts);
recv_sync_arrays(dist_data, *array, numP_parents, counts);
//printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
freeCounts(&counts);
......@@ -111,21 +108,9 @@ void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm int
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos.
*/
void send_sync_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts) {
void send_sync_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts) {
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
/*int i;
// PREPARAR ENVIO DEL VECTOR
if(idI == 0) {
set_counts(0, numP_child, dist_data, counts.counts);
idI++;
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_child, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
}*/
//print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
/* COMUNICACION DE DATOS */
MPI_Alltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm);
}
......@@ -135,7 +120,7 @@ void send_sync_arrays(struct Dist_data dist_data, char *array, int rootBcast, in
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos.
*/
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts) {
void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts) {
char aux;
......@@ -172,19 +157,17 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num
*
* El vector array no se modifica en esta funcion.
*/
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
int i, rootBcast = MPI_PROC_NULL;
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
int i;
int *idS = NULL;
struct Counts counts;
struct Dist_data dist_data;
if(myId == root) rootBcast = MPI_ROOT;
get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
dist_data.intercomm = intercomm;
// Create arrays which contains info about how many elements will be send to each created process
mallocCounts(&counts, numP_child);
mallocCounts(&counts, (size_t)numP_child);
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
......@@ -192,20 +175,20 @@ int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm inte
if(parents_wait == MAL_USE_NORMAL) {
//*comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
*comm_req[0] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &(*comm_req[0]));
send_async_arrays(dist_data, array, numP_child, counts, &(*comm_req[0]));
} else if (parents_wait == MAL_USE_IBARRIER){
//*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
*comm_req[0] = MPI_REQUEST_NULL;
*comm_req[1] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &((*comm_req)[1]));
send_async_arrays(dist_data, array, numP_child, counts, &((*comm_req)[1]));
MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
} else if (parents_wait == MAL_USE_POINT){
//*comm_req = (MPI_Request *) malloc(numP_child * sizeof(MPI_Request));
for(i=0; i<numP_child; i++){
(*comm_req)[i] = MPI_REQUEST_NULL;
}
send_async_point_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, *comm_req);
send_async_point_arrays(dist_data, array, numP_child, counts, *comm_req);
} else if (parents_wait == MAL_USE_THREAD) { //TODO
}
......@@ -225,7 +208,7 @@ int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm inte
* El argumento "parents_wait" sirve para indicar si se usará la versión en la los padres
* espera a que terminen de enviar, o en la que esperan a que los hijos acaben de recibir.
*/
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents, int parents_wait) {
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait) {
int *idS = NULL;
int wait_err, i;
struct Counts counts;
......@@ -234,27 +217,27 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char));
*array = malloc( (size_t)dist_data.tamBl * sizeof(char));
dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
mallocCounts(&counts, numP_parents);
mallocCounts(&counts, (size_t)numP_parents);
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
// MAL_USE_THREAD sigue el camino sincrono
if(parents_wait == MAL_USE_POINT) {
comm_req = (MPI_Request *) malloc(numP_parents * sizeof(MPI_Request));
comm_req = (MPI_Request *) malloc((size_t)numP_parents * sizeof(MPI_Request));
for(i=0; i<numP_parents; i++){
comm_req[i] = MPI_REQUEST_NULL;
}
recv_async_point_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, comm_req);
recv_async_point_arrays(dist_data, *array, numP_parents, counts, comm_req);
wait_err = MPI_Waitall(numP_parents, comm_req, MPI_STATUSES_IGNORE);
} else if (parents_wait == MAL_USE_NORMAL || parents_wait == MAL_USE_IBARRIER) {
comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
*comm_req = MPI_REQUEST_NULL;
recv_async_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, comm_req);
recv_async_arrays(dist_data, *array, numP_parents, counts, comm_req);
wait_err = MPI_Wait(comm_req, MPI_STATUS_IGNORE);
} else if (parents_wait == MAL_USE_THREAD) { //TODO
}
......@@ -281,21 +264,9 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in
*
* El envio se realiza a partir de una comunicación colectiva.
*/
void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
/*
// PREPARAR ENVIO DEL VECTOR
if(idI == 0) {
set_counts(0, numP_child, dist_data, counts.counts);
idI++;
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_child, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
}
*/
//print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
/* COMUNICACION DE DATOS */
MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
......@@ -308,21 +279,11 @@ void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, i
*
* El envio se realiza a partir de varias comunicaciones punto a punto.
*/
void send_async_point_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
int i;
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
// PREPARAR ENVIO DEL VECTOR
/*
if(idI == 0) {
set_counts(0, numP_child, dist_data, counts.counts);
idI++;
MPI_Isend(array, counts.counts[0], MPI_CHAR, 0, 99, dist_data.intercomm, &(comm_req[0]));
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_child, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
}*/
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
for(i=0; i<numP_child; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
if(counts.counts[0] != 0) {
MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
......@@ -338,20 +299,11 @@ void send_async_point_arrays(struct Dist_data dist_data, char *array, int rootBc
*
* La recepcion se realiza a partir de una comunicacion colectiva.
*/
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
char *aux = malloc(1);
// Ajustar los valores de recepcion
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
/*
if(idI == 0) {
set_counts(0, numP_parents, dist_data, counts.counts);
idI++;
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_parents, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
}*/
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
/* COMUNICACION DE DATOS */
......@@ -366,22 +318,12 @@ void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int nu
*
* La recepcion se realiza a partir de varias comunicaciones punto a punto.
*/
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
int i;
// Ajustar los valores de recepcion
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
/*
if(idI == 0) {
set_counts(0, numP_parents, dist_data, counts.counts);
idI++;
MPI_Irecv(array, counts.counts[0], MPI_CHAR, 0, 99, dist_data.intercomm, &(comm_req[0])); //FIXME BUffer recv
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_parents, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
}*/
for(i=0; i<numP_parents; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
if(counts.counts[0] != 0) {
MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
......
......@@ -16,12 +16,12 @@
//#define MAL_USE_POINT 2
//#define MAL_USE_THREAD 3
int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child);
void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents);
int send_sync(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child);
void recv_sync(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents);
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait);
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents, int parents_wait);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait);
void malloc_comm_array(char **array, int qty, int myId, int numP);
......
......@@ -18,7 +18,7 @@ void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Cou
struct Dist_data dist_data;
get_block_dist(n, myId, numP, &dist_data);
mallocCounts(counts, numP_other);
mallocCounts(counts, (size_t)numP_other);
get_util_ids(dist_data, numP_other, &idS);
if(idS[0] == 0) {
......@@ -42,7 +42,7 @@ void prepare_comm_allgatherv(int numP, int n, struct Counts *counts) {
int i;
struct Dist_data dist_data;
mallocCounts(counts, numP);
mallocCounts(counts, (size_t)numP);
get_block_dist(n, 0, numP, &dist_data);
counts->counts[0] = dist_data.tamBl;
......@@ -187,7 +187,7 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS) {
* El vector zero_arr se utiliza cuando se quiere indicar un vector incializado
* a 0 en todos sus elementos. Sirve para indicar que no hay comunicacion.
*/
void mallocCounts(struct Counts *counts, int numP) {
void mallocCounts(struct Counts *counts, size_t numP) {
counts->counts = calloc(numP, sizeof(int));
if(counts->counts == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment