Commit fc4c012e authored by iker_martin's avatar iker_martin
Browse files

New function to perform user redistribution. Config is now redistributed by...

New function to perform user redistribution. Config is now redistributed by Main. Minor improvements in MaM and simplifications in SAM.
parent 63787a2e
...@@ -29,6 +29,10 @@ int print_local_results(); ...@@ -29,6 +29,10 @@ int print_local_results();
int print_final_results(); int print_final_results();
int create_out_file(char *nombre, int *ptr, int newstdout); int create_out_file(char *nombre, int *ptr, int newstdout);
void init_originals();
void init_targets();
configuration *config_file; configuration *config_file;
group_data *group; group_data *group;
results_data *results; results_data *results;
...@@ -36,10 +40,9 @@ MPI_Comm comm, new_comm; ...@@ -36,10 +40,9 @@ MPI_Comm comm, new_comm;
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
int numP, myId, res, commited; int numP, myId, res;
int req; int req;
int im_child; int im_child;
size_t i;
int num_cpus, num_nodes; int num_cpus, num_nodes;
char *nodelist = NULL; char *nodelist = NULL;
...@@ -67,91 +70,20 @@ int main(int argc, char *argv[]) { ...@@ -67,91 +70,20 @@ int main(int argc, char *argv[]) {
if(!im_child) { //TODO REFACTOR Simplificar inicio if(!im_child) { //TODO REFACTOR Simplificar inicio
init_application(); init_application();
init_originals();
set_benchmark_configuration(config_file);
if(config_file->n_groups > 1) {
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0);
malleability_add_data(&run_id, 1, MPI_INT, 1, 0);
malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);
malleability_add_data(&(results->exec_start), 1, MPI_DOUBLE, 1, 0);
if(config_file->sdr) {
for(i=0; i<group->sync_data_groups; i++) {
malleability_add_data(group->sync_array[i], group->sync_qty[i], MPI_CHAR, 0, 0);
}
}
if(config_file->adr) {
for(i=0; i<group->async_data_groups; i++) {
malleability_add_data(group->async_array[i], group->async_qty[i], MPI_CHAR, 0, 1);
}
}
}
MPI_Barrier(comm); MPI_Barrier(comm);
results->exec_start = MPI_Wtime(); results->exec_start = MPI_Wtime();
} else { //Init hijos } else { //Init targets
MAM_Commit(&commited, &comm); init_targets();
get_benchmark_configuration(&config_file);
// TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos
void *value = NULL;
size_t entries;
malleability_get_data(&value, 0, 1, 0);
group->grp = *((int *)value);
malleability_get_data(&value, 1, 1, 0);
run_id = *((int *)value);
malleability_get_data(&value, 2, 1, 0);
group->iter_start = *((int *)value);
if(config_file->sdr) {
malleability_get_entries(&entries, 0, 0);
group->sync_qty = (int *) malloc(entries * sizeof(int));
group->sync_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 0);
group->sync_array[i] = (char *)value;
group->sync_qty[i] = DR_MAX_SIZE;
}
group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
group->sync_data_groups = entries;
}
if(config_file->adr) {
malleability_get_entries(&entries, 0, 1);
group->async_qty = (int *) malloc(entries * sizeof(int));
group->async_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 1);
group->async_array[i] = (char *)value;
group->async_qty[i] = DR_MAX_SIZE;
}
group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
group->async_data_groups = entries;
}
group->grp = group->grp + 1;
results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
malleability_get_data(&value, 3, 1, 0);
results->exec_start = *((double *)value);
} }
// //
// EMPIEZA LA EJECUCION------------------------------- // EMPIEZA LA EJECUCION-------------------------------
// //
group->grp = group->grp - 1; // TODO REFACTOR???
do { do {
MPI_Comm_size(comm, &(group->numP)); MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId)); MPI_Comm_rank(comm, &(group->myId));
group->grp = group->grp + 1;
if(group->grp != 0) { if(group->grp != 0) {
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
...@@ -180,9 +112,12 @@ int main(int argc, char *argv[]) { ...@@ -180,9 +112,12 @@ int main(int argc, char *argv[]) {
print_local_results(); print_local_results();
reset_results_index(results); reset_results_index(results);
if(comm != MPI_COMM_WORLD) MPI_Comm_free(&comm); if(res!=1) {
comm = new_comm; if(comm != MPI_COMM_WORLD) MPI_Comm_free(&comm);
} while(config_file->n_groups > group->grp + 1 && config_file->groups[group->grp+1].sm == MALL_SPAWN_MERGE); comm = new_comm;
}
group->grp = group->grp + 1;
} while(config_file->n_groups > group->grp && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE);
// //
// TERMINA LA EJECUCION ---------------------------------------------------------- // TERMINA LA EJECUCION ----------------------------------------------------------
...@@ -219,7 +154,7 @@ int main(int argc, char *argv[]) { ...@@ -219,7 +154,7 @@ int main(int argc, char *argv[]) {
* de procesos. En caso contrario se devuelve 0. * de procesos. En caso contrario se devuelve 0.
*/ */
int work() { int work() {
int iter, maxiter, state, res, commited; int iter, maxiter, state, res, commited, targets_qty;
int wait_completed = MAM_CHECK_COMPLETION; int wait_completed = MAM_CHECK_COMPLETION;
maxiter = config_file->groups[group->grp].iters; maxiter = config_file->groups[group->grp].iters;
...@@ -243,9 +178,15 @@ int work() { ...@@ -243,9 +178,15 @@ int work() {
malleability_checkpoint(&state, wait_completed); malleability_checkpoint(&state, wait_completed);
} }
// This function causes an overhead in the recorded time for last group
if(config_file->n_groups == group->grp + 1) res=1; compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
else { MAM_Commit(&commited, &new_comm); } if(config_file->n_groups == group->grp + 1) { res=1; }
else {
MAM_Get_comm(&new_comm, &targets_qty);
send_config_file(config_file, ROOT, new_comm);
MPI_Comm_free(&new_comm);
MAM_Commit(&commited, &new_comm);
}
if(state == MAM_ZOMBIE) res=state; if(state == MAM_ZOMBIE) res=state;
return res; return res;
} }
...@@ -373,7 +314,7 @@ int print_local_results() { ...@@ -373,7 +314,7 @@ int print_local_results() {
int ptr_local, ptr_out, err; int ptr_local, ptr_out, err;
char *file_name; char *file_name;
compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm); //compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
if(group->myId == ROOT) { if(group->myId == ROOT) {
ptr_out = dup(1); ptr_out = dup(1);
...@@ -407,7 +348,7 @@ int print_final_results() { ...@@ -407,7 +348,7 @@ int print_final_results() {
if(group->myId == ROOT) { if(group->myId == ROOT) {
if(config_file->n_groups == group->grp+1) { if(config_file->n_groups == group->grp) {
file_name = NULL; file_name = NULL;
file_name = malloc(20 * sizeof(char)); file_name = malloc(20 * sizeof(char));
if(file_name == NULL) return -1; // No ha sido posible alojar la memoria if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
...@@ -574,3 +515,92 @@ int create_out_file(char *nombre, int *ptr, int newstdout) { ...@@ -574,3 +515,92 @@ int create_out_file(char *nombre, int *ptr, int newstdout) {
return 0; return 0;
} }
//======================================================||
//======================================================||
//================ INIT MALLEABILITY ===================||
//======================================================||
//======================================================||
void init_originals() {
size_t i;
if(config_file->n_groups > 1) {
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
malleability_add_data(&(group->grp), 1, MPI_INT, 1, 0);
malleability_add_data(&run_id, 1, MPI_INT, 1, 0);
malleability_add_data(&(group->iter_start), 1, MPI_INT, 1, 0);
malleability_add_data(&(results->exec_start), 1, MPI_DOUBLE, 1, 0);
if(config_file->sdr) {
for(i=0; i<group->sync_data_groups; i++) {
malleability_add_data(group->sync_array[i], group->sync_qty[i], MPI_CHAR, 0, 0);
}
}
if(config_file->adr) {
for(i=0; i<group->async_data_groups; i++) {
malleability_add_data(group->async_array[i], group->async_qty[i], MPI_CHAR, 0, 1);
}
}
}
}
void init_targets() {
int commited, targets_qty;
size_t i;
MAM_Get_comm(&new_comm, &targets_qty);
recv_config_file(ROOT, new_comm, &config_file);
MPI_Comm_free(&new_comm);
MAM_Commit(&commited, &comm);
// TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos
void *value = NULL;
size_t entries;
malleability_get_data(&value, 0, 1, 0);
group->grp = *((int *)value);
malleability_get_data(&value, 1, 1, 0);
run_id = *((int *)value);
malleability_get_data(&value, 2, 1, 0);
group->iter_start = *((int *)value);
if(config_file->sdr) {
malleability_get_entries(&entries, 0, 0);
group->sync_qty = (int *) malloc(entries * sizeof(int));
group->sync_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 0);
group->sync_array[i] = (char *)value;
group->sync_qty[i] = DR_MAX_SIZE;
}
group->sync_qty[entries-1] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
group->sync_data_groups = entries;
}
if(config_file->adr) {
malleability_get_entries(&entries, 0, 1);
group->async_qty = (int *) malloc(entries * sizeof(int));
group->async_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 1);
group->async_array[i] = (char *)value;
group->async_qty[i] = DR_MAX_SIZE;
}
group->async_qty[entries-1] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
group->async_data_groups = entries;
}
group->grp = group->grp + 1;
results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
malleability_get_data(&value, 3, 1, 0);
results->exec_start = *((double *)value);
}
...@@ -4,11 +4,10 @@ ...@@ -4,11 +4,10 @@
/* /*
* Shows available data structures for inner ussage. * Shows available data structures for inner ussage.
*/ */
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h> #include <mpi.h>
#include <pthread.h>
//FIXME Remove both includes
#include "../Main/configuration.h"
#include "../Main/Main_datatypes.h"
#define DEBUG_FUNC(debug_string, rank, numP) printf("MaM [P%d/%d]: %s -- %s:%s:%d\n", rank, numP, debug_string, __FILE__, __func__, __LINE__) #define DEBUG_FUNC(debug_string, rank, numP) printf("MaM [P%d/%d]: %s -- %s:%s:%d\n", rank, numP, debug_string, __FILE__, __func__, __LINE__)
...@@ -55,7 +54,6 @@ typedef struct { ...@@ -55,7 +54,6 @@ typedef struct {
int red_strategies; int red_strategies;
malleability_times_t *times; malleability_times_t *times;
configuration *config_file;
} malleability_config_t; } malleability_config_t;
typedef struct { //FIXME numC_spawned no se esta usando typedef struct { //FIXME numC_spawned no se esta usando
...@@ -65,7 +63,6 @@ typedef struct { //FIXME numC_spawned no se esta usando ...@@ -65,7 +63,6 @@ typedef struct { //FIXME numC_spawned no se esta usando
MPI_Comm intercomm; MPI_Comm intercomm;
MPI_Comm user_comm; MPI_Comm user_comm;
MPI_Datatype struct_type; MPI_Datatype struct_type;
int dup_user_comm; //FIXME Borrar
char *name_exec, *nodelist; char *name_exec, *nodelist;
int num_cpus, num_nodes, nodelist_len; int num_cpus, num_nodes, nodelist_len;
......
...@@ -62,11 +62,10 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex ...@@ -62,11 +62,10 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t)); rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t)); dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
mall->dup_user_comm = 0;
MPI_Comm_dup(comm, &dup_comm); MPI_Comm_dup(comm, &dup_comm);
MPI_Comm_dup(comm, &thread_comm); MPI_Comm_dup(comm, &thread_comm);
MPI_Comm_set_name(dup_comm, "MPI_COMM_MALL"); MPI_Comm_set_name(dup_comm, "MPI_COMM_MAM");
MPI_Comm_set_name(thread_comm, "MPI_COMM_MALL_THREAD"); MPI_Comm_set_name(thread_comm, "MPI_COMM_MAM_THREAD");
mall->myId = myId; mall->myId = myId;
mall->numP = numP; mall->numP = numP;
...@@ -74,7 +73,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex ...@@ -74,7 +73,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
mall->root_parents = -1; mall->root_parents = -1;
mall->comm = dup_comm; mall->comm = dup_comm;
mall->thread_comm = thread_comm; mall->thread_comm = thread_comm;
mall->user_comm = comm; mall->user_comm = MPI_COMM_NULL;
mall->name_exec = name_exec; mall->name_exec = name_exec;
mall->nodelist = nodelist; mall->nodelist = nodelist;
...@@ -165,6 +164,7 @@ void free_malleability() { ...@@ -165,6 +164,7 @@ void free_malleability() {
* y finalmente se desconectan los dos grupos de procesos. * y finalmente se desconectan los dos grupos de procesos.
*/ */
int malleability_checkpoint(int *mam_state, int wait_completed) { int malleability_checkpoint(int *mam_state, int wait_completed) {
int is_intercomm;
switch(state) { switch(state) {
case MALL_UNRESERVED: case MALL_UNRESERVED:
...@@ -242,13 +242,20 @@ int malleability_checkpoint(int *mam_state, int wait_completed) { ...@@ -242,13 +242,20 @@ int malleability_checkpoint(int *mam_state, int wait_completed) {
malleability_checkpoint(mam_state, wait_completed); malleability_checkpoint(mam_state, wait_completed);
break; break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo? case MALL_DIST_COMPLETED:
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(is_intercomm) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->user_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->user_comm);
}
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");
state = MALL_COMPLETED;
*mam_state = MAM_COMPLETED;
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->comm); MPI_Barrier(mall->comm);
#endif #endif
mall_conf->times->malleability_end = MPI_Wtime(); mall_conf->times->malleability_end = MPI_Wtime();
state = MALL_COMPLETED;
*mam_state = MAM_COMPLETED;
break; break;
} }
...@@ -256,6 +263,29 @@ int malleability_checkpoint(int *mam_state, int wait_completed) { ...@@ -256,6 +263,29 @@ int malleability_checkpoint(int *mam_state, int wait_completed) {
return state; return state;
} }
/*
* Returns an intracommunicator to allow users to perform their
* own redistributions. The user must free this communicator
* when is not longer needed.
*
* This is a blocking function, must be called by all processes involved in the
* reconfiguration.
* TODO Hacer en otro sitio la creacion del comunicador y borrar en commit.
*
* The communicator is only returned if the state of reconfiguration
* is completed (MALL_COMPLETED / MAM_COMPLETED). Otherwise MALL_DENIED is obtained.
*/
int MAM_Get_comm(MPI_Comm *comm, int *targets_qty) {
if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
return MALL_DENIED;
}
MPI_Comm_dup(mall->user_comm, comm);
MPI_Comm_set_name(*comm, "MPI_MAM_DUP");
*targets_qty = mall->numC;
return 0;
}
/* /*
* TODO * TODO
*/ */
...@@ -268,39 +298,37 @@ void MAM_Commit(int *mam_state, MPI_Comm *new_comm) { ...@@ -268,39 +298,37 @@ void MAM_Commit(int *mam_state, MPI_Comm *new_comm) {
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); if(mall->myId == mall->root) DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
int zombies;
MPI_Allreduce(&state, &zombies, 1, MPI_INT, MPI_MIN, mall->intercomm);
if(zombies == MALL_ZOMBIE) {
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, NULL, 0, 0);
}
// Zombies treatment
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
int zombies;
MPI_Allreduce(&state, &zombies, 1, MPI_INT, MPI_MIN, mall->intercomm);
if(zombies == MALL_ZOMBIE) {
zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root);
}
}
// Reset/Free unneded communicators
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm));
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); } if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) {
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
} }
MPI_Comm_rank(mall->comm, &(mall->myId));
MPI_Comm_size(mall->comm, &(mall->numP));
mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents; mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
mall->root_parents = -1; mall->root_parents = -1;
state = MALL_NOT_STARTED; state = MALL_NOT_STARTED;
*mam_state = MAM_COMMITED; *mam_state = MAM_COMMITED;
*new_comm = mall->user_comm;
// Set new communicator
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *new_comm = MPI_COMM_WORLD; }
else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, new_comm); }
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
} }
// Funciones solo necesarias por el benchmark
//-------------------------------------------------------------------------------------------------------------
void set_benchmark_configuration(configuration *config_file) {
mall_conf->config_file = config_file;
}
void get_benchmark_configuration(configuration **config_file) {
*config_file = mall_conf->config_file;
}
//-------------------------------------------------------------------------------------------------------------
void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) { void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
malleability_I_retrieve_times(sp_time, sy_time, asy_time, mall_time); malleability_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
} }
...@@ -341,19 +369,6 @@ void set_children_number(int numC){ ...@@ -341,19 +369,6 @@ void set_children_number(int numC){
} }
} }
/*
* TODO
*/
void get_malleability_user_comm(MPI_Comm *comm) {
if(mall->dup_user_comm) {
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm));
MPI_Comm_dup(mall->comm, &(mall->user_comm));
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
mall->dup_user_comm = 0;
}
*comm = mall->user_comm;
}
/* /*
* Anyade a la estructura concreta de datos elegida * Anyade a la estructura concreta de datos elegida
* el nuevo set de datos "data" de un total de "total_qty" elementos. * el nuevo set de datos "data" de un total de "total_qty" elementos.
...@@ -561,7 +576,6 @@ void Children_init() { ...@@ -561,7 +576,6 @@ void Children_init() {
MPI_Comm_size(mall->intercomm, &(mall->numP)); MPI_Comm_size(mall->intercomm, &(mall->numP));
} }
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
MAM_Comm_main_structures(root_parents); MAM_Comm_main_structures(root_parents);
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
...@@ -637,6 +651,13 @@ void Children_init() { ...@@ -637,6 +651,13 @@ void Children_init() {
mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad mall_conf->times->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
state = MALL_COMPLETED; state = MALL_COMPLETED;
if(is_intercomm) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->user_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->user_comm);
}
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
...@@ -702,7 +723,6 @@ int start_redistribution() { ...@@ -702,7 +723,6 @@ int start_redistribution() {
rootBcast = mall->root; rootBcast = mall->root;
} }
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); } if(mall_conf->spawn_method == MALL_SPAWN_BASELINE || mall->numP <= mall->numC) { MAM_Comm_main_structures(rootBcast); }
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm); comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
...@@ -847,7 +867,7 @@ int end_redistribution() { ...@@ -847,7 +867,7 @@ int end_redistribution() {
///============================================= ///=============================================
///============================================= ///=============================================
///============================================= ///=============================================
//TODO Add comment //TODO DEPRECATED
int shrink_redistribution() { int shrink_redistribution() {
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->comm); MPI_Barrier(mall->comm);
...@@ -855,13 +875,12 @@ int shrink_redistribution() { ...@@ -855,13 +875,12 @@ int shrink_redistribution() {
double time_extra = MPI_Wtime(); double time_extra = MPI_Wtime();
//TODO Create Commit function. Processes can perform tasks before that. Then call again Malleability to commit the change //TODO Create Commit function. Processes can perform tasks before that. Then call again Malleability to commit the change
MPI_Abort(MPI_COMM_WORLD, -20); // (void *) mall_conf->results MPI_Abort(MPI_COMM_WORLD, -20); //
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, NULL, mall_conf->config_file->n_stages, mall_conf->config_file->capture_method); zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root);
if(mall->myId < mall->numC) { if(mall->myId < mall->numC) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm)); if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
mall->dup_user_comm = 1;
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm)); MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm)); MPI_Comm_dup(mall->intercomm, &(mall->comm));
...@@ -981,14 +1000,11 @@ void print_comms_state() { ...@@ -981,14 +1000,11 @@ void print_comms_state() {
void malleability_comms_update(MPI_Comm comm) { void malleability_comms_update(MPI_Comm comm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm)); if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso? Tendria que hacerlo el usuario
MPI_Comm_dup(comm, &(mall->thread_comm)); MPI_Comm_dup(comm, &(mall->thread_comm));
MPI_Comm_dup(comm, &(mall->comm)); MPI_Comm_dup(comm, &(mall->comm));
MPI_Comm_dup(comm, &(mall->user_comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MAM_THREAD"); MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MAM_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MAM"); MPI_Comm_set_name(mall->comm, "MPI_COMM_MAM");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");
} }
...@@ -7,27 +7,23 @@ ...@@ -7,27 +7,23 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <mpi.h> #include <mpi.h>
#include "../Main/configuration.h"
#include "../Main/Main_datatypes.h"
#include "malleabilityStates.h" #include "malleabilityStates.h"
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes); int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes);
void free_malleability(); void free_malleability();
void indicate_ending_malleability(int new_outside_state); void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint(int *mam_state, int wait_completed); int malleability_checkpoint(int *mam_state, int wait_completed);
int MAM_Get_comm(MPI_Comm *comm, int *targets_qty);
void MAM_Commit(int *mam_state, MPI_Comm *new_comm); void MAM_Commit(int *mam_state, MPI_Comm *new_comm);
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies); void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies);
void set_children_number(int numC); // TODO TO BE DEPRECATED void set_children_number(int numC); // TODO TO BE DEPRECATED
void get_malleability_user_comm(MPI_Comm *comm);
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant); void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant);
void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant); void malleability_modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant);
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant); void malleability_get_entries(size_t *entries, int is_replicated, int is_constant);
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant); void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant);
void set_benchmark_configuration(configuration *config_file);
void get_benchmark_configuration(configuration **config_file);
void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time); void malleability_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time);
#endif #endif
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
#include <unistd.h> #include <unistd.h>
#include <mpi.h> #include <mpi.h>
#include <signal.h> #include <signal.h>
#include "../IOcodes/results.h"
#include "malleabilityZombies.h" #include "malleabilityZombies.h"
#define PIDS_QTY 320 #define PIDS_QTY 320
...@@ -17,7 +16,7 @@ int offset_pids, *pids = NULL; ...@@ -17,7 +16,7 @@ int offset_pids, *pids = NULL;
void gestor_usr2() {} void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, size_t n_stages, int capture_method) { void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root) {
int pid = getpid(); int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int)); int *pids_counts = malloc(numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int)); int *pids_displs = malloc(numP * sizeof(int));
...@@ -41,10 +40,6 @@ void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int ...@@ -41,10 +40,6 @@ void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int
free(pids_displs); free(pids_displs);
if(myId >= numC) { if(myId >= numC) {
// FIXME No deberia estar aqui
// Needed to ensure iteration times are collected before suspending these processes
results_data *results = (results_data *) results_void;
compute_results_iter(results, myId, numP, root, n_stages, capture_method, comm);
zombies_suspend(); zombies_suspend();
} }
} }
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include <mpi.h> #include <mpi.h>
#include <signal.h> #include <signal.h>
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, size_t n_stages, int capture_method); void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root);
void zombies_service_init(); void zombies_service_init();
void zombies_service_free(); void zombies_service_free();
void zombies_awake(); void zombies_awake();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment