Commit 8a9e883d authored by iker_martin's avatar iker_martin
Browse files

Anyadiendo funcionalidad de fases de iteracion. Se esta refactorizando los...

Anyadiendo funcionalidad de fases de iteracion. Se esta refactorizando los codigos de IOcodes y la configuracion
parent 9f7e4ad1
......@@ -7,9 +7,11 @@
#include "ini.h"
void malloc_config_arrays(configuration *user_config, int resizes);
void malloc_config_resizes(configuration *user_config, int resizes);
void init_config_stages(configuration *user_config, int stages);
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type);
void def_struct_config_file_array(configuration *config_file, MPI_Datatype *config_type);
void def_struct_iter_stage(iter_stage_t *iter_stage, MPI_Datatype *config_type);
/*
* Funcion utilizada para leer el fichero de configuracion
......@@ -26,11 +28,18 @@ static int handler(void* user, const char* section, const char* name,
int act_resize = pconfig->actual_resize;
snprintf(resize_name, 10, "resize%d", act_resize);
char *iter_name = malloc(10 * sizeof(char));
int act_iter = pconfig->actual_iter;
snprintf(iter_name, 10, "stage%d", act_iter);
#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0
if (MATCH("general", "resizes")) {
pconfig->resizes = atoi(value) + 1;
malloc_config_arrays(pconfig, pconfig->resizes);
malloc_config_resizes(pconfig, pconfig->resizes);
} else if (MATCH("general", "iter_stages")) {
pconfig->iter_stages = atoi(value);
pconfig->iter_stage = malloc(sizeof(iter_stage_t) * pconfig->iter_stages);
init_config_stages(pconfig, pconfig->iter_stages);
} else if (MATCH("general", "matrix_tam")) {
pconfig->matrix_tam = atoi(value);
} else if (MATCH("general", "comm_tam")) {
......@@ -45,10 +54,18 @@ static int handler(void* user, const char* section, const char* name,
pconfig->cst = atoi(value);
} else if (MATCH("general", "CSS")) {
pconfig->css = atoi(value);
} else if (MATCH("general", "time")) {
pconfig->general_time = atof(value);
// Resize
// Iter stage
} else if (MATCH(iter_name, "PT")) {
pconfig->iter_stage[act_iter].pt = atoi(value);
} else if (MATCH(iter_name, "bytes")) {
pconfig->iter_stage[act_iter].bytes = atoi(value);
} else if (MATCH(iter_name, "t_stage")) {
pconfig->iter_stage[act_iter].t_stage = atof(value);
pconfig->actual_iter = pconfig->actual_iter+1; // Ultimo elemento del grupo
// Resize stage
} else if (MATCH(resize_name, "iters")) {
pconfig->iters[act_resize] = atoi(value);
} else if (MATCH(resize_name, "procs")) {
......@@ -72,6 +89,7 @@ static int handler(void* user, const char* section, const char* name,
}
free(resize_name);
free(iter_name);
return 1;
}
......@@ -91,6 +109,7 @@ configuration *read_ini_file(char *file_name) {
return NULL;
}
config->actual_resize=0;
config->actual_iter=0;
if(ini_parse(file_name, handler, config) < 0) { // Obtener configuracion
printf("Can't load '%s'\n", file_name);
......@@ -110,7 +129,7 @@ configuration *read_ini_file(char *file_name) {
* - read_ini_file
* - recv_config_file
*/
void malloc_config_arrays(configuration *user_config, int resizes) {
void malloc_config_resizes(configuration *user_config, int resizes) {
if(user_config != NULL) {
user_config->iters = malloc(sizeof(int) * resizes);
user_config->procs = malloc(sizeof(int) * resizes);
......@@ -119,16 +138,50 @@ void malloc_config_arrays(configuration *user_config, int resizes) {
}
}
/*
* Inicializa la memoria para las fases de iteraciones.
* No se reserva memoria, pero si se pone a NULL
* para poder liberar correctamente cada fase.
*
* Se puede obtener a traves de las funciones
* - read_ini_file
* - recv_config_file
*/
void init_config_stages(configuration *user_config, int stages) {
int i;
if(user_config != NULL) {
for(i=0; i<user_config->iter_stages; i++) {
user_config->iter_stage[i].array = NULL;
user_config->iter_stage[i].full_array = NULL;
user_config->iter_stage[i].double_array = NULL;
}
}
}
/*
* Libera toda la memoria de una estructura de configuracion
*/
void free_config(configuration *user_config) {
int i;
if(user_config != NULL) {
free(user_config->iters);
free(user_config->procs);
free(user_config->factors);
free(user_config->phy_dist);
for(i=0; i < user_config->iter_stages; i++) {
if(user_config->iter_stage[i].array != NULL)
free(user_config->iter_stage[i].array);
user_config->iter_stage[i].array = NULL;
if(user_config->iter_stage[i].full_array != NULL)
free(user_config->iter_stage[i].full_array);
user_config->iter_stage[i].full_array = NULL;
if(user_config->iter_stage[i].double_array != NULL)
free(user_config->iter_stage[i].double_array);
user_config->iter_stage[i].double_array = NULL;
}
free(user_config->iter_stage);
free(user_config);
}
}
......@@ -140,8 +193,12 @@ void free_config(configuration *user_config) {
void print_config(configuration *user_config, int grp) {
if(user_config != NULL) {
int i;
printf("Config loaded: resizes=%d, matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d, time=%f || grp=%d\n",
user_config->resizes, user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst, user_config->general_time, grp);
printf("Config loaded: resizes=%d, stages=%d, matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d || grp=%d\n",
user_config->resizes, user_config->iter_stages, user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst, grp);
for(i=0; i<user_config->iter_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d\n",
i, user_config->iter_stage[i].pt, user_config->iter_stage[i].t_stage, user_config->iter_stage[i].bytes);
}
for(i=0; i<user_config->resizes; i++) {
printf("Resize %d: Iters=%d, Procs=%d, Factors=%f, Phy=%d\n",
i, user_config->iters[i], user_config->procs[i], user_config->factors[i], user_config->phy_dist[i]);
......@@ -155,6 +212,7 @@ void print_config(configuration *user_config, int grp) {
* solo grupo de procesos en su configuracion.
*/
void print_config_group(configuration *user_config, int grp) {
int i;
if(user_config != NULL) {
int parents, sons;
parents = sons = 0;
......@@ -165,8 +223,12 @@ void print_config_group(configuration *user_config, int grp) {
sons = user_config->procs[grp+1];
}
printf("Config: matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d, time=%f\n",
user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst, user_config->general_time);
printf("Config: matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d\n",
user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst);
for(i=0; i<user_config->iter_stages; i++) {
printf("Stage %d: PT=%d, T_stage=%lf, bytes=%d\n",
i, user_config->iter_stage[i].pt, user_config->iter_stage[i].t_stage, user_config->iter_stage[i].bytes);
}
printf("Config Group: iters=%d, factor=%f, phy=%d, procs=%d, parents=%d, sons=%d\n",
user_config->iters[grp], user_config->factors[grp], user_config->phy_dist[grp], user_config->procs[grp], parents, sons);
}
......@@ -188,7 +250,7 @@ void print_config_group(configuration *user_config, int grp) {
*/
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm) {
MPI_Datatype config_type, config_type_array;
MPI_Datatype config_type, config_type_array, iter_stage_type;
// Obtener un tipo derivado para enviar todos los
// datos escalares con una sola comunicacion
......@@ -199,13 +261,19 @@ void send_config_file(configuration *config_file, int root, MPI_Comm intercomm)
// de enteros con una sola comunicacion
def_struct_config_file_array(config_file, &config_type_array);
// Obtener un tipo derivado para enviar las estructuras de fases de iteracion
// con una sola comunicacion
def_struct_iter_stage(&(config_file->iter_stage[0]), &iter_stage_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->iter_stage, config_file->iter_stages, iter_stage_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
MPI_Type_free(&iter_stage_type);
}
/*
......@@ -219,9 +287,9 @@ void send_config_file(configuration *config_file, int root, MPI_Comm intercomm)
* La memoria de la configuracion devuelta tiene que ser liberada con
* la funcion "free_config".
*/
configuration *recv_config_file(int root, MPI_Comm intercomm) {
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out) {
MPI_Datatype config_type, config_type_array;
MPI_Datatype config_type, config_type_array, iter_stage_type;
configuration *config_file = malloc(sizeof(configuration) * 1);
......@@ -231,50 +299,57 @@ configuration *recv_config_file(int root, MPI_Comm intercomm) {
def_struct_config_file(config_file, &config_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
//Inicializado de estructuras internas
malloc_config_resizes(config_file, config_file->resizes); // Reserva de memoria de los vectores
config_file->iter_stage = malloc(sizeof(iter_stage_t) * config_file->iter_stages);
// Obtener un tipo derivado para enviar los tres vectores
// de enteros con una sola comunicacion
malloc_config_arrays(config_file, config_file->resizes); // Reserva de memoria de los vectores
def_struct_config_file_array(config_file, &config_type_array);
def_struct_iter_stage(&(config_file->iter_stage[0]), &iter_stage_type);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->resizes, MPI_FLOAT, root, intercomm);
MPI_Bcast(config_file->iter_stage, config_file->iter_stages, iter_stage_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
MPI_Type_free(&iter_stage_type);
return config_file;
init_config_stages(config_file, config_file->iter_stages); // Inicializar a NULL vectores
*config_file_out = config_file;
}
/*
* Tipo derivado para enviar 6 elementos especificos
* Tipo derivado para enviar 12 elementos especificos
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 11;
int blocklengths[11] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
int i, counts = 12;
int blocklengths[12] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = types[7] = types[8] = MPI_INT;
types[9] = MPI_FLOAT;
types[10] = MPI_DOUBLE;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = types[7] = types[8] = types[9] = MPI_INT;
types[10] = types[11] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(&(config_file->resizes), &displs[0]);
MPI_Get_address(&(config_file->actual_resize), &displs[1]);
MPI_Get_address(&(config_file->matrix_tam), &displs[2]);
MPI_Get_address(&(config_file->comm_tam), &displs[3]);
MPI_Get_address(&(config_file->sdr), &displs[4]);
MPI_Get_address(&(config_file->adr), &displs[5]);
MPI_Get_address(&(config_file->aib), &displs[6]);
MPI_Get_address(&(config_file->css), &displs[7]);
MPI_Get_address(&(config_file->cst), &displs[8]);
MPI_Get_address(&(config_file->general_time), &displs[9]);
MPI_Get_address(&(config_file->Top), &displs[10]);
MPI_Get_address(&(config_file->iter_stages), &displs[1]);
MPI_Get_address(&(config_file->actual_resize), &displs[2]);
MPI_Get_address(&(config_file->matrix_tam), &displs[3]);
MPI_Get_address(&(config_file->comm_tam), &displs[4]);
MPI_Get_address(&(config_file->sdr), &displs[5]);
MPI_Get_address(&(config_file->adr), &displs[6]);
MPI_Get_address(&(config_file->aib), &displs[7]);
MPI_Get_address(&(config_file->css), &displs[8]);
MPI_Get_address(&(config_file->cst), &displs[9]);
MPI_Get_address(&(config_file->latency_m), &displs[10]);
MPI_Get_address(&(config_file->bw_m), &displs[11]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -313,3 +388,33 @@ void def_struct_config_file_array(configuration *config_file, MPI_Datatype *conf
MPI_Type_create_resized(aux, 0, 1*sizeof(int), config_type);
MPI_Type_commit(config_type);
}
/*
* Tipo derivado para enviar elementos especificos
* de la estructuras de fases de iteracion en una sola comunicacion.
*/
void def_struct_iter_stage(iter_stage_t *iter_stage, MPI_Datatype *config_type) {
int i, counts = 4;
int blocklengths[4] = {1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = MPI_INT;
types[1] = MPI_FLOAT;
types[2] = types[3] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(iter_stage, &dir);
MPI_Get_address(&(iter_stage->pt), &displs[0]);
MPI_Get_address(&(iter_stage->t_stage), &displs[1]);
MPI_Get_address(&(iter_stage->t_op), &displs[2]);
MPI_Get_address(&(iter_stage->bytes), &displs[3]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, config_type);
MPI_Type_commit(config_type);
}
......@@ -5,19 +5,33 @@
typedef struct
{
int resizes;
int actual_resize;
int pt; // Procedure type
float t_stage; // Time to complete the stage
double t_op;
int operations;
int bytes, bytes_real;
char* array, *full_array;
double* double_array;
} iter_stage_t;
typedef struct
{
int resizes, iter_stages;
int actual_resize, actual_iter;
int matrix_tam, comm_tam, sdr, adr;
int css, cst;
int aib;
float general_time;
double Top;
double latency_m, bw_m;
int *iters, *procs, *phy_dist;
float *factors;
iter_stage_t *iter_stage;
} configuration;
configuration *read_ini_file(char *file_name);
void free_config(configuration *user_config);
void print_config(configuration *user_config, int grp);
......@@ -25,4 +39,4 @@ void print_config_group(configuration *user_config, int grp);
// MPI Intercomm functions
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm);
configuration *recv_config_file(int root, MPI_Comm intercomm);
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out);
......@@ -144,28 +144,17 @@ void compute_results_iter(results_data *results, int myId, int root, MPI_Comm co
/*
* Imprime por pantalla los resultados locales.
* Estos son los relacionados con las iteraciones, que son el tiempo
* por iteracion, el tipo (Normal o durante communicacion asincrona)
* y cuantas operaciones internas se han realizado en cada iteracion.
* por iteracion, el tipo (Normal o durante communicacion asincrona).
*/
void print_iter_results(results_data results, int last_normal_iter_index) {
int i, aux;
int i;
printf("Titer: ");
for(i=0; i< results.iter_index; i++) {
printf("%lf ", results.iters_time[i]);
}
printf("\nTtype: "); //FIXME modificar a imprimir solo la cantidad de asincronas
for(i=0; i< results.iter_index; i++) {
printf("%d ", results.iters_type[i] == 0);
}
printf("\nTop: "); //TODO modificar a imprimir solo cuantas operaciones cuestan una iteracion?
for(i=0; i< results.iter_index; i++) {
aux = results.iters_type[i] == 0 ? results.iters_type[last_normal_iter_index] : results.iters_type[i];
printf("%d ", aux);
}
printf("\n");
printf("\nTtype: %d\n", results.iters_async);
}
/*
......@@ -221,25 +210,22 @@ void init_results_data(results_data *results, int resizes, int iters_size) {
results->iters_size = iters_size + 100;
results->iters_time = calloc(iters_size + 100, sizeof(double)); //FIXME Numero magico
results->iters_type = calloc(iters_size + 100, sizeof(int));
results->iters_async = 0;
results->iter_index = 0;
}
void realloc_results_iters(results_data *results, int needed) {
double *time_aux;
int *type_aux;
time_aux = (double *) realloc(results->iters_time, needed * sizeof(double));
type_aux = (int *) realloc(results->iters_type, needed * sizeof(int));
if(time_aux == NULL || type_aux == NULL) {
if(time_aux == NULL) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria de resultados\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
results->iters_time = time_aux;
results->iters_type = type_aux;
}
/*
......@@ -254,7 +240,6 @@ void free_results_data(results_data *results) {
free(results->async_time);
free(results->iters_time);
free(results->iters_type);
}
//free(*results); FIXME Borrar
}
......@@ -7,7 +7,7 @@
typedef struct {
// Iters data
double *iters_time;
int *iters_type, iter_index, iters_size;
int iters_async, iter_index, iters_size;
// Spawn, Thread, Sync, Async and Exec time
double spawn_start, *spawn_time, *spawn_real_time;
......
......@@ -4,15 +4,14 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include "computing_func.h"
#include "process_stage.h"
#include "Main_datatypes.h"
#include "../malleability/CommDist.h"
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
#define ROOT 0
int work();
void iterate(double *matrix, int n, int async_comm, int iter);
double iterate(double *matrix, int n, int async_comm, int iter);
void init_group_struct(char *argv[], int argc, int myId, int numP);
void init_application();
......@@ -24,21 +23,6 @@ int print_local_results();
int print_final_results();
int create_out_file(char *nombre, int *ptr, int newstdout);
typedef struct {
int myId;
int numP;
int grp;
int iter_start;
int argc;
int numS; // Cantidad de procesos hijos
MPI_Comm children, parents;
char *compute_comm_array;
char **argv;
char *sync_array, *async_array;
} group_data;
configuration *config_file;
group_data *group;
results_data *results;
......@@ -90,10 +74,6 @@ int main(int argc, char *argv[]) {
get_benchmark_results(&results);
set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
if(config_file->comm_tam) {
group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char));
}
// TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos
void *value = NULL;
......@@ -130,6 +110,7 @@ int main(int argc, char *argv[]) {
do {
group->grp = group->grp + 1;
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
set_benchmark_grp(group->grp);
get_malleability_user_comm(&comm);
MPI_Comm_size(comm, &(group->numP));
......@@ -162,7 +143,7 @@ int main(int argc, char *argv[]) {
//
if(res==1) { // Se he llegado al final de la aplicacion
if(res==1) { // Se ha llegado al final de la aplicacion
MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
results->exec_time = MPI_Wtime() - results->exec_start;
}
......@@ -176,6 +157,7 @@ int main(int argc, char *argv[]) {
MPI_Abort(MPI_COMM_WORLD, -100);
}
free_application_data();
MPI_Finalize();
return 0;
......@@ -221,6 +203,7 @@ int work() {
}
state = malleability_checkpoint();
}
if(config_file->resizes - 1 == group->grp) res=1;
if(state == MAL_ZOMBIE) res=state;
......@@ -239,48 +222,31 @@ int work() {
* Simula la ejecucción de una iteración de computo en la aplicación
* que dura al menos un tiempo de "time" segundos.
*/
void iterate(double *matrix, int n, int async_comm, int iter) {
double iterate(double *matrix, int n, int async_comm, int iter) {
double start_time, actual_time;
double time = config_file->general_time * config_file->factors[group->grp];
double Top = config_file->Top;
int i, operations = 0;
int i, cnt_async = 0;
double aux = 0;
start_time = MPI_Wtime();
operations = time / Top; //FIXME Calcular una sola vez
for(i=0; i < operations; i++) {
aux += computePiSerial(n);
}
/*
if(time >= 1) {
sleep(time);
}
else {
unsigned int sleep_time = time * 1000000;
usleep(sleep_time);
}
*/
if(config_file->comm_tam) {
MPI_Bcast(group->compute_comm_array, config_file->comm_tam, MPI_CHAR, ROOT, comm);
for(i=0; i < config_file->iter_stages; i++) {
aux+= process_stage((void*)config_file, i, (void*)group, comm);
}
actual_time = MPI_Wtime(); // Guardar tiempos
// TODO Que diferencie entre ambas en el IO
if(async_comm == MAL_DIST_PENDING || async_comm == MAL_SPAWN_PENDING || async_comm == MAL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
operations=0;
cnt_async=1;
}
if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
realloc_results_iters(results, results->iters_size + 100);
}
results->iters_time[results->iter_index] = actual_time - start_time;
results->iters_type[results->iter_index] = operations;
results->iters_async += cnt_async;
results->iter_index = results->iter_index + 1;
return aux;
}
//======================================================||
......@@ -340,7 +306,7 @@ int print_local_results() {
* y las comunicaciones.
*/
int print_final_results() {
int ptr_global, err;
int ptr_global, err, ptr_out;
char *file_name;
if(group->myId == ROOT) {
......@@ -352,12 +318,15 @@ int print_final_results() {
err = snprintf(file_name, 20, "R%d_Global.out", run_id);
if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero
ptr_out = dup(1);
create_out_file(file_name, &ptr_global, 1);
print_config(config_file, group->grp);
print_global_results(*results, config_file->resizes);
fflush(stdout);
free(file_name);
close(1);
dup(ptr_out);
}
}
return 0;
......@@ -397,43 +366,33 @@ void init_application() {
config_file = read_ini_file(group->argv[1]);
results = malloc(sizeof(results_data));
init_results_data(results, config_file->resizes, config_file->iters[group->grp]);
if(config_file->comm_tam) {
group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char));
}
if(config_file->sdr) {
malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
}
if(config_file->adr) {
malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
}
obtain_op_times();
int message_tam = 100000000;
config_file->latency_m = latency(group->myId, group->numP, comm);
config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
obtain_op_times(1);
}
/*
* Obtiene cuanto tiempo es necesario para realizar una operacion de PI
*/
void obtain_op_times() {
double result, start_time = MPI_Wtime();
int i, qty = 20000;
result = 0;
for(i=0; i<qty; i++) {
result += computePiSerial(config_file->matrix_tam);
void obtain_op_times(int compute) {
int i;
for(i=0; i<config_file->iter_stages; i++) {
init_stage((void*)config_file, i, (void*)group, comm, compute);
}
//printf("Creado Top con valor %lf\n", result);
//fflush(stdout);
config_file->Top = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion
MPI_Bcast(&(config_file->Top), 1, MPI_DOUBLE, ROOT, comm);
}
/*
* Libera toda la memoria asociada con la aplicacion
*/
void free_application_data() {
if(config_file->comm_tam) {
free(group->compute_comm_array);
}
if(config_file->sdr) {
free(group->sync_array);
}
......
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#define ROOT 0
typedef struct {
int myId;
int numP;
int grp;
int iter_start;
int argc;
int numS; // Cantidad de procesos hijos
MPI_Comm children, parents;
char *compute_comm_array, *compute_comm_recv;
char **argv;
char *sync_array, *async_array;
} group_data;
......@@ -54,3 +54,12 @@ void initMatrix(double **matrix, int n) {
}
}
}
void freeMatrix(double **matrix) {
// Init matrix
if(*matrix != NULL) {
free(*matrix);
*matrix = NULL;
}
}
double computeMatrix(double *matrix, int n);
double computePiSerial(int n);
void initMatrix(double **matrix, int n);
void freeMatrix(double **matrix);
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#include "comunication_func.h"
/*
* Realiza una comunicación punto a punto en orden
* El proceso X envia a X+1 y recibe de X-1
*/
void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, int qty) {
int prev, next;
next = (myId+1) % numP;
prev = (myId == 0 ? numP-1 : myId-1);
if(myId == root) {
MPI_Send(array, qty, MPI_CHAR, next, 99, comm);
MPI_Recv(array, qty, MPI_CHAR, prev, 99, comm, MPI_STATUS_IGNORE);
} else {
MPI_Recv(array, qty, MPI_CHAR, prev, 99, comm, MPI_STATUS_IGNORE);
MPI_Send(array, qty, MPI_CHAR, next, 99, comm);
}
}
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, int qty);
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <mpi.h>
#include "computing_func.h"
#include "comunication_func.h"
#include "Main_datatypes.h"
#include "process_stage.h"
#include "../malleability/malleabilityManager.h" //FIXME Refactor
/*
* Calcula el tiempo por operacion o total de bytes a enviar
* de cada fase de iteración para despues realizar correctamente
* las iteraciones.
*
* Solo es calculado por el proceso ROOT que tras ello lo envia al
* resto de procesos.
*
* Si la bandera "compute" esta activada, se realizaran las operaciones
* para recalcular los tiempos desde 0. Si esta en falso solo se reservara
* la memoria necesaria y utilizara los valores obtenidos en anteriores
* llamadas. Todos los procesos tienen que indicar el mismo valor en
* la bandera.
*
* TODO Que el trabajo se divida entre los procesos.
*/
void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm, int compute) {
double result, start_time, t_stage;
int qty = 20000;
group_data group = *((group_data *) group_void);
configuration config_file = *((configuration *) config_file_void);
config_file.iter_stage[stage].operations = qty;
t_stage = config_file.iter_stage[stage].t_stage * config_file.factors[group.grp];
if(config_file.iter_stage[stage].bytes == 0) {
config_file.iter_stage[stage].bytes = (t_stage - config_file.latency_m) * config_file.bw_m;
} else {
//config_file.iter_stage[stage].bytes = config_file.iter_stage[stage].bytes;
}
start_time = MPI_Wtime();
result = 0;
switch(config_file.iter_stage[stage].pt) {
//Computo
case COMP_MATRIX:
initMatrix(&(config_file.iter_stage[stage].double_array), config_file.matrix_tam);
case COMP_PI:
if(group.myId == ROOT && compute) {
result+= process_stage(config_file_void, stage, group_void, comm);
}
break;
//Comunicación
case COMP_POINT:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
break;
case COMP_BCAST:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
break;
case COMP_ALLTOALL:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
if(config_file.iter_stage[stage].full_array != NULL)
free(config_file.iter_stage[stage].full_array);
config_file.iter_stage[stage].full_array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes * group.numP);
break;
case COMP_REDUCE:
if(config_file.iter_stage[stage].array != NULL)
free(config_file.iter_stage[stage].array);
config_file.iter_stage[stage].array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
//Full array para el reduce necesita el mismo tamanyo
if(config_file.iter_stage[stage].full_array != NULL)
free(config_file.iter_stage[stage].full_array);
config_file.iter_stage[stage].full_array = malloc(sizeof(char) * config_file.iter_stage[stage].bytes);
break;
}
if(compute) {
config_file.iter_stage[stage].t_op = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion
MPI_Bcast(&(config_file.iter_stage[stage].t_op), 1, MPI_DOUBLE, ROOT, comm);
}
config_file.iter_stage[stage].operations = t_stage / config_file.iter_stage[stage].t_op;
}
/*
* Procesa una fase de la iteracion, concretando el tipo
* de operacion a realizar y llamando a la funcion que
* realizara la operacion.
*/
double process_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm) {
int i;
double result;
group_data group = *((group_data *) group_void);
configuration config_file = *((configuration *) config_file_void);
iter_stage_t stage_data = config_file.iter_stage[stage];
switch(stage_data.pt) {
//Computo
case COMP_PI:
for(i=0; i < stage_data.operations; i++) {
result += computePiSerial(config_file.matrix_tam);
}
break;
case COMP_MATRIX:
for(i=0; i < stage_data.operations; i++) {
result += computeMatrix(stage_data.double_array, config_file.matrix_tam); //FIXME No da tiempos repetibles
}
break;
//Comunicaciones
case COMP_POINT:
point_to_point(group.myId, group.numP, ROOT, comm, stage_data.array, stage_data.bytes);
break;
case COMP_BCAST:
MPI_Bcast(stage_data.array, stage_data.bytes, MPI_CHAR, ROOT, comm);
break;
case COMP_ALLTOALL:
MPI_Alltoall(stage_data.array, stage_data.bytes, MPI_CHAR, stage_data.full_array, stage_data.bytes, MPI_CHAR, comm);
break;
case COMP_REDUCE:
MPI_Reduce(stage_data.array, stage_data.full_array, stage_data.bytes, MPI_CHAR, MPI_MAX, ROOT, comm);
break;
}
return result;
}
// Se realizan varios tests de latencia al
// mandar un único dato de tipo CHAR a los procesos impares
// desde el par inmediatamente anterior. Tras esto, los impares
// vuelven a enviar el dato al proceso par.
//
// Devuelve la latencia del sistema.
double latency(int myId, int numP, MPI_Comm comm) {
int i, loop_count = 100;
double start_time, stop_time, elapsed_time, max_time;
char aux;
aux = '0';
elapsed_time = 0;
if(myId+1 != numP || (myId+1 == numP && numP % 2 == 0)) {
for(i=0; i<loop_count; i++){
MPI_Barrier(comm);
start_time = MPI_Wtime();
if(myId % 2 == 0){
MPI_Ssend(&aux, 1, MPI_CHAR, myId+1, 99, comm);
MPI_Recv(&aux, 1, MPI_CHAR, myId+1, 99, comm, MPI_STATUS_IGNORE);
}
else if(myId % 2 == 1){
MPI_Recv(&aux, 1, MPI_CHAR, myId-1, 99, comm, MPI_STATUS_IGNORE);
MPI_Ssend(&aux, 1, MPI_CHAR, myId-1, 99, comm);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time += stop_time - start_time;
}
}
if(myId %2 == 0) {
elapsed_time/=loop_count;
}
MPI_Allreduce(&elapsed_time, &max_time, 1, MPI_DOUBLE, MPI_MAX, comm);
return max_time;
}
// Se realizan varios tests de ancho de banda
// al mandar N datos a los procesos impares desde el
// par inmediatamente anterior. Tras esto, los impares
// vuelven a enviar los N datos al proceso par.
//
// Devuelve el tiempo necesario para realizar las pruebas
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n) {
int i, loop_count = 100, n_bytes;
double start_time, stop_time, elapsed_time, bw, time, max_time;
char *aux;
n_bytes = n * sizeof(char);
aux = malloc(n_bytes);
elapsed_time = 0;
if(myId+1 != numP || (myId+1 == numP && numP % 2 == 0)) {
for(i=0; i<loop_count; i++){
MPI_Barrier(comm);
start_time = MPI_Wtime();
if(myId %2 == 0){
MPI_Ssend(aux, n, MPI_CHAR, myId+1, 99, comm);
MPI_Recv(aux, n, MPI_CHAR, myId+1, 99, comm, MPI_STATUS_IGNORE);
}
else if(myId %2 == 1){
MPI_Recv(aux, n, MPI_CHAR, myId-1, 99, comm, MPI_STATUS_IGNORE);
MPI_Ssend(aux, n, MPI_CHAR, myId-1, 99, comm);
}
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time += stop_time - start_time;
}
}
if(myId %2 == 0) {
time = elapsed_time / loop_count - latency;
}
MPI_Allreduce(&time, &max_time, 1, MPI_DOUBLE, MPI_MAX, comm);
bw = ((double)n_bytes * 2) / max_time;
return bw;
}
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
//#include "Main_datatypes.h"
//#include "../malleability/malleabilityManager.h" //FIXME Refactor
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLTOALL, COMP_REDUCE};
//FIXME Refactor el void
void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm, int compute);
double process_stage(void *config_file_void, int stage, void* group_void, MPI_Comm comm);
double latency(int myId, int numP, MPI_Comm comm);
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n);
module load mpich-3.4.1-noucx
#mpicc -Wall Main/Main.c Main/computing_func.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
mpicc -Wall Main/Main.c Main/computing_func.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
mpicc -Wall Main/Main.c Main/computing_func.c Main/comunication_func.c Main/process_stage.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
if [ $# -gt 0 ]
then
......
......@@ -421,7 +421,8 @@ void Children_init() {
MPI_Bcast(&root_parents, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&numP_parents, 1, MPI_INT, root_parents, mall->intercomm);
mall_conf->config_file = recv_config_file(mall->root, mall->intercomm);
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
mall_conf->results = (results_data *) malloc(sizeof(results_data));
init_results_data(mall_conf->results, mall_conf->config_file->resizes, RESULTS_INIT_DATA_QTY);
......
......@@ -9,16 +9,17 @@ codeDir="/Codes"
nodelist=$SLURM_JOB_NODELIST
nodes=$SLURM_JOB_NUM_NODES
configFile=$1
outIndex=$2
echo "MPICH"
module load mpich-3.4.1-noucx
#export HYDRA_DEBUG=1
#-disable-hostname-propagation -disable-auto-cleanup -pmi-port -hosts n00,n01
numP=$(bash recordMachinefile.sh $1)
numP=$(bash recordMachinefile.sh $configFile)
mpirun -print-all-exitcodes -f hostfile.o$SLURM_JOB_ID $dir$codeDir/a.out $1 $2 $nodelist $nodes
#mpirun -np $numP $dir$codeDir/a.out $1 $2 $nodelist $nodes
#mpirun -print-all-exitcodes -f hostfile.o$SLURM_JOB_ID $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes
mpirun -np $numP $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID
echo "END RUN"
......
[general]
resizes=1 ; Numero de redistribuciones
matrix_tam=1000 ; Tamaño en bytes de la matriz de computo
comm_tam=1000 ; Tamaño en bytes de los datos a comunicar en cada iteracion. Una sola vez
SDR=100000000 ; Tamaño en bytes a redistribuir de forma sincrona
ADR=100000000 ; Tamaño en bytes a redistribuir de forma asincrona 1000000000
AIB=3 ; Indica si las redistribuciones asíncronas se consideran terminadas para los padres
; cuando terminan de enviar (0), cuando terminan de recibir los valores (1)
; o usar comunicaciones punto a punto (2), o utilizar hebras(3)
CST=0 ; Indica como realizar el spawn. (0) Para el método baseline, (1) para el método
; baseline con hilos, (2) para el método merge y (3) para el método merge
; con hilos
CSS=0 ; Indica si el spawn se realiza con todos los procesos (0) o solo participa
; el proceso raiz (1)
time=1 ; Tiempo necesario para realizar una iteracion
resizes=1
iter_stages=1
matrix_tam=100000
comm_tam=47192
SDR=0.0
ADR=0.0
AIB=0
CST=1
CSS=0
time=0.006
; end [general]
[resize0] ; Grupo inicial(mpirun)
iters=10 ; Numero de iteraciones a realizar por este grupo
procs=2 ; Cantidad de procesos en el grupo
factor=1 ; Factor de coste
physical_dist=node ; Tipo de redistribución física de los procesos
;end [resize0]
[resize1] ; Grupo de hijos 1
iters=20
[stage0]
PT=3
bytes=0
t_stage=0.05
;end [stage0]
[stage1]
PT=3
bytes=0
t_stage=0.05
;end [stage0]
[resize0]
iters=10
procs=4
factor=1
physical_dist=cpu
;end [resize0]
[resize1]
iters=10
procs=8
factor=0.5
physical_dist=node
physical_dist=cpu
;end [resize1]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment