Commit dae36bb8 authored by iker_martin's avatar iker_martin
Browse files

Arreglado funcionamiento de Regresion Lineal. Refactorizado codigo de...

Arreglado funcionamiento de Regresion Lineal. Refactorizado codigo de Process_stage.c. Protegidos los archivos .h. En el tiempo final se tiene en cuenta el tiempo desperdiciado
parent ca92ad42
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <mpi.h> #include <mpi.h>
#include "read_ini.h" #include "read_ini.h"
#include "../malleability/ProcessDist.h" #include "../malleability/ProcessDist.h"
#include "../malleability/distribution_methods/block_distribution.h"
#include "ini.h" #include "ini.h"
...@@ -40,7 +41,7 @@ static int handler(void* user, const char* section, const char* name, ...@@ -40,7 +41,7 @@ static int handler(void* user, const char* section, const char* name,
pconfig->iter_stages = atoi(value); pconfig->iter_stages = atoi(value);
pconfig->iter_stage = malloc(sizeof(iter_stage_t) * pconfig->iter_stages); pconfig->iter_stage = malloc(sizeof(iter_stage_t) * pconfig->iter_stages);
init_config_stages(pconfig, pconfig->iter_stages); init_config_stages(pconfig, pconfig->iter_stages);
} else if (MATCH("general", "matrix_tam")) { } else if (MATCH("general", "matrix_tam")) { //TODO Refactor cambiar nombre
pconfig->matrix_tam = atoi(value); pconfig->matrix_tam = atoi(value);
} else if (MATCH("general", "SDR")) { } else if (MATCH("general", "SDR")) {
pconfig->sdr = atoi(value); pconfig->sdr = atoi(value);
...@@ -55,32 +56,38 @@ static int handler(void* user, const char* section, const char* name, ...@@ -55,32 +56,38 @@ static int handler(void* user, const char* section, const char* name,
// Iter stage // Iter stage
} else if (MATCH(iter_name, "PT")) { } else if (MATCH(iter_name, "PT")) {
if(pconfig->actual_iter < pconfig->iter_stages)
pconfig->iter_stage[act_iter].pt = atoi(value); pconfig->iter_stage[act_iter].pt = atoi(value);
} else if (MATCH(iter_name, "bytes")) { } else if (MATCH(iter_name, "bytes")) {
if(pconfig->actual_iter < pconfig->iter_stages)
pconfig->iter_stage[act_iter].bytes = atoi(value); pconfig->iter_stage[act_iter].bytes = atoi(value);
} else if (MATCH(iter_name, "t_stage")) { } else if (MATCH(iter_name, "t_stage")) {
if(pconfig->actual_iter < pconfig->iter_stages) {
pconfig->iter_stage[act_iter].t_stage = atof(value); pconfig->iter_stage[act_iter].t_stage = atof(value);
pconfig->actual_iter = pconfig->actual_iter+1; // Ultimo elemento del grupo pconfig->actual_iter = pconfig->actual_iter+1; // Ultimo elemento del grupo
}
// Resize stage // Resize stage
} else if (MATCH(resize_name, "iters")) { } else if (MATCH(resize_name, "iters")) {
if(pconfig->actual_resize < pconfig->resizes)
pconfig->iters[act_resize] = atoi(value); pconfig->iters[act_resize] = atoi(value);
} else if (MATCH(resize_name, "procs")) { } else if (MATCH(resize_name, "procs")) {
if(pconfig->actual_resize < pconfig->resizes)
pconfig->procs[act_resize] = atoi(value); pconfig->procs[act_resize] = atoi(value);
} else if (MATCH(resize_name, "factor")) { } else if (MATCH(resize_name, "factor")) {
if(pconfig->actual_resize < pconfig->resizes)
pconfig->factors[act_resize] = atof(value); pconfig->factors[act_resize] = atof(value);
} else if (MATCH(resize_name, "physical_dist")) { } else if (MATCH(resize_name, "physical_dist")) {
if(pconfig->actual_resize < pconfig->resizes) {
char *aux = strdup(value); char *aux = strdup(value);
if (strcmp(aux, "node") == 0) { if (strcmp(aux, "node") == 0) {
pconfig->phy_dist[act_resize] = COMM_PHY_NODES; pconfig->phy_dist[act_resize] = COMM_PHY_NODES;
} else { } else {
pconfig->phy_dist[act_resize] = COMM_PHY_CPU; pconfig->phy_dist[act_resize] = COMM_PHY_CPU;
} }
free(aux); free(aux);
pconfig->actual_resize = pconfig->actual_resize+1; // Ultimo elemento del grupo pconfig->actual_resize = pconfig->actual_resize+1; // Ultimo elemento del grupo
}
} else { } else {
return 0; /* unknown section or name, error */ return 0; /* unknown section or name, error */
...@@ -152,8 +159,7 @@ void init_config_stages(configuration *user_config, int stages) { ...@@ -152,8 +159,7 @@ void init_config_stages(configuration *user_config, int stages) {
user_config->iter_stage[i].array = NULL; user_config->iter_stage[i].array = NULL;
user_config->iter_stage[i].full_array = NULL; user_config->iter_stage[i].full_array = NULL;
user_config->iter_stage[i].double_array = NULL; user_config->iter_stage[i].double_array = NULL;
user_config->iter_stage[i].counts = NULL; user_config->iter_stage[i].counts.counts = NULL;
user_config->iter_stage[i].displs = NULL;
user_config->iter_stage[i].real_bytes = 0; user_config->iter_stage[i].real_bytes = 0;
} }
} }
...@@ -184,20 +190,13 @@ void free_config(configuration *user_config) { ...@@ -184,20 +190,13 @@ void free_config(configuration *user_config) {
free(user_config->iter_stage[i].double_array); free(user_config->iter_stage[i].double_array);
user_config->iter_stage[i].double_array = NULL; user_config->iter_stage[i].double_array = NULL;
} }
if(user_config->iter_stage[i].counts.counts != NULL) {
if(user_config->iter_stage[i].counts != NULL) { freeCounts(&(user_config->iter_stage[i].counts));
free(user_config->iter_stage[i].counts);
user_config->iter_stage[i].counts = NULL;
}
if(user_config->iter_stage[i].displs != NULL) {
free(user_config->iter_stage[i].displs);
user_config->iter_stage[i].displs = NULL;
} }
} }
//free(user_config->iter_stage); //FIXME ERROR de memoria relacionado con la carpeta malleability //free(user_config->iter_stage); //FIXME ERROR de memoria relacionado con la carpeta malleability
free(user_config); free(user_config);
} }
} }
......
#ifndef READ_INI_H
#define READ_INI_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <mpi.h> #include <mpi.h>
#include "../malleability/distribution_methods/block_distribution.h"
typedef struct typedef struct
{ {
...@@ -10,7 +14,7 @@ typedef struct ...@@ -10,7 +14,7 @@ typedef struct
double t_op; double t_op;
int operations; int operations;
int bytes, real_bytes; int bytes, real_bytes, my_bytes;
// Variables to represent linear regresion // Variables to represent linear regresion
// for collective calls. // for collective calls.
...@@ -20,7 +24,8 @@ typedef struct ...@@ -20,7 +24,8 @@ typedef struct
char* array, *full_array; char* array, *full_array;
double* double_array; double* double_array;
// Arrays to indicate how many bytes are received from each rank // Arrays to indicate how many bytes are received from each rank
int *counts, *displs; //int *counts, *displs;
struct Counts counts;
} iter_stage_t; } iter_stage_t;
...@@ -48,3 +53,5 @@ void print_config_group(configuration *user_config, int grp); ...@@ -48,3 +53,5 @@ void print_config_group(configuration *user_config, int grp);
// MPI Intercomm functions // MPI Intercomm functions
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm); void send_config_file(configuration *config_file, int root, MPI_Comm intercomm);
void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out); void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_out);
#endif
...@@ -60,13 +60,13 @@ void recv_results(results_data *results, int root, int resizes, MPI_Comm interco ...@@ -60,13 +60,13 @@ void recv_results(results_data *results, int root, int resizes, MPI_Comm interco
* En concreto son tres escalares y dos vectores de tamaño "resizes" * En concreto son tres escalares y dos vectores de tamaño "resizes"
*/ */
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type) { void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type) {
int i, counts = 5; int i, counts = 6;
int blocklengths[] = {1, 1, 1, 1, 1}; int blocklengths[] = {1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir; MPI_Aint displs[counts], dir;
MPI_Datatype types[counts]; MPI_Datatype types[counts];
// Rellenar vector types // Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = MPI_DOUBLE; types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_DOUBLE;
blocklengths[3] = blocklengths[4] = resizes; blocklengths[3] = blocklengths[4] = resizes;
// Rellenar vector displs // Rellenar vector displs
...@@ -75,8 +75,9 @@ void def_results_type(results_data *results, int resizes, MPI_Datatype *results_ ...@@ -75,8 +75,9 @@ void def_results_type(results_data *results, int resizes, MPI_Datatype *results_
MPI_Get_address(&(results->sync_start), &displs[0]); MPI_Get_address(&(results->sync_start), &displs[0]);
MPI_Get_address(&(results->async_start), &displs[1]); MPI_Get_address(&(results->async_start), &displs[1]);
MPI_Get_address(&(results->exec_start), &displs[2]); MPI_Get_address(&(results->exec_start), &displs[2]);
MPI_Get_address(&(results->spawn_real_time[0]), &displs[3]); MPI_Get_address(&(results->wasted_time), &displs[3]);
MPI_Get_address(&(results->spawn_time[0]), &displs[4]); //TODO Revisar si se puede simplificar //FIXME Si hay mas de un spawn error? MPI_Get_address(&(results->spawn_real_time[0]), &displs[4]);
MPI_Get_address(&(results->spawn_time[0]), &displs[5]); //TODO Revisar si se puede simplificar //FIXME Si hay mas de un spawn error?
for(i=0;i<counts;i++) displs[i] -= dir; for(i=0;i<counts;i++) displs[i] -= dir;
...@@ -201,12 +202,12 @@ void print_global_results(results_data results, int resizes) { ...@@ -201,12 +202,12 @@ void print_global_results(results_data results, int resizes) {
* de los vectores de resultados. * de los vectores de resultados.
*/ */
void init_results_data(results_data *results, int resizes, int iters_size) { void init_results_data(results_data *results, int resizes, int iters_size) {
//*results = malloc(1 * sizeof(results_data)); FIXME Borrar
results->spawn_time = calloc(resizes, sizeof(double)); results->spawn_time = calloc(resizes, sizeof(double));
results->spawn_real_time = calloc(resizes, sizeof(double)); results->spawn_real_time = calloc(resizes, sizeof(double));
results->sync_time = calloc(resizes, sizeof(double)); results->sync_time = calloc(resizes, sizeof(double));
results->async_time = calloc(resizes, sizeof(double)); results->async_time = calloc(resizes, sizeof(double));
results->wasted_time = 0;
results->iters_size = iters_size + 100; results->iters_size = iters_size + 100;
results->iters_time = calloc(iters_size + 100, sizeof(double)); //FIXME Numero magico results->iters_time = calloc(iters_size + 100, sizeof(double)); //FIXME Numero magico
...@@ -229,7 +230,6 @@ void realloc_results_iters(results_data *results, int needed) { ...@@ -229,7 +230,6 @@ void realloc_results_iters(results_data *results, int needed) {
/* /*
* Libera toda la memoria asociada con una estructura de resultados. * Libera toda la memoria asociada con una estructura de resultados.
* TODO Asegurar que ha sido inicializado?
*/ */
void free_results_data(results_data *results) { void free_results_data(results_data *results) {
if(results != NULL) { if(results != NULL) {
...@@ -240,5 +240,4 @@ void free_results_data(results_data *results) { ...@@ -240,5 +240,4 @@ void free_results_data(results_data *results) {
free(results->iters_time); free(results->iters_time);
} }
//free(*results); FIXME Borrar
} }
#ifndef RESULTS_H
#define RESULTS_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
...@@ -14,7 +17,7 @@ typedef struct { ...@@ -14,7 +17,7 @@ typedef struct {
double sync_start, sync_end, *sync_time; double sync_start, sync_end, *sync_time;
double async_start, async_end, *async_time; double async_start, async_end, *async_time;
double exec_start, exec_time; double exec_start, exec_time;
//Overcharge time is time spent in malleability that is from IO modules double wasted_time; // Time spent recalculating iter stages
} results_data; } results_data;
void send_results(results_data *results, int root, int resizes, MPI_Comm intercomm); void send_results(results_data *results, int root, int resizes, MPI_Comm intercomm);
...@@ -30,3 +33,5 @@ void print_global_results(results_data results, int resizes); ...@@ -30,3 +33,5 @@ void print_global_results(results_data results, int resizes);
void init_results_data(results_data *results, int resizes, int iters_size); void init_results_data(results_data *results, int resizes, int iters_size);
void realloc_results_iters(results_data *results, int needed); void realloc_results_iters(results_data *results, int needed);
void free_results_data(results_data *results); void free_results_data(results_data *results);
#endif
...@@ -110,7 +110,7 @@ int main(int argc, char *argv[]) { ...@@ -110,7 +110,7 @@ int main(int argc, char *argv[]) {
do { do {
group->grp = group->grp + 1; group->grp = group->grp + 1;
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo if(group->grp != 0) obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
set_benchmark_grp(group->grp); set_benchmark_grp(group->grp);
get_malleability_user_comm(&comm); get_malleability_user_comm(&comm);
MPI_Comm_size(comm, &(group->numP)); MPI_Comm_size(comm, &(group->numP));
...@@ -145,7 +145,7 @@ int main(int argc, char *argv[]) { ...@@ -145,7 +145,7 @@ int main(int argc, char *argv[]) {
if(res==1) { // Se ha llegado al final de la aplicacion if(res==1) { // Se ha llegado al final de la aplicacion
MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
results->exec_time = MPI_Wtime() - results->exec_start; results->exec_time = MPI_Wtime() - results->exec_start - result->wasted_time;
} }
print_final_results(); // Pasado este punto ya no pueden escribir los procesos print_final_results(); // Pasado este punto ya no pueden escribir los procesos
...@@ -230,7 +230,7 @@ double iterate(double *matrix, int n, int async_comm, int iter) { ...@@ -230,7 +230,7 @@ double iterate(double *matrix, int n, int async_comm, int iter) {
start_time = MPI_Wtime(); start_time = MPI_Wtime();
for(i=0; i < config_file->iter_stages; i++) { for(i=0; i < config_file->iter_stages; i++) {
aux+= process_stage((void*)config_file, i, (void*)group, comm); aux+= process_stage(*config_file, config_file->iter_stage[i], *group, comm);
} }
actual_time = MPI_Wtime(); // Guardar tiempos actual_time = MPI_Wtime(); // Guardar tiempos
...@@ -377,19 +377,26 @@ void init_application() { ...@@ -377,19 +377,26 @@ void init_application() {
config_file->latency_m = latency(group->myId, group->numP, comm); config_file->latency_m = latency(group->myId, group->numP, comm);
config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam); config_file->bw_m = bandwidth(group->myId, group->numP, comm, config_file->latency_m, message_tam);
obtain_op_times(1); obtain_op_times(1);
linear_regression_stage( (void*)&(config_file->iter_stage[0]), (void*)group, comm);
printf("TEST P%d -- slope=%lf intercept=%lf\n", group->myId, config_file->iter_stage[0].slope, config_file->iter_stage[0].intercept);
} }
/* /*
* Obtiene cuanto tiempo es necesario para realizar una operacion de PI * Obtiene cuanto tiempo es necesario para realizar una operacion de PI
*
* Si compute esta a 1 se considera que se esta inicializando el entorno
* y realizará trabajo extra.
*
* Si compute esta a 0 se considera un entorno inicializado y solo hay que
* realizar algunos cambios de reserva de memoria. Si es necesario recalcular
* algo se obtiene el total de tiempo utilizado en dichas tareas y se resta
* al tiempo total de ejecucion.
*/ */
void obtain_op_times(int compute) { void obtain_op_times(int compute) {
int i; int i;
double time = 0;
for(i=0; i<config_file->iter_stages; i++) { for(i=0; i<config_file->iter_stages; i++) {
init_stage((void*)config_file, i, (void*)group, comm, compute); time+=init_stage(config_file, i, *group, comm, compute);
} }
if(!compute) results->wasted_time += time;
} }
/* /*
......
#ifndef MAIN_DATATYPES_H
#define MAIN_DATATYPES_H
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <mpi.h> #include <mpi.h>
...@@ -20,9 +23,4 @@ typedef struct { ...@@ -20,9 +23,4 @@ typedef struct {
char *sync_array, *async_array; char *sync_array, *async_array;
} group_data; } group_data;
#endif
/*----------LINEAR REGRESSION TYPES--------------*/
#define LR_ARRAY_TAM 7
// Array for linear regression computation
// Cantidades 10b 100b 1Kb 100Kb 1Mb 10Mb 100Mb
double LR_bytes_array[LR_ARRAY_TAM] = {10, 100, 1000, 100000, 1000000, 10000000, 100000000};
#ifndef COMPUTING_FUNC_H
#define COMPUTING_FUNC_H
double computeMatrix(double *matrix, int n); double computeMatrix(double *matrix, int n);
double computePiSerial(int n); double computePiSerial(int n);
void initMatrix(double **matrix, int n); void initMatrix(double **matrix, int n);
void freeMatrix(double **matrix); void freeMatrix(double **matrix);
#endif
#ifndef COMUNICATION_FUNC_H
#define COMUNICATION_FUNC_H
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <mpi.h> #include <mpi.h>
...@@ -5,3 +8,4 @@ ...@@ -5,3 +8,4 @@
void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, int qty); void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, int qty);
#endif
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <math.h>
#include <mpi.h> #include <mpi.h>
#include "Main_datatypes.h" #include "Main_datatypes.h"
#include "linear_reg.h" #include "linear_reg.h"
// Array for linear regression computation
// Cantidades 10b 100b 1Kb 5Kb 10Kb 50Kb 100Kb 500Kb 1Mb 10Mb 100Mb
double LR_bytes_array[LR_ARRAY_TAM] = {10, 100, 1000, 5000,10000, 50000,100000, 500000, 1000000, 10000000, 100000000};
// Linear regression // Linear regression
// Y = a +bX // Y = a +bX
// Bytes = a +b(Time) // Bytes = a +b(Time)
...@@ -12,27 +18,35 @@ ...@@ -12,27 +18,35 @@
// Y is the dependent variable, which correlates to the number of bytes // Y is the dependent variable, which correlates to the number of bytes
// //
void lr_avg_plus_diff(double *array, double *avg, double *diffs); void lr_avg_plus_diff(int tam, double *array, double *avg, double *diffs);
/*
* Computes and returns the related Y value to a given linear regression
*/
void lr_calc_Y(double slope, double intercept, double x_value, int *y_result) {
*y_result = (int) ceil(intercept + slope * x_value);
}
/* /*
* Computes the slope and intercept for a given array of times * Computes the slope and intercept for a given array of times
* so users can calculate the number of bytes for a given time. * so users can calculate the number of bytes for a given time.
* *
*/ */
void lr_compute(double *times, double *slope, double *intercept) { void lr_compute(int tam, double *bytes, double *times, double *slope, double *intercept) {
int i; int i;
double avgX, avgY; double avgX, avgY;
double *diffsX, *diffsY; double *diffsX, *diffsY;
double SSxx, SSxy; double SSxx, SSxy;
diffsX = malloc(LR_ARRAY_TAM * sizeof(double)); diffsX = malloc(tam * sizeof(double));
diffsY = malloc(LR_ARRAY_TAM * sizeof(double)); diffsY = malloc(tam * sizeof(double));
SSxx = SSxy = 0; SSxx = SSxy = 0;
lr_avg_plus_diff(times, &avgX, diffsX); lr_avg_plus_diff(tam, times, &avgX, diffsX);
lr_avg_plus_diff(LR_bytes_array, &avgY, diffsY); lr_avg_plus_diff(tam, bytes, &avgY, diffsY);
for(i=0; i<LR_ARRAY_TAM; i++) { for(i=0; i<tam; i++) {
SSxx+= diffsX[i]*diffsX[i]; SSxx+= diffsX[i]*diffsX[i];
SSxy+= diffsX[i]*diffsY[i]; SSxy+= diffsX[i]*diffsY[i];
} }
...@@ -49,15 +63,15 @@ void lr_compute(double *times, double *slope, double *intercept) { ...@@ -49,15 +63,15 @@ void lr_compute(double *times, double *slope, double *intercept) {
* *
* Returns the average and an the difference of each element. * Returns the average and an the difference of each element.
*/ */
void lr_avg_plus_diff(double *array, double *avg, double *diffs) { void lr_avg_plus_diff(int tam, double *array, double *avg, double *diffs) {
int i; int i;
double sum = 0; double sum = 0;
for(i=0; i<LR_ARRAY_TAM; i++) { for(i=0; i<tam; i++) {
sum+= array[i]; sum+= array[i];
} }
*avg = sum / LR_ARRAY_TAM; *avg = sum / tam;
for(i=0; i<LR_ARRAY_TAM; i++) { for(i=0; i<tam; i++) {
diffs[i]= *avg - array[i]; diffs[i]= *avg - array[i];
} }
} }
...@@ -73,29 +87,61 @@ void lr_avg_plus_diff(double *array, double *avg, double *diffs) { ...@@ -73,29 +87,61 @@ void lr_avg_plus_diff(double *array, double *avg, double *diffs) {
* Obtains an array of times to perform a "Broadcast" * Obtains an array of times to perform a "Broadcast"
* operation depending on a predifined set of number of bytes. * operation depending on a predifined set of number of bytes.
*/ */
void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, double *times) { void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n, loop_count = 10; int i, j, n;
double start_time, stop_time, elapsed_time; double start_time;
char *aux; char *aux = NULL;
elapsed_time = 0;
for(i=0; i<LR_ARRAY_TAM; i++) { for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i]; n = LR_bytes_array[i];
aux = malloc(n * sizeof(char)); aux = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm); MPI_Barrier(comm);
start_time = MPI_Wtime(); start_time = MPI_Wtime();
for(j=0; j<loop_count; j++) {
MPI_Bcast(aux, n, MPI_CHAR, root, comm); MPI_Bcast(aux, n, MPI_CHAR, root, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
} }
MPI_Barrier(comm);
stop_time = MPI_Wtime();
elapsed_time = (stop_time - start_time) / loop_count; free(aux);
MPI_Reduce(MPI_IN_PLACE, &elapsed_time, n, MPI_DOUBLE, MPI_MAX, root, comm); aux = NULL;
if(myId == root) {
times[i] = elapsed_time;
} }
}
/*
* Obtains an array of times to perform an "Allgatherv"
* operation depending on a predifined set of number of bytes.
*/
/*
void lr_times_allgatherv(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times) {
int i, j, n;
int *counts, *displs;
double start_time;
char *aux = NULL;
counts = calloc(numP,sizeof(int));
displs = calloc(numP,sizeof(int));
for(i=0; i<LR_ARRAY_TAM; i++) {
n = LR_bytes_array[i];
aux = malloc( * sizeof(char));
aux_full = malloc(n * sizeof(char));
for(j=0; j<loop_iters; j++) {
MPI_Barrier(comm);
start_time = MPI_Wtime();
MPI_Allgatherv(aux, stage_data.my_bytes, MPI_CHAR, aux_full, counts, displs, MPI_CHAR, comm);
times[i*loop_iters+j] = MPI_Wtime() - start_time;
}
free(aux); free(aux);
free(aux_full);
aux_full = NULL;
aux = NULL;
} }
free(counts);
free(displs);
} }
*/
#ifndef LINEAR_REG_H
#define LINEAR_REG_H
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <mpi.h> #include <mpi.h>
/*----------LINEAR REGRESSION TYPES--------------*/
#define LR_ARRAY_TAM 11
// Array for linear regression computation
extern double LR_bytes_array[LR_ARRAY_TAM];
void lr_calc_Y(double slope, double intercept, double x_value, int *y_result);
void lr_compute(int loop_iters, double *bytes, double *times, double *slope, double *intercept);
void lr_compute(double *times, double *slope, double *intercept); void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, int loop_iters, double *times);
void lr_times_bcast(int myId, int numP, int root, MPI_Comm comm, double *times); #endif
...@@ -7,10 +7,18 @@ ...@@ -7,10 +7,18 @@
#include "linear_reg.h" #include "linear_reg.h"
#include "Main_datatypes.h" #include "Main_datatypes.h"
#include "process_stage.h" #include "process_stage.h"
#include "../malleability/malleabilityManager.h" //FIXME Refactor //#include "../malleability/malleabilityManager.h" //FIXME Refactor
#include "../malleability/distribution_methods/block_distribution.h"
void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm);
void get_byte_dist(int qty, int id, int numP, int *result);
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
/* /*
* Calcula el tiempo por operacion o total de bytes a enviar * Calcula el tiempo por operacion o total de bytes a enviar
...@@ -29,92 +37,37 @@ void get_byte_dist(int qty, int id, int numP, int *result); ...@@ -29,92 +37,37 @@ void get_byte_dist(int qty, int id, int numP, int *result);
* TODO Que el trabajo se divida entre los procesos. * TODO Que el trabajo se divida entre los procesos.
* TODO No tiene en cuenta cambios entre maquinas heterogeneas. * TODO No tiene en cuenta cambios entre maquinas heterogeneas.
*/ */
void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm, int compute) { double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute) {
double result, start_time, t_stage; double result = 0;
int i, aux_bytes, qty = 20000; int qty = 20000;
group_data group = *((group_data *) group_void);
configuration *config_file = (configuration *) config_file_void;
iter_stage_t *stage_data = &(config_file->iter_stage[stage]);
stage_data->operations = qty;
t_stage = stage_data->t_stage * config_file->factors[group.grp];
if(stage_data->bytes == 0) {
stage_data->bytes = (stage_data->t_stage - config_file->latency_m) * config_file->bw_m;
}
get_byte_dist(stage_data->bytes, group.myId, group.numP, &(stage_data->real_bytes) ); iter_stage_t *stage = &(config_file->iter_stage[stage_i]);
stage->operations = qty;
start_time = MPI_Wtime(); switch(stage->pt) {
result = 0;
switch(stage_data->pt) {
//Computo //Computo
case COMP_MATRIX: case COMP_MATRIX:
initMatrix(&(stage_data->double_array), config_file->matrix_tam); result = init_matrix_pt(group, config_file, stage, comm, compute);
case COMP_PI: case COMP_PI:
if(group.myId == ROOT && compute) { result = init_pi_pt(group, config_file, stage, comm, compute);
result+= process_stage(config_file_void, stage, group_void, comm);
}
break; break;
//Comunicación //Comunicación
case COMP_POINT: case COMP_POINT:
if(stage_data->array != NULL) init_comm_ptop_pt(group, config_file, stage, comm);
free(stage_data->array);
stage_data->array = malloc(sizeof(char) * stage_data->real_bytes);
break; break;
case COMP_BCAST: case COMP_BCAST:
if(stage_data->array != NULL) result = init_comm_bcast_pt(group, config_file, stage, comm);
free(stage_data->array);
stage_data->real_bytes = stage_data->bytes; // Caso especial al usar Bcast
stage_data->array = malloc(sizeof(char) * stage_data->real_bytes);
break; break;
case COMP_ALLGATHER: case COMP_ALLGATHER:
result = init_comm_allgatherv_pt(group, config_file, stage, comm);
if(stage_data->counts != NULL)
free(stage_data->counts);
stage_data->counts = calloc(group.numP,sizeof(int));
if(stage_data->displs != NULL)
free(stage_data->displs);
stage_data->displs = calloc(group.numP,sizeof(int));
get_byte_dist(stage_data->bytes, 0, group.numP, &aux_bytes);
stage_data->counts[0] = aux_bytes;
stage_data->displs[0] = 0;
for(i=1; i<group.numP; i++){
get_byte_dist(stage_data->bytes, i, group.numP, &aux_bytes);
stage_data->counts[i] = aux_bytes;
stage_data->displs[i] = stage_data->displs[i-1] + stage_data->counts[i-1];
}
if(stage_data->array != NULL)
free(stage_data->array);
stage_data->array = malloc(sizeof(char) * stage_data->real_bytes);
if(stage_data->full_array != NULL)
free(stage_data->full_array);
stage_data->full_array = malloc(sizeof(char) * stage_data->bytes);
break; break;
case COMP_REDUCE: case COMP_REDUCE:
case COMP_ALLREDUCE: case COMP_ALLREDUCE:
stage_data->real_bytes = stage_data->bytes; result = init_comm_reduce_pt(group, config_file, stage, comm);
if(stage_data->array != NULL)
free(stage_data->array);
stage_data->array = malloc(sizeof(char) * stage_data->real_bytes);
//Full array para el reduce necesita el mismo tamanyo
if(stage_data->full_array != NULL)
free(stage_data->full_array);
stage_data->full_array = malloc(sizeof(char) * stage_data->real_bytes);
break; break;
} }
if(compute) { return result;
stage_data->t_op = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion
MPI_Bcast(&(stage_data->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage_data->operations = t_stage / stage_data->t_op;
} }
/* /*
...@@ -122,40 +75,37 @@ void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm co ...@@ -122,40 +75,37 @@ void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm co
* de operacion a realizar y llamando a la funcion que * de operacion a realizar y llamando a la funcion que
* realizara la operacion. * realizara la operacion.
*/ */
double process_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm) { double process_stage(configuration config_file, iter_stage_t stage, group_data group, MPI_Comm comm) {
int i; int i;
double result; double result;
group_data group = *((group_data *) group_void);
configuration config_file = *((configuration *) config_file_void);
iter_stage_t stage_data = config_file.iter_stage[stage];
switch(stage_data.pt) { switch(stage.pt) {
//Computo //Computo
case COMP_PI: case COMP_PI:
for(i=0; i < stage_data.operations; i++) { for(i=0; i < stage.operations; i++) {
result += computePiSerial(config_file.matrix_tam); result += computePiSerial(config_file.matrix_tam);
} }
break; break;
case COMP_MATRIX: case COMP_MATRIX:
for(i=0; i < stage_data.operations; i++) { for(i=0; i < stage.operations; i++) {
result += computeMatrix(stage_data.double_array, config_file.matrix_tam); //FIXME No da tiempos repetibles result += computeMatrix(stage.double_array, config_file.matrix_tam); //FIXME No da tiempos repetibles
} }
break; break;
//Comunicaciones //Comunicaciones
case COMP_POINT: case COMP_POINT:
point_to_point(group.myId, group.numP, ROOT, comm, stage_data.array, stage_data.real_bytes); point_to_point(group.myId, group.numP, ROOT, comm, stage.array, stage.real_bytes);
break; break;
case COMP_BCAST: case COMP_BCAST:
MPI_Bcast(stage_data.array, stage_data.real_bytes, MPI_CHAR, ROOT, comm); MPI_Bcast(stage.array, stage.real_bytes, MPI_CHAR, ROOT, comm);
break; break;
case COMP_ALLGATHER: case COMP_ALLGATHER:
MPI_Allgatherv(stage_data.array, stage_data.real_bytes, MPI_CHAR, stage_data.full_array, stage_data.counts, stage_data.displs, MPI_CHAR, comm); MPI_Allgatherv(stage.array, stage.my_bytes, MPI_CHAR, stage.full_array, stage.counts.counts, stage.counts.displs, MPI_CHAR, comm);
break; break;
case COMP_REDUCE: case COMP_REDUCE:
MPI_Reduce(stage_data.array, stage_data.full_array, stage_data.real_bytes, MPI_CHAR, MPI_MAX, ROOT, comm); MPI_Reduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, ROOT, comm);
break; break;
case COMP_ALLREDUCE: case COMP_ALLREDUCE:
MPI_Allreduce(stage_data.array, stage_data.full_array, stage_data.real_bytes, MPI_CHAR, MPI_MAX, comm); MPI_Allreduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, comm);
break; break;
} }
return result; return result;
...@@ -195,7 +145,6 @@ double latency(int myId, int numP, MPI_Comm comm) { ...@@ -195,7 +145,6 @@ double latency(int myId, int numP, MPI_Comm comm) {
MPI_Barrier(comm); MPI_Barrier(comm);
stop_time = MPI_Wtime(); stop_time = MPI_Wtime();
elapsed_time = (stop_time - start_time) / loop_count; elapsed_time = (stop_time - start_time) / loop_count;
} }
if(myId %2 != 0) { if(myId %2 != 0) {
...@@ -253,27 +202,27 @@ double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n) { ...@@ -253,27 +202,27 @@ double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n) {
} }
/* /*
* * Creates a linear regression model to predict
* the number of bytes needed to perform a collective
* communication.
*/ */
void linear_regression_stage(void *stage_void, void *group_void, MPI_Comm comm) { void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm) {
int i, j, tam, loop_iters = 100;
group_data group = *((group_data *) group_void); tam = LR_ARRAY_TAM * loop_iters;
iter_stage_t *stage = (iter_stage_t *) stage_void; double *bytes = malloc(tam * sizeof(double));
double *times = malloc(tam * sizeof(double));
double *times = NULL; for(i=0; i<LR_ARRAY_TAM; i++) {
if(group.myId == ROOT) { for(j=0; j<loop_iters; j++) {
times = malloc(LR_ARRAY_TAM * sizeof(double)); bytes[i*loop_iters + j] = LR_bytes_array[i];
}
} }
switch(stage->pt) { switch(stage->pt) {
//Comunicaciones //Comunicaciones
case COMP_BCAST: case COMP_BCAST:
lr_times_bcast(group.myId, group.numP, ROOT, comm, times); lr_times_bcast(group.myId, group.numP, ROOT, comm, loop_iters, times);
if(group.myId == ROOT) {
lr_compute(times, &(stage->slope), &(stage->intercept));
}
MPI_Bcast(&(stage->slope), 1, MPI_DOUBLE, ROOT, comm);
MPI_Bcast(&(stage->intercept), 1, MPI_DOUBLE, ROOT, comm);
break; break;
case COMP_ALLGATHER: case COMP_ALLGATHER:
break; break;
...@@ -282,42 +231,168 @@ void linear_regression_stage(void *stage_void, void *group_void, MPI_Comm comm) ...@@ -282,42 +231,168 @@ void linear_regression_stage(void *stage_void, void *group_void, MPI_Comm comm)
case COMP_ALLREDUCE: case COMP_ALLREDUCE:
break; break;
default: default:
return;
break; break;
} }
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, times, LR_ARRAY_TAM * loop_iters, MPI_DOUBLE, MPI_MAX, ROOT, comm);
lr_compute(tam, bytes, times, &(stage->slope), &(stage->intercept));
} else {
MPI_Reduce(times, NULL, LR_ARRAY_TAM * loop_iters, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
MPI_Bcast(&(stage->slope), 1, MPI_DOUBLE, ROOT, comm);
MPI_Bcast(&(stage->intercept), 1, MPI_DOUBLE, ROOT, comm);
free(times); free(times);
free(bytes);
} }
/* /*
* Obatains for "Id" and "numP", how many * ========================================================================================
* bytes will have process "Id" and returns * ========================================================================================
* that quantity. * =================================INIT STAGE FUNCTIONS===================================
* * ========================================================================================
* Processes under "rem" will have more data * ========================================================================================
* than those with ranks higher or equal to "rem". */
*
* TODO Refactor: Ya existe esta funcion en malleability/CommDist.c double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
*/ double result, t_stage;
void get_byte_dist(int qty, int id, int numP, int *result) {
int rem, ini, fin, tamBl; result = 0;
t_stage = stage->t_stage * config_file->factors[group.grp];
tamBl = qty / numP; initMatrix(&(stage->double_array), config_file->matrix_tam);
rem = qty % numP;
double start_time = MPI_Wtime();
if(id < rem) { // First subgroup if(group.myId == ROOT && compute) {
ini = id * tamBl + id; result+= process_stage(*config_file, *stage, group, comm);
fin = (id+1) * tamBl + (id+1);
} else { // Second subgroup
ini = id * tamBl + rem;
fin = (id+1) * tamBl + rem;
} }
if(fin > qty) { if(compute) {
fin = qty; stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage->operations = t_stage / stage->t_op;
return result;
}
double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double result, t_stage, start_time;
result = 0;
t_stage = stage->t_stage * config_file->factors[group.grp];
start_time = MPI_Wtime();
if(group.myId == ROOT && compute) {
result+= process_stage(*config_file, *stage, group, comm);
}
if(compute) {
stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
}
stage->operations = t_stage / stage->t_op;
return result;
}
void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
struct Dist_data dist_data;
if(stage->array != NULL)
free(stage->array);
if(stage->bytes == 0) {
stage->bytes = (stage->t_stage - config_file->latency_m) * config_file->bw_m;
}
get_block_dist(stage->bytes, group.myId, group.numP, &dist_data);
stage->real_bytes = dist_data.tamBl;
stage->array = malloc(sizeof(char) * stage->real_bytes);
}
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
}
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->real_bytes);
return time;
}
double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
struct Dist_data dist_data;
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} }
if(ini > fin) {
ini = fin;
} }
*result= fin - ini; if(stage->counts.counts != NULL)
freeCounts(&(stage->counts));
prepare_comm_allgatherv(group.numP, stage->real_bytes, &(stage->counts));
get_block_dist(stage->real_bytes, group.myId, group.numP, &dist_data);
stage->my_bytes = dist_data.tamBl;
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->my_bytes);
if(stage->full_array != NULL)
free(stage->full_array);
stage->full_array = malloc(sizeof(char) * stage->real_bytes);
return time;
}
double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, time = 0;
stage->real_bytes = stage->bytes;
if(stage->bytes == 0) {
start_time = MPI_Wtime();
linear_regression_stage(stage, group, comm);
lr_calc_Y(stage->slope, stage->intercept, stage->t_stage, &(stage->real_bytes));
time = MPI_Wtime() - start_time;
if(group.myId == ROOT) {
MPI_Reduce(MPI_IN_PLACE, &time, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
} else {
MPI_Reduce(&time, NULL, 1, MPI_DOUBLE, MPI_MAX, ROOT, comm);
}
}
if(stage->array != NULL)
free(stage->array);
stage->array = malloc(sizeof(char) * stage->real_bytes);
//Full array para el reduce necesita el mismo tamanyo
if(stage->full_array != NULL)
free(stage->full_array);
stage->full_array = malloc(sizeof(char) * stage->real_bytes);
return time;
} }
#ifndef PROCESS_STAGE_H
#define PROCESS_STAGE_H
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <mpi.h> #include <mpi.h>
//#include "Main_datatypes.h" #include "Main_datatypes.h"
//#include "../malleability/malleabilityManager.h" //FIXME Refactor //#include "../malleability/malleabilityManager.h" //FIXME Refactor
#include "../IOcodes/read_ini.h"
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLGATHER, COMP_REDUCE, COMP_ALLREDUCE}; enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLGATHER, COMP_REDUCE, COMP_ALLREDUCE};
//FIXME Refactor el void //FIXME Refactor el void
void init_stage(void *config_file_void, int stage, void *group_void, MPI_Comm comm, int compute); double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute);
double process_stage(void *config_file_void, int stage, void* group_void, MPI_Comm comm); double process_stage(configuration config_file, iter_stage_t stage, group_data group, MPI_Comm comm);
double latency(int myId, int numP, MPI_Comm comm); double latency(int myId, int numP, MPI_Comm comm);
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n); double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n);
void linear_regression_stage(void *stage_void, void *group_void, MPI_Comm comm); void linear_regression_stage(iter_stage_t *stage, group_data group, MPI_Comm comm);
#endif
#mpicc -Wall Main/Main.c Main/computing_func.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm mpicc -g -Wall Main/Main.c Main/computing_func.c Main/comunication_func.c Main/linear_reg.c Main/process_stage.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c malleability/distribution_methods/block_distribution.c -pthread -lslurm -lm
mpicc -g -Wall Main/Main.c Main/computing_func.c Main/comunication_func.c Main/linear_reg.c Main/process_stage.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
if [ $# -gt 0 ] if [ $# -gt 0 ]
then then
......
...@@ -2,8 +2,10 @@ ...@@ -2,8 +2,10 @@
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include <string.h> #include <string.h>
#include "distribution_methods/block_distribution.h"
#include "CommDist.h" #include "CommDist.h"
/*
struct Dist_data { struct Dist_data {
int ini; //Primer elemento a enviar int ini; //Primer elemento a enviar
int fin; //Ultimo elemento a enviar int fin; //Ultimo elemento a enviar
...@@ -21,6 +23,7 @@ struct Counts { ...@@ -21,6 +23,7 @@ struct Counts {
int *displs; int *displs;
int *zero_arr; int *zero_arr;
}; };
*/
void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts); void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts); void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts);
...@@ -35,10 +38,11 @@ void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root, ...@@ -35,10 +38,11 @@ void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root,
void get_dist(int qty, int id, int numP, struct Dist_data *dist_data); void get_dist(int qty, int id, int numP, struct Dist_data *dist_data);
void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts); void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS); void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
/*
void mallocCounts(struct Counts *counts, int numP); void mallocCounts(struct Counts *counts, int numP);
void freeCounts(struct Counts *counts); void freeCounts(struct Counts *counts);
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, const char* name); void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, const char* name);
*/
/* /*
* Reserva memoria para un vector de hasta "qty" elementos. * Reserva memoria para un vector de hasta "qty" elementos.
...@@ -507,51 +511,3 @@ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) { ...@@ -507,51 +511,3 @@ void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
(*idS)[0] = idI; (*idS)[0] = idI;
(*idS)[1] = idE; (*idS)[1] = idE;
} }
/*
* Reserva memoria para los vectores de counts/displs de la funcion
* MPI_Alltoallv. Todos los vectores tienen un tamaño de numP, que es la
* cantidad de procesos en el otro grupo de procesos.
*
* El vector counts indica cuantos elementos se comunican desde este proceso
* al proceso "i" del otro grupo.
*
* El vector displs indica los desplazamientos necesarios para cada comunicacion
* con el proceso "i" del otro grupo.
*
* El vector zero_arr se utiliza cuando se quiere indicar un vector incializado
* a 0 en todos sus elementos. Sirve para indicar que no hay comunicacion.
*/
void mallocCounts(struct Counts *counts, int numP) {
counts->counts = calloc(numP, sizeof(int));
if(counts->counts == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->displs = calloc(numP, sizeof(int));
if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->zero_arr = calloc(numP, sizeof(int));
if(counts->zero_arr == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
}
/*
* Libera la memoria interna de una estructura Counts.
*
* No libera la memoria de la estructura counts si se ha alojado
* de forma dinamica.
*/
void freeCounts(struct Counts *counts) {
free(counts->counts);
free(counts->displs);
free(counts->zero_arr);
}
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, const char* name) {
int i;
for(i=0; i < size; i++) {
//if(xcounts[i] != 0) {
printf("P%d of %d | %scounts[%d]=%d disp=%d\n", data_dist.myId, data_dist.numP, name, i, xcounts[i], xdispls[i]);
//}
}
}
#ifndef COMMDIST_H
#define COMMDIST_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
...@@ -22,3 +25,4 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in ...@@ -22,3 +25,4 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in
void malloc_comm_array(char **array, int qty, int myId, int numP); void malloc_comm_array(char **array, int qty, int myId, int numP);
#endif
#ifndef PROCESS_DIST_H
#define PROCESS_DIST_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
...@@ -13,3 +16,5 @@ void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm); ...@@ -13,3 +16,5 @@ void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm);
void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group); void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group);
void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId); void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "block_distribution.h"
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
/*
* Prepares a communication from "numP" processes to "numP_other" processes
* of "n" elements an returns an struct of counts with 3 arrays to perform the
* communications.
*
* The struct should be freed with freeCounts
*/
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts) {
int i, *idS;
struct Dist_data dist_data;
get_block_dist(n, myId, numP, &dist_data);
mallocCounts(counts, numP_other);
get_util_ids(dist_data, numP_other, &idS);
if(idS[0] == 0) {
set_interblock_counts(0, numP_other, dist_data, counts->counts);
idS[0]++;
}
for(i=idS[0]; i<idS[1]; i++) {
set_interblock_counts(i, numP_other, dist_data, counts->counts);
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
}
}
/*
* Prepares a communication of "numP" processes of "n" elements an
* returns an struct of counts with 3 arrays to perform the
* communications.
*
* The struct should be freed with freeCounts
*/
void prepare_comm_allgatherv(int numP, int n, struct Counts *counts) {
int i;
struct Dist_data dist_data;
mallocCounts(counts, numP);
get_block_dist(n, 0, numP, &dist_data);
counts->counts[0] = dist_data.tamBl;
for(i=1; i<numP; i++){
get_block_dist(n, i, numP, &dist_data);
counts->counts[i] = dist_data.tamBl;
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
}
}
/*
* ========================================================================================
* ========================================================================================
* ================================DISTRIBUTION FUNCTIONS==================================
* ========================================================================================
* ========================================================================================
*/
/*
* Obatains for "Id" and "numP", how many
* elements per row will have process "Id"
* and fills the results in a Dist_data struct
*/
void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
int rem;
dist_data->myId = id;
dist_data->numP = numP;
dist_data->qty = qty;
dist_data->tamBl = qty / numP;
rem = qty % numP;
if(id < rem) { // First subgroup
dist_data->ini = id * dist_data->tamBl + id;
dist_data->fin = (id+1) * dist_data->tamBl + (id+1);
} else { // Second subgroup
dist_data->ini = id * dist_data->tamBl + rem;
dist_data->fin = (id+1) * dist_data->tamBl + rem;
}
if(dist_data->fin > qty) {
dist_data->fin = qty;
}
if(dist_data->ini > dist_data->fin) {
dist_data->ini = dist_data->fin;
}
dist_data->tamBl = dist_data->fin - dist_data->ini;
}
/*
* Obtiene para el Id de un proceso dado, cuantos elementos
* enviara o recibira desde el proceso indicado en Dist_data.
*/
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts) {
struct Dist_data other;
int biggest_ini, smallest_end;
get_block_dist(data_dist.qty, id, numP, &other);
// Si el rango de valores no coincide, se pasa al siguiente proceso
if(data_dist.ini >= other.fin || data_dist.fin <= other.ini) {
return;
}
// Obtiene el proceso con mayor ini entre los dos procesos
if(data_dist.ini > other.ini) {
biggest_ini = data_dist.ini;
} else {
biggest_ini = other.ini;
}
// Obtiene el proceso con menor fin entre los dos procesos
if(data_dist.fin < other.fin) {
smallest_end = data_dist.fin;
} else {
smallest_end = other.fin;
}
sendcounts[id] = smallest_end - biggest_ini; // Numero de elementos a enviar/recibir del proceso Id
}
/*
* Obtiene para un proceso de un grupo a que rango procesos de
* otro grupo tiene que enviar o recibir datos.
*
* Devuelve el primer identificador y el último (Excluido) con el que
* comunicarse.
*/
void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS) {
int idI, idE;
int tamOther = dist_data.qty / numP_other;
int remOther = dist_data.qty % numP_other;
// Indica el punto de corte del grupo de procesos externo que
// divide entre los procesos que tienen
// un tamaño tamOther + 1 y un tamaño tamOther
int middle = (tamOther + 1) * remOther;
// Calcular idI teniendo en cuenta si se comunica con un
// proceso con tamano tamOther o tamOther+1
if(middle > dist_data.ini) { // First subgroup (tamOther+1)
idI = dist_data.ini / (tamOther + 1);
} else { // Second subgroup (tamOther)
idI = ((dist_data.ini - middle) / tamOther) + remOther;
}
// Calcular idR teniendo en cuenta si se comunica con un
// proceso con tamano tamOther o tamOther+1
if(middle >= dist_data.fin) { // First subgroup (tamOther +1)
idE = dist_data.fin / (tamOther + 1);
idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
} else { // Second subgroup (tamOther)
idE = ((dist_data.fin - middle) / tamOther) + remOther;
idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
}
*idS = malloc(2 * sizeof(int));
(*idS)[0] = idI;
(*idS)[1] = idE;
}
/*
* ========================================================================================
* ========================================================================================
* ==============================INIT/FREE/PRINT FUNCTIONS=================================
* ========================================================================================
* ========================================================================================
*/
/*
* Reserva memoria para los vectores de counts/displs de la funcion
* MPI_Alltoallv. Todos los vectores tienen un tamaño de numP, que es la
* cantidad de procesos en el otro grupo de procesos.
*
* El vector counts indica cuantos elementos se comunican desde este proceso
* al proceso "i" del otro grupo.
*
* El vector displs indica los desplazamientos necesarios para cada comunicacion
* con el proceso "i" del otro grupo.
*
* El vector zero_arr se utiliza cuando se quiere indicar un vector incializado
* a 0 en todos sus elementos. Sirve para indicar que no hay comunicacion.
*/
void mallocCounts(struct Counts *counts, int numP) {
counts->counts = calloc(numP, sizeof(int));
if(counts->counts == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->displs = calloc(numP, sizeof(int));
if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->zero_arr = calloc(numP, sizeof(int));
if(counts->zero_arr == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
}
/*
* Libera la memoria interna de una estructura Counts.
*
* No libera la memoria de la estructura counts si se ha alojado
* de forma dinamica.
*/
void freeCounts(struct Counts *counts) {
free(counts->counts);
free(counts->displs);
free(counts->zero_arr);
counts->counts = NULL;
counts->displs = NULL;
counts->zero_arr = NULL;
}
/*
* Muestra la informacion de comunicaciones de un proceso
* Si se activa la bandera "include_zero" a verdadero se mostraran para el vector
* xcounts los valores a 0.
*
* En "name" se puede indicar un string con el fin de identificar mejor a que vectores
* se refiere la llamada.
*/
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, int include_zero, const char* name) {
int i;
for(i=0; i < size; i++) {
if(xcounts[i] != 0 || include_zero) {
printf("P%d of %d | %scounts[%d]=%d disp=%d\n", data_dist.myId, data_dist.numP, name, i, xcounts[i], xdispls[i]);
}
}
}
#ifndef mall_block_distribution
#define mall_block_distribution
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
struct Dist_data {
int ini; //Primer elemento a enviar
int fin; //Ultimo elemento a enviar
int tamBl; // Total de elementos
int qty; // Total number of rows of the full disperse matrix
int myId;
int numP;
MPI_Comm intercomm;
};
struct Counts {
int *counts;
int *displs;
int *zero_arr;
};
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts);
void prepare_comm_allgatherv(int numP, int n, struct Counts *counts);
void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data);
void mallocCounts(struct Counts *counts, int numP);
void freeCounts(struct Counts *counts);
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, int include_zero, const char* name);
#endif
#ifndef MALLEABILITY_DATA_STRUCTURES_H
#define MALLEABILITY_DATA_STRUCTURES_H
/*
* Shows available data structures for inner ussage.
*/
#include <mpi.h>
/* --- SPAWN STRUCTURES --- */
struct physical_dist {
int num_cpus, num_nodes;
char *nodelist;
int target_qty, already_created;
int dist_type;
};
typedef struct {
int myId, root, root_parents;
int spawn_qty, initial_qty, target_qty;
int already_created;
int spawn_method, spawn_is_single, spawn_is_async;
char *cmd; //Executable name
MPI_Info mapping;
MPI_Datatype dtype;
struct physical_dist dist; // Used to create mapping var
MPI_Comm comm, returned_comm;
// To control the spawn state
pthread_mutex_t spawn_mutex;
pthread_cond_t cond_adapt_rdy;
} Spawn_data;
#endif
#ifndef MALLEABILITY_MANAGER_H
#define MALLEABILITY_MANAGER_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
...@@ -26,3 +29,5 @@ void set_benchmark_configuration(configuration *config_file); ...@@ -26,3 +29,5 @@ void set_benchmark_configuration(configuration *config_file);
void get_benchmark_configuration(configuration **config_file); void get_benchmark_configuration(configuration **config_file);
void set_benchmark_results(results_data *results); void set_benchmark_results(results_data *results);
void get_benchmark_results(results_data **results); void get_benchmark_results(results_data **results);
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment