Commit 32a4ca93 authored by Iker Martín Álvarez's avatar Iker Martín Álvarez
Browse files

Merge branch 'malleability-refactor' into 'dev'

Malleability focus refactor of Proteo

See merge request martini/malleability_benchmark!5
parents f1511cb4 06573694
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
#include <signal.h>
#include "../IOcodes/results.h"
#include "malleabilityZombies.h"
#define PIDS_QTY 320
void zombies_suspend();
int offset_pids, *pids = NULL;
void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, int n_stages) {
int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int));
int i, count=1;
if(myId < numC) {
count = 0;
if(myId == root) {
for(i=0; i < numC; i++) {
pids_counts[i] = 0;
}
for(i=numC; i<numP; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i + offset_pids) - numC;
}
offset_pids += numP - numC;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, root, comm);
free(pids_counts);
free(pids_displs);
if(myId >= numC) {
// FIXME No deberia estar aqui
// Needed to ensure iteration times are collected before suspending these processes
results_data *results = (results_data *) results_void;
compute_results_iter(results, myId, numP,root, comm);
compute_results_stages(results, myId, numP, n_stages, root, comm);
zombies_suspend();
}
}
void zombies_service_init() {
offset_pids = 0;
pids = malloc(PIDS_QTY * sizeof(int));
for(int i=0; i<PIDS_QTY; i++) {
pids[i] = 0;
}
}
void zombies_service_free() {
free(pids);
}
void zombies_suspend() {
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
act.sa_handler=gestor_usr2;
sigaction(SIGUSR2, &act, NULL);
sigset_t set;
sigprocmask(SIG_SETMASK,NULL,&set);
sigsuspend(&set);
}
void zombies_awake() {
for(int i=0; i < offset_pids; i++) { // Despertar a los zombies
kill(pids[i], SIGUSR2);
}
}
#ifndef MALLEABILITY_ZOMBIES_H
#define MALLEABILITY_ZOMBIES_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
#include <signal.h>
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, int n_stages);
void zombies_service_init();
void zombies_service_free();
void zombies_awake();
#endif
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include "../malleabilityStates.h" #include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "Baseline.h" #include "Baseline.h"
#include "Spawn_state.h" #include "Spawn_state.h"
#define MAM_TAG_STRAT_SINGLE 130
#define MAM_TAG_STRAT_MULTIPLE_FIRST 131
#define MAM_TAG_STRAT_MULTIPLE_OTHER 132
//--------------PRIVATE DECLARATIONS---------------// //--------------PRIVATE DECLARATIONS---------------//
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child); int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child);
int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child); void baseline_parents(Spawn_data spawn_data, MPI_Comm *child);
void single_strat_children(int myId, int root, MPI_Comm *parents);
void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child);
void multiple_strat_children(MPI_Comm *parents);
void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
void single_strat_children(MPI_Comm *parents);
//--------------PUBLIC FUNCTIONS---------------// //--------------PUBLIC FUNCTIONS---------------//
...@@ -18,72 +27,193 @@ void single_strat_children(int myId, int root, MPI_Comm *parents); ...@@ -18,72 +27,193 @@ void single_strat_children(int myId, int root, MPI_Comm *parents);
*/ */
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
MPI_Comm intercomm; MPI_Comm intercomm;
MPI_Comm_get_parent(&intercomm); MPI_Comm_get_parent(&intercomm); //FIXME May be a problem for third reconf or more with only expansions
if (intercomm == MPI_COMM_NULL) { // Parents path if (intercomm == MPI_COMM_NULL) { // Parents path
if (spawn_data.spawn_is_single) { baseline_parents(spawn_data, child);
single_strat_parents(spawn_data, child); } else { // Children path
} else { if(spawn_data.spawn_is_multiple) { multiple_strat_children(child); }
baseline_spawn(spawn_data, spawn_data.comm, child); if(spawn_data.spawn_is_single) { single_strat_children(child); }
}
} else if(spawn_data.spawn_is_single) { // Children path
single_strat_children(spawn_data.myId, spawn_data.root, child);
} }
return MALL_SPAWN_COMPLETED;
return MAM_I_SPAWN_COMPLETED;
} }
//--------------PRIVATE FUNCTIONS---------------// //--------------PRIVATE FUNCTIONS---------------//
/* /*
* Crea un grupo de procesos segun la configuracion indicada por la funcion * Funcion utilizada por los padres para realizar la
* "processes_dist()". * creación de procesos.
*
*/ */
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) { void baseline_parents(Spawn_data spawn_data, MPI_Comm *child) {
int i;
MPI_Comm comm, *intercomms;
if (spawn_data.spawn_is_single && mall->myId != mall->root) {
single_strat_parents(spawn_data, child);
return;
}
comm = spawn_data.spawn_is_single ? MPI_COMM_SELF : spawn_data.comm;
MPI_Bcast(&spawn_data.total_spawns, 1, MPI_INT, mall->root, comm);
intercomms = (MPI_Comm*) malloc(spawn_data.total_spawns * sizeof(MPI_Comm));
if(mall->myId != mall->root) {
spawn_data.sets = (Spawn_set *) malloc(spawn_data.total_spawns * sizeof(Spawn_set));
}
#if MAM_DEBUG >= 3
DEBUG_FUNC("Starting spawning of processes", mall->myId, mall->numP); fflush(stdout);
#endif
for(i=0; i<spawn_data.total_spawns; i++) {
baseline_spawn(spawn_data.sets[i], comm, &intercomms[i]);
}
#if MAM_DEBUG >= 3
DEBUG_FUNC("Sources have created the new processes. Performing additional actions if required.", mall->myId, mall->numP); fflush(stdout);
#endif
// TODO Improvement - Deactivate Multiple spawn before spawning if total_spawns == 1
if(spawn_data.spawn_is_multiple) { multiple_strat_parents(spawn_data, comm, intercomms, child); }
else { *child = intercomms[0]; }
if(spawn_data.spawn_is_single) { single_strat_parents(spawn_data, child); }
free(intercomms);
if(mall->myId != mall->root) { free(spawn_data.sets); }
}
/*
* Funcion basica encargada de la creacion de procesos.
* Crea un set de procesos segun la configuracion obtenida
* en ProcessDist.c
* Devuelve en "child" el intercomunicador que se conecta a los hijos.
*/
int baseline_spawn(Spawn_set spawn_set, MPI_Comm comm, MPI_Comm *child) {
int rootBcast = MPI_PROC_NULL; int rootBcast = MPI_PROC_NULL;
if(spawn_data.myId == spawn_data.root) rootBcast = MPI_ROOT; if(mall->myId == mall->root) rootBcast = MPI_ROOT;
// WORK int spawn_err = MPI_Comm_spawn(spawn_set.cmd, MPI_ARGV_NULL, spawn_set.spawn_qty, spawn_set.mapping, mall->root, comm, child, MPI_ERRCODES_IGNORE);
int spawn_err = MPI_Comm_spawn(spawn_data.cmd, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, spawn_data.root, comm, child, MPI_ERRCODES_IGNORE);
MPI_Comm_set_name(*child, "MPI_COMM_MALL_RESIZE");
// END WORK
if(spawn_err != MPI_SUCCESS) { if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_data.spawn_qty); printf("Error creating new set of %d procs.\n", spawn_set.spawn_qty);
} }
MPI_Bcast(&spawn_data, 1, spawn_data.dtype, rootBcast, *child); MAM_Comm_main_structures(*child, rootBcast);
return spawn_err; return spawn_err;
} }
void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *intercomms, MPI_Comm *child) {
int i, tag;
char *port_name, aux;
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
tag = MAM_TAG_STRAT_MULTIPLE_FIRST;
MPI_Send(&spawn_data.total_spawns, 1, MPI_INT, MAM_ROOT, tag, intercomms[0]);
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, tag, intercomms[0], MPI_STATUS_IGNORE);
for(i=1; i<spawn_data.total_spawns; i++) {
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MAM_ROOT, tag+i, intercomms[i]);
MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_MULTIPLE_FIRST, intercomms[0], MPI_STATUS_IGNORE);
}
} else { port_name = malloc(1); }
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, comm, child);
for(i=0; i<spawn_data.total_spawns; i++) {
MPI_Comm_disconnect(&intercomms[i]);
}
free(port_name);
}
void multiple_strat_children(MPI_Comm *parents) {
int i, start, total_spawns, new_root;
int rootBcast = MPI_PROC_NULL;
char *port_name, aux;
MPI_Status stat;
MPI_Comm newintracomm, intercomm, parents_comm;
new_root = 0;
parents_comm = *parents;
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, parents_comm, &stat);
if(stat.MPI_TAG == MAM_TAG_STRAT_MULTIPLE_FIRST) {
MPI_Recv(&total_spawns, 1, MPI_INT, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, MPI_STATUS_IGNORE);
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm);
start = 0;
new_root = 1;
rootBcast = MPI_ROOT;
} else {
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm, &stat);
// The "+1" is because the first iteration is done before the loop
start = stat.MPI_TAG - MAM_TAG_STRAT_MULTIPLE_FIRST + 1;
}
} else { port_name = malloc(1); }
MPI_Bcast(&start, 1, MPI_INT, mall->root, mall->comm);
if(start) {
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, mall->comm, &intercomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
MPI_Intercomm_merge(intercomm, 1, &newintracomm); // Get last ranks
MPI_Comm_disconnect(&intercomm);
} else {
start = 1;
MPI_Comm_dup(mall->comm, &newintracomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, mall->root, mall->comm); // FIXME Seems inneficient - Should be performed by parent root?
}
for(i=start; i<total_spawns; i++) {
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
MPI_Bcast(&total_spawns, 1, MPI_INT, rootBcast, intercomm); // FIXME Seems inneficient - Should be performed by parent root?
if(newintracomm != MPI_COMM_WORLD) MPI_Comm_disconnect(&newintracomm);
MPI_Intercomm_merge(intercomm, 0, &newintracomm); // Get first ranks
MPI_Comm_disconnect(&intercomm);
if(new_root) {
MPI_Send(&aux, 1, MPI_CHAR, stat.MPI_SOURCE, stat.MPI_TAG, parents_comm); // Ensures order in the created intracommunicator
}
}
// Connect with parents
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, newintracomm, &intercomm);
// Update communicator to expected one
MAM_comms_update(newintracomm);
MPI_Comm_rank(mall->comm, &mall->myId);
MPI_Comm_size(mall->comm, &mall->numP);
if(new_root) MPI_Close_port(port_name);
free(port_name);
MPI_Comm_disconnect(&newintracomm);
MPI_Comm_disconnect(parents);
*parents = intercomm;
}
/* /*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres * Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root * Si el valor es diferente, la creación es solo con la participación del proceso root
*/ */
int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) { void single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
int spawn_err;
char *port_name; char *port_name;
MPI_Comm newintercomm; MPI_Comm newintercomm;
if (spawn_data.myId == spawn_data.root) { if (mall->myId == mall->root) {
spawn_err = baseline_spawn(spawn_data, MPI_COMM_SELF, child);
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char)); port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, spawn_data.root, 130, *child, MPI_STATUS_IGNORE); MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_SINGLE, *child, MPI_STATUS_IGNORE);
set_spawn_state(MALL_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
set_spawn_state(MAM_I_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
wakeup_completion();
} else { } else {
port_name = malloc(1); port_name = malloc(1);
} }
MPI_Comm_connect(port_name, MPI_INFO_NULL, spawn_data.root, spawn_data.comm, &newintercomm); MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, spawn_data.comm, &newintercomm);
if(spawn_data.myId == spawn_data.root) if(mall->myId == mall->root)
MPI_Comm_free(child); MPI_Comm_disconnect(child);
free(port_name); free(port_name);
*child = newintercomm; *child = newintercomm;
return spawn_err;
} }
/* /*
...@@ -93,24 +223,24 @@ int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) { ...@@ -93,24 +223,24 @@ int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
* Solo se utiliza cuando la creación de los procesos ha sido * Solo se utiliza cuando la creación de los procesos ha sido
* realizada por un solo proceso padre * realizada por un solo proceso padre
*/ */
void single_strat_children(int myId, int root, MPI_Comm *parents) { void single_strat_children(MPI_Comm *parents) {
char *port_name; char *port_name;
MPI_Comm newintercomm; MPI_Comm newintercomm;
if(myId == root) { if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char)); port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Open_port(MPI_INFO_NULL, port_name); MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *parents); MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, MAM_TAG_STRAT_SINGLE, *parents);
} else { } else {
port_name = malloc(1); port_name = malloc(1);
} }
MPI_Comm_accept(port_name, MPI_INFO_NULL, root, MPI_COMM_WORLD, &newintercomm); MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, mall->comm, &newintercomm);
if(myId == root) { if(mall->myId == mall->root) {
MPI_Close_port(port_name); MPI_Close_port(port_name);
} }
free(port_name); free(port_name);
MPI_Comm_free(parents); MPI_Comm_disconnect(parents);
*parents = newintercomm; *parents = newintercomm;
} }
#ifndef MALLEABILITY_SPAWN_BASELINE_H #ifndef MAM_SPAWN_BASELINE_H
#define MALLEABILITY_SPAWN_BASELINE_H #define MAM_SPAWN_BASELINE_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include <string.h> #include <string.h>
#include "../malleabilityDataStructures.h" #include "Spawn_DataStructure.h"
int baseline(Spawn_data spawn_data, MPI_Comm *child); int baseline(Spawn_data spawn_data, MPI_Comm *child);
#endif #endif
...@@ -5,7 +5,9 @@ ...@@ -5,7 +5,9 @@
#include <pthread.h> #include <pthread.h>
#include <mpi.h> #include <mpi.h>
#include <string.h> #include <string.h>
#include "../malleabilityStates.h" #include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "../MAM_Configuration.h"
#include "ProcessDist.h" #include "ProcessDist.h"
#include "GenericSpawn.h" #include "GenericSpawn.h"
#include "Baseline.h" #include "Baseline.h"
...@@ -17,24 +19,20 @@ ...@@ -17,24 +19,20 @@
Spawn_data *spawn_data = NULL; Spawn_data *spawn_data = NULL;
pthread_t spawn_thread; pthread_t spawn_thread;
MPI_Comm *returned_comm;
double end_time; //FIXME REFACTOR
//--------------PRIVATE CONFIGURATION DECLARATIONS---------------// //--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodelist, int myId, int root, int initial_qty, int target_qty, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm); void set_spawn_configuration(MPI_Comm comm);
void set_basic_spawn_dtype();
void deallocate_spawn_data(); void deallocate_spawn_data();
//--------------PRIVATE DECLARATIONS---------------// //--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage); void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, int global_state); int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time); int check_generic_state(MPI_Comm comm, int local_state, int wait_completed);
//--------------PRIVATE THREADS DECLARATIONS---------------// //--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn(); int allocate_thread_spawn(MPI_Comm *child);
void* thread_work(); void* thread_work(void *args);
//--------------PUBLIC FUNCTIONS---------------// //--------------PUBLIC FUNCTIONS---------------//
...@@ -52,27 +50,27 @@ void* thread_work(); ...@@ -52,27 +50,27 @@ void* thread_work();
* Si se pide en segundo plano, llamar a "check_spawn_state()" comprobara si la configuracion para * Si se pide en segundo plano, llamar a "check_spawn_state()" comprobara si la configuracion para
* crearlos esta lista, y si es asi, los crea. * crearlos esta lista, y si es asi, los crea.
* *
* Devuelve el estado de el procedimiento. Si no devuelve "MALL_SPAWN_COMPLETED", es necesario llamar a * Devuelve el estado de el procedimiento. Si no devuelve "MAM_I_SPAWN_COMPLETED", es necesario llamar a
* "check_spawn_state()". * "check_spawn_state()".
*/ */
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child) { int init_spawn(MPI_Comm comm, MPI_Comm *child) {
int local_state; int local_state;
set_spawn_configuration(argv, num_cpus, num_nodes, nodelist, myId, root, initial_qty, target_qty, type_dist, spawn_method, spawn_strategies, comm); set_spawn_configuration(comm);
if(!spawn_data->spawn_is_async) { if(!spawn_data->spawn_is_async) {
generic_spawn(child, MALL_NOT_STARTED); generic_spawn(child, MAM_I_NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async); local_state = get_spawn_state(spawn_data->spawn_is_async);
if (local_state == MALL_SPAWN_COMPLETED) if (local_state == MAM_I_SPAWN_COMPLETED)
deallocate_spawn_data(); deallocate_spawn_data();
} else { } else {
local_state = spawn_data->spawn_is_single ? local_state = spawn_data->spawn_is_single ?
MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING; MAM_I_SPAWN_SINGLE_PENDING : MAM_I_SPAWN_PENDING;
local_state = spawn_data->spawn_method == MALL_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ? local_state = mall_conf->spawn_method == MAM_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
MALL_SPAWN_ADAPT_POSTPONE : local_state; MAM_I_SPAWN_ADAPT_POSTPONE : local_state;
set_spawn_state(local_state, 0); set_spawn_state(local_state, 0);
if((spawn_data->spawn_is_single && myId == root) || !spawn_data->spawn_is_single) { if((spawn_data->spawn_is_single && mall->myId == mall->root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn(); allocate_thread_spawn(child);
} }
} }
...@@ -83,20 +81,20 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId ...@@ -83,20 +81,20 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista, * Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos. * y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/ */
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) { int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) {
int local_state; int local_state;
int global_state=MALL_NOT_STARTED; int global_state=MAM_I_NOT_STARTED;
if(spawn_data->spawn_is_async) { // Async if(spawn_data->spawn_is_async) { // Async
local_state = get_spawn_state(spawn_data->spawn_is_async); local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single if(local_state == MAM_I_SPAWN_SINGLE_PENDING || local_state == MAM_I_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, local_state); global_state = check_single_state(comm, child, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Baseline } else if(local_state == MAM_I_SPAWN_PENDING || local_state == MAM_I_SPAWN_COMPLETED || local_state == MAM_I_SPAWN_ADAPTED) { // Generic
global_state = check_generic_state(comm, child, local_state, real_time); global_state = check_generic_state(comm, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) { } else if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE) {
global_state = local_state; global_state = local_state;
} else { } else {
...@@ -104,18 +102,18 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) { ...@@ -104,18 +102,18 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
MPI_Abort(MPI_COMM_WORLD, -1); MPI_Abort(MPI_COMM_WORLD, -1);
return -10; return -10;
} }
} else if(spawn_data->spawn_method == MALL_SPAWN_MERGE){ // Start Merge shrink Sync } else if(mall_conf->spawn_method == MAM_SPAWN_MERGE){ // Start Merge shrink Sync
generic_spawn(child, MALL_DIST_COMPLETED); generic_spawn(child, MAM_I_DIST_COMPLETED);
global_state = get_spawn_state(spawn_data->spawn_is_async); global_state = get_spawn_state(spawn_data->spawn_is_async);
} }
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) if(global_state == MAM_I_SPAWN_COMPLETED || global_state == MAM_I_SPAWN_ADAPTED)
deallocate_spawn_data(); deallocate_spawn_data();
return global_state; return global_state;
} }
/* /*
* Elimina la bandera bloqueante MALL_SPAWN_ADAPT_POSTPONE para los hilos * Elimina la bandera bloqueante MAM_I_SPAWN_ADAPT_POSTPONE para los hilos
* auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no * auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no
* avance hasta que se complete la redistribucion de datos. Por tanto, * avance hasta que se complete la redistribucion de datos. Por tanto,
* al modificar la bandera los hilos pueden continuar. * al modificar la bandera los hilos pueden continuar.
...@@ -125,9 +123,9 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) { ...@@ -125,9 +123,9 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
*/ */
void unset_spawn_postpone_flag(int outside_state) { void unset_spawn_postpone_flag(int outside_state) {
int local_state = get_spawn_state(spawn_data->spawn_is_async); int local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE && outside_state == MALL_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) { if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE && outside_state == MAM_I_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) {
set_spawn_state(MALL_SPAWN_PENDING, MALL_SPAWN_PTHREAD); set_spawn_state(MAM_I_SPAWN_PENDING, spawn_data->spawn_is_async);
wakeup(); wakeup_redistribution();
} }
} }
...@@ -139,73 +137,58 @@ void unset_spawn_postpone_flag(int outside_state) { ...@@ -139,73 +137,58 @@ void unset_spawn_postpone_flag(int outside_state) {
* para el paso de redistribucion de datos (Numeros de procesos y Id del Root). * para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
* *
*/ */
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents) { void malleability_connect_children(MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data)); spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->root = root;
spawn_data->myId = myId;
spawn_data->spawn_qty = numP;
spawn_data->target_qty = numP;
spawn_data->comm = comm;
set_basic_spawn_dtype(); MAM_Comm_main_structures(*parents, MAM_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
MPI_Bcast(spawn_data, 1, spawn_data->dtype, MALLEABILITY_ROOT, *parents); spawn_data->initial_qty = mall->num_parents;
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
switch(spawn_data->spawn_method) { switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE: case MAM_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
baseline(*spawn_data, parents); baseline(*spawn_data, parents);
if(!spawn_data->spawn_is_intercomm) {
intracomm_strategy(MAM_TARGETS, parents);
}
break; break;
case MALL_SPAWN_MERGE: case MAM_SPAWN_MERGE:
spawn_data->target_qty += spawn_data->initial_qty; spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
merge(*spawn_data, parents, MALL_NOT_STARTED); merge(*spawn_data, parents, MAM_I_NOT_STARTED);
break; break;
} }
*root_parents = spawn_data->root_parents;
*numP_parents = spawn_data->initial_qty;
MPI_Type_free(&(spawn_data->dtype));
free(spawn_data); free(spawn_data);
} }
/*
* Función para obtener si entre las estrategias elegidas, se utiliza
* la estrategia pasada como segundo argumento.
*
* Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso
* contrario.
*/
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result) {
int value = spawn_strategies % strategy ? 0 : 1;
if(result != NULL) *result = value;
return value;
}
//--------------PRIVATE CONFIGURATION FUNCTIONS---------------// //--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/* /*
* Agrupa en una sola estructura todos los datos de configuración necesarios * Agrupa en una sola estructura todos los datos de configuración necesarios
* e inicializa las estructuras necesarias. * e inicializa las estructuras necesarias.
*/ */
void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodelist, int myId, int root, int initial_qty, int target_qty, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm) { void set_spawn_configuration(MPI_Comm comm) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data)); spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->myId = myId; spawn_data->total_spawns = 0;
spawn_data->root = root; spawn_data->initial_qty = mall->numP;
spawn_data->root_parents = root; spawn_data->target_qty = mall->numC;
spawn_data->initial_qty = initial_qty; MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
spawn_data->target_qty = target_qty; MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
spawn_data->spawn_method = spawn_method; MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
malleability_spawn_contains_strat(spawn_strategies, MALL_SPAWN_SINGLE, &(spawn_data->spawn_is_single)); MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
malleability_spawn_contains_strat(spawn_strategies, MALL_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
spawn_data->comm = comm; spawn_data->comm = comm;
spawn_data->mapping_fill_method = MAM_PHY_TYPE_STRING;
spawn_data->sets = NULL;
set_basic_spawn_dtype(); switch(mall_conf->spawn_method) {
case MAM_SPAWN_BASELINE:
switch(spawn_data->spawn_method) {
case MALL_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty; spawn_data->spawn_qty = spawn_data->target_qty;
spawn_data->already_created = 0; spawn_data->already_created = 0;
break; break;
case MALL_SPAWN_MERGE: case MAM_SPAWN_MERGE:
spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty; spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
spawn_data->already_created = spawn_data->initial_qty; spawn_data->already_created = spawn_data->initial_qty;
break; break;
...@@ -214,47 +197,6 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel ...@@ -214,47 +197,6 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel
if(spawn_data->spawn_is_async) { if(spawn_data->spawn_is_async) {
init_spawn_state(); init_spawn_state();
} }
spawn_data->mapping = MPI_INFO_NULL;
if(spawn_data->myId == spawn_data->root) {
physical_struct_create(target_qty, spawn_data->already_created, num_cpus, num_nodes, nodelist, type_dist, MALL_DIST_STRING, &(spawn_data->dist));
//COPY PROGRAM NAME
spawn_data->cmd = malloc((strlen(cmd)+1) * sizeof(char));
strcpy(spawn_data->cmd, cmd);
spawn_data->cmd[strlen(cmd)]='\0';
} else {
spawn_data->cmd = malloc(1 * sizeof(char));
}
}
/*
* Crea un tipo derivado para mandar 4 enteros con informacion
* basica a los hijos. Son datos necesarios para que terminen
* la creacion de procesos.
*/
void set_basic_spawn_dtype() {
int i, counts = 4;
int blocklengths[] = {1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = MPI_INT;
// Rellenar vector displs
MPI_Get_address(spawn_data, &dir);
MPI_Get_address(&(spawn_data->root_parents), &displs[0]);
MPI_Get_address(&(spawn_data->initial_qty), &displs[1]);
MPI_Get_address(&(spawn_data->spawn_is_single), &displs[2]);
MPI_Get_address(&(spawn_data->spawn_method), &displs[3]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, &(spawn_data->dtype));
MPI_Type_commit(&(spawn_data->dtype));
} }
/* /*
...@@ -262,17 +204,23 @@ void set_basic_spawn_dtype() { ...@@ -262,17 +204,23 @@ void set_basic_spawn_dtype() {
* junto a la destrucion de aquellas estructuras que utiliza. * junto a la destrucion de aquellas estructuras que utiliza.
*/ */
void deallocate_spawn_data() { void deallocate_spawn_data() {
int i;
MPI_Info *info;
if(spawn_data == NULL) return; if(spawn_data == NULL) return;
if(spawn_data->cmd != NULL) { for(i=0; i<spawn_data->total_spawns; i++) {
free(spawn_data->cmd); info = &(spawn_data->sets[i].mapping);
} if(*info != MPI_INFO_NULL) {
if(spawn_data->dtype != MPI_DATATYPE_NULL) { MPI_Info_free(info);
MPI_Type_free(&(spawn_data->dtype)); *info = MPI_INFO_NULL;
}
} }
if(spawn_data->mapping != MPI_INFO_NULL) {
MPI_Info_free(&(spawn_data->mapping)); if(spawn_data->sets != NULL) {
free(spawn_data->sets);
spawn_data->sets = NULL;
} }
if(spawn_data->spawn_is_async) { if(spawn_data->spawn_is_async) {
free_spawn_state(); free_spawn_state();
} }
...@@ -293,21 +241,23 @@ void generic_spawn(MPI_Comm *child, int data_stage) { ...@@ -293,21 +241,23 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
int local_state, aux_state; int local_state, aux_state;
// WORK // WORK
if(spawn_data->myId == spawn_data->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
processes_dist(spawn_data->dist, &(spawn_data->mapping)); processes_dist(spawn_data);
} }
switch(spawn_data->spawn_method) { switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE: case MAM_SPAWN_BASELINE:
local_state = baseline(*spawn_data, child); local_state = baseline(*spawn_data, child);
if(!spawn_data->spawn_is_intercomm) {
local_state = intracomm_strategy(MAM_SOURCES, child);
}
break; break;
case MALL_SPAWN_MERGE: case MAM_SPAWN_MERGE:
local_state = merge(*spawn_data, child, data_stage); local_state = merge(*spawn_data, child, data_stage);
break; break;
} }
// END WORK // END WORK
end_time = MPI_Wtime();
aux_state = get_spawn_state(spawn_data->spawn_is_async); aux_state = get_spawn_state(spawn_data->spawn_is_async);
if(!(aux_state == MALL_SPAWN_PENDING && local_state == MALL_SPAWN_ADAPT_POSTPONE)) { if(!(aux_state == MAM_I_SPAWN_PENDING && local_state == MAM_I_SPAWN_ADAPT_POSTPONE)) {
set_spawn_state(local_state, spawn_data->spawn_is_async); set_spawn_state(local_state, spawn_data->spawn_is_async);
} }
} }
...@@ -320,8 +270,8 @@ void generic_spawn(MPI_Comm *child, int data_stage) { ...@@ -320,8 +270,8 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
* No se puede realizar un "join" sobre el hilo y el mismo libera su memoria * No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
* asociado al terminar. * asociado al terminar.
*/ */
int allocate_thread_spawn() { int allocate_thread_spawn(MPI_Comm *child) {
if(pthread_create(&spawn_thread, NULL, thread_work, NULL)) { if(pthread_create(&spawn_thread, NULL, thread_work, (void *) child)) {
printf("Error al crear el hilo de SPAWN\n"); printf("Error al crear el hilo de SPAWN\n");
MPI_Abort(MPI_COMM_WORLD, -1); MPI_Abort(MPI_COMM_WORLD, -1);
return -1; return -1;
...@@ -341,19 +291,20 @@ int allocate_thread_spawn() { ...@@ -341,19 +291,20 @@ int allocate_thread_spawn() {
* Una vez esta lista la configuracion y es posible crear los procesos * Una vez esta lista la configuracion y es posible crear los procesos
* se avisa al hilo maestro. * se avisa al hilo maestro.
*/ */
void* thread_work() { void* thread_work(void *args) {
int local_state; int local_state;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm)); MPI_Comm *child = (MPI_Comm *) args;
generic_spawn(returned_comm, MALL_NOT_STARTED); generic_spawn(child, MAM_I_NOT_STARTED);
local_state = get_spawn_state(MALL_SPAWN_PTHREAD); local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE || local_state == MALL_SPAWN_PENDING) { if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE || local_state == MAM_I_SPAWN_PENDING) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos // El grupo de procesos se terminara de juntar tras la redistribucion de datos
local_state = wait_wakeup(); local_state = wait_redistribution();
generic_spawn(returned_comm, MALL_DIST_COMPLETED); generic_spawn(child, MAM_I_DIST_COMPLETED);
} }
wakeup_completion();
pthread_exit(NULL); pthread_exit(NULL);
} }
...@@ -362,23 +313,26 @@ void* thread_work() { ...@@ -362,23 +313,26 @@ void* thread_work() {
* Comprueba si una creacion de procesos asincrona en el * Comprueba si una creacion de procesos asincrona en el
* paso "single" ha terminado. * paso "single" ha terminado.
* Si no ha terminado se mantiene el estado * Si no ha terminado se mantiene el estado
* "MALL_SPAWN_SINGLE_PENDING". * "MAM_I_SPAWN_SINGLE_PENDING".
* *
* Si ha terminado se crean los hilos auxiliares para * Si ha terminado se crean los hilos auxiliares para
* los procesos no root y se devuelve el estado * los procesos no root y se devuelve el estado
* "MALL_SPAWN_PENDING". * "MAM_I_SPAWN_PENDING".
*/ */
int check_single_state(MPI_Comm comm, int global_state) { int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed) {
MPI_Bcast(&global_state, 1, MPI_INT, spawn_data->root, comm); while(wait_completed && mall->myId == mall->root && global_state == MAM_I_SPAWN_SINGLE_PENDING) {
global_state = wait_completion();
}
MPI_Bcast(&global_state, 1, MPI_INT, mall->root, comm);
// Non-root processes join root to finalize the spawn // Non-root processes join root to finalize the spawn
// They also must join if the application has ended its work // They also must join if the application has ended its work
if(global_state == MALL_SPAWN_SINGLE_COMPLETED) { if(global_state == MAM_I_SPAWN_SINGLE_COMPLETED) {
global_state = MALL_SPAWN_PENDING; global_state = MAM_I_SPAWN_PENDING;
set_spawn_state(global_state, MALL_SPAWN_PTHREAD); set_spawn_state(global_state, spawn_data->spawn_is_async);
if(spawn_data->myId != spawn_data->root) { if(mall->myId != mall->root) {
allocate_thread_spawn(spawn_data); allocate_thread_spawn(child);
} }
} }
return global_state; return global_state;
...@@ -388,20 +342,19 @@ int check_single_state(MPI_Comm comm, int global_state) { ...@@ -388,20 +342,19 @@ int check_single_state(MPI_Comm comm, int global_state) {
* Comprueba si una creación de procesos asincrona en el * Comprueba si una creación de procesos asincrona en el
* paso "generic" ha terminado. * paso "generic" ha terminado.
* Si no ha terminado devuelve el estado * Si no ha terminado devuelve el estado
* "MALL_SPAWN_PENDING". * "MAM_I_SPAWN_PENDING".
* *
* Si ha terminado libera la memoria asociada a spawn_data * Si ha terminado libera la memoria asociada a spawn_data
* y devuelve el estado "MALL_SPAWN_COMPLETED". * y devuelve el estado "MAM_I_SPAWN_COMPLETED".
*/ */
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time) { int check_generic_state(MPI_Comm comm, int local_state, int wait_completed) {
int global_state; int global_state;
while(wait_completed && local_state == MAM_I_SPAWN_PENDING) local_state = wait_completion();
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm); MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) { if(global_state == MAM_I_SPAWN_COMPLETED || global_state == MAM_I_SPAWN_ADAPTED) {
set_spawn_state(global_state, MALL_SPAWN_PTHREAD); set_spawn_state(global_state, spawn_data->spawn_is_async);
*child = *returned_comm;
deallocate_spawn_data(spawn_data);
*real_time=end_time;
} }
return global_state; return global_state;
} }
#ifndef MALLEABILITY_GENERIC_SPAWN_H #ifndef MAM_GENERIC_SPAWN_H
#define MALLEABILITY_GENERIC_SPAWN_H #define MAM_GENERIC_SPAWN_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include "../malleabilityDataStructures.h" #include "Spawn_DataStructure.h"
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child); int init_spawn(MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time); int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed);
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents); void malleability_connect_children(MPI_Comm *parents);
void unset_spawn_postpone_flag(int outside_state); void unset_spawn_postpone_flag(int outside_state);
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result);
#endif #endif
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include "../malleabilityStates.h" #include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "Merge.h" #include "Merge.h"
#include "Baseline.h" #include "Baseline.h"
...@@ -16,11 +17,11 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) { ...@@ -16,11 +17,11 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
int is_children_group = 1; int is_children_group = 1;
if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink
if(data_state == MALL_DIST_COMPLETED) { if(data_state == MAM_I_DIST_COMPLETED) {
merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, spawn_data.myId); merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, mall->myId);
local_state = MALL_SPAWN_ADAPTED; local_state = MAM_I_SPAWN_ADAPTED;
} else { } else {
local_state = MALL_SPAWN_ADAPT_POSTPONE; local_state = MAM_I_SPAWN_ADAPT_POSTPONE;
} }
} else { //Expand } else { //Expand
MPI_Comm_get_parent(&intercomm); MPI_Comm_get_parent(&intercomm);
...@@ -29,12 +30,17 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) { ...@@ -29,12 +30,17 @@ int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
baseline(spawn_data, child); baseline(spawn_data, child);
merge_adapt_expand(child, is_children_group); merge_adapt_expand(child, is_children_group);
local_state = MALL_SPAWN_COMPLETED; local_state = MAM_I_SPAWN_COMPLETED;
} }
return local_state; return local_state;
} }
int intracomm_strategy(int is_children_group, MPI_Comm *child) {
merge_adapt_expand(child, is_children_group);
return MAM_I_SPAWN_COMPLETED;
}
//--------------PRIVATE MERGE TYPE FUNCTIONS---------------// //--------------PRIVATE MERGE TYPE FUNCTIONS---------------//
/* /*
...@@ -51,13 +57,8 @@ void merge_adapt_expand(MPI_Comm *child, int is_children_group) { ...@@ -51,13 +57,8 @@ void merge_adapt_expand(MPI_Comm *child, int is_children_group) {
MPI_Intercomm_merge(*child, is_children_group, &new_comm); //El que pone 0 va primero MPI_Intercomm_merge(*child, is_children_group, &new_comm); //El que pone 0 va primero
MPI_Comm_free(child); //POSIBLE ERROR? MPI_Comm_disconnect(child);
*child = new_comm; *child = new_comm;
//*numP = numC; //TODO REFACTOR Llevar a otra parte -- Hacer solo si MALL_SPAWN_ADAPTED
//if(*comm != MPI_COMM_WORLD && *comm != MPI_COMM_NULL) {
// MPI_Comm_free(comm);
//}
} }
...@@ -71,6 +72,7 @@ void merge_adapt_expand(MPI_Comm *child, int is_children_group) { ...@@ -71,6 +72,7 @@ void merge_adapt_expand(MPI_Comm *child, int is_children_group) {
void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId) { void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId) {
int color = MPI_UNDEFINED; int color = MPI_UNDEFINED;
if(*child != MPI_COMM_NULL && *child != MPI_COMM_WORLD) MPI_Comm_disconnect(child);
if(myId < numC) { if(myId < numC) {
color = 1; color = 1;
} }
......
#ifndef MALLEABILITY_SPAWN_MERGE_H #ifndef MAM_SPAWN_MERGE_H
#define MALLEABILITY_SPAWN_MERGE_H #define MAM_SPAWN_MERGE_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include "../malleabilityDataStructures.h" #include "Spawn_DataStructure.h"
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state); int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state);
int intracomm_strategy(int is_children_group, MPI_Comm *child);
#endif #endif
...@@ -6,94 +6,75 @@ ...@@ -6,94 +6,75 @@
#include <string.h> #include <string.h>
#include <mpi.h> #include <mpi.h>
#include "ProcessDist.h" #include "ProcessDist.h"
#include "../MAM_Constants.h"
//#define USE_SLURM #include "../MAM_DataStructures.h"
//--------------PRIVATE DECLARATIONS---------------// //--------------PRIVATE DECLARATIONS---------------//
void node_dist( struct physical_dist dist, int **qty, int *used_nodes); void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns);
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs); void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs); void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs);
void set_spawn_cmd(size_t nodes, Spawn_data *spawn_data);
void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void generate_info_string(int target_qty, MPI_Info *info); void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data);
void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name);
//--------------------------------SLURM USAGE-------------------------------------// //--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM #if MAM_USE_SLURM
#include <slurm/slurm.h> #include <slurm/slurm.h>
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info); void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str); void generate_multiple_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data);
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str);
//@deprecated functions //@deprecated functions
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info); void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data);
void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes); void fill_hostfile_slurm(char *nodelist, int ptr, int *qty, int used_nodes);
#endif #endif
//--------------------------------SLURM USAGE-------------------------------------// //--------------------------------SLURM USAGE-------------------------------------//
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name);
//@deprecated functions //@deprecated functions
int create_hostfile(char **file_name); int create_hostfile(char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name); int write_hostfile_node(int ptr, int qty, char *node_name);
//--------------PUBLIC FUNCTIONS---------------// //--------------PUBLIC FUNCTIONS---------------//
/*
* Pone los datos para una estructura que guarda los parametros
* para realizar un mappeado de los procesos.
*
* Si la memoria no esta reservada devuelve falso y no hace nada.
* Si puede realizar los cambios devuelve verdadero.
*
* IN parameters -->
* target_qty: Numero de procesos tras la reconfiguracion
* alreadyCreated: Numero de procesos padre a considerar
* La resta de target_qty-alreadyCreated es el numero de hijos a crear
* num_cpus: Numero de cpus totales (En uso o no)
* num_nodes: Numero de nodos disponibles por esta aplicacion
* info_type: Indica como realizar el mappeado, si indicarlo
* en una cadena (MALL_DIST_STRING) o en un hostfile
* (MALL_DIST_HOSTFILE)
* dist_type: Indica como sera el mappeado, si intentar rellenar
* primero los nodos con cpus ya usados (CPUS/BEST/COMPACT) o
* que todos los nodos tengan el mismo numero de cpus usados
* (NODES/WORST/SPREAD)
*/
int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist) {
dist->target_qty = target_qty;
dist->already_created = already_created;
dist->num_cpus = num_cpus;
dist->num_nodes = num_nodes;
dist->nodelist = nodelist;
dist->dist_type = dist_type;
dist->info_type = info_type;
return 1;
}
/* /*
* Configura la creacion de un nuevo grupo de procesos, reservando la memoria * Configura la creacion de un nuevo grupo de procesos, reservando la memoria
* para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
* para los procesos y creando un fichero hostfile. * para los procesos y creando un fichero hostfile.
* *
* OUT parameters -->
* info_spawn: Objeto MPI_Info en el que se indica el mappeado
* a usar al crear los procesos.
*/ */
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) { void processes_dist(Spawn_data *spawn_data) {
#ifdef USE_SLURM
int used_nodes=0; int used_nodes=0;
int *procs_array; int *procs_array;
// GET NEW DISTRIBUTION // GET NEW DISTRIBUTION
node_dist(dist, &procs_array, &used_nodes); node_dist(*spawn_data, &procs_array, &used_nodes, &spawn_data->total_spawns);
switch(dist.info_type) { spawn_data->sets = (Spawn_set *) malloc(spawn_data->total_spawns * sizeof(Spawn_set));
case MALL_DIST_STRING: #if MAM_USE_SLURM
generate_info_string_slurm(dist.nodelist, procs_array, used_nodes, info_spawn); switch(spawn_data->mapping_fill_method) {
case MAM_PHY_TYPE_STRING:
if(spawn_data->spawn_is_multiple) {
generate_multiple_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
} else {
generate_info_string_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
}
break; break;
case MALL_DIST_HOSTFILE: case MAM_PHY_TYPE_HOSTFILE: // FIXME Does not consider multiple spawn strat
generate_info_hostfile_slurm(dist.nodelist, procs_array, used_nodes, info_spawn); generate_info_hostfile_slurm(mall->nodelist, procs_array, used_nodes, spawn_data);
break; break;
} }
free(procs_array);
#else #else
generate_info_string(dist.target_qty, info_spawn); if(spawn_data->spawn_is_multiple) {
generate_multiple_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
} else {
generate_info_string(mall->nodelist, procs_array, used_nodes, spawn_data);
}
#endif #endif
set_spawn_cmd(used_nodes, spawn_data);
free(procs_array);
} }
...@@ -101,35 +82,44 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) { ...@@ -101,35 +82,44 @@ void processes_dist(struct physical_dist dist, MPI_Info *info_spawn) {
//-----------------DISTRIBUTION-----------------// //-----------------DISTRIBUTION-----------------//
/* /*
* Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
* cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada * cuantos nodos se van a utilizar, la cantidad de procesos que alojara cada
* nodo. * nodo y cuantas creaciones de procesos seran necesarias.
* *
* Se permiten dos tipos de distribuciones fisicas segun el valor de "dist_type": * Se permiten dos tipos de distribuciones fisicas segun el valor de "spawn_dist":
* *
* COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre * COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre
* todos los nodos disponibles. * todos los nodos disponibles.
* COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de * COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de
* ocupar otro nodo. * ocupar otro nodo.
*/ */
void node_dist(struct physical_dist dist, int **qty, int *used_nodes) { void node_dist(Spawn_data spawn_data, int **qty, int *used_nodes, int *total_spawns) {
int i, *procs; int i, *procs;
procs = calloc(mall->num_nodes, sizeof(int)); // Numero de procesos por nodo
procs = calloc(dist.num_nodes, sizeof(int)); // Numero de procesos por nodo
/* GET NEW DISTRIBUTION */ /* GET NEW DISTRIBUTION */
switch(dist.dist_type) { switch(mall_conf->spawn_dist) {
case MALL_DIST_SPREAD: // DIST NODES @deprecated case MAM_PHY_DIST_SPREAD: // DIST NODES @deprecated
spread_dist(dist, used_nodes, procs); spread_dist(spawn_data, used_nodes, procs);
break; break;
case MALL_DIST_COMPACT: // DIST CPUs case MAM_PHY_DIST_COMPACT: // DIST CPUs
compact_dist(dist, used_nodes, procs); compact_dist(spawn_data, used_nodes, procs);
break; break;
} }
//Copy results to output vector qty //Copy results to output vector qty
*qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i]; // if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL) ) {
if(spawn_data.spawn_is_multiple) {
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
if(procs[i]) (*total_spawns)++;
}
} else {
*total_spawns = 1;
for(i=0; i< *used_nodes; i++) {
(*qty)[i] = procs[i];
}
} }
free(procs); free(procs);
} }
...@@ -139,18 +129,22 @@ void node_dist(struct physical_dist dist, int **qty, int *used_nodes) { ...@@ -139,18 +129,22 @@ void node_dist(struct physical_dist dist, int **qty, int *used_nodes) {
* para que todos los nodos tengan el mismo numero. Devuelve el total de * para que todos los nodos tengan el mismo numero. Devuelve el total de
* nodos utilizados y el numero de procesos a crear en cada nodo. * nodos utilizados y el numero de procesos a crear en cada nodo.
* *
* TODO Tener en cuenta procesos ya creados (already_created) * Asume que los procesos que ya existen estan en los nodos mas bajos
* con el mismo tamBl. //FIXME No deberia asumir el tamBl.
*
* FIXME Tener en cuenta localizacion de procesos ya creados (already_created)
*/ */
void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) { void spread_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, tamBl, remainder; int i, tamBl, remainder;
*used_nodes = dist.num_nodes; *used_nodes = mall->num_nodes;
tamBl = dist.target_qty / dist.num_nodes; tamBl = spawn_data.target_qty / *used_nodes;
remainder = dist.target_qty % dist.num_nodes; i = spawn_data.already_created / tamBl;
for(i=0; i<remainder; i++) { remainder = spawn_data.already_created % tamBl;
procs[i] = tamBl + 1; if(remainder) {
procs[i++] = tamBl - remainder;
} }
for(i=remainder; i<dist.num_nodes; i++) { for(; i<*used_nodes; i++) {
procs[i] = tamBl; procs[i] = tamBl;
} }
} }
...@@ -163,41 +157,69 @@ void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -163,41 +157,69 @@ void spread_dist(struct physical_dist dist, int *used_nodes, int *procs) {
* Tiene en cuenta los procesos ya existentes para el mappeado de * Tiene en cuenta los procesos ya existentes para el mappeado de
* los procesos a crear. * los procesos a crear.
*/ */
void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { void compact_dist(Spawn_data spawn_data, int *used_nodes, int *procs) {
int i, asigCores; int i, asigCores;
int tamBl, remainder; int tamBl, remainder;
tamBl = dist.num_cpus / dist.num_nodes; tamBl = mall->num_cpus;
asigCores = dist.already_created; asigCores = spawn_data.already_created;
i = *used_nodes = dist.already_created / tamBl; i = *used_nodes = spawn_data.already_created / tamBl;
remainder = dist.already_created % tamBl; remainder = spawn_data.already_created % tamBl;
//FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres //FIXME REFACTOR Que pasa si los nodos 1 y 2 tienen espacios libres
//First nodes could already have existing procs //First nodes could already have existing procs
//Start from the first with free spaces //Start from the first with free spaces
if (remainder) { if (remainder && asigCores + (tamBl - remainder) < spawn_data.target_qty) {
procs[i] = tamBl - remainder; procs[i] = tamBl - remainder;
asigCores += procs[i]; asigCores += procs[i];
i = (i+1) % dist.num_nodes; i = (i+1) % mall->num_nodes;
(*used_nodes)++; (*used_nodes)++;
} }
//Assign tamBl to each node //Assign tamBl to each node
while(asigCores+tamBl <= dist.target_qty) { while(asigCores+tamBl <= spawn_data.target_qty) {
asigCores += tamBl; asigCores += tamBl;
procs[i] += tamBl; procs[i] += tamBl;
i = (i+1) % dist.num_nodes; i = (i+1) % mall->num_nodes;
(*used_nodes)++; (*used_nodes)++;
} }
//Last node could have less procs than tamBl //Last node could have less procs than tamBl
if(asigCores < dist.target_qty) { if(asigCores < spawn_data.target_qty) {
procs[i] += dist.target_qty - asigCores; procs[i] += spawn_data.target_qty - asigCores;
(*used_nodes)++; (*used_nodes)++;
} }
if(*used_nodes > dist.num_nodes) *used_nodes = dist.num_nodes; //FIXME Si ocurre esto no es un error? if(*used_nodes > mall->num_nodes) *used_nodes = mall->num_nodes; //FIXME Si ocurre esto no es un error?
} }
//--------------PRIVATE FUNCTIONS---------------//
//-------------------CMD SET--------------------//
/*
* Comprueba que comando hay que llamar al realizar
* el spawn. Todos los sets tienen que hacer el mismo
* comando.
*
*/
void set_spawn_cmd(size_t nodes, Spawn_data *spawn_data) {
size_t index = 0;
char *cmd_aux;
switch(mall_conf->external_usage) {
case MAM_USE_VALGRIND:
cmd_aux = MAM_VALGRIND_SCRIPT;
break;
case MAM_USE_EXTRAE:
cmd_aux = MAM_EXTRAE_SCRIPT;
break;
default:
cmd_aux = mall->name_exec;
break;
}
for(; index<nodes; index++) {
spawn_data->sets[index].cmd = cmd_aux;
}
}
//--------------PRIVATE FUNCTIONS---------------// //--------------PRIVATE FUNCTIONS---------------//
//-------------------INFO SET-------------------// //-------------------INFO SET-------------------//
...@@ -207,73 +229,94 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -207,73 +229,94 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
* en el que se indica el mappeado a utilizar en los nuevos * en el que se indica el mappeado a utilizar en los nuevos
* procesos. * procesos.
* *
* Actualmente no considera que puedan haber varios nodos *
* y lleva todos al mismo. Las funciones "generate_info_string_slurm"
* o "generate_info_hostfile_slurm" permiten utilizar varios
* nodos, pero es necesario activar Slurm.
*/ */
void generate_info_string(int target_qty, MPI_Info *info){ void generate_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
char *host_string, *host; char *host_str;
int len, err;
host = "localhost"; fill_str_hosts(nodelist, procs_array, nodes, &host_str);
//host = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
//MPI_Get_processor_name(host, &len);
// CREATE AND SET STRING HOSTS
err = write_str_node(&host_string, 0, target_qty, host);
if (err<0) {printf("Error when generating mapping: %d\n", err); MPI_Abort(MPI_COMM_WORLD, err);}
// SET MAPPING // SET MAPPING
MPI_Info_create(info); set_mapping_host(spawn_data->spawn_qty, host_str, 0, spawn_data);
MPI_Info_set(*info, "hosts", host_string); free(host_str);
//free(host);
free(host_string);
} }
//--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM
/* /*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping * Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos * en el que se indica el mappeado a utilizar en los nuevos
* procesos. * procesos.
* Es necesario usar Slurm para usarlo. *
*
*/ */
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, MPI_Info *info){ void generate_multiple_info_string(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
// CREATE AND SET STRING HOSTS char *host, *aux, *token, *hostlist_str;
char *hoststring; size_t i=0,j=0,len=0;
fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
MPI_Info_create(info); aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
MPI_Info_set(*info, "hosts", hoststring); strcpy(aux, nodelist);
free(hoststring); token = strtok(aux, ",");
while (token != NULL && i < nodes) {
host = strdup(token);
if (procs_array[i] != 0) {
write_str_node(&hostlist_str, len, procs_array[i], host);
set_mapping_host(procs_array[i], hostlist_str, j, spawn_data);
free(hostlist_str); hostlist_str = NULL;
j++;
}
i++;
free(host);
token = strtok(NULL, ",");
}
free(aux);
if(hostlist_str != NULL) { free(hostlist_str); }
} }
//--------------PRIVATE FUNCTIONS---------------//
//---------------MAPPING UTILITY----------------//
//----------------------------------------------//
/*
* Anyade en la siguiente entrada de spawns la
* distribucion fisica a utilizar con un par
* host/mapping y el total de procesos.
*/
void set_mapping_host(int qty, char *host, size_t index, Spawn_data *spawn_data) {
MPI_Info *info;
spawn_data->sets[index].spawn_qty = qty;
info = &(spawn_data->sets[index].mapping);
MPI_Info_create(info);
MPI_Info_set(*info, "hosts", host);
}
/* /*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts" * Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados. * al crear procesos e indicar donde tienen que ser creados.
*/ */
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostfile_str) { void fill_str_hosts(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
char *host; char *host, *aux, *token;
size_t i=0,len=0; size_t i=0,len=0;
hostlist_t hostlist;
aux = (char *) malloc((strlen(nodelist)+1) * sizeof(char));
hostlist = slurm_hostlist_create(nodelist); strcpy(aux, nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) { token = strtok(aux, ",");
if(qty[i] != 0) { while (token != NULL && i < used_nodes) {
len = write_str_node(hostfile_str, len, qty[i], host); host = strdup(token);
if (qty[i] != 0) {
len = write_str_node(hostlist_str, len, qty[i], host);
} }
i++; i++;
free(host); free(host);
token = strtok(NULL, ",");
} }
slurm_hostlist_destroy(hostlist); free(aux);
} }
#endif
//--------------------------------SLURM USAGE-------------------------------------//
/* /*
* Añade en una cadena "qty" entradas de "node_name". * Añade en una cadena "qty" entradas de "node_name".
* Realiza la reserva de memoria y la realoja si es necesario. * Realiza la reserva de memoria y la realoja si es necesario.
*/ */
int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_name) { int write_str_node(char **hostlist_str, size_t len_og, size_t qty, char *node_name) {
int err; int err;
char *ocurrence; char *ocurrence;
size_t i, len, len_node; size_t i, len, len_node;
...@@ -282,11 +325,11 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na ...@@ -282,11 +325,11 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na
len = qty * len_node; // Number of times the node is used len = qty * len_node; // Number of times the node is used
if(len_og == 0) { // Memoria no reservada if(len_og == 0) { // Memoria no reservada
*hostfile_str = (char *) malloc((len+1) * sizeof(char)); *hostlist_str = (char *) malloc((len+1) * sizeof(char));
} else { // Cadena ya tiene datos } else { // Cadena ya tiene datos
*hostfile_str = (char *) realloc(*hostfile_str, (len_og + len + 1) * sizeof(char)); *hostlist_str = (char *) realloc(*hostlist_str, (len_og + len + 1) * sizeof(char));
} }
if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria if(hostlist_str == NULL) return -1; // No ha sido posible alojar la memoria
ocurrence = (char *) malloc((len_node+1) * sizeof(char)); ocurrence = (char *) malloc((len_node+1) * sizeof(char));
if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria if(ocurrence == NULL) return -2; // No ha sido posible alojar la memoria
...@@ -296,10 +339,10 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na ...@@ -296,10 +339,10 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na
i=0; i=0;
if(len_og == 0) { // Si se inicializa, la primera es una copia if(len_og == 0) { // Si se inicializa, la primera es una copia
i++; i++;
strcpy(*hostfile_str, node_name); strcpy(*hostlist_str, node_name);
} }
for(; i<qty; i++){ // Las siguientes se conctanenan for(; i<qty; i++){ // Las siguientes se conctanenan
strcat(*hostfile_str, ocurrence); strcat(*hostlist_str, ocurrence);
} }
...@@ -307,22 +350,89 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na ...@@ -307,22 +350,89 @@ int write_str_node(char **hostfile_str, size_t len_og, size_t qty, char *node_na
return len+len_og; return len+len_og;
} }
//--------------------------------SLURM USAGE-------------------------------------//
#if MAM_USE_SLURM
/*
* Crea y devuelve un objeto MPI_Info con un par hosts/mapping
* en el que se indica el mappeado a utilizar en los nuevos
* procesos.
* Es necesario usar Slurm para usarlo.
*/
void generate_info_string_slurm(char *nodelist, int *procs_array, size_t nodes, Spawn_data *spawn_data){
char *hoststring;
// CREATE AND SET STRING HOSTS
fill_str_hosts_slurm(nodelist, procs_array, nodes, &hoststring);
set_mapping_host(spawn_data->spawn_qty, hoststring, 0, spawn_data);
free(hoststring);
}
/*
* Crea y devuelve un conjunto de objetos MPI_Info con
* un par host/mapping en el que se indica el mappeado
* a utilizar en los nuevos procesos dividido por nodos.
* Es necesario Slurm para usarlo.
*/
void generate_multiple_info_string_slurm(char *nodelist, int *qty, size_t used_nodes, Spawn_data *spawn_data) {
char *host, *hostlist_str;
size_t i=0,j=0,len=0;
hostlist_t hostlist;
hostlist_str = NULL;
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
write_str_node(&hostlist_str, len, qty[i], host);
set_mapping_host(qty[i], hostlist_str, j, spawn_data);
free(hostlist_str); hostlist_str = NULL;
j++;
}
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
if(hostlist_str != NULL) { free(hostlist_str); }
}
/*
* Crea y devuelve una cadena para ser utilizada por la llave "hosts"
* al crear procesos e indicar donde tienen que ser creados.
*/
void fill_str_hosts_slurm(char *nodelist, int *qty, size_t used_nodes, char **hostlist_str) {
char *host;
size_t i=0,len=0;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(nodelist);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
if(qty[i] != 0) {
len = write_str_node(hostlist_str, len, qty[i], host);
}
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
}
//==================================================== //====================================================
//==================================================== //====================================================
//============DEPRECATED FUNCTIONS==================== //============DEPRECATED FUNCTIONS====================
//==================================================== //====================================================
//==================================================== //====================================================
//--------------------------------SLURM USAGE-------------------------------------//
#ifdef USE_SLURM
/* FIXME Por revisar /* FIXME Por revisar
* @deprecated * @deprecated
* Genera un fichero hostfile y lo anyade a un objeto * Genera un fichero hostfile y lo anyade a un objeto
* MPI_Info para ser utilizado. * MPI_Info para ser utilizado.
*/ */
void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, MPI_Info *info){ void generate_info_hostfile_slurm(char *nodelist, int *procs_array, int nodes, Spawn_data *spawn_data){
char *hostfile; char *hostfile;
int ptr; int ptr;
MPI_Info *info;
spawn_data->sets[0].spawn_qty = spawn_data->spawn_qty;
info = &(spawn_data->sets[0].mapping);
// CREATE/UPDATE HOSTFILE // CREATE/UPDATE HOSTFILE
ptr = create_hostfile(&hostfile); ptr = create_hostfile(&hostfile);
...@@ -413,17 +523,3 @@ int write_hostfile_node(int ptr, int qty, char *node_name) { ...@@ -413,17 +523,3 @@ int write_hostfile_node(int ptr, int qty, char *node_name) {
} }
#endif #endif
//--------------------------------SLURM USAGE-------------------------------------// //--------------------------------SLURM USAGE-------------------------------------//
//TODO REFACTOR PARA CUANDO SE COMUNIQUE CON RMS
// Get Slurm job info
//int jobId;
//char *tmp;
//job_info_msg_t *j_info;
//slurm_job_info_t last_record;
//tmp = getenv("SLURM_JOB_ID");
//jobId = atoi(tmp);
//slurm_load_job(&j_info, jobId, 1);
//last_record = j_info->job_array[j_info->record_count - 1];
// Free JOB INFO
//slurm_free_job_info_msg(j_info);
#ifndef MALLEABILITY_SPAWN_PROCESS_DIST_H #ifndef MAM_SPAWN_PROCESS_DIST_H
#define MALLEABILITY_SPAWN_PROCESS_DIST_H #define MAM_SPAWN_PROCESS_DIST_H
#include <stdio.h> #include "Spawn_DataStructure.h"
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#define MALL_DIST_SPREAD 1 void processes_dist(Spawn_data *spawn_data);
#define MALL_DIST_COMPACT 2
#define MALL_DIST_STRING 1
#define MALL_DIST_HOSTFILE 2
int physical_struct_create(int target_qty, int already_created, int num_cpus, int num_nodes, char *nodelist, int dist_type, int info_type, struct physical_dist *dist);
void processes_dist(struct physical_dist dist, MPI_Info *info_spawn);
#endif #endif
#ifndef MAM_SPAWN_DATASTRUCTURE_H
#define MAM_SPAWN_DATASTRUCTURE_H
#include <mpi.h>
/* --- SPAWN STRUCTURE --- */
typedef struct {
int spawn_qty;
char *cmd;
MPI_Info mapping;
} Spawn_set;
typedef struct {
int spawn_qty, initial_qty, target_qty;
int already_created;
int total_spawns;
int spawn_is_single, spawn_is_async, spawn_is_intercomm, spawn_is_multiple;
// MPI_Info mapping;
int mapping_fill_method;
MPI_Comm comm, returned_comm; // ONLY SET FOR SOURCE PROCESSES
Spawn_set *sets;
} Spawn_data;
#endif
...@@ -4,19 +4,21 @@ ...@@ -4,19 +4,21 @@
#include "Spawn_state.h" #include "Spawn_state.h"
pthread_mutex_t spawn_mutex; pthread_mutex_t spawn_mutex;
pthread_cond_t spawn_cond; pthread_cond_t spawn_cond, completion_cond;
int spawn_state; int spawn_state;
int waiting_redistribution=0; int waiting_redistribution=0, waiting_completion=0;
void init_spawn_state() { void init_spawn_state() {
pthread_mutex_init(&spawn_mutex,NULL); pthread_mutex_init(&spawn_mutex,NULL);
pthread_cond_init(&spawn_cond,NULL); pthread_cond_init(&spawn_cond,NULL);
pthread_cond_init(&completion_cond,NULL);
set_spawn_state(1,0); //FIXME First parameter is a horrible magical number set_spawn_state(1,0); //FIXME First parameter is a horrible magical number
} }
void free_spawn_state() { void free_spawn_state() {
pthread_mutex_destroy(&spawn_mutex); pthread_mutex_destroy(&spawn_mutex);
pthread_cond_destroy(&spawn_cond); pthread_cond_destroy(&spawn_cond);
pthread_cond_destroy(&completion_cond);
} }
int get_spawn_state(int is_async) { int get_spawn_state(int is_async) {
...@@ -41,7 +43,7 @@ void set_spawn_state(int value, int is_async) { ...@@ -41,7 +43,7 @@ void set_spawn_state(int value, int is_async) {
} }
} }
int wait_wakeup() { int wait_redistribution() {
pthread_mutex_lock(&spawn_mutex); pthread_mutex_lock(&spawn_mutex);
if(!waiting_redistribution) { if(!waiting_redistribution) {
waiting_redistribution=1; waiting_redistribution=1;
...@@ -52,7 +54,7 @@ int wait_wakeup() { ...@@ -52,7 +54,7 @@ int wait_wakeup() {
return get_spawn_state(1); return get_spawn_state(1);
} }
void wakeup() { void wakeup_redistribution() {
pthread_mutex_lock(&spawn_mutex); pthread_mutex_lock(&spawn_mutex);
if(waiting_redistribution) { if(waiting_redistribution) {
pthread_cond_signal(&spawn_cond); pthread_cond_signal(&spawn_cond);
...@@ -60,3 +62,23 @@ void wakeup() { ...@@ -60,3 +62,23 @@ void wakeup() {
waiting_redistribution=1; waiting_redistribution=1;
pthread_mutex_unlock(&spawn_mutex); pthread_mutex_unlock(&spawn_mutex);
} }
int wait_completion() {
pthread_mutex_lock(&spawn_mutex);
if(!waiting_completion) {
waiting_completion=1;
pthread_cond_wait(&completion_cond, &spawn_mutex);
}
waiting_completion=0;
pthread_mutex_unlock(&spawn_mutex);
return get_spawn_state(1);
}
void wakeup_completion() {
pthread_mutex_lock(&spawn_mutex);
if(waiting_completion) {
pthread_cond_signal(&completion_cond);
}
waiting_completion=1;
pthread_mutex_unlock(&spawn_mutex);
}
#ifndef MALLEABILITY_SPAWN_STATE_H #ifndef MAM_SPAWN_STATE_H
#define MALLEABILITY_SPAWN_STATE_H #define MAM_SPAWN_STATE_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -11,7 +11,10 @@ void free_spawn_state(); ...@@ -11,7 +11,10 @@ void free_spawn_state();
int get_spawn_state(int is_async); int get_spawn_state(int is_async);
void set_spawn_state(int value, int is_async); void set_spawn_state(int value, int is_async);
int wait_wakeup(); int wait_redistribution();
void wakeup(); void wakeup_redistribution();
int wait_completion();
void wakeup_completion();
#endif #endif
...@@ -2,11 +2,10 @@ ...@@ -2,11 +2,10 @@
#SBATCH -p P1 #SBATCH -p P1
#SBATCH -N 1 #SBATCH -N 1
#SBATCH --exclude=c01,c00,c02
dir="/home/martini/malleability_benchmark"
partition='P1' partition='P1'
source build/config.txt
codeDir="/Codes" codeDir="/Codes"
execDir="/Exec" execDir="/Exec"
cores=$(bash $dir$execDir/BashScripts/getCores.sh $partition) cores=$(bash $dir$execDir/BashScripts/getCores.sh $partition)
...@@ -21,14 +20,13 @@ then ...@@ -21,14 +20,13 @@ then
outIndex=$2 outIndex=$2
fi fi
echo "MPICH" echo "MPICH provider=$FI_PROVIDER"
#export HYDRA_DEBUG=1 mpirun --version
numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0) numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0)
initial_nodelist=$(bash $dir$execDir/BashScripts/createInitialNodelist.sh $numP $cores $nodelist) initial_nodelist=$(bash $dir$execDir/BashScripts/createInitialNodelist.sh $numP $cores $nodelist)
echo $initial_nodelist echo $initial_nodelist
echo "Test PreRUN $numP $nodelist" echo "Test PreRUN $numP $nodelist"
mpirun -hosts $initial_nodelist -np $numP $dir$codeDir/build/a.out $configFile $outIndex $nodelist $nodes mpirun -hosts $initial_nodelist -np $numP $dir$codeDir/build/a.out $configFile $outIndex
echo "END RUN" echo "END RUN"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
......
...@@ -4,7 +4,8 @@ ...@@ -4,7 +4,8 @@
#SBATCH -N 1 #SBATCH -N 1
#SBATCH --exclude=c01,c00,c02 #SBATCH --exclude=c01,c00,c02
dir="/home/martini/malleability_benchmark" scriptDir="$(dirname "$0")"
source $scriptDir/build/config.txt
codeDir="/Codes/build" codeDir="/Codes/build"
resultsDir="/Results" resultsDir="/Results"
execDir="/Exec" execDir="/Exec"
......
...@@ -4,21 +4,39 @@ ...@@ -4,21 +4,39 @@
#SBATCH -N 1 #SBATCH -N 1
#SBATCH --exclude=c01,c00,c02 #SBATCH --exclude=c01,c00,c02
dir="/home/martini/malleability_benchmark" source build/config.txt
codeDir="/Codes" codeDir="/Codes"
execDir="/Exec" execDir="/Exec"
cores=$(bash $dir$execDir/BashScripts/getCores.sh $partition)
nodelist="localhost" #configFile=$1
nodes=1 #outIndex=$2
configFile=$1
outIndex=$2
echo "MPICH" #echo "MPICH"
#module load mpich-3.4.1-noucx #module load mpich-3.4.1-noucx
#export HYDRA_DEBUG=1 #export HYDRA_DEBUG=1
#mpirun --version
#numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0)
#mpirun -np $numP valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --trace-children=yes --log-file=nc.vg.%p $dir$codeDir/build/a.out $configFile $outIndex
nodelist=$SLURM_JOB_NODELIST
nodes=$SLURM_JOB_NUM_NODES
configFile=$1
outIndex=0
if [ $# -ge 2 ]
then
outIndex=$2
fi
echo "MPICH provider=$FI_PROVIDER"
mpirun --version
numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0) numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0)
mpirun -np $numP valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --trace-children=yes --log-file=nc.vg.%p $dir$codeDir/build/a.out $configFile $outIndex $nodelist $nodes initial_nodelist=$(bash $dir$execDir/BashScripts/createInitialNodelist.sh $numP $cores $nodelist)
echo $initial_nodelist
echo "Test PreRUN $numP $nodelist"
mpirun -hosts $initial_nodelist -np $numP valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --trace-children=yes --log-file=vg.sp.%p.$SLURM_JOB_ID $dir$codeDir/build/a.out $configFile $outIndex
echo "END RUN" echo "END RUN"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
......
#!/bin/bash #!/bin/bash
dir="/home/martini/malleability_benchmark" #FIXME Obtain from another way
# Runs in a given current directory all .ini files # Runs in a given current directory all .ini files
# Parameter 1(Optional) - Amount of executions per file. Must be a positive number # Parameter 1(Optional) - Amount of executions per file. Must be a positive number
#====== Do not modify these values ======= #====== Do not modify these values =======
......
File mode changed from 100644 to 100755
#!/bin/bash #!/bin/bash
dir="/home/martini/malleability_benchmark" #FIXME Obtain from another way
# Runs in a given current directory all .ini files # Runs in a given current directory all .ini files
# Parameter 1(Optional) - Amount of executions per file. Must be a positive number # Parameter 1(Optional) - Amount of executions per file. Must be a positive number
#====== Do not modify these values ======= #====== Do not modify these values =======
......
#!/bin/bash #!/bin/bash
dir="/home/martini/malleability_benchmark/"
partition="P1" partition="P1"
# Checks if all the runs in the current working directory performed under a # Checks if all the runs in the current working directory performed under a
...@@ -18,15 +17,17 @@ partition="P1" ...@@ -18,15 +17,17 @@ partition="P1"
# Must be a positive integer. # Must be a positive integer.
#====== Do not modify the following values ======= #====== Do not modify the following values =======
codeDir="Codes/" scriptDir="$(dirname "$0")"
execDir="Exec/" source $scriptDir/../Codes/build/config.txt
ResultsDir="Results/" codeDir="/Codes/"
execDir="/Exec/"
ResultsDir="/Results/"
cores=$(bash $dir$execDir/BashScripts/getCores.sh $partition) cores=$(bash $dir$execDir/BashScripts/getCores.sh $partition)
if [ "$#" -lt "6" ] if [ "$#" -lt "6" ]
then then
echo "Not enough arguments" echo "Not enough arguments"
echo "Usage -> bash CheckRun Common_Name maxIndex total_repetitions total_groups total_stages max_iteration_time [limit_time]" echo "Usage -> bash CheckRun.sh Common_Name maxIndex total_repetitions total_stages total_groups max_iteration_time [limit_time]"
exit -1 exit -1
fi fi
...@@ -43,7 +44,7 @@ then ...@@ -43,7 +44,7 @@ then
fi fi
limit_time=0 limit_time=0
exec_lines_basic=7 exec_lines_basic=6
iter_lines_basic=3 iter_lines_basic=3
exec_total_lines=$(($exec_lines_basic+$total_stages+$total_groups)) exec_total_lines=$(($exec_lines_basic+$total_stages+$total_groups))
iter_total_lines=$(($iter_lines_basic+$total_stages*2+1)) iter_total_lines=$(($iter_lines_basic+$total_stages*2+1))
...@@ -68,7 +69,7 @@ fi ...@@ -68,7 +69,7 @@ fi
# then the scripts exits. # then the scripts exits.
#The user must figure out what to do with those runs. #The user must figure out what to do with those runs.
qtyG=$(ls R*_Global.out | wc -l) qtyG=$(ls R*_Global.out | wc -l)
qtyG=$(($qtyG * 2)) qtyG=$(($qtyG * $total_groups))
qtyL=$(ls R*_G*N*.out | wc -l) qtyL=$(ls R*_G*N*.out | wc -l)
if [ "$qtyG" == "$qtyL" ] if [ "$qtyG" == "$qtyL" ]
then then
...@@ -165,32 +166,39 @@ fi ...@@ -165,32 +166,39 @@ fi
#If any run lacks repetitions, the job is automatically launched again #If any run lacks repetitions, the job is automatically launched again
#If a run has even executed a repetition, is not launched as it could be in the waiting queue #If a run has even executed a repetition, is not launched as it could be in the waiting queue
qty_missing=0 qty_missing=0
use_extrae=0
for ((run=0; run<$maxIndex; run++)) for ((run=0; run<$maxIndex; run++))
do do
diff=0
if [ -f "R${run}_Global.out" ] if [ -f "R${run}_Global.out" ]
then then
qtyEx=$(grep T_total R"$run"_Global.out | wc -l) qtyEx=$(grep T_total R"$run"_Global.out | wc -l)
if [ "$qtyEx" -ne "$totalEjGrupo" ]; if [ "$qtyEx" -ne "$totalEjGrupo" ];
then then
#1 - Obtain config file name and repetitions to perform
diff=$(($totalEjGrupo-$qtyEx)) diff=$(($totalEjGrupo-$qtyEx))
qty_missing=$(($qty_missing+$diff))
config_file="$common_name$run.ini"
if [ $limit_time_exec -ne 0 ] #Max time per execution in seconds
then
limit_time=$(($limit_time_exec*$diff/60+1))
fi
#2 - Obtain number of nodes needed
node_qty=$(bash $dir$execDir/BashScripts/getMaxNodesNeeded.sh $config_file $dir $cores)
#3 - Launch execution
echo "Run$run lacks $diff repetitions" echo "Run$run lacks $diff repetitions"
use_extrae=0
sbatch -p $partition -N $node_qty -t $limit_time $dir$execDir./generalRun.sh $dir $cores $config_file $use_extrae $run $diff
fi fi
else else
echo "File R${run}_Global.out does not exist -- Could it be it must still be executed?" diff=$(($totalEjGrupo))
echo "Run$run results not found -- Trying to execute"
fi
if [ $diff -ne 0 ] #Execute if needed
then
qty_missing=$(($qty_missing+$diff))
if [ $limit_time_exec -ne 0 ] #Max time per execution in seconds
then
limit_time=$(($limit_time_exec*$diff/60+1))
fi
#2 - Obtain number of nodes needed
config_file="$common_name$run.ini"
node_qty=$(bash $dir$execDir/BashScripts/getMaxNodesNeeded.sh $config_file $dir $cores)
#3 - Launch execution
sbatch -p $partition -N $node_qty -t $limit_time $dir$execDir./generalRun.sh $dir $cores $config_file $use_extrae $run $diff
fi fi
done done
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment