Commit 3d79df4f authored by iker_martin's avatar iker_martin
Browse files

Refactor del modulo de creacion de procesos. Ahora esta dividido en varios...

Refactor del modulo de creacion de procesos. Ahora esta dividido en varios ficheros. Faltan aún varios tests.
parent 63f24c68
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#include <string.h> #include <string.h>
#include <mpi.h> #include <mpi.h>
#include "read_ini.h" #include "read_ini.h"
#include "../malleability/ProcessDist.h" #include "../malleability/spawn_methods/ProcessDist.h"
#include "../malleability/distribution_methods/block_distribution.h" #include "../malleability/distribution_methods/block_distribution.h"
#include "ini.h" #include "ini.h"
...@@ -79,9 +79,9 @@ static int handler(void* user, const char* section, const char* name, ...@@ -79,9 +79,9 @@ static int handler(void* user, const char* section, const char* name,
if(pconfig->actual_resize < pconfig->n_resizes) { if(pconfig->actual_resize < pconfig->n_resizes) {
char *aux = strdup(value); char *aux = strdup(value);
if (strcmp(aux, "spread") == 0) { if (strcmp(aux, "spread") == 0) {
pconfig->phy_dist[pconfig->actual_resize] = COMM_PHY_SPREAD; pconfig->phy_dist[pconfig->actual_resize] = MALL_DIST_SPREAD;
} else { } else {
pconfig->phy_dist[pconfig->actual_resize] = COMM_PHY_COMPACT; pconfig->phy_dist[pconfig->actual_resize] = MALL_DIST_COMPACT;
} }
free(aux); free(aux);
pconfig->actual_resize = pconfig->actual_resize+1; // Ultimo elemento del grupo pconfig->actual_resize = pconfig->actual_resize+1; // Ultimo elemento del grupo
......
...@@ -119,7 +119,7 @@ int main(int argc, char *argv[]) { ...@@ -119,7 +119,7 @@ int main(int argc, char *argv[]) {
MPI_Comm_rank(comm, &(group->myId)); MPI_Comm_rank(comm, &(group->myId));
if(config_file->n_resizes != group->grp + 1) { if(config_file->n_resizes != group->grp + 1) {
set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], -1, config_file->at, -1); set_malleability_configuration(config_file->sm, config_file->ss, config_file->phy_dist[group->grp+1], config_file->at, -1);
set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED
if(group->grp == 0) { if(group->grp == 0) {
...@@ -134,11 +134,11 @@ int main(int argc, char *argv[]) { ...@@ -134,11 +134,11 @@ int main(int argc, char *argv[]) {
} }
res = work(); res = work();
if(res == MAL_ZOMBIE) break; if(res == MALL_ZOMBIE) break;
print_local_results(); print_local_results();
reset_results_index(results); reset_results_index(results);
} while((config_file->n_resizes > group->grp + 1) && (config_file->sm == COMM_SPAWN_MERGE || config_file->sm == COMM_SPAWN_MERGE_PTHREAD)); } while(config_file->n_resizes > group->grp + 1 && config_file->sm == MALL_SPAWN_MERGE);
// //
// TERMINA LA EJECUCION ---------------------------------------------------------- // TERMINA LA EJECUCION ----------------------------------------------------------
...@@ -155,7 +155,7 @@ int main(int argc, char *argv[]) { ...@@ -155,7 +155,7 @@ int main(int argc, char *argv[]) {
MPI_Comm_free(&comm); MPI_Comm_free(&comm);
} }
if(group->myId == ROOT && (config_file->sm == COMM_SPAWN_MERGE || config_file->sm == COMM_SPAWN_MERGE_PTHREAD)) { if(group->myId == ROOT && config_file->sm == MALL_SPAWN_MERGE) {
MPI_Abort(MPI_COMM_WORLD, -100); MPI_Abort(MPI_COMM_WORLD, -100);
} }
free_application_data(); free_application_data();
...@@ -185,7 +185,7 @@ int work() { ...@@ -185,7 +185,7 @@ int work() {
double *matrix = NULL; double *matrix = NULL;
maxiter = config_file->iters[group->grp]; maxiter = config_file->iters[group->grp];
state = MAL_NOT_STARTED; state = MALL_NOT_STARTED;
res = 0; res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) { for(iter=group->iter_start; iter < maxiter; iter++) {
...@@ -196,7 +196,7 @@ int work() { ...@@ -196,7 +196,7 @@ int work() {
state = malleability_checkpoint(); state = malleability_checkpoint();
iter = 0; iter = 0;
while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) { while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING) {
if(iter < config_file->iters[group->grp+1]) { if(iter < config_file->iters[group->grp+1]) {
iterate(matrix, config_file->granularity, state, iter); iterate(matrix, config_file->granularity, state, iter);
iter++; iter++;
...@@ -207,7 +207,7 @@ int work() { ...@@ -207,7 +207,7 @@ int work() {
if(config_file->n_resizes - 1 == group->grp) res=1; if(config_file->n_resizes - 1 == group->grp) res=1;
if(state == MAL_ZOMBIE) res=state; if(state == MALL_ZOMBIE) res=state;
return res; return res;
} }
...@@ -239,7 +239,7 @@ double iterate(double *matrix, int n, int async_comm, int iter) { ...@@ -239,7 +239,7 @@ double iterate(double *matrix, int n, int async_comm, int iter) {
actual_time = MPI_Wtime(); // Guardar tiempos actual_time = MPI_Wtime(); // Guardar tiempos
// TODO Que diferencie entre ambas en el IO // TODO Que diferencie entre ambas en el IO
if(async_comm == MAL_DIST_PENDING || async_comm == MAL_SPAWN_PENDING || async_comm == MAL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona if(async_comm == MALL_DIST_PENDING || async_comm == MALL_SPAWN_PENDING || async_comm == MALL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona
cnt_async=1; cnt_async=1;
} }
......
...@@ -11,7 +11,7 @@ struct physical_dist { ...@@ -11,7 +11,7 @@ struct physical_dist {
int num_cpus, num_nodes; int num_cpus, num_nodes;
char *nodelist; char *nodelist;
int target_qty, already_created; int target_qty, already_created;
int dist_type; int dist_type, info_type;
}; };
typedef struct { typedef struct {
...@@ -25,10 +25,6 @@ typedef struct { ...@@ -25,10 +25,6 @@ typedef struct {
struct physical_dist dist; // Used to create mapping var struct physical_dist dist; // Used to create mapping var
MPI_Comm comm, returned_comm; MPI_Comm comm, returned_comm;
// To control the spawn state
pthread_mutex_t spawn_mutex;
pthread_cond_t cond_adapt_rdy;
} Spawn_data; } Spawn_data;
#endif #endif
#include <pthread.h> #include <pthread.h>
#include "malleabilityManager.h" #include "malleabilityManager.h"
#include "malleabilityStates.h" #include "malleabilityStates.h"
#include "malleabilityDataStructures.h"
#include "malleabilityTypes.h" #include "malleabilityTypes.h"
#include "malleabilityZombies.h" #include "malleabilityZombies.h"
#include "ProcessDist.h" #include "spawn_methods/GenericSpawn.h"
#include "CommDist.h" #include "CommDist.h"
#define MALLEABILITY_ROOT 0
#define MALLEABILITY_USE_SYNCHRONOUS 0 #define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1 #define MALLEABILITY_USE_ASYNCHRONOUS 1
...@@ -26,10 +26,11 @@ int thread_check(); ...@@ -26,10 +26,11 @@ int thread_check();
void* thread_async_work(); void* thread_async_work();
typedef struct { typedef struct {
int spawn_type; int spawn_method;
int spawn_dist; int spawn_dist;
int spawn_is_single; int spawn_strategies;
int spawn_threaded; //int spawn_is_single;
//int spawn_threaded;
int comm_type; int comm_type;
int comm_threaded; int comm_threaded;
...@@ -49,7 +50,7 @@ typedef struct { //FIXME numC_spawned no se esta usando ...@@ -49,7 +50,7 @@ typedef struct { //FIXME numC_spawned no se esta usando
int num_cpus, num_nodes; int num_cpus, num_nodes;
} malleability_t; } malleability_t;
int state = MAL_UNRESERVED; //FIXME Mover a otro lado int state = MALL_UNRESERVED; //FIXME Mover a otro lado
malleability_config_t *mall_conf; malleability_config_t *mall_conf;
malleability_t *mall; malleability_t *mall;
...@@ -99,7 +100,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex ...@@ -99,7 +100,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
dist_s_data->entries = 0; dist_s_data->entries = 0;
dist_a_data->entries = 0; dist_a_data->entries = 0;
state = MAL_NOT_STARTED; state = MALL_NOT_STARTED;
// Si son el primer grupo de procesos, obtienen los datos de los padres // Si son el primer grupo de procesos, obtienen los datos de los padres
MPI_Comm_get_parent(&(mall->intercomm)); MPI_Comm_get_parent(&(mall->intercomm));
...@@ -136,7 +137,7 @@ void free_malleability() { ...@@ -136,7 +137,7 @@ void free_malleability() {
zombies_awake(); zombies_awake();
zombies_service_free(); zombies_service_free();
state = MAL_UNRESERVED; state = MALL_UNRESERVED;
} }
/* /*
...@@ -156,37 +157,40 @@ void free_malleability() { ...@@ -156,37 +157,40 @@ void free_malleability() {
*/ */
int malleability_checkpoint() { int malleability_checkpoint() {
if(state == MAL_UNRESERVED) return MAL_UNRESERVED; if(state == MALL_UNRESERVED) return MALL_UNRESERVED;
if(state == MAL_NOT_STARTED) { if(state == MALL_NOT_STARTED) {
// Comprobar si se tiene que realizar un redimensionado // Comprobar si se tiene que realizar un redimensionado
//if(CHECK_RMS()) {return MAL_DENIED;} //if(CHECK_RMS()) {return MALL_DENIED;}
state = spawn_step(); state = spawn_step();
if (state == MAL_SPAWN_COMPLETED){ if (state == MALL_SPAWN_COMPLETED){
state = start_redistribution(); state = start_redistribution();
} }
} else if(state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion } else if(state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
double end_real_time; double end_real_time;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD && mall->numP > mall->numC) { /*if(mall_conf->spawn_type == MALL_SPAWN_MERGE && mall_conf->spawn_type == MALL_SPAWN_PTHREAD && mall->numP > mall->numC) {
state = shrink_redistribution(); //TODO REFACTOR state = shrink_redistribution(); //TODO REFACTOR
} else { } else {
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm, &end_real_time); */
if (state == MAL_SPAWN_COMPLETED) { //state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm, &end_real_time); //FIXMENOW
state = check_spawn_state(&(mall->intercomm), mall->comm, MALL_DIST_PENDING, &end_real_time); //FIXME 3 argumento depende de la distribucion
if (state == MALL_SPAWN_COMPLETED || state == MALL_DIST_ADAPTED) { //FIXME MALL_DIST_ADAPTED tiene que recoger los tiempos
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start; mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
if(malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start; mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start;
} }
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos //TODO Si es MERGE, metodo diferente de redistribucion de datos
state = start_redistribution(); state = start_redistribution();
} }
} //}
} else if(state == MAL_DIST_PENDING) { } else if(state == MALL_DIST_PENDING) {
if(mall_conf->comm_type == MAL_USE_THREAD) { if(mall_conf->comm_type == MAL_USE_THREAD) {
state = thread_check(); state = thread_check();
} else { } else {
...@@ -220,11 +224,10 @@ void get_benchmark_results(results_data **results) { ...@@ -220,11 +224,10 @@ void get_benchmark_results(results_data **results) {
} }
//------------------------------------------------------------------------------------------------------------- //-------------------------------------------------------------------------------------------------------------
void set_malleability_configuration(int spawn_type, int spawn_is_single, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded) { void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int comm_type, int comm_threaded) {
mall_conf->spawn_type = spawn_type; mall_conf->spawn_method = spawn_method;
mall_conf->spawn_is_single = spawn_is_single; mall_conf->spawn_strategies = spawn_strategies;
mall_conf->spawn_dist = spawn_dist; mall_conf->spawn_dist = spawn_dist;
mall_conf->spawn_threaded = spawn_threaded;
mall_conf->comm_type = comm_type; mall_conf->comm_type = comm_type;
mall_conf->comm_threaded = comm_threaded; mall_conf->comm_threaded = comm_threaded;
} }
...@@ -234,16 +237,13 @@ void set_malleability_configuration(int spawn_type, int spawn_is_single, int spa ...@@ -234,16 +237,13 @@ void set_malleability_configuration(int spawn_type, int spawn_is_single, int spa
* Tiene que ser llamado despues de setear la config * Tiene que ser llamado despues de setear la config
*/ */
void set_children_number(int numC){ void set_children_number(int numC){
if((mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) && (numC - mall->numP >= 0)) { if((mall_conf->spawn_method == MALL_SPAWN_MERGE) && (numC >= mall->numP)) {
mall->numC = numC; mall->numC = numC;
mall->numC_spawned = numC - mall->numP; mall->numC_spawned = numC - mall->numP;
if(numC == mall->numP) { // Migrar if(numC == mall->numP) { // Migrar
mall->numC_spawned = numC; mall->numC_spawned = numC;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE) mall_conf->spawn_method = MALL_SPAWN_BASELINE;
mall_conf->spawn_type = COMM_SPAWN_SERIAL;
else
mall_conf->spawn_type = COMM_SPAWN_PTHREAD;
} }
} else { } else {
mall->numC = numC; mall->numC = numC;
...@@ -409,17 +409,20 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch ...@@ -409,17 +409,20 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
*/ */
void Children_init() { void Children_init() {
int numP_parents, root_parents, i; int numP_parents, root_parents, i;
int spawn_is_single; int is_intercomm;
MPI_Comm aux; //MPI_Comm aux;
MPI_Bcast(&spawn_is_single, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm); malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
/*MPI_Bcast(&spawn_is_single, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
if(spawn_is_single) { if(spawn_is_single) {
malleability_establish_connection(mall->myId, MALLEABILITY_ROOT, &(mall->intercomm)); malleability_establish_connection(mall->myId, MALLEABILITY_ROOT, &(mall->intercomm)); //FIXMENOW
} }
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm); MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&root_parents, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm); MPI_Bcast(&root_parents, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&numP_parents, 1, MPI_INT, root_parents, mall->intercomm); MPI_Bcast(&numP_parents, 1, MPI_INT, root_parents, mall->intercomm);
*/
// TODO A partir de este punto tener en cuenta si es BASELINE o MERGE
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file)); recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
...@@ -457,8 +460,9 @@ void Children_init() { ...@@ -457,8 +460,9 @@ void Children_init() {
} }
} }
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) { /*
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN); if(mall_conf->spawn_type == MALL_SPAWN_MERGE || mall_conf->spawn_type == MALL_SPAWN_MERGE_PTHREAD) {
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN); //FIXMENOW
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
...@@ -467,12 +471,11 @@ void Children_init() { ...@@ -467,12 +471,11 @@ void Children_init() {
MPI_Comm_dup(mall->comm, &aux); MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux; mall->user_comm = aux;
} }
*/
// Guardar los resultados de esta transmision // Guardar los resultados de esta transmision
recv_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm); recv_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm);
MPI_Comm_disconnect(&(mall->intercomm)); MPI_Comm_disconnect(&(mall->intercomm));
} }
//======================================================|| //======================================================||
...@@ -488,16 +491,19 @@ void Children_init() { ...@@ -488,16 +491,19 @@ void Children_init() {
int spawn_step(){ int spawn_step(){
mall_conf->results->spawn_start = MPI_Wtime(); mall_conf->results->spawn_start = MPI_Wtime();
if((mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) && mall->numP > mall->numC) { /* FIXME Mantener funcion de shrink_redistribuition
if((mall_conf->spawn_type == MALL_SPAWN_MERGE) && mall->numP > mall->numC) {
state = shrink_redistribution(); state = shrink_redistribution();
return state; return state;
} }
*/
state = init_slurm_comm(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm)); //state = init_slurm_comm(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm)); //FIXMENOW
state = init_spawn(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_method, mall_conf->spawn_strategies, mall->thread_comm, &(mall->intercomm));
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE) if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start; mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
else if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) { } else {
//mall_conf->results->spawn_thread_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start; //mall_conf->results->spawn_thread_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
//mall_conf->results->spawn_start = MPI_Wtime(); //mall_conf->results->spawn_start = MPI_Wtime();
} }
...@@ -523,9 +529,11 @@ int start_redistribution() { ...@@ -523,9 +529,11 @@ int start_redistribution() {
int rootBcast = MPI_PROC_NULL; int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT; if(mall->myId == mall->root) rootBcast = MPI_ROOT;
/*
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, rootBcast, mall->intercomm); MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->root), 1, MPI_INT, rootBcast, mall->intercomm); MPI_Bcast(&(mall->root), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->numP), 1, MPI_INT, rootBcast, mall->intercomm); MPI_Bcast(&(mall->numP), 1, MPI_INT, rootBcast, mall->intercomm);
*/
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm); send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
...@@ -536,7 +544,7 @@ int start_redistribution() { ...@@ -536,7 +544,7 @@ int start_redistribution() {
return thread_creation(); return thread_creation();
} else { } else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS); send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
return MAL_DIST_PENDING; return MALL_DIST_PENDING;
} }
} }
return end_redistribution(); return end_redistribution();
...@@ -580,7 +588,7 @@ int check_redistribution() { ...@@ -580,7 +588,7 @@ int check_redistribution() {
} }
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm); MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MAL_DIST_PENDING; // Continue only if asynchronous send has ended if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
if(mall_conf->comm_type == MAL_USE_IBARRIER) { if(mall_conf->comm_type == MAL_USE_IBARRIER) {
...@@ -602,7 +610,7 @@ int check_redistribution() { ...@@ -602,7 +610,7 @@ int check_redistribution() {
*/ */
int end_redistribution() { int end_redistribution() {
int result, i, rootBcast = MPI_PROC_NULL; int result, i, rootBcast = MPI_PROC_NULL;
MPI_Comm aux; //MPI_Comm aux;
if(mall->myId == mall->root) rootBcast = MPI_ROOT; if(mall->myId == mall->root) rootBcast = MPI_ROOT;
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
...@@ -622,10 +630,11 @@ int end_redistribution() { ...@@ -622,10 +630,11 @@ int end_redistribution() {
} }
} }
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) { /*
if(mall_conf->spawn_type == MALL_SPAWN_MERGE) {
double time_adapt = MPI_Wtime(); double time_adapt = MPI_Wtime();
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN); proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN); //FIXMENOW
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
...@@ -638,12 +647,13 @@ int end_redistribution() { ...@@ -638,12 +647,13 @@ int end_redistribution() {
// result = MAL_DIST_ADAPTED; // result = MAL_DIST_ADAPTED;
} }
*/
send_results(mall_conf->results, rootBcast, mall_conf->config_file->n_resizes, mall->intercomm); send_results(mall_conf->results, rootBcast, mall_conf->config_file->n_resizes, mall->intercomm);
result = MAL_DIST_COMPLETED; result = MALL_DIST_COMPLETED;
MPI_Comm_disconnect(&(mall->intercomm)); MPI_Comm_disconnect(&(mall->intercomm));
state = MAL_NOT_STARTED; state = MALL_NOT_STARTED;
return result; return result;
} }
...@@ -652,15 +662,15 @@ int end_redistribution() { ...@@ -652,15 +662,15 @@ int end_redistribution() {
///============================================= ///=============================================
///============================================= ///=============================================
double time_adapt, time_adapt_end; double time_adapt, time_adapt_end;
/*
int state_shrink=0; //TODO Refactor int state_shrink=0; //TODO Refactor
pthread_t thread_shrink; pthread_t thread_shrink;
MPI_Comm comm_shrink; MPI_Comm comm_shrink;
int thread_shrink_creation(); int thread_shrink_creation();
void *thread_shrink_work(); void *thread_shrink_work();
/*
* Crea una hebra para ejecutar una comunicación en segundo plano.
*/
int thread_shrink_creation() { int thread_shrink_creation() {
if(pthread_create(&thread_shrink, NULL, thread_shrink_work, NULL)) { if(pthread_create(&thread_shrink, NULL, thread_shrink_work, NULL)) {
printf("Error al crear el hilo\n"); printf("Error al crear el hilo\n");
...@@ -670,20 +680,22 @@ int thread_shrink_creation() { ...@@ -670,20 +680,22 @@ int thread_shrink_creation() {
return MAL_SPAWN_PENDING; return MAL_SPAWN_PENDING;
} }
void* thread_shrink_work() { void* thread_shrink_work() {
proc_adapt_shrink(mall->numC, &comm_shrink, mall->myId); proc_adapt_shrink(mall->numC, &comm_shrink, mall->myId); //FIXMENOW
time_adapt_end = MPI_Wtime(); time_adapt_end = MPI_Wtime();
state_shrink=2; state_shrink=2;
pthread_exit(NULL); pthread_exit(NULL);
} }
*/
///============================================= ///=============================================
///============================================= ///=============================================
///============================================= ///=============================================
int shrink_redistribution() { int shrink_redistribution() {
int global_state; //int global_state;
double time_aux; double time_aux;
MPI_Comm aux_comm; MPI_Comm aux_comm;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) { /*
if(mall_conf->spawn_type == MALL_SPAWN_MERGE && mall_conf->spawn_type == MALL_SPAWN_PTHREAD) {
if(state_shrink == 0) { if(state_shrink == 0) {
time_adapt = MPI_Wtime(); time_adapt = MPI_Wtime();
state_shrink = 1; state_shrink = 1;
...@@ -707,26 +719,29 @@ int shrink_redistribution() { ...@@ -707,26 +719,29 @@ int shrink_redistribution() {
} else { } else {
time_adapt = MPI_Wtime(); time_adapt = MPI_Wtime();
MPI_Comm_dup(mall->comm, &aux_comm); MPI_Comm_dup(mall->comm, &aux_comm);
proc_adapt_shrink( mall->numC, &(mall->comm), mall->myId); proc_adapt_shrink( mall->numC, &(mall->comm), mall->myId); //FIXMENOW
} }
*/
MPI_Comm_dup(mall->comm, &aux_comm);
//TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos //TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos
zombies_collect_suspended(aux_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall->user_comm); zombies_collect_suspended(aux_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall->user_comm);
if(mall->myId < mall->numC) { if(mall->myId < mall->numC) {
MPI_Comm_free(&aux_comm); MPI_Comm_free(&aux_comm);
MPI_Comm_dup(mall->comm, &aux_comm); MPI_Comm_dup(mall->comm, &aux_comm); // FIXME Que pasa con los comunicadores Thread_comm y User_comm
mall->thread_comm = aux_comm; mall->thread_comm = aux_comm;
MPI_Comm_dup(mall->comm, &aux_comm); MPI_Comm_dup(mall->comm, &aux_comm);
mall->user_comm = aux_comm; mall->user_comm = aux_comm;
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt; mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt; //FIXME Error
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) { if(mall_conf->spawn_method == MALL_SPAWN_MERGE && malleability_spawn_contains_strat(mall_conf->spawn_strategies,MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_real_time[mall_conf->grp] = time_adapt_end - time_adapt + MPI_Wtime() - time_aux; mall_conf->results->spawn_real_time[mall_conf->grp] = time_adapt_end - time_adapt + MPI_Wtime() - time_aux; //FIXME Error
} }
return MAL_DIST_COMPLETED; //FIXME Refactor Poner a SPAWN_COMPLETED return MALL_DIST_COMPLETED; //FIXME Refactor Poner a SPAWN_COMPLETED
} else { } else {
return MAL_ZOMBIE; return MALL_ZOMBIE;
} }
} }
...@@ -746,7 +761,7 @@ int thread_creation() { ...@@ -746,7 +761,7 @@ int thread_creation() {
MPI_Abort(MPI_COMM_WORLD, -1); MPI_Abort(MPI_COMM_WORLD, -1);
return -1; return -1;
} }
return MAL_DIST_PENDING; return MALL_DIST_PENDING;
} }
/* /*
...@@ -760,8 +775,8 @@ int thread_check() { ...@@ -760,8 +775,8 @@ int thread_check() {
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync) // Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm); MPI_Allreduce(&state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
if(all_completed != MAL_DIST_COMPLETED) return MAL_DIST_PENDING; // Continue only if asynchronous send has ended if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
//FIXME No se tiene en cuenta el estado MAL_APP_ENDED //FIXME No se tiene en cuenta el estado MALL_APP_ENDED
if(pthread_join(mall->async_thread, NULL)) { if(pthread_join(mall->async_thread, NULL)) {
printf("Error al esperar al hilo\n"); printf("Error al esperar al hilo\n");
...@@ -782,6 +797,6 @@ int thread_check() { ...@@ -782,6 +797,6 @@ int thread_check() {
*/ */
void* thread_async_work() { void* thread_async_work() {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS); send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
state = MAL_DIST_COMPLETED; state = MALL_DIST_COMPLETED;
pthread_exit(NULL); pthread_exit(NULL);
} }
...@@ -18,7 +18,7 @@ void indicate_ending_malleability(int new_outside_state); ...@@ -18,7 +18,7 @@ void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint(); int malleability_checkpoint();
void set_benchmark_grp(int grp); void set_benchmark_grp(int grp);
void set_malleability_configuration(int spawn_type, int spawn_is_single, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded); void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int comm_type, int comm_threaded);
void set_children_number(int numC); // TODO TO BE DEPRECATED void set_children_number(int numC); // TODO TO BE DEPRECATED
void get_malleability_user_comm(MPI_Comm *comm); void get_malleability_user_comm(MPI_Comm *comm);
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <stdlib.h> #include <stdlib.h>
//States //States
/*
#define MAL_UNRESERVED -1 #define MAL_UNRESERVED -1
#define MAL_DENIED -2 #define MAL_DENIED -2
#define MAL_ZOMBIE -3 #define MAL_ZOMBIE -3
...@@ -17,20 +18,29 @@ ...@@ -17,20 +18,29 @@
#define MAL_DIST_PENDING 6 #define MAL_DIST_PENDING 6
#define MAL_DIST_COMPLETED 7 #define MAL_DIST_COMPLETED 7
#define MAL_DIST_ADAPTED 8 #define MAL_DIST_ADAPTED 8
*/
//enum mall_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING, enum mall_states{MALL_UNRESERVED, MALL_NOT_STARTED, MALL_ZOMBIE, MALL_SPAWN_PENDING,
// MALL_SPAWN_SINGLE_PENDING, MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE, MALL_SPAWN_SINGLE_PENDING, MALL_SPAWN_SINGLE_COMPLETED, MALL_SPAWN_ADAPT_POSTPONE,
// MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED, MALL_DIST_ADAPTED}; MALL_SPAWN_COMPLETED, MALL_DIST_PENDING, MALL_DIST_COMPLETED, MALL_DIST_ADAPTED};
//enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE}; enum mall_spawn_methods{MALL_SPAWN_BASELINE, MALL_SPAWN_MERGE};
//enum mall_spawn_strategies{MALL_SPAWN_SERIAL, MALL_SPAWN_PTHREAD, MALL_SPAWN_MULTIPLE, MALL_SPAWN_SINGLE}; //#define MALL_SPAWN_SERIAL 2
#define MALL_SPAWN_PTHREAD 2
//#define MALL_SPAWN_MULTIPLE 5
#define MALL_SPAWN_SINGLE 3
#define MALLEABILITY_ROOT 0
#define MAL_APP_EXECUTING 0 #define MAL_APP_EXECUTING 0
#define MAL_APP_ENDED 1 #define MAL_APP_ENDED 1
// TODO Refactor // TODO Refactor
/*
#define COMM_PHY_SPREAD 1 #define COMM_PHY_SPREAD 1
#define COMM_PHY_COMPACT 2 #define COMM_PHY_COMPACT 2
*/
/*
// SPAWN METHODS // SPAWN METHODS
#define COMM_SPAWN_SERIAL 0 #define COMM_SPAWN_SERIAL 0
#define COMM_SPAWN_PTHREAD 1 #define COMM_SPAWN_PTHREAD 1
...@@ -43,7 +53,7 @@ ...@@ -43,7 +53,7 @@
#define COMM_SPAWN_SINGLE 1 #define COMM_SPAWN_SINGLE 1
//#define COMM_SPAWN_PTHREAD 2 //#define COMM_SPAWN_PTHREAD 2
//#define COMM_SPAWN_SINGLE 3 //#define COMM_SPAWN_SINGLE 3
*/
#define MAL_USE_NORMAL 0 #define MAL_USE_NORMAL 0
#define MAL_USE_IBARRIER 1 #define MAL_USE_IBARRIER 1
#define MAL_USE_POINT 2 #define MAL_USE_POINT 2
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <pthread.h> #include <pthread.h>
#include "../malleabilityStates.h" #include "../malleabilityStates.h"
#include "Baseline.h" #include "Baseline.h"
#include "Spawn_state.h"
//--------------PRIVATE DECLARATIONS---------------// //--------------PRIVATE DECLARATIONS---------------//
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child); int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child);
...@@ -24,10 +25,10 @@ void baseline_establish_connection(int myId, int root, MPI_Comm *parents); ...@@ -24,10 +25,10 @@ void baseline_establish_connection(int myId, int root, MPI_Comm *parents);
* nada los hijos. * nada los hijos.
*/ */
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
int numRanks; MPI_Comm intercomm;
MPI_Comm_size(spawn_data.comm, &numRanks); MPI_Comm_get_parent(&intercomm);
if (spawn_data.initial_qty == numRanks) { // Parents path if (intercomm == MPI_COMM_NULL) { // Parents path
if(spawn_data.spawn_is_single) { if(spawn_data.spawn_is_single) {
baseline_single_spawn(spawn_data, child); baseline_single_spawn(spawn_data, child);
} else { } else {
...@@ -78,11 +79,8 @@ int baseline_single_spawn(Spawn_data spawn_data, MPI_Comm *child) { ...@@ -78,11 +79,8 @@ int baseline_single_spawn(Spawn_data spawn_data, MPI_Comm *child) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char)); port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, spawn_data.root, 130, *child, MPI_STATUS_IGNORE); MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, spawn_data.root, 130, *child, MPI_STATUS_IGNORE);
if(spawn_data.spawn_is_async) { set_spawn_state(MALL_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
pthread_mutex_lock(&(spawn_data.spawn_mutex));
commState = MALL_SPAWN_SINGLE_COMPLETED; // Indicate other processes to join root to end spawn procedure
pthread_mutex_unlock(&(spawn_data.spawn_mutex));
}
} else { } else {
port_name = malloc(1); port_name = malloc(1);
} }
......
...@@ -10,15 +10,13 @@ ...@@ -10,15 +10,13 @@
#include "GenericSpawn.h" #include "GenericSpawn.h"
#include "Baseline.h" #include "Baseline.h"
#include "Merge.h" #include "Merge.h"
#include "Spawn_state.h"
// This code is a Singleton object -- Only one instance can be used at a given time and // This code is a Singleton object -- Only one instance can be used at a given time and
// no multiple calls to perform diferent resizes can be performed at the same time. // no multiple calls to perform diferent resizes can be performed at the same time.
int commState = MALL_NOT_STARTED; Spawn_data *spawn_data = NULL;
Spawn_data *spawn_data;
pthread_t spawn_thread; pthread_t spawn_thread;
//pthread_mutex_t spawn_mutex; FIXME BORRAR
//pthread_cond_t cond_adapt_rdy; FIXME BORRAR
MPI_Comm *returned_comm; MPI_Comm *returned_comm;
double end_time; //FIXME REFACTOR double end_time; //FIXME REFACTOR
...@@ -63,7 +61,7 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId ...@@ -63,7 +61,7 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
set_spawn_configuration(argv, num_cpus, num_nodes, nodelist, myId, root, initial_qty, target_qty, type_dist, spawn_method, spawn_strategies, comm); set_spawn_configuration(argv, num_cpus, num_nodes, nodelist, myId, root, initial_qty, target_qty, type_dist, spawn_method, spawn_strategies, comm);
if(!spawn_data->spawn_is_async) { if(!spawn_data->spawn_is_async) {
generic_spawn(child, NOT_STARTED); generic_spawn(child, MALL_NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async); local_state = get_spawn_state(spawn_data->spawn_is_async);
if (local_state == MALL_SPAWN_COMPLETED) if (local_state == MALL_SPAWN_COMPLETED)
deallocate_spawn_data(); deallocate_spawn_data();
...@@ -77,7 +75,7 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId ...@@ -77,7 +75,7 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
} }
} }
return commState; return local_state;
} }
/* /*
...@@ -88,16 +86,16 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, d ...@@ -88,16 +86,16 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, d
int local_state; int local_state;
int global_state=MALL_NOT_STARTED; int global_state=MALL_NOT_STARTED;
if(spawn_data->spawn_is_async) { if(spawn_data->spawn_is_async) { // Async
local_state = get_spawn_state(spawn_data->spawn_is_async); local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, local_state); global_state = check_single_state(comm, local_state);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE && data_dist_completed) { } else if(local_state == MALL_SPAWN_ADAPT_POSTPONE && data_dist_completed) { // Start Merge Shrink Async
global_state = check_merge_shrink_state(); global_state = check_merge_shrink_state();
} else if(local_state == MALL_SPAWN_PENDING) { } else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_DIST_ADAPTED) { // Baseline
global_state = check_generic_state(comm, child, local_state, real_time); global_state = check_generic_state(comm, child, local_state, real_time);
} else { } else {
...@@ -105,12 +103,12 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, d ...@@ -105,12 +103,12 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, d
MPI_Abort(MPI_COMM_WORLD, -1); MPI_Abort(MPI_COMM_WORLD, -1);
return -10; return -10;
} }
} else if(spawn_data->spawn_method == MALL_SPAWN_MERGE){ } else if(spawn_data->spawn_method == MALL_SPAWN_MERGE){ // Start Merge shrink Sync
generic_spawn(child, MALL_DIST_COMPLETED); generic_spawn(child, MALL_DIST_COMPLETED);
global_state = get_spawn_state(spawn_data->spawn_is_async); global_state = get_spawn_state(spawn_data->spawn_is_async);
if(global_state == MALL_SPAWN_COMPLETED)
deallocate_spawn_data();
} }
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_DIST_ADAPTED)
deallocate_spawn_data();
return global_state; return global_state;
} }
...@@ -122,7 +120,7 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, d ...@@ -122,7 +120,7 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, d
* para el paso de redistribucion de datos (Numeros de procesos y Id del Root). * para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
* *
*/ */
void malleability_connect_children(int myId, int numP, int root, int *numP_parents, int *root_parents, MPI_Comm *parents) { void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data)); spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->root = root; spawn_data->root = root;
spawn_data->myId = myId; spawn_data->myId = myId;
...@@ -133,21 +131,36 @@ void malleability_connect_children(int myId, int numP, int root, int *numP_paren ...@@ -133,21 +131,36 @@ void malleability_connect_children(int myId, int numP, int root, int *numP_paren
set_basic_spawn_dtype(); set_basic_spawn_dtype();
MPI_Bcast(spawn_data, 1, spawn_data->dtype, MALLEABILITY_ROOT, *parents); MPI_Bcast(spawn_data, 1, spawn_data->dtype, MALLEABILITY_ROOT, *parents);
switch(spawn_data->spawn_method) { switch(spawn_data->spawn_method) {
case MALL_SPAWN_BASELINE: case MALL_SPAWN_BASELINE:
local_state = baseline(*spawn_data, parents); baseline(*spawn_data, parents);
break; break;
case MALL_SPAWN_MERGE: case MALL_SPAWN_MERGE:
spawn_data->target_qty += numP_parents; spawn_data->target_qty += *numP_parents;
local_state = merge(*spawn_data, parents, NOT_STARTED); merge(*spawn_data, parents, MALL_NOT_STARTED);
break; break;
} }
*root_parents = spawn_data->root_parents;
*numP_parents = spawn_data->initial_qty;
MPI_Type_free(&(spawn_data->dtype)); MPI_Type_free(&(spawn_data->dtype));
free(spawn_data); free(spawn_data);
} }
/*
* Función para obtener si entre las estrategias elegidas, se utiliza
* la estrategia pasada como segundo argumento.
*
* Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso
* contrario.
*/
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result) {
int value = spawn_strategies % strategy ? 0 : 1;
if(result != NULL) *result = value;
return value;
}
//--------------PRIVATE CONFIGURATION FUNCTIONS---------------// //--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/* /*
* Agrupa en una sola estructura todos los datos de configuración necesarios * Agrupa en una sola estructura todos los datos de configuración necesarios
...@@ -162,8 +175,8 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel ...@@ -162,8 +175,8 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel
spawn_data->initial_qty = initial_qty; spawn_data->initial_qty = initial_qty;
spawn_data->target_qty = target_qty; spawn_data->target_qty = target_qty;
spawn_data->spawn_method = spawn_method; spawn_data->spawn_method = spawn_method;
spawn_data->spawn_is_single = spawn_strategies % MALL_SPAWN_SINGLE ? 0 : 1; malleability_spawn_contains_strat(spawn_strategies, MALL_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
spawn_data->spawn_is_async = spawn_strategies % MALL_SPAWN_PTHREAD ? 0 : 1; malleability_spawn_contains_strat(spawn_strategies, MALL_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
spawn_data->comm = comm; spawn_data->comm = comm;
set_basic_spawn_dtype(); set_basic_spawn_dtype();
...@@ -172,15 +185,15 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel ...@@ -172,15 +185,15 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel
case MALL_SPAWN_BASELINE: case MALL_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty; spawn_data->spawn_qty = spawn_data->target_qty;
spawn_data->already_created = 0; spawn_data->already_created = 0;
break;
case MALL_SPAWN_MERGE: case MALL_SPAWN_MERGE:
spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty; spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
spawn_data->already_created = spawn_data->initial_qty; spawn_data->already_created = spawn_data->initial_qty;
break;
} }
if(spawn_data->spawn_is_async) { if(spawn_data->spawn_is_async) {
pthread_mutex_init(&(spawn_data->spawn_mutex),NULL); init_spawn_state();
pthread_cond_init(&(spawn_data->cond_adapt_rdy),NULL);
spawn_thread = pthread_self();
} }
if(spawn_data->myId == spawn_data->root) { if(spawn_data->myId == spawn_data->root) {
...@@ -229,19 +242,22 @@ void set_basic_spawn_dtype() { ...@@ -229,19 +242,22 @@ void set_basic_spawn_dtype() {
* junto a la destrucion de aquellas estructuras que utiliza. * junto a la destrucion de aquellas estructuras que utiliza.
*/ */
void deallocate_spawn_data() { void deallocate_spawn_data() {
free(spawn_data->cmd); if(spawn_data == NULL) return;
if(spawn_data->cmd != NULL) {
free(spawn_data->cmd);
}
if(spawn_data->dtype != MPI_DATATYPE_NULL) {
MPI_Type_free(&(spawn_data->dtype)); MPI_Type_free(&(spawn_data->dtype));
}
if(spawn_data->mapping != MPI_INFO_NULL) { if(spawn_data->mapping != MPI_INFO_NULL) {
MPI_Info_free(&(spawn_data->mapping)); MPI_Info_free(&(spawn_data->mapping));
} }
if(spawn_data->spawn_is_async) { if(spawn_data->spawn_is_async) {
pthread_cond_destroy(&(spawn_data->cond_adapt_rdy)); free_spawn_state();
pthread_mutex_destroy(&(spawn_data->spawn_mutex));
spawn_thread = pthread_self();
} }
free(spawn_data); free(spawn_data);
spawn_data = NULL;
} }
...@@ -270,7 +286,7 @@ void generic_spawn(MPI_Comm *child, int data_stage) { ...@@ -270,7 +286,7 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
} }
// END WORK // END WORK
local_state = get_spawn_state(spawn_data->spawn_is_async); set_spawn_state(local_state, spawn_data->spawn_is_async);
end_time = MPI_Wtime(); end_time = MPI_Wtime();
} }
...@@ -304,16 +320,19 @@ int allocate_thread_spawn() { ...@@ -304,16 +320,19 @@ int allocate_thread_spawn() {
* se avisa al hilo maestro. * se avisa al hilo maestro.
*/ */
void* thread_work(void* arg) { void* thread_work(void* arg) {
int local_state, repeat = 0;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm)); returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
generic_spawn(returned_comm, NOT_STARTED); generic_spawn(returned_comm, MALL_NOT_STARTED);
while(commState == MALL_SPAWN_ADAPT_POSTPONE) {
local_state = get_spawn_state(MALL_SPAWN_PTHREAD);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos // El grupo de procesos se terminara de juntar tras la redistribucion de datos
pthread_cond_wait(&(spawn_data->cond_adapt_rdy), &(spawn_data->spawn_mutex)); repeat = 1;
generic_spawn(returned_comm, MALL_DIST_COMPLETED); local_state = wait_wakeup();
} }
if (repeat) generic_spawn(returned_comm, MALL_DIST_COMPLETED);
deallocate_spawn_data();
pthread_exit(NULL); pthread_exit(NULL);
} }
...@@ -334,14 +353,10 @@ int check_single_state(MPI_Comm comm, int global_state) { ...@@ -334,14 +353,10 @@ int check_single_state(MPI_Comm comm, int global_state) {
// They also must join if the application has ended its work // They also must join if the application has ended its work
if(global_state == MALL_SPAWN_SINGLE_COMPLETED) { if(global_state == MALL_SPAWN_SINGLE_COMPLETED) {
global_state = MALL_SPAWN_PENDING; global_state = MALL_SPAWN_PENDING;
pthread_mutex_lock(&(spawn_data->spawn_mutex)); set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
commState = global_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex));
// TODO Refactor - No debería ser necesario //int threads_not_spawned = pthread_equal(pthread_self(), spawn_thread);
int threads_not_spawned = pthread_equal(pthread_self(), spawn_thread); if(spawn_data->myId != spawn_data->root) { //&& threads_not_spawned) {
if(spawn_data->myId != spawn_data->root && threads_not_spawned) {
allocate_thread_spawn(spawn_data); allocate_thread_spawn(spawn_data);
} }
} }
...@@ -361,7 +376,8 @@ int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double ...@@ -361,7 +376,8 @@ int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double
int global_state; int global_state;
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm); MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) { if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_DIST_ADAPTED) {
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
*child = *returned_comm; *child = *returned_comm;
deallocate_spawn_data(spawn_data); deallocate_spawn_data(spawn_data);
*real_time=end_time; *real_time=end_time;
...@@ -376,10 +392,7 @@ int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double ...@@ -376,10 +392,7 @@ int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double
*/ */
int check_merge_shrink_state() { int check_merge_shrink_state() {
// FIXME Pasar como caso especial para evitar iteracion no necesaria // FIXME Pasar como caso especial para evitar iteracion no necesaria
int global_state = MALL_SPAWN_ADAPT_PENDING; int global_state = MALL_SPAWN_PENDING;
pthread_mutex_lock(&(spawn_data->spawn_mutex)); set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
commState = global_state; return global_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex));
pthread_cond_signal(&(spawn_data->cond_adapt_rdy));
return global_state
} }
...@@ -8,7 +8,9 @@ ...@@ -8,7 +8,9 @@
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child); int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, double *real_time); int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int data_dist_completed, double *real_time);
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents);
void malleability_connect_children(int myId, int numP, int root, int *numP_parents, int *root_parents, MPI_Comm *parents);
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result);
#endif #endif
objects1 := Baseline objects1 := ProcessDist
objects2 := Merge ProcessDist objects2 := Spawn_state
objects3 := GenericSpawn objects3 := Baseline
objects4 := Merge
objects5 := GenericSpawn
CC := gcc CC := gcc
MCC := mpicc MCC := mpicc
CFLAGS := -Wall -Wextra CFLAGS := -Wall
all: $(objects1) $(objects2) $(objects3) all: $(objects1) $(objects2) $(objects3) $(objects4) $(objects5)
$(objects1): %: %.c $(objects1): %: %.c %.h
$(MCC) $(CFLAGS) -c -o $@.o $< $(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
$(objects2): %: %.c $(objects1).o
echo $@ $(objects2): %: %.c %.h
$(MCC) $(CFLAGS) -c -o $@.o $< $(CC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
$(objects3): %: %.c $(objects2).o
echo $@ $(objects3): %: %.c %.h $(objects2).h
$(MCC) $(CFLAGS) -c -o $@.o $< $(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
clean:
rm *.o $(objects4): %: %.c %.h $(objects3).h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
$(objects5): %: %.c %.h $(objects1).h $(objects2).h $(objects3).h $(objects4).h
$(MCC) $(CFLAGS) -c -o $(BUILDDIR)/$@.o $<
...@@ -11,24 +11,24 @@ void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId); ...@@ -11,24 +11,24 @@ void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId);
//--------------PUBLIC FUNCTIONS---------------// //--------------PUBLIC FUNCTIONS---------------//
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) { int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
MPI_Comm new_comm = MPI_COMM_NULL; MPI_Comm intercomm;
int local_state; int local_state;
int numRanks, is_children_group = 1; int is_children_group = 1;
if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink
if(data_state == MALL_DIST_COMPLETED) if(data_state == MALL_DIST_COMPLETED) {
merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, spawn_data.myId) merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, spawn_data.myId);
local_state = MALL_SPAWN_ADAPTED; local_state = MALL_DIST_ADAPTED;
else { } else {
local_state = MALL_SPAWN_ADAPT_POSTPONE; local_state = MALL_SPAWN_ADAPT_POSTPONE;
} }
} else { //Expand } else { //Expand
MPI_Comm_size(spawn_data.comm, &numRanks); MPI_Comm_get_parent(&intercomm);
is_children_group = spawn_data.initial_qty == numRanks ? 0:1; is_children_group = intercomm == MPI_COMM_NULL ? 0:1;
baseline(spawn_data, child); baseline(spawn_data, child);
merge_adapt_expand(child, is_children_group); merge_adapt_expand(child, is_children_group);
local_state = MALL_SPAWN_ADAPTED; local_state = MALL_SPAWN_COMPLETED;
} }
return local_state; return local_state;
...@@ -48,7 +48,7 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) { ...@@ -48,7 +48,7 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
void merge_adapt_expand(MPI_Comm *child, int is_children_group) { void merge_adapt_expand(MPI_Comm *child, int is_children_group) {
MPI_Comm new_comm = MPI_COMM_NULL; MPI_Comm new_comm = MPI_COMM_NULL;
MPI_Intercomm_merge(child, is_children_group, new_comm); //El que pone 0 va primero MPI_Intercomm_merge(*child, is_children_group, &new_comm); //El que pone 0 va primero
MPI_Comm_free(child); //POSIBLE ERROR? MPI_Comm_free(child); //POSIBLE ERROR?
*child = new_comm; *child = new_comm;
......
...@@ -2,8 +2,10 @@ ...@@ -2,8 +2,10 @@
#include <stdlib.h> #include <stdlib.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
#include <mpi.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include <mpi.h>
#include <slurm/slurm.h>
#include "ProcessDist.h" #include "ProcessDist.h"
...@@ -33,7 +35,7 @@ int write_hostfile_node(int ptr, int qty, char *node_name); ...@@ -33,7 +35,7 @@ int write_hostfile_node(int ptr, int qty, char *node_name);
* IN parameters --> * IN parameters -->
* target_qty: Numero de procesos tras la reconfiguracion * target_qty: Numero de procesos tras la reconfiguracion
* alreadyCreated: Numero de procesos padre a considerar * alreadyCreated: Numero de procesos padre a considerar
* La resta de numC-alreadyCreated es el numero de hijos a crear * La resta de target_qty-alreadyCreated es el numero de hijos a crear
* num_cpus: Numero de cpus totales (En uso o no) * num_cpus: Numero de cpus totales (En uso o no)
* num_nodes: Numero de nodos disponibles por esta aplicacion * num_nodes: Numero de nodos disponibles por esta aplicacion
* info_type: Indica como realizar el mappeado, si indicarlo * info_type: Indica como realizar el mappeado, si indicarlo
...@@ -45,15 +47,12 @@ int write_hostfile_node(int ptr, int qty, char *node_name); ...@@ -45,15 +47,12 @@ int write_hostfile_node(int ptr, int qty, char *node_name);
* (NODES/WORST/SPREAD) * (NODES/WORST/SPREAD)
*/ */
int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist) { int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist) {
if(*dist == NULL) {
return 0;
}
dist->target_qty = target_qty; dist->target_qty = target_qty;
dist->already_created = already_created; dist->already_created = already_created;
dist->num_cpus = num_cpus; dist->num_cpus = num_cpus;
dist->num_cpus = num_nodes; dist->num_nodes = num_nodes;
dist->nodelist = nodelist dist->nodelist = nodelist;
dist->dist_type = dist_type; dist->dist_type = dist_type;
dist->info_type = info_type; dist->info_type = info_type;
...@@ -77,10 +76,10 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) { ...@@ -77,10 +76,10 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
node_dist(dist, &procs_array, &used_nodes); node_dist(dist, &procs_array, &used_nodes);
switch(dist.info_type) { switch(dist.info_type) {
case MALL_DIST_STRING: case MALL_DIST_STRING:
generate_info_string(nodelist, procs_array, used_nodes, info_spawn); generate_info_string(dist.nodelist, procs_array, used_nodes, info_spawn);
break; break;
case MALL_DIST_HOSTFILE: case MALL_DIST_HOSTFILE:
generate_info_hostfile(nodelist, procs_array, used_nodes, info_spawn); generate_info_hostfile(dist.nodelist, procs_array, used_nodes, info_spawn);
break; break;
} }
free(procs_array); free(procs_array);
...@@ -133,8 +132,8 @@ void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -133,8 +132,8 @@ void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) {
int i, tamBl, remainder; int i, tamBl, remainder;
*used_nodes = dist.num_nodes; *used_nodes = dist.num_nodes;
tamBl = dist.numC / dist.num_nodes; tamBl = dist.target_qty / dist.num_nodes;
remainder = dist.numC % dist.num_nodes; remainder = dist.target_qty % dist.num_nodes;
for(i=0; i<remainder; i++) { for(i=0; i<remainder; i++) {
procs[i] = tamBl + 1; procs[i] = tamBl + 1;
} }
...@@ -170,7 +169,7 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -170,7 +169,7 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
} }
//Assing tamBl to each node //Assing tamBl to each node
while(asigCores+tamBl <= dist.numC) { while(asigCores+tamBl <= dist.target_qty) {
asigCores += tamBl; asigCores += tamBl;
procs[i] += tamBl; procs[i] += tamBl;
i = (i+1) % dist.num_nodes; i = (i+1) % dist.num_nodes;
...@@ -178,8 +177,8 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -178,8 +177,8 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
} }
//Last node could have less procs than tamBl //Last node could have less procs than tamBl
if(asigCores < dist.numC) { if(asigCores < dist.target_qty) {
procs[i] += dist.numC - asigCores; procs[i] += dist.target_qty - asigCores;
(*used_nodes)++; (*used_nodes)++;
} }
if(*used_nodes > dist.num_nodes) *used_nodes = dist.num_nodes; //FIXME Si ocurre esto no es un error? if(*used_nodes > dist.num_nodes) *used_nodes = dist.num_nodes; //FIXME Si ocurre esto no es un error?
......
...@@ -3,26 +3,29 @@ ...@@ -3,26 +3,29 @@
#include <pthread.h> #include <pthread.h>
pthread_mutex_t spawn_mutex; pthread_mutex_t spawn_mutex;
pthread_cond_t spawn_cond;
int spawn_state; int spawn_state;
void init_spawn_state() { void init_spawn_state() {
pthread_mutex_init(&spawn_mutex,NULL); pthread_mutex_init(&spawn_mutex,NULL);
pthread_cond_init(&spawn_cond,NULL);
} }
void free_spawn_state() { void free_spawn_state() {
pthread_mutex_destroy(&spawn_mutex); pthread_mutex_destroy(&spawn_mutex);
pthread_cond_destroy(&spawn_cond);
} }
int get_spawn_state(int is_async) { int get_spawn_state(int is_async) {
int value; int value;
if(is_async) { if(is_async) {
pthread_mutex_lock(&(spawn_data->spawn_mutex)); pthread_mutex_lock(&spawn_mutex);
value = spawn_state; value = spawn_state;
pthread_mutex_unlock(&(spawn_data->spawn_mutex)); pthread_mutex_unlock(&spawn_mutex);
} else { } else {
value = spawn_state; value = spawn_state;
} }
return value return value;
} }
void set_spawn_state(int value, int is_async) { void set_spawn_state(int value, int is_async) {
...@@ -34,3 +37,12 @@ void set_spawn_state(int value, int is_async) { ...@@ -34,3 +37,12 @@ void set_spawn_state(int value, int is_async) {
spawn_state = value; spawn_state = value;
} }
} }
int wait_wakeup() {
pthread_cond_wait(&spawn_cond, &spawn_mutex);
return get_spawn_state(1);
}
void wakeup() {
pthread_cond_signal(&spawn_cond);
}
...@@ -7,7 +7,11 @@ ...@@ -7,7 +7,11 @@
void init_spawn_state(); void init_spawn_state();
void free_spawn_state(); void free_spawn_state();
int get_spawn_state(int is_async); int get_spawn_state(int is_async);
void set_spawn_state(int value, int is_async); void set_spawn_state(int value, int is_async);
int wait_wakeup();
void wakeup();
#endif #endif
...@@ -19,7 +19,7 @@ echo "MPICH" ...@@ -19,7 +19,7 @@ echo "MPICH"
numP=$(bash recordMachinefile.sh $configFile) numP=$(bash recordMachinefile.sh $configFile)
#mpirun -np 4 /home/martini/Instalaciones/valgrind-mpich-3.4.1-noucx/bin/valgrind --leak-check=full --show-leak-kinds=all --log-file=nc.vg.%p $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes #mpirun -np 4 /home/martini/Instalaciones/valgrind-mpich-3.4.1-noucx/bin/valgrind --leak-check=full --show-leak-kinds=all --log-file=nc.vg.%p $dir$codeDir/a.out $configFile $outIndex $nodelist $nodes
mpirun -np $numP $dir$codeDir/exec/a.out $configFile $outIndex $nodelist $nodes mpirun -np $numP $dir$codeDir/build/a.out $configFile $outIndex $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID rm hostfile.o$SLURM_JOB_ID
echo "END RUN" echo "END RUN"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment