Commit d75645d6 authored by iker_martin's avatar iker_martin
Browse files

Added new option to set up configuration. Modified checkpoint to allow a user...

Added new option to set up configuration. Modified checkpoint to allow a user function so they can redistribute theiw own data. WIP.
parent 142780d4
...@@ -32,6 +32,7 @@ int create_out_file(char *nombre, int *ptr, int newstdout); ...@@ -32,6 +32,7 @@ int create_out_file(char *nombre, int *ptr, int newstdout);
void init_originals(); void init_originals();
void init_targets(); void init_targets();
void user_redistribution(void *args);
configuration *config_file; configuration *config_file;
group_data *group; group_data *group;
...@@ -66,7 +67,7 @@ int main(int argc, char *argv[]) { ...@@ -66,7 +67,7 @@ int main(int argc, char *argv[]) {
} }
init_group_struct(argv, argc, myId, numP); init_group_struct(argv, argc, myId, numP);
im_child = MAM_Init(ROOT, comm, argv[0], nodelist, num_cpus, num_nodes); im_child = MAM_Init(ROOT, &comm, argv[0], nodelist, num_cpus, num_nodes, user_redistribution, NULL);
if(!im_child) { //TODO REFACTOR Simplificar inicio if(!im_child) { //TODO REFACTOR Simplificar inicio
init_application(); init_application();
...@@ -74,14 +75,14 @@ int main(int argc, char *argv[]) { ...@@ -74,14 +75,14 @@ int main(int argc, char *argv[]) {
MPI_Barrier(comm); MPI_Barrier(comm);
results->exec_start = MPI_Wtime(); results->exec_start = MPI_Wtime();
} else { //Init targets
init_targets();
} }
// //
// EMPIEZA LA EJECUCION------------------------------- // EMPIEZA LA EJECUCION-------------------------------
// //
do { do {
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
if(group->grp != 0) { if(group->grp != 0) {
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
...@@ -101,17 +102,17 @@ int main(int argc, char *argv[]) { ...@@ -101,17 +102,17 @@ int main(int argc, char *argv[]) {
res = work(); res = work();
if(res == MAM_ZOMBIE) break;
if(res==1) { // Se ha llegado al final de la aplicacion if(res==1) { // Se ha llegado al final de la aplicacion
MPI_Barrier(comm); MPI_Barrier(comm);
results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time; results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
print_local_results(); print_local_results();
} }
reset_results_index(results); reset_results_index(results);
group->grp = group->grp + 1; group->grp = group->grp + 1;
} while(config_file->n_groups > group->grp && config_file->groups[group->grp].sm == MALL_SPAWN_MERGE); } while(config_file->n_groups > group->grp);
// //
// TERMINA LA EJECUCION ---------------------------------------------------------- // TERMINA LA EJECUCION ----------------------------------------------------------
...@@ -148,7 +149,7 @@ int main(int argc, char *argv[]) { ...@@ -148,7 +149,7 @@ int main(int argc, char *argv[]) {
* de procesos. En caso contrario se devuelve 0. * de procesos. En caso contrario se devuelve 0.
*/ */
int work() { int work() {
int iter, maxiter, state, res, commited; int iter, maxiter, state, res;
int wait_completed = MAM_CHECK_COMPLETION; int wait_completed = MAM_CHECK_COMPLETION;
maxiter = config_file->groups[group->grp].iters; maxiter = config_file->groups[group->grp].iters;
...@@ -160,30 +161,19 @@ int work() { ...@@ -160,30 +161,19 @@ int work() {
} }
if(config_file->n_groups != group->grp + 1) if(config_file->n_groups != group->grp + 1)
MAM_Checkpoint(&state, wait_completed); MAM_Checkpoint(&state, wait_completed, user_redistribution, NULL);
iter = 0; iter = 0;
while(state == MAM_PENDING) { while(state == MAM_PENDING || state == MAM_USER_PENDING) {
if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) { if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
iterate(state); iterate(state);
iter++; iter++;
group->iter_start = iter; group->iter_start = iter;
} else { wait_completed = MAM_WAIT_COMPLETION; } } else { wait_completed = MAM_WAIT_COMPLETION; }
MAM_Checkpoint(&state, wait_completed); MAM_Checkpoint(&state, wait_completed, user_redistribution, NULL);
} }
// This function causes an overhead in the recorded time for last group
compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
if(config_file->n_groups == group->grp + 1) { res=1; } if(config_file->n_groups == group->grp + 1) { res=1; }
else {
MAM_Get_comm(&new_comm);
send_config_file(config_file, ROOT, new_comm);
results_comm(results, ROOT, config_file->n_resizes, new_comm);
print_local_results();
MAM_Commit(&commited, &comm);
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
}
if(state == MAM_ZOMBIE) res=state; if(state == MAM_ZOMBIE) res=state;
return res; return res;
} }
...@@ -311,6 +301,8 @@ int print_local_results() { ...@@ -311,6 +301,8 @@ int print_local_results() {
int ptr_local, ptr_out, err; int ptr_local, ptr_out, err;
char *file_name; char *file_name;
// This function causes an overhead in the recorded time for last group
compute_results_iter(results, group->myId, group->numP, ROOT, config_file->n_stages, config_file->capture_method, comm);
if(group->myId == ROOT) { if(group->myId == ROOT) {
ptr_out = dup(1); ptr_out = dup(1);
...@@ -545,12 +537,9 @@ void init_originals() { ...@@ -545,12 +537,9 @@ void init_originals() {
} }
void init_targets() { void init_targets() {
int commited;
size_t i, entries; size_t i, entries;
void *value = NULL; void *value = NULL;
MAM_Get_comm(&new_comm);
malleability_get_data(&value, 0, 1, 0); malleability_get_data(&value, 0, 1, 0);
group->grp = *((int *)value); group->grp = *((int *)value);
group->grp = group->grp + 1; group->grp = group->grp + 1;
...@@ -559,10 +548,6 @@ void init_targets() { ...@@ -559,10 +548,6 @@ void init_targets() {
results = malloc(sizeof(results_data)); results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters); init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
results_comm(results, ROOT, config_file->n_resizes, new_comm); results_comm(results, ROOT, config_file->n_resizes, new_comm);
MAM_Commit(&commited, &comm);
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
// TODO Refactor - Que sea una unica funcion // TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos // Obtiene las variables que van a utilizar los hijos
...@@ -598,3 +583,22 @@ void init_targets() { ...@@ -598,3 +583,22 @@ void init_targets() {
group->async_data_groups = entries; group->async_data_groups = entries;
} }
} }
void user_redistribution(void *args) {
int commited;
mam_user_reconf_t user_reconf;
MAM_Get_Reconf_Info(&user_reconf);
new_comm = user_reconf.comm;
if(user_reconf.rank_state == 1) { //FIXME Crear MAM_NEW_RANK?
init_targets();
} else {
send_config_file(config_file, ROOT, new_comm);
results_comm(results, ROOT, config_file->n_resizes, new_comm);
print_local_results();
}
MAM_Commit(&commited);
}
#include "malleabilityDataStructures.h" #include "malleabilityDataStructures.h"
const char *mam_key_names[] = {
MAM_SPAWN_METHOD,
MAM_SPAWN_STRATEGIES,
MAM_PHYSICAL_DISTRIBUTION,
MAM_RED_METHOD,
MAM_RED_STRATEGIES
};
/* /*
* Crea un tipo derivado para mandar las dos estructuras principales * Crea un tipo derivado para mandar las dos estructuras principales
* de MaM. * de MaM.
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <stdio.h> #include <stdio.h>
#include <mpi.h> #include <mpi.h>
#include <pthread.h> #include <pthread.h>
#include "malleabilityStates.h"
#define DEBUG_FUNC(debug_string, rank, numP) printf("MaM [P%d/%d]: %s -- %s:%s:%d\n", rank, numP, debug_string, __FILE__, __func__, __LINE__) #define DEBUG_FUNC(debug_string, rank, numP) printf("MaM [P%d/%d]: %s -- %s:%s:%d\n", rank, numP, debug_string, __FILE__, __func__, __LINE__)
...@@ -57,11 +58,11 @@ typedef struct { ...@@ -57,11 +58,11 @@ typedef struct {
} malleability_config_t; } malleability_config_t;
typedef struct { //FIXME numC_spawned no se esta usando typedef struct { //FIXME numC_spawned no se esta usando
int myId, numP, numC, numC_spawned, root, root_parents; int myId, numP, numC, numC_spawned, root, root_parents, zombie;
pthread_t async_thread; pthread_t async_thread;
MPI_Comm comm, thread_comm; MPI_Comm comm, thread_comm;
MPI_Comm intercomm; MPI_Comm intercomm, tmp_comm;
MPI_Comm user_comm; MPI_Comm *user_comm;
MPI_Datatype struct_type; MPI_Datatype struct_type;
char *name_exec, *nodelist; char *name_exec, *nodelist;
...@@ -72,6 +73,9 @@ typedef struct { //FIXME numC_spawned no se esta usando ...@@ -72,6 +73,9 @@ typedef struct { //FIXME numC_spawned no se esta usando
malleability_config_t *mall_conf; malleability_config_t *mall_conf;
malleability_t *mall; malleability_t *mall;
extern const char *mam_key_names[];
enum mam_key_values{MAM_SPAWN_METHOD_VALUE=0, MAM_SPAWN_STRATEGIES_VALUE, MAM_PHYSICAL_DISTRIBUTION_VALUE, MAM_PYHSICAL_DISTRIBUTION_VALUE, MAM_RED_METHOD_VALUE, MAM_RED_STRATEGIES_VALUE, MAM_KEY_COUNT};
/* --- FUNCTIONS --- */ /* --- FUNCTIONS --- */
void MAM_Def_main_datatype(); void MAM_Def_main_datatype();
void MAM_Free_main_datatype(); void MAM_Free_main_datatype();
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous); void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous); void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);
void Children_init(); void Children_init(void (*user_function)(void *), void *user_args);
int spawn_step(); int spawn_step();
int start_redistribution(); int start_redistribution();
int check_redistribution(int wait_completed); int check_redistribution(int wait_completed);
...@@ -30,6 +30,9 @@ void* thread_async_work(); ...@@ -30,6 +30,9 @@ void* thread_async_work();
void print_comms_state(); void print_comms_state();
void malleability_comms_update(MPI_Comm comm); void malleability_comms_update(MPI_Comm comm);
int MAM_I_convert_key(char *key);
void MAM_I_create_user_struct(int mam_state, int is_children_group);
int state = MALL_UNRESERVED; //FIXME Mover a otro lado int state = MALL_UNRESERVED; //FIXME Mover a otro lado
malleability_data_t *rep_s_data; malleability_data_t *rep_s_data;
...@@ -37,6 +40,8 @@ malleability_data_t *dist_s_data; ...@@ -37,6 +40,8 @@ malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data; malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data; malleability_data_t *dist_a_data;
mam_user_reconf_t *user_reconf;
/* /*
* Inicializa la reserva de memoria para el modulo de maleabilidad * Inicializa la reserva de memoria para el modulo de maleabilidad
* creando todas las estructuras necesarias y copias de comunicadores * creando todas las estructuras necesarias y copias de comunicadores
...@@ -47,33 +52,37 @@ malleability_data_t *dist_a_data; ...@@ -47,33 +52,37 @@ malleability_data_t *dist_a_data;
* la comunicacion los procesos hijo estan preparados para ejecutar la * la comunicacion los procesos hijo estan preparados para ejecutar la
* aplicacion. * aplicacion.
*/ */
int MAM_Init(int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) { int MAM_Init(int root, MPI_Comm *comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes, void (*user_function)(void *), void *user_args) {
MPI_Comm dup_comm, thread_comm; MPI_Comm dup_comm, thread_comm;
#if USE_MAL_DEBUG
DEBUG_FUNC("Initializing MaM", myId, numP); fflush(stdout); MPI_Barrier(comm);
#endif
mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t)); mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
mall = (malleability_t *) malloc(sizeof(malleability_t)); mall = (malleability_t *) malloc(sizeof(malleability_t));
user_reconf = (mam_user_reconf_t *) malloc(sizeof(mam_user_reconf_t));
MPI_Comm_rank(*comm, &(mall->myId));
MPI_Comm_size(*comm, &(mall->numP));
#if USE_MAL_DEBUG
DEBUG_FUNC("Initializing MaM", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
#endif
rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t)); rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t)); dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t)); rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t)); dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
MPI_Comm_dup(comm, &dup_comm); MPI_Comm_dup(*comm, &dup_comm);
MPI_Comm_dup(comm, &thread_comm); MPI_Comm_dup(*comm, &thread_comm);
MPI_Comm_set_name(dup_comm, "MPI_COMM_MAM"); MPI_Comm_set_name(dup_comm, "MAM_MAIN");
MPI_Comm_set_name(thread_comm, "MPI_COMM_MAM_THREAD"); MPI_Comm_set_name(thread_comm, "MAM_THREAD");
MPI_Comm_rank(comm, &(mall->myId));
MPI_Comm_size(comm, &(mall->numP));
mall->root = root; mall->root = root;
mall->root_parents = -1; mall->root_parents = -1;
mall->zombie = 0;
mall->comm = dup_comm; mall->comm = dup_comm;
mall->thread_comm = thread_comm; mall->thread_comm = thread_comm;
mall->user_comm = MPI_COMM_NULL; mall->user_comm = comm;
mall->tmp_comm = MPI_COMM_NULL;
mall->name_exec = name_exec; mall->name_exec = name_exec;
mall->nodelist = nodelist; mall->nodelist = nodelist;
...@@ -94,7 +103,7 @@ int MAM_Init(int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_c ...@@ -94,7 +103,7 @@ int MAM_Init(int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_c
// Si son el primer grupo de procesos, obtienen los datos de los padres // Si son el primer grupo de procesos, obtienen los datos de los padres
MPI_Comm_get_parent(&(mall->intercomm)); MPI_Comm_get_parent(&(mall->intercomm));
if(mall->intercomm != MPI_COMM_NULL ) { if(mall->intercomm != MPI_COMM_NULL ) {
Children_init(); Children_init(user_function, user_args);
return MALLEABILITY_CHILDREN; return MALLEABILITY_CHILDREN;
} }
...@@ -112,7 +121,7 @@ int MAM_Init(int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_c ...@@ -112,7 +121,7 @@ int MAM_Init(int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_c
} }
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
DEBUG_FUNC("MaM has been initialized correctly as parents", myId, numP); fflush(stdout); MPI_Barrier(comm); DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
#endif #endif
return MALLEABILITY_NOT_CHILDREN; return MALLEABILITY_NOT_CHILDREN;
...@@ -140,6 +149,7 @@ void MAM_Finalize() { ...@@ -140,6 +149,7 @@ void MAM_Finalize() {
if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm)); if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
free(mall); free(mall);
free(mall_conf); free(mall_conf);
free(user_reconf);
zombies_awake(); zombies_awake();
zombies_service_free(); zombies_service_free();
...@@ -149,21 +159,16 @@ void MAM_Finalize() { ...@@ -149,21 +159,16 @@ void MAM_Finalize() {
/* /*
* TODO Reescribir * TODO Reescribir
* Se realiza el redimensionado de procesos por parte de los padres. * Comprueba el estado de la maleabilidad. Intenta avanzar en la misma
* * si es posible. Funciona como una máquina de estados.
* Se crean los nuevos procesos con la distribucion fisica elegida y * Retorna el estado de la maleabilidad concreto y modifica el argumento
* a continuacion se transmite la informacion a los mismos. * "mam_state" a uno generico.
* *
* Si hay datos asincronos a transmitir, primero se comienza a * El argumento "wait_completed" se utiliza para esperar a la finalización de
* transmitir estos y se termina la funcion. Se tiene que comprobar con * las tareas llevadas a cabo por parte de MAM.
* llamando a la función de nuevo que se han terminado de enviar
* *
* Si hay ademas datos sincronos a enviar, no se envian aun.
*
* Si solo hay datos sincronos se envian tras la creacion de los procesos
* y finalmente se desconectan los dos grupos de procesos.
*/ */
int MAM_Checkpoint(int *mam_state, int wait_completed) { int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args) {
int is_intercomm; int is_intercomm;
switch(state) { switch(state) {
...@@ -184,7 +189,7 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) { ...@@ -184,7 +189,7 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) {
state = spawn_step(); state = spawn_step();
if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){ if (state == MALL_SPAWN_COMPLETED || state == MALL_SPAWN_ADAPT_POSTPONE){
MAM_Checkpoint(mam_state, wait_completed); MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
} }
break; break;
...@@ -197,14 +202,14 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) { ...@@ -197,14 +202,14 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) {
#endif #endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start; mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
MAM_Checkpoint(mam_state, wait_completed); MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
} }
break; break;
case MALL_SPAWN_ADAPT_POSTPONE: case MALL_SPAWN_ADAPT_POSTPONE:
case MALL_SPAWN_COMPLETED: case MALL_SPAWN_COMPLETED:
state = start_redistribution(); state = start_redistribution();
MAM_Checkpoint(mam_state, wait_completed); MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
break; break;
case MALL_DIST_PENDING: case MALL_DIST_PENDING:
...@@ -214,7 +219,7 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) { ...@@ -214,7 +219,7 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) {
state = check_redistribution(wait_completed); state = check_redistribution(wait_completed);
} }
if(state != MALL_DIST_PENDING) { if(state != MALL_DIST_PENDING) {
MAM_Checkpoint(mam_state, wait_completed); MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
} }
break; break;
...@@ -232,30 +237,55 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) { ...@@ -232,30 +237,55 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) {
MPI_Barrier(mall->comm); MPI_Barrier(mall->comm);
#endif #endif
mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start; mall_conf->times->spawn_time = MPI_Wtime() - mall_conf->times->malleability_start;
MAM_Checkpoint(mam_state, wait_completed); MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
} }
break; break;
case MALL_SPAWN_ADAPTED: //FIXME Borrar? case MALL_SPAWN_ADAPTED: //FIXME Borrar?
state = shrink_redistribution(); state = shrink_redistribution();
if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla if(state == MALL_ZOMBIE) *mam_state = MAM_ZOMBIE; //TODO Esta no hay que borrarla
MAM_Checkpoint(mam_state, wait_completed); MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
break; break;
case MALL_DIST_COMPLETED: case MALL_DIST_COMPLETED:
MPI_Comm_test_inter(mall->intercomm, &is_intercomm); MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(is_intercomm) { if(is_intercomm) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->user_comm); //El que pone 0 va primero MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
} else { } else {
MPI_Comm_dup(mall->intercomm, &mall->user_comm); MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
} }
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER"); MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
state = MALL_COMPLETED; state = MALL_USER_PENDING;
*mam_state = MAM_COMPLETED; *mam_state = MAM_USER_PENDING;
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE){ mall->zombie = 1; }
#if USE_MAL_BARRIERS #if USE_MAL_BARRIERS
MPI_Barrier(mall->comm); MPI_Barrier(mall->comm);
#endif #endif
mall_conf->times->malleability_end = MPI_Wtime(); mall_conf->times->malleability_end = MPI_Wtime();
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
break;
case MALL_USER_PENDING:
#if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
#endif
if(user_function != NULL) {
MAM_I_create_user_struct(*mam_state, MALLEABILITY_NOT_CHILDREN);
user_function(user_args);
} else {
state = MALL_COMPLETED; //FIXME Deberia ser hacer sync redist
*mam_state = MAM_COMPLETED; //FIXME Deberia ser hacer sync redist
}
if(state != MALL_USER_PENDING && state != MALL_NOT_STARTED) { // TODO Quitar la segunda parte cuando USER este antes de redist sinc
MAM_Checkpoint(mam_state, wait_completed, user_function, user_args);
}
if(state == MALL_NOT_STARTED) { //FIXME Muy feo, borrar
*mam_state = MAM_COMMITED;
}
break;
case MALL_COMPLETED:
MAM_Commit(mam_state);
break; break;
} }
...@@ -263,35 +293,16 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) { ...@@ -263,35 +293,16 @@ int MAM_Checkpoint(int *mam_state, int wait_completed) {
return state; return state;
} }
/*
* Returns an intracommunicator to allow users to perform their
* own redistributions. The user may free this communicator
* when is not longer needed. It will be freed by MaM when
* commiting the reconfiguration.
*
*
* The communicator is only returned if the state of reconfiguration
* is completed (MALL_COMPLETED / MAM_COMPLETED). Otherwise MALL_DENIED is obtained.
*/
int MAM_Get_comm(MPI_Comm *comm) {
if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) {
return MALL_DENIED;
}
*comm = mall->user_comm;
return 0;
}
/* /*
* TODO * TODO
*/ */
void MAM_Commit(int *mam_state, MPI_Comm *updated_comm) { void MAM_Commit(int *mam_state) {
if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE)) { if(!(state == MALL_COMPLETED || state == MALL_ZOMBIE || state == MALL_USER_PENDING)) { //FIXME El ultimo habria que borrarlo
*mam_state = MALL_DENIED; *mam_state = MALL_DENIED;
return; return;
} }
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout); MPI_Barrier(mall->intercomm);
#endif #endif
// Zombies treatment // Zombies treatment
...@@ -304,20 +315,18 @@ void MAM_Commit(int *mam_state, MPI_Comm *updated_comm) { ...@@ -304,20 +315,18 @@ void MAM_Commit(int *mam_state, MPI_Comm *updated_comm) {
} }
// Reset/Free unneded communicators // Reset/Free unneded communicators
if(*updated_comm != MPI_COMM_WORLD && *updated_comm != MPI_COMM_NULL) MPI_Comm_free(updated_comm); if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
if(mall->user_comm != MPI_COMM_WORLD && mall->user_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->user_comm)); if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); } if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
}
// Zombies KILL // Zombies KILL
if(state == MALL_ZOMBIE) { if(state == MALL_ZOMBIE || mall->zombie) {
MAM_Finalize(); #if USE_MAL_DEBUG >= 2
MPI_Finalize();
#if USE_MAL_DEBUG
DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout); DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
#endif #endif
MAM_Finalize();
MPI_Finalize();
exit(0); exit(0);
} }
...@@ -326,16 +335,23 @@ void MAM_Commit(int *mam_state, MPI_Comm *updated_comm) { ...@@ -326,16 +335,23 @@ void MAM_Commit(int *mam_state, MPI_Comm *updated_comm) {
mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents; mall->root = mall->root_parents == -1 ? mall->root : mall->root_parents;
mall->root_parents = -1; mall->root_parents = -1;
state = MALL_NOT_STARTED; state = MALL_NOT_STARTED;
*mam_state = MAM_COMMITED; if(mam_state != NULL) *mam_state = MAM_COMMITED;
// Set new communicator // Set new communicator
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *updated_comm = MPI_COMM_WORLD; } if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, updated_comm); } else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
#endif #endif
} }
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info) {
if(state != MALL_USER_PENDING) return MALL_DENIED;
*reconf_info = *user_reconf;
return 0;
}
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) { void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *mall_time) {
MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time); MAM_I_retrieve_times(sp_time, sy_time, asy_time, mall_time);
} }
...@@ -355,6 +371,40 @@ void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dis ...@@ -355,6 +371,40 @@ void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dis
} }
} }
void MAM_Set_key_configuration(char *key, int required, int *provided) {
int value = MAM_I_convert_key(key);
*provided = required;
switch(value) { //TODO Comprobar si required existe para key
case MAM_SPAWN_METHOD_VALUE:
mall_conf->spawn_method = required;
break;
case MAM_SPAWN_STRATEGIES_VALUE:
malleability_spawn_add_strat(&(mall_conf->spawn_strategies), required);
*provided = mall_conf->spawn_strategies;
break;
case MAM_PHYSICAL_DISTRIBUTION_VALUE:
mall_conf->spawn_dist = required;
break;
case MAM_RED_METHOD_VALUE:
mall_conf->red_method = required;
break;
case MAM_RED_STRATEGIES_VALUE:
malleability_red_add_strat(&(mall_conf->red_strategies), required);
*provided = mall_conf->red_strategies;
break;
case MALL_DENIED:
default:
printf("MAM: Key %s does not exist\n", key);
*provided = MALL_DENIED;
break;
}
if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) &&
(mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL)) {
malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
}
}
/* /*
* Tiene que ser llamado despues de setear la config * Tiene que ser llamado despues de setear la config
*/ */
...@@ -566,7 +616,7 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch ...@@ -566,7 +616,7 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
* de la ejecucion a realizar; y los datos a recibir de los padres * de la ejecucion a realizar; y los datos a recibir de los padres
* ya sea de forma sincrona, asincrona o ambas. * ya sea de forma sincrona, asincrona o ambas.
*/ */
void Children_init() { void Children_init(void (*user_function)(void *), void *user_args) {
size_t i; size_t i;
int numP_parents, root_parents; int numP_parents, root_parents;
int is_intercomm; int is_intercomm;
...@@ -602,6 +652,9 @@ void Children_init() { ...@@ -602,6 +652,9 @@ void Children_init() {
} else { } else {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
//for(i=0; i<rep_a_data->entries; i++) {
// MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], root_parents, mall->intercomm, &(rep_a_data));
//}
#if USE_MAL_DEBUG >= 2 #if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); DEBUG_FUNC("Targets started asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
...@@ -658,14 +711,22 @@ void Children_init() { ...@@ -658,14 +711,22 @@ void Children_init() {
state = MALL_COMPLETED; state = MALL_COMPLETED;
if(is_intercomm) { if(is_intercomm) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->user_comm); //El que pone 0 va primero MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
} else { } else {
MPI_Comm_dup(mall->intercomm, &mall->user_comm); MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
}
MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
mall->numC = numP_parents;
if(user_function != NULL) {
state = MALL_USER_PENDING;
MAM_I_create_user_struct(MAM_COMPLETED, MALLEABILITY_CHILDREN);
user_function(user_args);
} else {
MAM_Commit(NULL);
} }
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MAM_USER");
#if USE_MAL_DEBUG #if USE_MAL_DEBUG
DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD); if(mall->myId == mall->root){ DEBUG_FUNC("MaM has been initialized correctly as children", mall->myId, mall->numP); } fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif #endif
} }
...@@ -881,7 +942,7 @@ int shrink_redistribution() { ...@@ -881,7 +942,7 @@ int shrink_redistribution() {
double time_extra = MPI_Wtime(); double time_extra = MPI_Wtime();
MPI_Abort(MPI_COMM_WORLD, -20); // MPI_Abort(MPI_COMM_WORLD, -20); //
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root); zombies_collect_suspended(*(mall->user_comm), mall->myId, mall->numP, mall->numC, mall->root);
if(mall->myId < mall->numC) { if(mall->myId < mall->numC) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
...@@ -990,8 +1051,8 @@ void print_comms_state() { ...@@ -990,8 +1051,8 @@ void print_comms_state() {
MPI_Comm_get_name(mall->comm, test, &tester); MPI_Comm_get_name(mall->comm, test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test); printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
MPI_Comm_get_name(mall->user_comm, test, &tester); MPI_Comm_get_name(*(mall->user_comm), test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->user_comm, test); printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
if(mall->intercomm != MPI_COMM_NULL) { if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_get_name(mall->intercomm, test, &tester); MPI_Comm_get_name(mall->intercomm, test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test); printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
...@@ -1009,7 +1070,38 @@ void malleability_comms_update(MPI_Comm comm) { ...@@ -1009,7 +1070,38 @@ void malleability_comms_update(MPI_Comm comm) {
MPI_Comm_dup(comm, &(mall->thread_comm)); MPI_Comm_dup(comm, &(mall->thread_comm));
MPI_Comm_dup(comm, &(mall->comm)); MPI_Comm_dup(comm, &(mall->comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MAM_THREAD"); MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MAM"); MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}
/*
* Converts the name of a Key to its value version
*/
int MAM_I_convert_key(char *key) {
size_t i;
for(i=0; i<MAM_KEY_COUNT; i++) {
if(strcmp(key, mam_key_names[i]) == 0) { // Equal
return i;
}
}
return MALL_DENIED;
} }
/*
* TODO Por hacer
*/
void MAM_I_create_user_struct(int mam_state, int is_children_group) {
user_reconf->comm = mall->tmp_comm;
user_reconf->rank_state = mam_state;
if(is_children_group) {
user_reconf->rank_state = is_children_group; //FIXME Elegir nombre adecuado
user_reconf->numS = mall->numC;
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) user_reconf->numT = mall->numC;
else user_reconf->numT = mall->numC + mall->numP;
} else {
user_reconf->numS = mall->numP;
user_reconf->numT = mall->numC;
}
}
...@@ -9,13 +9,20 @@ ...@@ -9,13 +9,20 @@
#include <mpi.h> #include <mpi.h>
#include "malleabilityStates.h" #include "malleabilityStates.h"
int MAM_Init(int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes); typedef struct {
int numS, numT;
int rank_state;
MPI_Comm comm;
} mam_user_reconf_t;
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes, void (*user_function)(void *), void *user_args);
void MAM_Finalize(); void MAM_Finalize();
int MAM_Checkpoint(int *mam_state, int wait_completed); int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_Get_comm(MPI_Comm *comm); void MAM_Commit(int *mam_state);
void MAM_Commit(int *mam_state, MPI_Comm *updated_comm);
int MAM_Get_Reconf_Info(mam_user_reconf_t *reconf_info);
void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies); void MAM_Set_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies);
void MAM_Set_key_configuration(char *key, int required, int *provided);
void MAM_Set_target_number(int numC); // TODO TO BE DEPRECATED void MAM_Set_target_number(int numC); // TODO TO BE DEPRECATED
void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant); void malleability_add_data(void *data, size_t total_qty, MPI_Datatype type, int is_replicated, int is_constant);
......
...@@ -8,8 +8,8 @@ ...@@ -8,8 +8,8 @@
#define MALL_DENIED -1 #define MALL_DENIED -1
enum mall_inner_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING, enum mall_inner_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, MALL_SPAWN_SINGLE_PENDING,
MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED, MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED,
MALL_SPAWN_ADAPT_PENDING, MALL_SPAWN_ADAPTED, MALL_COMPLETED}; MALL_SPAWN_ADAPT_PENDING, MALL_SPAWN_ADAPTED, MALL_COMPLETED, MALL_USER_PENDING};
enum mam_states{MAM_UNRESERVED, MAM_NOT_STARTED, MAM_ZOMBIE, MAM_PENDING, MAM_COMPLETED, MAM_COMMITED}; enum mam_states{MAM_UNRESERVED, MAM_NOT_STARTED, MAM_ZOMBIE, MAM_PENDING, MAM_COMPLETED, MAM_USER_PENDING, MAM_COMMITED};
enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE}; enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
#define MALL_SPAWN_PTHREAD 2 #define MALL_SPAWN_PTHREAD 2
#define MALL_SPAWN_SINGLE 3 #define MALL_SPAWN_SINGLE 3
...@@ -18,6 +18,13 @@ enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA ...@@ -18,6 +18,13 @@ enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA
#define MALL_RED_THREAD 2 #define MALL_RED_THREAD 2
#define MALL_RED_IBARRIER 3 #define MALL_RED_IBARRIER 3
/* KEYS & VALUES for config*/
#define MAM_SPAWN_METHOD "MAM_SPAWN_METHOD"
#define MAM_SPAWN_STRATEGIES "MAM_SPAWN_STRATEGIES"
#define MAM_PHYSICAL_DISTRIBUTION "MAM_PHYSICAL_DISTRIBUTION"
#define MAM_RED_METHOD "MAM_RED_METHOD"
#define MAM_RED_STRATEGIES "MAM_RED_STRATEGIES"
#define MALLEABILITY_ROOT 0 #define MALLEABILITY_ROOT 0
#define MAL_APP_EXECUTING 0 #define MAL_APP_EXECUTING 0
...@@ -26,11 +33,6 @@ enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA ...@@ -26,11 +33,6 @@ enum mall_redistribution_methods{MALL_RED_BASELINE, MALL_RED_POINT, MALL_RED_RMA
#define MAM_CHECK_COMPLETION 0 #define MAM_CHECK_COMPLETION 0
#define MAM_WAIT_COMPLETION 1 #define MAM_WAIT_COMPLETION 1
//TODO DEPRECATE
#define MAL_INT 0
#define MAL_CHAR 1
////////////////
#define MALLEABILITY_CHILDREN 1 #define MALLEABILITY_CHILDREN 1
#define MALLEABILITY_NOT_CHILDREN 0 #define MALLEABILITY_NOT_CHILDREN 0
......
...@@ -179,6 +179,12 @@ int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *r ...@@ -179,6 +179,12 @@ int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *r
return value; return value;
} }
int malleability_spawn_add_strat(int *spawn_strategies, int strategy) {
if(malleability_spawn_contains_strat(*spawn_strategies, strategy, NULL)) return 1;
*spawn_strategies = *spawn_strategies * strategy;
return 1;
}
//--------------PRIVATE CONFIGURATION FUNCTIONS---------------// //--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/* /*
* Agrupa en una sola estructura todos los datos de configuración necesarios * Agrupa en una sola estructura todos los datos de configuración necesarios
......
...@@ -13,5 +13,6 @@ void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, ...@@ -13,5 +13,6 @@ void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm,
void unset_spawn_postpone_flag(int outside_state); void unset_spawn_postpone_flag(int outside_state);
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result); int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result);
int malleability_spawn_add_strat(int *spawn_strategies, int strategy);
#endif #endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment