Commit 594957a2 authored by iker_martin's avatar iker_martin
Browse files

Code still must be tested. Added new communication stages IP2P+Wait. Added 3...

Code still must be tested. Added new communication stages IP2P+Wait. Added 3 different ways of capturing times + new configuration file variable (Capture_Method). Some refactors in results.c and configuration.c.
parent ec138d81
......@@ -47,6 +47,8 @@ static int handler(void* user, const char* section, const char* name,
pconfig->adr = strtoul(value, NULL, 10);
} else if (MATCH("general", "Rigid")) {
pconfig->rigid_times = atoi(value);
} else if (MATCH("general", "Capture_Method")) {
pconfig->capture_method = atoi(value);
// Iter stage
} else if (MATCH(stage_name, "Stage_Type") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
......@@ -55,6 +57,8 @@ static int handler(void* user, const char* section, const char* name,
pconfig->stages[pconfig->actual_stage].t_capped = atoi(value);
} else if (MATCH(stage_name, "Stage_Bytes") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].bytes = atoi(value);
} else if (MATCH(stage_name, "Stage_Identifier") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].id = atoi(value);
} else if (MATCH(stage_name, "Stage_Time") && LAST(pconfig->actual_stage, pconfig->n_stages)) {
pconfig->stages[pconfig->actual_stage].t_stage = (float) atof(value);
pconfig->actual_stage = pconfig->actual_stage+1; // Ultimo elemento del grupo
......@@ -107,6 +111,8 @@ configuration *read_ini_file(char *file_name, ext_functions_t init_functions) {
printf("Error when reserving configuration structure\n");
return NULL;
}
config->capture_method = 0;
config->rigid_times = 0;
config->n_resizes = 0;
config->n_groups = 1;
config->n_stages = 1;
......
......@@ -7,6 +7,11 @@
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type);
void compute_max(results_data *results, double *computed_array, int myId, int root, MPI_Comm comm);
void compute_mean(results_data *results, double *computed_array, int myId, int numP, int root, MPI_Comm comm);
void compute_median(results_data *results, double *computed_array, size_t *used_ids, int myId, int numP, int root, MPI_Comm comm);
void match_median(results_data *results, double *computed_array, size_t *used_ids, int myId, int numP, int root, MPI_Comm comm);
//======================================================||
//======================================================||
//================MPI RESULTS FUNCTIONS=================||
......@@ -105,13 +110,6 @@ void reset_results_index(results_data *results) {
results->iters_async = 0;
}
//=============================================================== FIXME BORRAR?
int compare(const void *_a, const void *_b) {
double *a, *b;
a = (double *) _a;
b = (double *) _b;
return (*a - *b);
}
/*
* Obtiene para cada iteracion, el tiempo maximo entre todos los procesos
* que han participado.
......@@ -119,64 +117,116 @@ int compare(const void *_a, const void *_b) {
* Es necesario obtener el maximo, pues es el que representa el tiempo real
* que se ha utilizado.
*/
void compute_results_iter(results_data *results, int myId, int numP, int root, MPI_Comm comm) { //TODO Probar a quedarse la MEDIA en vez de MAX?
void compute_results_iter(results_data *results, int myId, int numP, int root, size_t stages, int capture_method, MPI_Comm comm) {
size_t i, *used_ids;
switch(capture_method) {
case RESULTS_MAX:
compute_max(results, results->iters_time, myId, root, comm);
for(i=0; i<stages; i++) {
compute_max(results, results->stage_times[i], myId, root, comm);
}
break;
case RESULTS_MEAN:
compute_mean(results, results->iters_time, myId, numP, root, comm);
for(i=0; i<stages; i++) {
compute_mean(results, results->stage_times[i], myId, numP, root, comm);
}
break;
case RESULTS_MEDIAN:
used_ids = malloc(results->iter_index * sizeof(size_t));
compute_median(results, results->iters_time, used_ids, myId, numP, root, comm);
for(i=0; i<stages; i++) {
//compute_median(results, results->stage_times[i], myId, numP, root, comm);
match_median(results, results->stage_times[i], used_ids, myId, numP, root, comm);
}
free(used_ids);
break;
}
}
void compute_max(results_data *results, double *computed_array, int myId, int root, MPI_Comm comm) {
if(myId == root) {
MPI_Reduce(MPI_IN_PLACE, results->iters_time, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
/*
MPI_Reduce(MPI_IN_PLACE, computed_array, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
} else {
MPI_Reduce(computed_array, NULL, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
}
}
void compute_mean(results_data *results, double *computed_array, int myId, int numP, int root, MPI_Comm comm) {
if(myId == root) {
MPI_Reduce(MPI_IN_PLACE, computed_array, results->iter_index, MPI_DOUBLE, MPI_SUM, root, comm);
for(size_t i=0; i<results->iter_index; i++) {
results->iters_time[i] = results->iters_time[i] / numP;
}*/
computed_array[i] = results->iters_time[i] / numP;
}
} else {
MPI_Reduce(results->iters_time, NULL, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
MPI_Reduce(computed_array, NULL, results->iter_index, MPI_DOUBLE, MPI_SUM, root, comm);
}
/*
double *aux_all_iters, *aux_id_iters, median;
}
struct TimeWithIndex {
double time;
size_t index;
};
int compare(const void *a, const void *b) {
return ((struct TimeWithIndex *)a)->time - ((struct TimeWithIndex *)b)->time;
}
/*
* Calcula la mediana de un vector de tiempos replicado entre "numP" procesos.
* Se calcula la mediana para cada elemento del vector final y se devuelve este.
*
* Además se devuelve en el vector "used_ids" de que proceso se ha obtenido la mediana de cada elemento.
*/
void compute_median(results_data *results, double *computed_array, size_t *used_ids, int myId, int numP, int root, MPI_Comm comm) {
double *aux_all_iters, median;
struct TimeWithIndex *aux_id_iters;
if(myId == root) {
aux_all_iters = malloc(numP *results->iter_index * sizeof(double));
aux_id_iters = malloc(numP * sizeof(struct TimeWithIndex));
}
MPI_Gather(results->iters_time, results->iter_index, MPI_DOUBLE, aux_all_iters, results->iter_index, MPI_DOUBLE, root, comm);
MPI_Gather(computed_array, results->iter_index, MPI_DOUBLE, aux_all_iters, results->iter_index, MPI_DOUBLE, root, comm);
if(myId == root) {
aux_id_iters = malloc(numP * sizeof(double));
for(size_t i=0; i<results->iter_index; i++) {
for(int j=0; j<numP; j++) {
aux_id_iters[j] = aux_all_iters[i+(results->iter_index*j)];
aux_id_iters[j].time = aux_all_iters[i+(results->iter_index*j)];
aux_id_iters[j].index = (size_t) j;
}
// Get Median
qsort(aux_id_iters, numP, sizeof(double), &compare);
median = aux_id_iters[numP/2];
if (numP % 2 == 0) median = (aux_id_iters[numP/2 - 1] + aux_id_iters[numP/2]) / 2;
results->iters_time[i] = median;
qsort(aux_id_iters, numP, sizeof(struct TimeWithIndex), &compare);
median = aux_id_iters[numP/2].time;
if (numP % 2 == 0) median = (aux_id_iters[numP/2 - 1].time + aux_id_iters[numP/2].time) / 2;
computed_array[i] = median;
used_ids[i] = aux_id_iters[numP/2].index; //FIXME What should be the index when numP is even?
}
free(aux_all_iters);
free(aux_id_iters);
}
*/
}
/*
* Obtiene para cada stage de cada iteracion, el tiempo maximo entre todos los procesos
* que han participado.
* Obtiene las medianas de un vector de tiempos replicado entre "numP" procesos.
* La mediana de cada elemento se obtiene consultando el vector "used_ids", que contiene
* que proceso tiene la mediana.
*
* Es necesario obtener el maximo, pues es el que representa el tiempo real
* que se ha utilizado.
* Como resultado devuelve un vector con la mediana calculada.
*/
void compute_results_stages(results_data *results, int myId, int numP, int root, int stages, MPI_Comm comm) { //TODO Probar a quedarse la MEDIA en vez de MAX?
int i;
void match_median(results_data *results, double *computed_array, size_t *used_ids, int myId, int numP, int root, MPI_Comm comm) {
double *aux_all_iters;
size_t matched_id;
if(myId == root) {
for(i=0; i<stages; i++) {
MPI_Reduce(MPI_IN_PLACE, results->stage_times[i], results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
/* for(size_t j=0; j<results->iter_index; j++) {
results->stage_times[i][j] = results->stage_times[i][j] / numP;
}*/
}
aux_all_iters = malloc(numP * results->iter_index * sizeof(double));
}
else {
for(i=0; i<stages; i++) {
MPI_Reduce(results->stage_times[i], NULL, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
MPI_Gather(computed_array, results->iter_index, MPI_DOUBLE, aux_all_iters, results->iter_index, MPI_DOUBLE, root, comm);
if(myId == root) {
for(size_t i=0; i<results->iter_index; i++) {
matched_id = used_ids[i];
computed_array[i] = aux_all_iters[i+(results->iter_index*matched_id)];
}
free(aux_all_iters);
}
//MPI_Barrier(comm); //FIXME Esto debería de borrarse
}
//======================================================||
......
......@@ -7,6 +7,7 @@
#define RESULTS_INIT_DATA_QTY 100
enum capture_methods{RESULTS_MAX, RESULTS_MEAN, RESULTS_MEDIAN};
typedef struct {
// Iters data
double *iters_time, **stage_times;
......@@ -25,9 +26,7 @@ void comm_results(results_data *results, int root, size_t resizes, MPI_Comm inte
void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr);
void reset_results_index(results_data *results);
void compute_results_iter(results_data *results, int myId, int numP, int root, MPI_Comm comm);
void compute_results_stages(results_data *results, int myId, int numP, int root, int n_stages, MPI_Comm comm);
void compute_results_iter(results_data *results, int myId, int numP, int root, size_t n_stages, int capture_method, MPI_Comm comm);
void print_iter_results(results_data results);
void print_stage_results(results_data results, size_t n_stages);
......
......@@ -318,9 +318,9 @@ double iterate_rigid(double *time, double *times_stages) {
start_time = MPI_Wtime();
for(i=0; i < config_file->n_stages; i++) {
MPI_Barrier(comm);
start_time_stage = MPI_Wtime();
aux+= process_stage(*config_file, config_file->stages[i], *group, comm);
MPI_Barrier(comm);
times_stages[i] = MPI_Wtime() - start_time_stage;
}
......@@ -359,8 +359,7 @@ int print_local_results() {
int ptr_local, ptr_out, err;
char *file_name;
compute_results_iter(results, group->myId, group->numP, ROOT, comm);
compute_results_stages(results, group->myId, group->numP, config_file->n_stages, ROOT, comm);
compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
if(group->myId == ROOT) {
ptr_out = dup(1);
......
......@@ -28,7 +28,8 @@ typedef struct {
typedef struct
{
int pt; // Procedure type
int pt; // Procedure type to execute
int id; // Stage identifier
// Wether the stage completes after "operations" iterations (0)
// or after "t_stage" time has passed (1).
int t_capped;
......@@ -41,6 +42,9 @@ typedef struct
// Arrays to communicate data;
char* array, *full_array;
double* double_array;
int req_count;
MPI_Request *reqs;
// Arrays to indicate how many bytes are received from each rank
struct Counts counts;
......@@ -57,7 +61,7 @@ typedef struct
{
size_t n_groups, n_resizes, n_stages; // n_groups==n_resizes+1
size_t actual_group, actual_stage;
int rigid_times;
int rigid_times, capture_method;
int granularity;
size_t sdr, adr;
......
......@@ -21,15 +21,20 @@ void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, in
}
}
void point_to_point_inter(int myId, int numP, MPI_Comm comm, char *array, int qty) {
void point_to_point_inter(int myId, int numP, MPI_Comm comm, char *array, char *r_array, int qty) {
int target;
target = (myId + numP/2)%numP;
MPI_Sendrecv(array, qty, MPI_CHAR, target, 99, r_array, qty, MPI_CHAR, target, 99, comm, MPI_STATUS_IGNORE);
}
void point_to_point_asynch_inter(int myId, int numP, MPI_Comm comm, char *array, char *r_array, int qty, MPI_Request *reqs) {
int target;
target = (myId + numP/2)%numP;
if(myId < numP/2) {
MPI_Send(array, qty, MPI_CHAR, target, 99, comm);
//MPI_Recv(array, qty, MPI_CHAR, target, 99, comm, MPI_STATUS_IGNORE);
MPI_Isend(array, qty, MPI_CHAR, target, 99, comm, &(reqs[0]));
MPI_Irecv(r_array, qty, MPI_CHAR, target, 99, comm, &(reqs[1]));
} else {
MPI_Recv(array, qty, MPI_CHAR, target, 99, comm, MPI_STATUS_IGNORE);
//MPI_Send(array, qty, MPI_CHAR, target, 99, comm);
MPI_Irecv(r_array, qty, MPI_CHAR, target, 99, comm, &(reqs[0]));
MPI_Isend(array, qty, MPI_CHAR, target, 99, comm, &(reqs[1]));
}
}
......@@ -7,6 +7,8 @@
void point_to_point(int myId, int numP, int root, MPI_Comm comm, char *array, int qty);
void point_to_point_inter(int myId, int numP, MPI_Comm comm, char *array, int qty);
void point_to_point_inter(int myId, int numP, MPI_Comm comm, char *array, char *r_array, int qty);
void point_to_point_asynch_inter(int myId, int numP, MPI_Comm comm, char *array, char *r_array, int qty, MPI_Request *reqs);
#endif
......@@ -10,6 +10,8 @@
void malloc_config_resizes(configuration *user_config);
void malloc_config_stages(configuration *user_config);
void free_config_stage(iter_stage_t *stage, int *freed_ids, size_t *found_ids);
void def_struct_config_file(configuration *config_file);
void def_struct_groups(configuration *config_file);
void def_struct_iter_stage(configuration *config_file);
......@@ -96,12 +98,14 @@ void malloc_config_stages(configuration *user_config) {
user_config->stages[i].array = NULL;
user_config->stages[i].full_array = NULL;
user_config->stages[i].double_array = NULL;
user_config->stages[i].reqs = NULL;
user_config->stages[i].counts.counts = NULL;
user_config->stages[i].bytes = 0;
user_config->stages[i].my_bytes = 0;
user_config->stages[i].real_bytes = 0;
user_config->stages[i].operations = 0;
user_config->stages[i].pt = 0;
user_config->stages[i].id = -1;
user_config->stages[i].t_op = 0;
user_config->stages[i].t_stage = 0;
user_config->stages[i].t_capped = 0;
......@@ -115,25 +119,13 @@ void malloc_config_stages(configuration *user_config) {
* Libera toda la memoria de una estructura de configuracion
*/
void free_config(configuration *user_config) {
size_t i;
size_t i, found_ids;
int *freed_ids;
found_ids = 0;
if(user_config != NULL) {
freed_ids = (int *) malloc(user_config->n_stages * sizeof(int));
for(i=0; i < user_config->n_stages; i++) {
if(user_config->stages[i].array != NULL) {
free(user_config->stages[i].array);
user_config->stages[i].array = NULL;
}
if(user_config->stages[i].full_array != NULL) {
free(user_config->stages[i].full_array);
user_config->stages[i].full_array = NULL;
}
if(user_config->stages[i].double_array != NULL) {
free(user_config->stages[i].double_array);
user_config->stages[i].double_array = NULL;
}
if(user_config->stages[i].counts.counts != NULL) {
freeCounts(&(user_config->stages[i].counts));
}
free_config_stage(&(user_config->stages[i]), freed_ids, &found_ids);
}
//Liberar tipos derivados
MPI_Type_free(&(user_config->config_type));
......@@ -148,7 +140,56 @@ void free_config(configuration *user_config) {
free(user_config->groups);
free(user_config->stages);
free(user_config);
free(freed_ids);
}
}
/*
* Libera toda la memoria de una stage
*/
void free_config_stage(iter_stage_t *stage, int *freed_ids, size_t *found_ids) {
size_t i;
int free_reqs;
free_reqs = 1;
if(stage->id > -1) {
for(i=0; i<*found_ids; i++) {
if(stage->id == freed_ids[i]) {
free_reqs = 0;
break;
}
}
if(free_reqs) {
freed_ids[*found_ids] = stage->id;
*found_ids=*found_ids + 1;
}
}
if(stage->array != NULL) {
free(stage->array);
stage->array = NULL;
}
if(stage->full_array != NULL) {
free(stage->full_array);
stage->full_array = NULL;
}
if(stage->double_array != NULL) {
free(stage->double_array);
stage->double_array = NULL;
}
if(stage->reqs != NULL && free_reqs) {
for(i=0; i<stage->req_count; i++) {
if(stage->reqs[i] != MPI_REQUEST_NULL) {
MPI_Request_free(&(stage->reqs[i]));
stage->reqs[i] = MPI_REQUEST_NULL;
}
}
free(stage->reqs);
stage->reqs = NULL;
}
if(stage->counts.counts != NULL) {
freeCounts(&(stage->counts));
}
}
......@@ -257,18 +298,18 @@ void recv_config_file(int root, MPI_Comm intercomm, configuration **config_file_
/*
* Tipo derivado para enviar 6 elementos especificos
* Tipo derivado para enviar 7 elementos especificos
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file(configuration *config_file) {
int i, counts = 6;
int blocklengths[6] = {1, 1, 1, 1, 1, 1};
int i, counts = 7;
int blocklengths[7] = {1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = MPI_UNSIGNED_LONG;
types[4] = types[5] = MPI_INT;
types[4] = types[5] = types[6] = MPI_INT;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
......@@ -279,6 +320,7 @@ void def_struct_config_file(configuration *config_file) {
MPI_Get_address(&(config_file->adr), &displs[3]);
MPI_Get_address(&(config_file->granularity), &displs[4]);
MPI_Get_address(&(config_file->rigid_times), &displs[5]);
MPI_Get_address(&(config_file->capture_method), &displs[6]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -333,24 +375,25 @@ void def_struct_groups(configuration *config_file) {
* de la estructuras de fases de iteracion en una sola comunicacion.
*/
void def_struct_iter_stage(configuration *config_file) {
int i, counts = 5;
int blocklengths[5] = {1, 1, 1, 1, 1};
int i, counts = 6;
int blocklengths[6] = {1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
iter_stage_t *stages = config_file->stages;
// Rellenar vector types
types[0] = types[1] = types[2] = MPI_INT;
types[3] = types[4] = MPI_DOUBLE;
types[0] = types[1] = types[2] = types[3] = MPI_INT;
types[4] = types[5] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(stages, &dir);
MPI_Get_address(&(stages->pt), &displs[0]);
MPI_Get_address(&(stages->bytes), &displs[1]);
MPI_Get_address(&(stages->t_capped), &displs[2]);
MPI_Get_address(&(stages->t_stage), &displs[3]);
MPI_Get_address(&(stages->t_op), &displs[4]);
MPI_Get_address(&(stages->id), &displs[1]);
MPI_Get_address(&(stages->bytes), &displs[2]);
MPI_Get_address(&(stages->t_capped), &displs[3]);
MPI_Get_address(&(stages->t_stage), &displs[4]);
MPI_Get_address(&(stages->t_op), &displs[5]);
for(i=0;i<counts;i++) displs[i] -= dir;
......
......@@ -9,13 +9,17 @@
#include "../malleability/distribution_methods/block_distribution.h"
double init_emulation_comm_time(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_emulation_icomm_time(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_comm_iptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_comm_wait_pt(configuration *config_file, iter_stage_t *stage);
/*
* Calcula el tiempo por operacion o total de bytes a enviar
......@@ -52,7 +56,10 @@ double init_stage(configuration *config_file, int stage_i, group_data group, MPI
//Comunicación
case COMP_POINT:
init_comm_ptop_pt(group, config_file, stage, comm);
result = init_comm_ptop_pt(group, config_file, stage, comm, compute);
break;
case COMP_IPOINT:
result = init_comm_iptop_pt(group, config_file, stage, comm, compute);
break;
case COMP_BCAST:
result = init_comm_bcast_pt(group, config_file, stage, comm, compute);
......@@ -64,6 +71,9 @@ double init_stage(configuration *config_file, int stage_i, group_data group, MPI
case COMP_ALLREDUCE:
result = init_comm_reduce_pt(group, config_file, stage, comm, compute);
break;
case COMP_WAIT:
result = init_comm_wait_pt(config_file, stage);
break;
}
return result;
}
......@@ -93,7 +103,22 @@ double process_stage(configuration config_file, iter_stage_t stage, group_data g
break;
//Comunicaciones
case COMP_POINT:
point_to_point(group.myId, group.numP, ROOT, comm, stage.array, stage.real_bytes);
if(stage.t_capped) {
while(t_total < stage.t_stage) {
point_to_point_inter(group.myId, group.numP, comm, stage.array, stage.full_array, stage.real_bytes);
t_total = MPI_Wtime() - t_start;
MPI_Bcast(&t_total, 1, MPI_DOUBLE, ROOT, comm);
}
} else {
for(i=0; i < stage.operations; i++) {
point_to_point_inter(group.myId, group.numP, comm, stage.array, stage.full_array, stage.real_bytes);
}
}
break;
case COMP_IPOINT:
for(i=0; i < stage.operations; i++) {
point_to_point_asynch_inter(group.myId, group.numP, comm, stage.array, stage.full_array, stage.real_bytes, &(stage.reqs[i*2])); //FIXME Magical number
}
break;
case COMP_BCAST:
......@@ -148,6 +173,33 @@ double process_stage(configuration config_file, iter_stage_t stage, group_data g
}
}
break;
case COMP_WAIT:
if(stage.t_capped) { //FIXME Right now, COMP_WAIT with t_capped only works for P2P comms
int remaining;
i = 0;
// Wait until t_stage time has passed
while(t_total < stage.t_stage) {
MPI_Waitall(2, &(stage.reqs[i*2]), MPI_STATUSES_IGNORE); //FIXME Magical number
t_total = MPI_Wtime() - t_start;
i++;
MPI_Bcast(&t_total, 1, MPI_DOUBLE, ROOT, comm);
}
remaining = stage.operations - i;
// If there are operations remaning, terminate them
if (remaining) {
for(; i < stage.operations; i++) {
MPI_Cancel(&(stage.reqs[i*2])); //FIXME Magical number
MPI_Cancel(&(stage.reqs[i*2+1])); //FIXME Magical number
}
MPI_Waitall(remaining*2, &(stage.reqs[(stage.operations-remaining)*2]), MPI_STATUSES_IGNORE); //FIXME Magical number
}
} else {
MPI_Waitall(stage.req_count, stage.reqs, MPI_STATUSES_IGNORE);
}
break;
}
return result;
}
......@@ -176,6 +228,32 @@ double init_emulation_comm_time(group_data group, configuration *config_file, it
return time;
}
double init_emulation_icomm_time(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
double start_time, end_time, time = 0;
double t_stage;
iter_stage_t wait_stage;
wait_stage.pt = COMP_WAIT;
wait_stage.id = stage->id;
wait_stage.operations = stage->operations;
wait_stage.req_count = stage->req_count;
wait_stage.reqs = stage->reqs;
MPI_Barrier(comm);
start_time = MPI_Wtime();
process_stage(*config_file, *stage, group, comm);
process_stage(*config_file, wait_stage, group, comm);
MPI_Barrier(comm);
end_time = MPI_Wtime();
stage->t_op = (end_time - start_time) / stage->operations; //Tiempo de una operacion
t_stage = stage->t_stage * config_file->groups[group.grp].factor;
stage->operations = ceil(t_stage / stage->t_op);
MPI_Bcast(&(stage->operations), 1, MPI_INT, ROOT, comm);
return time;
}
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double result, t_stage, start_time;
......@@ -218,20 +296,57 @@ double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *st
return result;
}
void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
int aux_bytes = stage->bytes;
double init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double time = 0;
if(stage->array != NULL)
free(stage->array);
if(aux_bytes == 0) {
MPI_Barrier(comm);
//aux_bytes = (stage->t_stage - config_file->latency_m) * config_file->bw_m;
init_emulation_comm_time(group, config_file, stage, comm);
if(stage->full_array != NULL)
free(stage->full_array);
stage->real_bytes = (stage->bytes && !stage->t_capped) ? stage->bytes : config_file->granularity;
stage->array = malloc(stage->real_bytes * sizeof(char));
stage->full_array = malloc(stage->real_bytes * sizeof(char));
if(compute && !stage->bytes && !stage->t_capped) {
time = init_emulation_comm_time(group, config_file, stage, comm);
} else {
stage->operations = 1;
}
stage->real_bytes = aux_bytes;
return time;
}
double init_comm_iptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
int i;
double time = 0;
if(stage->array != NULL)
free(stage->array);
if(stage->full_array != NULL)
free(stage->full_array);
if(stage->reqs != NULL) //FIXME May be erroneous if request are active...
free(stage->reqs);
stage->real_bytes = (stage->bytes && !stage->t_capped) ? stage->bytes : config_file->granularity;
stage->array = malloc(stage->real_bytes * sizeof(char));
stage->full_array = malloc(stage->real_bytes * sizeof(char));
if(compute && !stage->bytes) { // t_capped is not considered in this case
stage->req_count = 2 * stage->operations; //FIXME Magical number
stage->reqs = (MPI_Request *) malloc(stage->req_count * sizeof(MPI_Request));
time = init_emulation_icomm_time(group, config_file, stage, comm);
free(stage->reqs);
} else {
stage->operations = 1;
}
stage->req_count = 2 * stage->operations; //FIXME Magical number
stage->reqs = (MPI_Request *) malloc(stage->req_count * sizeof(MPI_Request));
for(i=0; i < stage->req_count; i++) {
stage->reqs[i] = MPI_REQUEST_NULL;
}
return time;
}
// TODO Compute should be always 1 if the number of processes is different
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
double time = 0;
......@@ -301,3 +416,29 @@ double init_comm_reduce_pt(group_data group, configuration *config_file, iter_st
return time;
}
double init_comm_wait_pt(configuration *config_file, iter_stage_t *stage) {
size_t i;
double time = 0;
iter_stage_t aux_stage;
if(stage->id < 0) {
printf("Error when initializing wait stage. Id is negative\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
for(i=0; i<config_file->n_stages; i++) {
aux_stage = config_file->stages[i];
if(aux_stage.id == stage->id) { break; }
}
if(i == config_file->n_stages) {
printf("Error when initializing wait stage. Not found a corresponding id\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
stage->req_count = aux_stage.req_count;
stage->reqs = aux_stage.reqs;
return time;
}
......@@ -6,7 +6,7 @@
#include <mpi.h>
#include "Main_datatypes.h"
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_BCAST, COMP_ALLGATHER, COMP_REDUCE, COMP_ALLREDUCE};
enum compute_methods{COMP_PI, COMP_MATRIX, COMP_POINT, COMP_IPOINT, COMP_WAIT, COMP_BCAST, COMP_ALLGATHER, COMP_REDUCE, COMP_ALLREDUCE};
double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute);
//double stage_init_all();
......
......@@ -772,7 +772,7 @@ int shrink_redistribution() {
double time_extra = MPI_Wtime();
//TODO Create new state before collecting zombies. Processes can perform tasks before that. Then call again Malleability to commit the change
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall_conf->config_file->n_stages);
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall_conf->config_file->n_stages, mall_conf->config_file->capture_method);
if(mall->myId < mall->numC) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
......
......@@ -17,7 +17,7 @@ int offset_pids, *pids = NULL;
void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, int n_stages) {
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, size_t n_stages, int capture_method) {
int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int));
......@@ -44,8 +44,7 @@ void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int
// FIXME No deberia estar aqui
// Needed to ensure iteration times are collected before suspending these processes
results_data *results = (results_data *) results_void;
compute_results_iter(results, myId, numP,root, comm);
compute_results_stages(results, myId, numP, n_stages, root, comm);
compute_results_iter(results, myId, numP, root, n_stages, capture_method, comm);
zombies_suspend();
}
}
......
......@@ -9,7 +9,7 @@
#include <mpi.h>
#include <signal.h>
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, int n_stages);
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, size_t n_stages, int capture_method);
void zombies_service_init();
void zombies_service_free();
void zombies_awake();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment