Commit 65641141 authored by iker_martin's avatar iker_martin
Browse files

Readed result MPI comunication. Improvements and minor fixes around MaM.

parent fc4c012e
......@@ -5,11 +5,72 @@
#define RESULTS_EXTRA_SIZE 100
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type);
void compute_max(results_data *results, double *computed_array, int myId, int root, MPI_Comm comm);
void compute_mean(results_data *results, double *computed_array, int myId, int numP, int root, MPI_Comm comm);
void compute_median(results_data *results, double *computed_array, size_t *used_ids, int myId, int numP, int root, MPI_Comm comm);
void match_median(results_data *results, double *computed_array, size_t *used_ids, int myId, int numP, int root, MPI_Comm comm);
//======================================================||
//======================================================||
//================MPI RESULTS FUNCTIONS=================||
//======================================================||
//======================================================||
/*
* Comunica una estructura de resultados a todos los procesos del comunicador
* a traves de un tipo derivado.
*
* Si se llama con un intercommunicador, el grupo de procesos que envia los datos
* tiene que indicar en el proceso raiz el valor "MPI_ROOT" para "root" y el resto
* de ese grupo el valor "MPI_PROC_NULL". Los procesos del otro grupo tienen que
* indicar el Id del proceso raiz que ha puesto "MPI_ROOT".
*/
void results_comm(results_data *results, int root, size_t resizes, MPI_Comm intercomm) {
MPI_Datatype results_type;
// Obtener un tipo derivado para enviar todos los
// datos escalares con una sola comunicacion
def_results_type(results, resizes, &results_type);
MPI_Bcast(results, 1, results_type, root, intercomm);
//Liberar tipos derivados
MPI_Type_free(&results_type);
}
/*
* Define un tipo derivado de MPI para mandar los tiempos
* con una sola comunicacion.
*
* En concreto son tres escalares y dos vectores de tamaño "resizes"
*/
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type) {
int i, counts = 6;
int blocklengths[] = {1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_DOUBLE;
blocklengths[2] = blocklengths[3] = blocklengths[4] = blocklengths[5] = resizes;
// Rellenar vector displs
MPI_Get_address(results, &dir);
MPI_Get_address(&(results->exec_start), &displs[0]);
MPI_Get_address(&(results->wasted_time), &displs[1]);
MPI_Get_address(results->sync_time, &displs[2]);
MPI_Get_address(results->async_time, &displs[3]);
MPI_Get_address(results->spawn_time, &displs[4]);
MPI_Get_address(results->malleability_time, &displs[5]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, results_type);
MPI_Type_commit(results_type);
}
//======================================================||
//======================================================||
//================SET RESULTS FUNCTIONS=================||
......
......@@ -22,6 +22,8 @@ typedef struct {
double wasted_time; // Time spent recalculating iter stages
} results_data;
void results_comm(results_data *results, int root, size_t resizes, MPI_Comm intercomm);
void reset_results_index(results_data *results);
void compute_results_iter(results_data *results, int myId, int numP, int root, size_t n_stages, int capture_method, MPI_Comm comm);
......
......@@ -66,7 +66,7 @@ int main(int argc, char *argv[]) {
}
init_group_struct(argv, argc, myId, numP);
im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
im_child = MAM_Init(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes);
if(!im_child) { //TODO REFACTOR Simplificar inicio
init_application();
......@@ -87,13 +87,13 @@ int main(int argc, char *argv[]) {
if(group->grp != 0) {
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
malleability_retrieve_times(&results->spawn_time[group->grp - 1], &results->sync_time[group->grp - 1], &results->async_time[group->grp - 1], &results->malleability_time[group->grp - 1]);
MAM_Retrieve_times(&results->spawn_time[group->grp - 1], &results->sync_time[group->grp - 1], &results->async_time[group->grp - 1], &results->malleability_time[group->grp - 1]);
}
if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
MAM_Set_target_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
if(group->grp != 0) {
malleability_modify_data(&(group->grp), 0, 1, MPI_INT, 1, 0);
......@@ -166,7 +166,7 @@ int work() {
}
if(config_file->n_groups != group->grp + 1)
malleability_checkpoint(&state, wait_completed);
MAM_Checkpoint(&state, wait_completed);
iter = 0;
while(state == MAM_PENDING) {
......@@ -175,7 +175,7 @@ int work() {
iter++;
group->iter_start = iter;
} else { wait_completed = MAM_WAIT_COMPLETION; }
malleability_checkpoint(&state, wait_completed);
MAM_Checkpoint(&state, wait_completed);
}
// This function causes an overhead in the recorded time for last group
......@@ -184,6 +184,7 @@ int work() {
else {
MAM_Get_comm(&new_comm, &targets_qty);
send_config_file(config_file, ROOT, new_comm);
results_comm(results, ROOT, config_file->n_resizes, new_comm);
MPI_Comm_free(&new_comm);
MAM_Commit(&commited, &new_comm);
}
......@@ -481,7 +482,7 @@ void free_application_data() {
free(group->async_array);
group->async_array = NULL;
}
free_malleability();
MAM_Finalize();
free_results_data(results, config_file->n_stages);
free(results);
......@@ -527,14 +528,13 @@ void init_originals() {
size_t i;
if(config_file->n_groups > 1) {
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
MAM_Set_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
MAM_Set_target_number(config_file->groups[group->grp+1].procs);
malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0);
malleability_add_data(&run_id, 1, MPI_INT, 1, 0);
malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);
malleability_add_data(&(results->exec_start), 1, MPI_DOUBLE, 1, 0);
if(config_file->sdr) {
for(i=0; i<group->sync_data_groups; i++) {
......@@ -551,20 +551,25 @@ void init_originals() {
void init_targets() {
int commited, targets_qty;
size_t i;
size_t i, entries;
void *value = NULL;
MAM_Get_comm(&new_comm, &targets_qty);
malleability_get_data(&value, 0, 1, 0);
group->grp = *((int *)value);
group->grp = group->grp + 1;
recv_config_file(ROOT, new_comm, &config_file);
results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
results_comm(results, ROOT, config_file->n_resizes, new_comm);
MPI_Comm_free(&new_comm);
MAM_Commit(&commited, &comm);
// TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos
void *value = NULL;
size_t entries;
malleability_get_data(&value, 0, 1, 0);
group->grp = *((int *)value);
malleability_get_data(&value, 1, 1, 0);
run_id = *((int *)value);
......@@ -595,12 +600,5 @@ void init_targets() {
}
group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
group->async_data_groups = entries;
}
group->grp = group->grp + 1;
results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
malleability_get_data(&value, 3, 1, 0);
results->exec_start = *((double *)value);
}
}
......@@ -8,7 +8,7 @@
//void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_strategies, MPI_Request **requests, size_t *request_qty);
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_method, int red_strategies, MPI_Request **requests, size_t *request_qty);
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method);
......@@ -325,7 +325,7 @@ int async_communication_start(void *send, void **recv, int qty, MPI_Datatype dat
aux_comm = comm;
}
// FIXME END REFACTOR
check_requests(s_counts, r_counts, red_strategies, requests, request_qty);
check_requests(s_counts, r_counts, red_method, red_strategies, requests, request_qty);
/* PERFORM COMMUNICATION */
switch(red_method) {
......@@ -651,12 +651,20 @@ void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP,
* - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
* functions to perform, it is modified to the minimum value.
*/
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_strategies, MPI_Request **requests, size_t *request_qty) {
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_method, int red_strategies, MPI_Request **requests, size_t *request_qty) {
size_t i, sum;
MPI_Request *aux;
sum = (size_t) s_counts.idE - s_counts.idI;
sum += (size_t) r_counts.idE - r_counts.idI;
switch(red_method) {
case MALL_RED_BASELINE:
sum = 1;
break;
case MALL_RED_POINT:
default:
sum = (size_t) s_counts.idE - s_counts.idI;
sum += (size_t) r_counts.idE - r_counts.idI;
break;
}
if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
sum++;
}
......
......@@ -47,7 +47,7 @@ malleability_data_t *dist_a_data;
* la comunicacion los procesos hijo estan preparados para ejecutar la
* aplicacion.
*/
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) {
int MAM_Init(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) {
MPI_Comm dup_comm, thread_comm;
#if USE_MAL_DEBUG
......@@ -123,7 +123,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
* de maleabilidad y asegura que los zombies
* despierten si los hubiese.
*/
void free_malleability() {
void MAM_Finalize() {
free_malleability_data_struct(rep_s_data);
free_malleability_data_struct(rep_a_data);
free_malleability_data_struct(dist_s_data);
......@@ -163,7 +163,7 @@ void free_malleability() {
* Si solo hay datos sincronos se envian tras la creacion de los procesos
* y finalmente se desconectan los dos grupos de procesos.
*/
int malleability_checkpoint(int *mam_state, int wait_completed) {
int MAM_Checkpoint(int *mam_state, int wait_completed) {
int is_intercomm;
switch(state) {
......@@ -184,7 +184,7 @@ int malleability_checkpoint(int *mam_state, int wait_completed) {
state = spawn_step();
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
malleability_checkpoint(mam_state, wait_completed);
MAM_Checkpoint(mam_state, wait_completed);
}
break;
......@@ -197,14 +197,14 @@ int malleability_checkpoint(int *mam_state, int wait_completed) {
#endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
malleability_checkpoint(mam_state, wait_completed);
MAM_Checkpoint(mam_state, wait_completed);
}
break;
case MALL_SPAWN_ADAPT_POSTPONE:
case MALL_SPAWN_COMPLETED:
state = start_redistribution();
malleability_checkpoint(mam_state, wait_completed);
MAM_Checkpoint(mam_state, wait_completed);
break;
case MALL_DIST_PENDING:
......@@ -214,7 +214,7 @@ int malleability_checkpoint(int *mam_state, int wait_completed) {
state = check_redistribution(wait_completed);
}
if(state != MALL_DIST_PENDING) {
malleability_checkpoint(mam_state, wait_completed);
MAM_Checkpoint(mam_state, wait_completed);
}
break;
......@@ -232,14 +232,14 @@ int malleability_checkpoint(int *mam_state, int wait_completed) {
MPI_Barrier(mall->comm);
#endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
malleability_checkpoint(mam_state, wait_completed);
MAM_Checkpoint(mam_state, wait_completed);
}
break;
case MALL_SPAWN_ADAPTED: //FIXME Borrar?
state = shrink_redistribution();
if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla
malleability_checkpoint(mam_state, wait_completed);
MAM_Checkpoint(mam_state, wait_completed);
break;
case MALL_DIST_COMPLETED:
......@@ -329,11 +329,11 @@ void MAM_Commit(int *mam_state, MPI_Comm *new_comm) {
#endif
}
void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
malleability_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
}
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
if(state > MALL_NOT_STARTED) return;
mall_conf->spawn_method = spawn_method;
......@@ -349,10 +349,9 @@ void set_malleability_configuration(int spawn_method, int spawn_strategies, int
}
/*
* To be deprecated
* Tiene que ser llamado despues de setear la config
*/
void set_children_number(int numC){
void MAM_Set_target_number(int numC){
if(state > MALL_NOT_STARTED) return;
if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
......@@ -874,7 +873,6 @@ int shrink_redistribution() {
#endif
double time_extra = MPI_Wtime();
//TODO Create Commit function. Processes can perform tasks before that. Then call again Malleability to commit the change
MPI_Abort(MPI_COMM_WORLD, -20); //
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root);
......
......@@ -9,21 +9,20 @@
#include <mpi.h>
#include "malleabilityStates.h"
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes);
void free_malleability();
void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint(int *mam_state, int wait_completed);
int MAM_Init(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes);
void MAM_Finalize();
int MAM_Checkpoint(int *mam_state, int wait_completed);
int MAM_Get_comm(MPI_Comm *comm, int *targets_qty);
void MAM_Commit(int *mam_state, MPI_Comm *new_comm);
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies);
void set_children_number(int numC); // TODO TO BE DEPRECATED
void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies);
void MAM_Set_target_number(int numC); // TODO TO BE DEPRECATED
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant);
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant);
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant);
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant);
void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time);
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time);
#endif
......@@ -50,7 +50,7 @@ void malleability_times_broadcast(int root) {
MPI_Bcast(mall_conf->times, 1, times->times_type, root, mall->intercomm);
}
void malleability_I_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
void MAM_I_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
malleability_times_t *times = mall_conf->times;
*sp_time = times->spawn_time;
*sy_time = times->sync_end - times->sync_start;
......
......@@ -11,6 +11,6 @@ void free_malleability_times();
void malleability_times_broadcast(int root);
void malleability_I_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time);
void MAM_I_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time);
#endif
......@@ -185,8 +185,14 @@ void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t q
arrays_aux[i] = NULL;
}
//TODO
//if(data_struct->qty != qty_aux && data_struct->qty != NULL) free(data_struct->qty);
// Check if old array can be freed
if(data_struct->qty != qty_aux && data_struct->qty != NULL) free(data_struct->qty);
if(data_struct->types != types_aux && data_struct->types != NULL) free(data_struct->types);
if(data_struct->request_qty != request_qty_aux && data_struct->request_qty != NULL) free(data_struct->request_qty);
if(data_struct->requests != requests_aux && data_struct->requests != NULL) free(data_struct->requests);
if(data_struct->windows != windows_aux && data_struct->windows != NULL) free(data_struct->windows);
if(data_struct->arrays != arrays_aux && data_struct->arrays != NULL) free(data_struct->arrays);
data_struct->qty = qty_aux;
data_struct->types = types_aux;
data_struct->request_qty = request_qty_aux;
......@@ -201,10 +207,6 @@ void free_malleability_data_struct(malleability_data_t *data_struct) {
max = data_struct->entries;
if(max != 0) {
for(i=0; i<max; i++) {
//free(data_struct->arrays[i]); //FIXME Valores alojados con 1 elemento no se liberan? Usar qty? Comprobar si esta alojado por el usuario?
}
if(data_struct->qty != NULL) {
free(data_struct->qty);
}
......
......@@ -2,12 +2,11 @@
#SBATCH -p P1
#SBATCH -N 1
#SBATCH --exclude=c01,c00,c02
#SBATCH --exclude=n00
partition='P1'
scriptDir="$(dirname "$0")"
source $scriptDir/build/config.txt
source build/config.txt
codeDir="/Codes"
execDir="/Exec"
cores=$(bash $dir$execDir/BashScripts/getCores.sh $partition)
......@@ -24,7 +23,7 @@ fi
echo "MPICH"
#export HYDRA_DEBUG=1
mpirun --version
numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0)
initial_nodelist=$(bash $dir$execDir/BashScripts/createInitialNodelist.sh $numP $cores $nodelist)
echo $initial_nodelist
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment