...@@ -3,16 +3,13 @@ ...@@ -3,16 +3,13 @@
#include <mpi.h> #include <mpi.h>
#include <string.h> #include <string.h>
#include <slurm/slurm.h> #include <slurm/slurm.h>
#include "malleabilityStates.h"
#define COMM_UNRESERVED -2 int init_slurm_comm(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int numP, int numC, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child);
#define COMM_IN_PROGRESS -1 int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread, double *end_real_time);
#define COMM_PHY_NODES 1 void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm);
#define COMM_PHY_CPU 2
int init_slurm_comm(char **argv, int myId, int numP, int root, int type_dist, int type_creation); void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group);
int check_slurm_comm(int myId, int root, MPI_Comm comm, MPI_Comm *child); void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId);
#include <pthread.h>
#include "malleabilityManager.h"
#include "malleabilityStates.h"
#include "malleabilityTypes.h"
#include "malleabilityZombies.h"
#include "ProcessDist.h"
#include "CommDist.h"
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);
void Children_init();
int spawn_step();
int start_redistribution();
int check_redistribution();
int end_redistribution();
int shrink_redistribution();
int thread_creation();
int thread_check();
void* thread_async_work(void* void_arg);
typedef struct {
int spawn_type;
int spawn_dist;
int spawn_is_single;
int spawn_threaded;
int comm_type;
int comm_threaded;
int grp;
configuration *config_file;
results_data *results;
} malleability_config_t;
typedef struct { //FIXME numC_spawned no se esta usando
int myId, numP, numC, numC_spawned, root, root_parents;
pthread_t async_thread;
MPI_Comm comm, thread_comm;
MPI_Comm intercomm;
MPI_Comm user_comm;
char *name_exec, *nodelist;
int num_cpus, num_nodes;
} malleability_t;
int state = MAL_UNRESERVED; //FIXME Mover a otro lado
malleability_config_t *mall_conf;
malleability_t *mall;
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;
* Inicializa la reserva de memoria para el modulo de maleabilidad
* creando todas las estructuras necesarias y copias de comunicadores
* para no interferir en la aplicación.
* Si es llamada por un grupo de procesos creados de forma dinámica,
* inicializan la comunicacion con sus padres. En este caso, al terminar
* la comunicacion los procesos hijo estan preparados para ejecutar la
* aplicacion.
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) {
MPI_Comm dup_comm, thread_comm;
mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
mall = (malleability_t *) malloc(sizeof(malleability_t));
rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
MPI_Comm_dup(comm, &dup_comm);
MPI_Comm_dup(comm, &thread_comm);
mall->myId = myId;
mall->numP = numP;
mall->root = root;
mall->comm = dup_comm;
mall->thread_comm = thread_comm; // TODO Refactor -- Crear solo si es necesario?
mall->user_comm = comm;
mall->name_exec = name_exec;
mall->nodelist = nodelist;
mall->num_cpus = num_cpus;
mall->num_nodes = num_nodes;
rep_s_data->entries = 0;
rep_a_data->entries = 0;
dist_s_data->entries = 0;
dist_a_data->entries = 0;
// Si son el primer grupo de procesos, obtienen los datos de los padres
if(mall->intercomm != MPI_COMM_NULL ) {
* Elimina toda la memoria reservado por el modulo
* de maleabilidad y asegura que los zombies
* despierten si los hubiese.
void free_malleability() {
//MPI_Comm_free(&(mall->comm)); // TODO Revisar si hace falta?
* Se realiza el redimensionado de procesos por parte de los padres.
* Se crean los nuevos procesos con la distribucion fisica elegida y
* a continuacion se transmite la informacion a los mismos.
* Si hay datos asincronos a transmitir, primero se comienza a
* transmitir estos y se termina la funcion. Se tiene que comprobar con
* llamando a la función de nuevo que se han terminado de enviar
* Si hay ademas datos sincronos a enviar, no se envian aun.
* Si solo hay datos sincronos se envian tras la creacion de los procesos
* y finalmente se desconectan los dos grupos de procesos.
int malleability_checkpoint() {
if(state == MAL_NOT_STARTED) {
// Comprobar si se tiene que realizar un redimensionado
//if(CHECK_RMS()) {return MAL_DENIED;}
state = spawn_step();
if (state == MAL_SPAWN_COMPLETED){
state = start_redistribution();
} else if(state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
double end_real_time;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD && mall->numP > mall->numC) {
state = shrink_redistribution(); //TODO REFACTOR
} else {
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm, &end_real_time);
if (state == MAL_SPAWN_COMPLETED) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start;
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos
state = start_redistribution();
} else if(state == MAL_DIST_PENDING) {
if(mall_conf->comm_type == MAL_USE_THREAD) {
state = thread_check();
} else {
state = check_redistribution();
return state;
// Funciones solo necesarias por el benchmark
void set_benchmark_grp(int grp) {
mall_conf->grp = grp;
void set_benchmark_configuration(configuration *config_file) {
mall_conf->config_file = config_file;
void get_benchmark_configuration(configuration **config_file) {
*config_file = mall_conf->config_file;
void set_benchmark_results(results_data *results) {
mall_conf->results = results;
void get_benchmark_results(results_data **results) {
*results = mall_conf->results;
void set_malleability_configuration(int spawn_type, int spawn_is_single, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded) {
mall_conf->spawn_type = spawn_type;
mall_conf->spawn_is_single = spawn_is_single;
mall_conf->spawn_dist = spawn_dist;
mall_conf->spawn_threaded = spawn_threaded;
mall_conf->comm_type = comm_type;
mall_conf->comm_threaded = comm_threaded;
* To be deprecated
* Tiene que ser llamado despues de setear la config
void set_children_number(int numC){
if((mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) && (numC - mall->numP >= 0)) {
mall->numC = numC;
mall->numC_spawned = numC - mall->numP;
if(numC == mall->numP) { // Migrar
mall->numC_spawned = numC;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->spawn_type = COMM_SPAWN_SERIAL;
mall_conf->spawn_type = COMM_SPAWN_PTHREAD;
} else {
mall->numC = numC;
mall->numC_spawned = numC;
void get_malleability_user_comm(MPI_Comm *comm) {
*comm = mall->user_comm;
* Anyade a la estructura concreta de datos elegida
* el nuevo set de datos "data" de un total de "total_qty" elementos.
* Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
* Mas informacion en la funcion "add_data".
void malleability_add_data(void *data, int total_qty, int type, int is_replicated, int is_constant) {
if(is_constant) {
if(is_replicated) {
add_data(data, total_qty, type, 0, rep_s_data); //FIXME Numero magico
} else {
add_data(data, total_qty, type, 0, dist_s_data); //FIXME Numero magico
} else {
if(is_replicated) {
add_data(data, total_qty, type, 0, rep_a_data); //FIXME Numero magico || Un request?
} else {
int total_reqs = 0;
if(mall_conf->comm_type == MAL_USE_NORMAL) {
total_reqs = 1;
} else if(mall_conf->comm_type == MAL_USE_IBARRIER) {
total_reqs = 2;
} else if(mall_conf->comm_type == MAL_USE_POINT) {
total_reqs = mall->numC;
add_data(data, total_qty, type, total_reqs, dist_a_data);
* Devuelve el numero de entradas para la estructura de descripcion de
* datos elegida.
void malleability_get_entries(int *entries, int is_replicated, int is_constant){
if(is_constant) {
if(is_replicated) {
*entries = rep_s_data->entries;
} else {
*entries = dist_s_data->entries;
} else {
if(is_replicated) {
*entries = rep_a_data->entries;
} else {
*entries = dist_a_data->entries;
* Devuelve el elemento de la lista "index" al usuario.
* La devolución es en el mismo orden que lo han metido los padres
* con la funcion "malleability_add_data()".
* Es tarea del usuario saber el tipo de esos datos.
* TODO Refactor a que sea automatico
void malleability_get_data(void **data, int index, int is_replicated, int is_constant) {
malleability_data_t *data_struct;
if(is_constant) {
if(is_replicated) {
data_struct = rep_s_data;
} else {
data_struct = dist_s_data;
} else {
if(is_replicated) {
data_struct = rep_a_data;
} else {
data_struct = dist_a_data;
*data = data_struct->arrays[index];
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
* Funcion generalizada para enviar datos desde los hijos.
* La asincronizidad se refiere a si el hilo padre e hijo lo hacen
* de forma bloqueante o no. El padre puede tener varios hilos.
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
int i;
char *aux;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_async(aux, data_struct->qty[i], mall->myId, mall->numP, mall->root, mall->intercomm, numP_children, data_struct->requests, mall_conf->comm_type);
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_sync(aux, data_struct->qty[i], mall->myId, mall->numP, mall->root, mall->intercomm, numP_children);
* Funcion generalizada para recibir datos desde los hijos.
* La asincronizidad se refiere a si el hilo padre e hijo lo hacen
* de forma bloqueante o no. El padre puede tener varios hilos.
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
int i;
char *aux;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_async(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->root, mall->intercomm, numP_parents, mall_conf->comm_type);
data_struct->arrays[i] = (void *) aux;
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_sync(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->root, mall->intercomm, numP_parents);
data_struct->arrays[i] = (void *) aux;
//================PRIVATE FUNCTIONS=====================||
* Inicializacion de los datos de los hijos.
* En la misma se reciben datos de los padres: La configuracion
* de la ejecucion a realizar; y los datos a recibir de los padres
* ya sea de forma sincrona, asincrona o ambas.
void Children_init() {
int numP_parents, root_parents, i;
int spawn_is_single;
MPI_Comm aux;
MPI_Bcast(&spawn_is_single, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
if(spawn_is_single) {
malleability_establish_connection(mall->myId, MALLEABILITY_ROOT, &(mall->intercomm));
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&root_parents, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&numP_parents, 1, MPI_INT, root_parents, mall->intercomm);
mall_conf->config_file = recv_config_file(mall->root, mall->intercomm);
mall_conf->results = (results_data *) malloc(sizeof(results_data));
init_results_data(mall_conf->results, mall_conf->config_file->resizes, RESULTS_INIT_DATA_QTY);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(mall_conf->comm_type == MAL_USE_NORMAL || mall_conf->comm_type == MAL_USE_IBARRIER || mall_conf->comm_type == MAL_USE_POINT) {
recv_data(numP_parents, dist_a_data, 1);
} else if (mall_conf->comm_type == MAL_USE_THREAD) { //TODO Modificar uso para que tenga sentido comm_threaded
recv_data(numP_parents, dist_a_data, 0);
mall_conf->results->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
recv_data(numP_parents, dist_s_data, 0);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo y qty
for(i=0; i<rep_s_data->entries; i++) {
MPI_Datatype datatype;
if(rep_s_data->types[i] == MAL_INT) {
datatype = MPI_INT;
} else {
datatype = MPI_CHAR;
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
// Guardar los resultados de esta transmision
recv_results(mall_conf->results, mall->root, mall_conf->config_file->resizes, mall->intercomm);
//================PRIVATE FUNCTIONS=====================||
* Se encarga de realizar la creacion de los procesos hijos.
* Si se pide en segundo plano devuelve el estado actual.
int spawn_step(){
mall_conf->results->spawn_start = MPI_Wtime();
if((mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) && mall->numP > mall->numC) {
state = shrink_redistribution();
return state;
state = init_slurm_comm(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm));
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
else if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
//mall_conf->results->spawn_thread_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
//mall_conf->results->spawn_start = MPI_Wtime();
return state;
* Comienza la redistribucion de los datos con el nuevo grupo de procesos.
* Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
* se realiza el envio asincrono y/o sincrono si lo hay.
* En caso de que haya comunicacion asincrona, se comienza y se termina la funcion
* indicando que se ha comenzado un envio asincrono.
* Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
* Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
* grupos de procesos.
int start_redistribution() {
int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->root), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->numP), 1, MPI_INT, rootBcast, mall->intercomm);
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
mall_conf->results->async_start = MPI_Wtime();
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(mall_conf->comm_type == MAL_USE_THREAD) {
return thread_creation();
} else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
return end_redistribution();
* @deprecated
* Comprueba si la redistribucion asincrona ha terminado.
* Si no ha terminado la funcion termina indicandolo, en caso contrario,
* se continua con la comunicacion sincrona, el envio de resultados y
* se desconectan los grupos de procesos.
* Esta funcion permite dos modos de funcionamiento al comprobar si la
* comunicacion asincrona ha terminado.
* Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera
* terminada cuando los padres terminan de enviar.
* Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
* los hijos han terminado de recibir.
int check_redistribution() {
int completed, all_completed, test_err;
MPI_Request *req_completed;
//dist_a_data->requests[0][X] //FIXME Numero magico 0 -- Modificar para que sea un for?
if (mall_conf->comm_type == MAL_USE_POINT) {
test_err = MPI_Testall(mall->numC, dist_a_data->requests[0], &completed, MPI_STATUSES_IGNORE);
} else {
if(mall_conf->comm_type == MAL_USE_NORMAL) {
req_completed = &(dist_a_data->requests[0][0]);
} else if (mall_conf->comm_type == MAL_USE_IBARRIER) {
req_completed = &(dist_a_data->requests[0][1]);
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", mall->myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MAL_DIST_PENDING; // Continue only if asynchronous send has ended
if(mall_conf->comm_type == MAL_USE_IBARRIER) {
MPI_Wait(&(dist_a_data->requests[0][0]), MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
return end_redistribution();
* Termina la redistribución de los datos con los hijos, comprobando
* si se han realizado iteraciones con comunicaciones en segundo plano
* y enviando cuantas iteraciones se han realizado a los hijos.
* Además se realizan las comunicaciones síncronas se las hay.
* Finalmente termina enviando los datos temporales a los hijos.
int end_redistribution() {
int result, i, rootBcast = MPI_PROC_NULL;
MPI_Comm aux;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo
for(i=0; i<rep_s_data->entries; i++) {
MPI_Datatype datatype;
if(rep_s_data->types[i] == MAL_INT) {
datatype = MPI_INT;
} else {
datatype = MPI_CHAR;
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
double time_adapt = MPI_Wtime();
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_adapt;
// result = MAL_DIST_ADAPTED;
send_results(mall_conf->results, rootBcast, mall_conf->config_file->resizes, mall->intercomm);
return result;
double time_adapt, time_adapt_end;
int state_shrink=0; //TODO Refactor
pthread_t thread_shrink;
MPI_Comm comm_shrink;
int thread_shrink_creation();
void *thread_shrink_work();
* Crea una hebra para ejecutar una comunicación en segundo plano.
int thread_shrink_creation() {
if(pthread_create(&thread_shrink, NULL, thread_shrink_work, NULL)) {
printf("Error al crear el hilo\n");
return -1;
void* thread_shrink_work() {
proc_adapt_shrink(mall->numC, &comm_shrink, mall->myId);
time_adapt_end = MPI_Wtime();
int shrink_redistribution() {
int global_state;
double time_aux;
MPI_Comm aux_comm;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
if(state_shrink == 0) {
time_adapt = MPI_Wtime();
state_shrink = 1;
MPI_Comm_dup(mall->comm, &comm_shrink);
} else if(state_shrink>0) {
MPI_Allreduce(&state_shrink, &global_state, 1, MPI_INT, MPI_MIN, mall->comm);
if(global_state < 2) return MAL_SPAWN_PENDING;
time_aux = MPI_Wtime();
if(pthread_join(thread_shrink, NULL)) {
printf("Error al esperar al hilo\n");
return -10;
MPI_Comm_dup(mall->comm, &aux_comm);
mall->comm = comm_shrink;
} else {
time_adapt = MPI_Wtime();
MPI_Comm_dup(mall->comm, &aux_comm);
proc_adapt_shrink( mall->numC, &(mall->comm), mall->myId);
//TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos
zombies_collect_suspended(aux_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall->user_comm);
if(mall->myId < mall->numC) {
MPI_Comm_dup(mall->comm, &aux_comm);
mall->thread_comm = aux_comm;
MPI_Comm_dup(mall->comm, &aux_comm);
mall->user_comm = aux_comm;
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
mall_conf->results->spawn_real_time[mall_conf->grp] = time_adapt_end - time_adapt + MPI_Wtime() - time_aux;
} else {
return MAL_ZOMBIE;
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
* Crea una hebra para ejecutar una comunicación en segundo plano.
int thread_creation() {
if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
printf("Error al crear el hilo\n");
return -1;
* Comprobación por parte de una hebra maestra que indica
* si una hebra esclava ha terminado su comunicación en segundo plano.
* El estado de la comunicación es devuelto al finalizar la función.
int thread_check() {
int all_completed = 0;
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
if(all_completed != MAL_DIST_COMPLETED) return MAL_DIST_PENDING; // Continue only if asynchronous send has ended
//FIXME No se tiene en cuenta el estado MAL_APP_ENDED
if(pthread_join(mall->async_thread, NULL)) {
printf("Error al esperar al hilo\n");
return -2;
return end_redistribution();
* Función ejecutada por una hebra.
* Ejecuta una comunicación síncrona con los hijos que
* para el usuario se puede considerar como en segundo plano.
* Cuando termina la comunicación la hebra maestra puede comprobarlo
* por el valor "commAsync".
void* thread_async_work(void* void_arg) {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mpi.h>
#include "../IOcodes/read_ini.h"
#include "../IOcodes/results.h"
#include "malleabilityStates.h"
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes);
void free_malleability();
void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint();
void set_benchmark_grp(int grp);
void set_malleability_configuration(int spawn_type, int spawn_is_single, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded);
void set_children_number(int numC); // TODO TO BE DEPRECATED
void get_malleability_user_comm(MPI_Comm *comm);
void malleability_add_data(void *data, int total_qty, int type, int is_replicated, int is_constant);
void malleability_get_entries(int *entries, int is_replicated, int is_constant);
void malleability_get_data(void **data, int index, int is_replicated, int is_constant);
void set_benchmark_configuration(configuration *config_file);
void get_benchmark_configuration(configuration **config_file);
void set_benchmark_results(results_data *results);
void get_benchmark_results(results_data **results);
#define MAL_DENIED -2
#define MAL_ZOMBIE -3
#define MAL_APP_ENDED 1
// TODO Refactor
#define COMM_PHY_NODES 1
#define COMM_PHY_CPU 2
//#define COMM_SPAWN_MERGE 1
#define MAL_USE_NORMAL 0
#define MAL_USE_POINT 2
#define MAL_USE_THREAD 3
#define MAL_INT 0
#define MAL_CHAR 1
#include "malleabilityTypes.h"
void init_malleability_data_struct(malleability_data_t *data_struct, int size);
void realloc_malleability_data_struct(malleability_data_t *data_struct, int qty_to_add);
void def_malleability_entries(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type);
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type);
//===================PUBLIC FUNCTIONS===================||
* Anyade en la estructura de datos a comunicar con los hijos
* un nuevo set de datos de un total "total_qty" distribuido entre
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
void add_data(void *data, int total_qty, int type, int request_qty, malleability_data_t *data_struct) {
int i;
if(data_struct->entries == 0) {
init_malleability_data_struct(data_struct, MALLEABILITY_INIT_DATA_QTY);
} else if(data_struct->entries == data_struct->max_entries) {
realloc_malleability_data_struct(data_struct, MALLEABILITY_INIT_DATA_QTY);
data_struct->qty[data_struct->entries] = total_qty;
data_struct->types[data_struct->entries] = type;
data_struct->arrays[data_struct->entries] = data;
data_struct->requests[data_struct->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request));
for(i=0; i < request_qty; i++) {
data_struct->requests[data_struct->entries][i] = MPI_REQUEST_NULL;
* Comunicar desde los padres a los hijos las estructuras de datos sincronas o asincronas
* No es necesario que las estructuras esten inicializadas para un buen funcionamiento.
* En el argumento "root" todos tienen que indicar quien es el proceso raiz de los padres
* unicamente.
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) {
int i, rootBcast = MPI_PROC_NULL;
MPI_Datatype entries_type, struct_type;
if(is_children_group) {
rootBcast = root;
} else {
if(myId == root) rootBcast = MPI_ROOT;
// Mandar primero numero de entradas
def_malleability_entries(data_struct_dist, data_struct_rep, &entries_type);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, rootBcast, intercomm);
if(is_children_group) {
if(data_struct_rep->entries != 0) init_malleability_data_struct(data_struct_rep, data_struct_rep->entries);
if(data_struct_dist->entries != 0) init_malleability_data_struct(data_struct_dist, data_struct_dist->entries);
def_malleability_qty_type(data_struct_dist, data_struct_rep, &struct_type);
MPI_Bcast(MPI_BOTTOM, 1, struct_type, rootBcast, intercomm); //FIXME Doy error
if(is_children_group) {
//data_struct->requests[data_struct->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request)); FIXME Crear los requests?
//data_struct->requests[data_struct->entries][i] = MPI_REQUEST_NULL;
for(i=0; i < data_struct_rep->entries; i++) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int
for(i=0; i < data_struct_dist->entries; i++) {
data_struct_dist->arrays[i] = (void *) malloc(data_struct_dist->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int
* Inicializa la estructura que describe una serie de datos con las mismas
* caracteristicas de localización y uso. Se inicializa para utilizar hasta
* "size" elementos.
void init_malleability_data_struct(malleability_data_t *data_struct, int size) {
data_struct->max_entries = size;
data_struct->qty = (int *) malloc(size * sizeof(int));
data_struct->types = (int *) malloc(size * sizeof(int));
data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *));
data_struct->arrays = (void **) malloc(size * sizeof(void *));
data_struct->request_ibarrier = MPI_REQUEST_NULL;
* Realoja la estructura que describe una serie de datos con las mismas
* caracteristicas de localización y uso. Se anyaden "size" entradas nuevas
* a las ya existentes.
void realloc_malleability_data_struct(malleability_data_t *data_struct, int qty_to_add) {
int *qty_aux, *types_aux, needed;
MPI_Request **requests_aux;
void **arrays_aux;
needed = data_struct->max_entries + qty_to_add;
qty_aux = (int *) realloc(data_struct->qty, needed * sizeof(int));
types_aux = (int *) realloc(data_struct->types, needed * sizeof(int));
requests_aux = (MPI_Request **) realloc(data_struct->requests, needed * sizeof(MPI_Request *));
arrays_aux = (void **) realloc(data_struct->arrays, needed * sizeof(void *));
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria constante de datos a redistribuir/comunicar\n");
data_struct->qty = qty_aux;
data_struct->types = types_aux;
data_struct->requests = requests_aux;
data_struct->arrays = arrays_aux;
data_struct->max_entries = needed;
void free_malleability_data_struct(malleability_data_t *data_struct) {
int i, max;
max = data_struct->entries;
if(max != 0) {
for(i=0; i<max; i++) {
//free(data_struct->arrays[i]); //FIXME Valores alojados con 1 elemento no se liberan?
//free(data_struct->requests[i]); //TODO Plantear como crearlos
//================MPI DERIVED DATATYPES=================||
* Crea un tipo derivado para mandar el numero de entradas
* en dos estructuras de descripcion de datos.
void def_malleability_entries(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 2;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
blocklengths[0] = blocklengths[1] = 1;
types[0] = types[1] = MPI_INT;
// Obtener direccion base
MPI_Get_address(&(data_struct_rep->entries), &displs[0]);
MPI_Get_address(&(data_struct_dist->entries), &displs[1]);
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
* Crea un tipo derivado para mandar las cantidades y tipo
* de datos de dos estructuras de descripcion de datos.
* El vector de "requests" no es enviado ya que solo es necesario
* en los padres.
* TODO Refactor?
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 4;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
types[0] = types[1] = types[2] = types[3] = MPI_INT;
blocklengths[0] = blocklengths[1] = data_struct_rep->entries;
blocklengths[2] = blocklengths[3] = data_struct_dist->entries;
MPI_Get_address((data_struct_rep->qty), &displs[0]);
MPI_Get_address((data_struct_rep->types), &displs[1]);
MPI_Get_address((data_struct_dist->qty), &displs[2]);
MPI_Get_address((data_struct_dist->types), &displs[3]);
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <fcntl.h>
#include <sys/stat.h>
#include "malleabilityStates.h"
typedef struct {
int entries; // Indica numero de vectores a comunicar (replicated data)
int max_entries;
MPI_Request request_ibarrier; // Request para indicar que los padres esperan a que los hijos terminen de recibir
int *qty; // Indica numero de elementos en cada subvector de sync_array
int *types;
// Vector de vectores de request. En cada elemento superior se indican los requests a comprobar para dar por finalizada
// la comunicacion de ese dato
MPI_Request **requests;
void **arrays; // Cada subvector es una serie de datos a comunicar
} malleability_data_t;
void add_data(void *data, int total_qty, int type, int request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm);
void free_malleability_data_struct(malleability_data_t *data_struct);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
//#include <slurm/slurm.h>
#include <signal.h>
#include "../IOcodes/results.h"
#include "malleabilityZombies.h"
#define PIDS_QTY 320
void zombies_suspend();
int offset_pids, *pids = NULL;
void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, MPI_Comm user_comm) {
int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int));
int i, count=1;
if(myId < numC) {
count = 0;
if(myId == root) {
for(i=0; i < numC; i++) {
pids_counts[i] = 0;
for(i=numC; i<numP; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i + offset_pids) - numC;
offset_pids += numP - numC;
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, root, comm);
if(myId >= numC) {
// Needed to ensure iteration times are collected before suspending these processes
results_data *results = (results_data *) results_void;
compute_results_iter(results, myId, root, user_comm);
void zombies_service_init() {
offset_pids = 0;
pids = malloc(PIDS_QTY * sizeof(int));
for(int i=0; i<PIDS_QTY; i++) {
pids[i] = 0;
void zombies_service_free() {
void zombies_suspend() {
struct sigaction act;
sigaction(SIGUSR2, &act, NULL);
sigset_t set;
void zombies_awake() {
for(int i=0; i < offset_pids; i++) { // Despertar a los zombies
kill(pids[i], SIGUSR2);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
//#include <slurm/slurm.h>
#include <signal.h>
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, MPI_Comm user_comm);
void zombies_service_init();
void zombies_service_free();
void zombies_awake();
#module load gcc/6.4.0
#module load openmpi/1.10.7
#module load /home/martini/ejemplos_Mod/modulefiles/mpi/openmpi_aliaga_1_10_7
echo "OPENMPI"
#module load /home/martini/MODULES/modulefiles/openmpi4.1.0
#mpirun -mca btl_openib_allow_ib 1 -npernode 10 -np 20 ./batch5.out
echo "MPICH"
module load /home/martini/MODULES/modulefiles/mpich3.4
#export HYDRA_DEBUG=1
#-disable-hostname-propagation -disable-auto-cleanup -pmi-port -hosts n00,n01
mpirun -hostfile hostfile.o2785 -np 20 ./test.out
echo "Intel"
#module load /home/martini/MODULES/modulefiles/intel64.module
#export I_MPI_OFI_PROVIDER=tcp
#export I_MPI_DEBUG=6
#export I_MPI_FABRICS=shm:ofi
#mpirun -print-all-exitcodes -np 2 --ppn 1 ./batch.out
#mpirun -genv I_MPI_FABRICS=shm:ofi -print-all-exitcodes -iface ib0 -np 16 ./batch4.out
#mpirun -genv I_MPI_FABRICS=shm:ofi -iface enp59s0f0 -np 20 ./batch4.out
...@@ -15,5 +15,4 @@ elif [ $dist == "cpu" ]; then ...@@ -15,5 +15,4 @@ elif [ $dist == "cpu" ]; then
fi fi
$dir/Recordnodelist.o $numP $dist $dir/Recordnodelist.o $numP $dist
echo $numP echo $numP
#!/bin/bash #!/bin/bash
#SBATCH --exclude=c01,c00,c02
echo "MPICH" echo "MPICH"
module load mpich-3.4.1-noucx module load mpich-3.4.1-noucx
...@@ -9,9 +17,9 @@ module load mpich-3.4.1-noucx ...@@ -9,9 +17,9 @@ module load mpich-3.4.1-noucx
numP=$(bash $1) numP=$(bash $1)
mpirun -f hostfile.o$SLURM_JOB_ID -np $numP ./a.out $1 $2 mpirun -print-all-exitcodes -f hostfile.o$SLURM_JOB_ID $dir$codeDir/a.out $1 $2 $nodelist $nodes
#mpirun -np 2 ./a.out test.ini #mpirun -np $numP $dir$codeDir/a.out $1 $2 $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID rm hostfile.o$SLURM_JOB_ID
echo "END RUN" echo "END RUN"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
[general] [general]
resizes=1 ; Numero de redistribuciones resizes=1 ; Numero de redistribuciones
matrix_tam=1000 ; Tamaño en bytes de la matriz de computo matrix_tam=1000 ; Tamaño en bytes de la matriz de computo
SDR=100 ; Tamaño en bytes a redistribuir de forma sincrona comm_tam=1000 ; Tamaño en bytes de los datos a comunicar en cada iteracion. Una sola vez
ADR=100 ; Tamaño en bytes a redistribuir de forma asincrona 1000000000 SDR=100000000 ; Tamaño en bytes a redistribuir de forma sincrona
AIB=0 ; Indica si las redistribuciones asíncronas se consideran terminadas para los padres ADR=100000000 ; Tamaño en bytes a redistribuir de forma asincrona 1000000000
; cuando terminan de enviar (0) o cuando terminan de recibir los valores (1) AIB=3 ; Indica si las redistribuciones asíncronas se consideran terminadas para los padres
; cuando terminan de enviar (0), cuando terminan de recibir los valores (1)
; o usar comunicaciones punto a punto (2), o utilizar hebras(3)
CST=0 ; Indica como realizar el spawn. (0) Para el método baseline, (1) para el método
; baseline con hilos, (2) para el método merge y (3) para el método merge
; con hilos
CSS=0 ; Indica si el spawn se realiza con todos los procesos (0) o solo participa
; el proceso raiz (1)
time=1 ; Tiempo necesario para realizar una iteracion time=1 ; Tiempo necesario para realizar una iteracion
; end [general]
[resize0] ; Grupo inicial(mpirun) [resize0] ; Grupo inicial(mpirun)
iters=10 ; Numero de iteraciones a realizar por este grupo iters=10 ; Numero de iteraciones a realizar por este grupo
procs=2 ; Cantidad de procesos en el grupo procs=2 ; Cantidad de procesos en el grupo
factor=1 ; Factor de coste factor=1 ; Factor de coste
physical_dist=cpu ; Tipo de redistribución física de los procesos physical_dist=node ; Tipo de redistribución física de los procesos
;end [resize0] ;end [resize0]
[resize1] ; Grupo de hijos 1 [resize1] ; Grupo de hijos 1
iters=20 iters=20
procs=4 procs=4
factor=0.5 factor=0.5
physical_dist=cpu physical_dist=node
;end [resize1]
...@@ -8,6 +8,15 @@ ResultsDir="Results/" ...@@ -8,6 +8,15 @@ ResultsDir="Results/"
ResultsDirName=$1 ResultsDirName=$1
maxIndex=$2 maxIndex=$2
cantidadGrupos=$3 #Contando a los padres cantidadGrupos=$3 #Contando a los padres
totalEjGrupo=$4 #Total de ejecuciones por grupo
maxTime=$5 #Maximo tiempo que se considera válido
if [ $# -lt 3 ]
echo "Faltan argumentos"
echo "Uso -> bash CheckRun NombreDirectorio IndiceMaximo Grupos"
exit -1
cd $dir$ResultsDir cd $dir$ResultsDir
if [ ! -d $ResultsDirName ] if [ ! -d $ResultsDirName ]
...@@ -25,10 +34,39 @@ qty=$(wc -l errores2.txt | cut -d ' ' -f1) ...@@ -25,10 +34,39 @@ qty=$(wc -l errores2.txt | cut -d ' ' -f1)
if [ $qty -gt 0 ] if [ $qty -gt 0 ]
then then
echo "Se han encontrado errores de ejecución graves. Abortando" echo "Se han encontrado errores de ejecución graves. Abortando"
echo "Revisar archivo errores2.txt en el directorio $ResultsDirName"
exit -2 exit -2
fi fi
rm errores2.txt rm errores2.txt
#Comprobar que el número de archivos es correcto
#Pueden estar todos los archivos pero no estar los archivos
#completos -- Esto se comprueba más tarde
qtyG=$(ls R*/R*_Global.out | wc -l)
qtyG=$(($qtyG * 2))
qtyL=$(ls R*/R*_G?N*.out | wc -l)
if [ $qtyG == $qtyL ]
echo "El numero de ficheros G($qtyG) y L($qtyL) coincide"
#Si faltan archivos, se indican cuales faltan
echo "Faltan ejecuciones Locales o globales"
for ((i=1; i<$maxIndex; i++))
qtyEx=$(grep Tex -r Run$i | wc -l)
qtyIt=$(grep Top -r Run$i | wc -l)
qtyEx=$(($qtyEx * 2))
if [ $qtyEx -ne $qtyIt ]
echo "Faltan archivos en Run$i"
exit -1
#grep -rn "2.\." R* TODO Testear que el tiempo teorico maximo es valido?
#Comprobar si hay runs con tiempo negativos #Comprobar si hay runs con tiempo negativos
#Si los hay, reejecutar e informar de cuales son #Si los hay, reejecutar e informar de cuales son
grep - */R* | grep Tex > errores.txt grep - */R* | grep Tex > errores.txt
...@@ -40,9 +78,9 @@ then ...@@ -40,9 +78,9 @@ then
while IFS="" read -r lineRun || [ -n "$lineRun" ] while IFS="" read -r lineRun || [ -n "$lineRun" ]
do do
#Obtener datos de una ejecución erronea #Obtener datos de una ejecución erronea
run=$(echo $lineRun | cut -d '/R' -f2 | cut -d '_' -f1) run=$(echo $lineRun | cut -d 'R' -f3 | cut -d '_' -f1)
if [ $run -gt $maxIndex ] if [ $run -gt $maxIndex ]
then #Indice de ejecuciones posteriores then #Indice de ejecuciones posteriores echas a mano -- FIXME Eliminar?
realRun=$(($run - $maxIndex)) realRun=$(($run - $maxIndex))
index=$run index=$run
else # Indice de las primeras ejecuciones else # Indice de las primeras ejecuciones
...@@ -60,20 +98,53 @@ then ...@@ -60,20 +98,53 @@ then
for ((i=0; i<qty; i++)) for ((i=0; i<qty; i++))
do do
fin=$(grep -n - R* | grep Tex | cut -d ':' -f2 | head -n1) fin=$(grep -n - R* | grep Tex | cut -d ':' -f2 | head -n1)
init=$(($fin - 6)) init=$(($fin - 7))
sed -i ''$init','$fin'd' R${realRun}_Global.out sed -i ''$init','$fin'd' R${realRun}_Global.out
aux=$(($fin / 7)) #Utilizado para saber de entre las ejecuciones del fichero, cual es la erronea #Se borran las lineas de los ficheros locales asociados
aux=$(($fin / 8)) #Utilizado para saber de entre las ejecuciones del fichero, cual es la erronea
fin=$(($aux * 5)) fin=$(($aux * 5))
ini=$(($fin - 4)) init=$(($fin - 4))
for ((j=0; j<cantidadGrupos; j++)); do for ((j=0; j<cantidadGrupos; j++)); do
sed -i ''$init','$fin'd' R${realRun}_G${j}* sed -i ''$init','$fin'd' R${realRun}_G${j}*
done done
done done
#2 - Reelanzar ejecucion #2 - Reelanzar ejecucion
sbatch -N 2 $dir$execDir./ config$realRun.ini $index proc_list=$(grep Procs R${realRun}_Global.out | cut -d '=' -f3 | cut -d ',' -f1)
proc_parents=$(echo $proc_list | cut -d ' ' -f1)
proc_children=$(echo $proc_list | cut -d ' ' -f2)
nodes=8 # Maximo actual
if [ $proc_parents -gt $proc_children ]
nodes=$(($proc_parents / 20))
nodes=$(($proc_children / 20))
sbatch -N $nodes $dir$execDir./ config$realRun.ini $index
cd $dir$ResultsDir$ResultsDirName cd $dir$ResultsDir$ResultsDirName
done < errores.txt done < errores.txt
exit 0
#Comprobar que todas las ejecuciones tienen todas las ejecucciones que tocan
#Solo es necesario comprobar el global.
cd $dir$ResultsDir$ResultsDirName
for ((i=1; i<$maxIndex; i++))
qtyEx=$(grep Tex -r Run$i | wc -l)
if [ $qtyEx -ne $totalEjGrupo ]
echo "Faltan en $i, $diff ejecuciones"
if [ $qty_missing -eq 0 ]
echo "Todos los archivos tienen $totalEjGrupo ejecuciones"
fi fi
Los archivos de esta carpeta son para ejecutar pruebas con todas las posibles configuraciones. Los archivos de esta carpeta son para ejecutar pruebas con todas las posibles configuraciones.
Se tienen tres ficheros en esta carpeta: Se tienen tres ficheros en esta carpeta:
-- Para ejecutar una serie de pruebas. -- Para ejecutar una serie de pruebas.
-- Script para ejecutar por slurm las pruebas. Es llamado por
-- Para ejecutar pruebas con un fichero de configuración. -- Para ejecutar pruebas con un fichero de configuración.
-- Para comprobar que las ejecuciones realizadas por son correctas, y en caso de que algunas fallen, relanzarlas. -- Para comprobar que las ejecuciones realizadas por son correctas, y en caso de que algunas fallen, relanzarlas.
-- Crea un fichero de configuración de tipo "config.ini" a partir de los argumentos pasados -- Crea un fichero de configuración de tipo "config.ini" a partir de los argumentos pasados
-- Para ejecutar una serie de pruebas dedicadas a la creacion de procesos.
-- Script para ejecutar por slurm las pruebas. Es llamado por
Para que la mayoria de estos scripts funcionen correctamente es necesario compilar el código
en la carpeta "Codes". Ejecutar dentro de esa carpeta lo siguiente: "./compila -e"
-------------------------------- --------------------------------
Para ejecutar las pruebas se utiliza el comando: Para ejecutar las pruebas se utiliza el comando:
Para ejecutar las pruebas se utiliza el comando: bash grupos-hijos tamaño-matriz tamaño-comunicacion cantidad-datos-sincronos tiempo-iteracion proceso-tiempo iteraciones-por-grupo cantidad-nodos
Este script crea subcarpetas en "Results" donde almacena los resultados y los ficheros de configuración que crea. Este script crea subcarpetas en "Results" donde almacena los resultados y los ficheros de configuración que crea.
grupos-hijos: Es la cantidad de grupos hijos de procesos a ejecutar. Por tanto, el valor 1 indicaría el grupo de procesos padres y un grupo de procesos hijos. grupos-hijos: Es la cantidad de grupos hijos de procesos a ejecutar. Por tanto, el valor 1 indicaría el grupo de procesos padres y un grupo de procesos hijos.
...@@ -16,6 +22,8 @@ grupos-hijos: Es la cantidad de grupos hijos de procesos a ejecutar. Por tanto, ...@@ -16,6 +22,8 @@ grupos-hijos: Es la cantidad de grupos hijos de procesos a ejecutar. Por tanto,
tamaño-matriz: Cantidad de filas en la matriz rectangular. Esta matriz se utiliza para realizar el computo de la aplicación. tamaño-matriz: Cantidad de filas en la matriz rectangular. Esta matriz se utiliza para realizar el computo de la aplicación.
tamaño-comunicacion: Numero de bytes a comunicar en la aplicacion cuando se realiza computo.
cantidad-datos-sincronos: Indica la cantidad de bytes que se tienen que transmitir desde un grupo de procesos a otro en una redimensión cantidad-datos-sincronos: Indica la cantidad de bytes que se tienen que transmitir desde un grupo de procesos a otro en una redimensión
tiempo-iteracion: Indica la cantidad de segundos que deben pasar para que una iteración se considere terminada. tiempo-iteracion: Indica la cantidad de segundos que deben pasar para que una iteración se considere terminada.
...@@ -28,8 +36,13 @@ proceso-tiempo: Ligado al valor "tiempo-iteracion". Indica con cuantos procesos ...@@ -28,8 +36,13 @@ proceso-tiempo: Ligado al valor "tiempo-iteracion". Indica con cuantos procesos
iteraciones-por-grupo: Cantidad de iteraciones a realizar en cada grupo para que consideren terminada su ejecución. iteraciones-por-grupo: Cantidad de iteraciones a realizar en cada grupo para que consideren terminada su ejecución.
Actualmente todos los grupos de procesos realizan la misma cantidad de iteraciones. Actualmente todos los grupos de procesos realizan la misma cantidad de iteraciones.
primera-iter: Indica si el primer grupo de procesos sigue el valor en "iteraciones-por-grupo" (0), o realiza el numero indicado en este argumento antes de realizar la redistribucion (>=1).
cantidad-nodos: Cantidad de nodos a utilizar en las ejecuciones. La cantidad de nodos también influye en la cantidad de procesos por grupo, donde nunca habrá más procesos en un grupo cantidad-nodos: Cantidad de nodos a utilizar en las ejecuciones. La cantidad de nodos también influye en la cantidad de procesos por grupo, donde nunca habrá más procesos en un grupo
que núcleos entre todos los nodos. Si se elige el valor 2 y habiendo 20 núcleos por nodo, no se realizarán pruebas con más de 40 procesos por grupo. que núcleos entre todos los nodos. Si se elige el valor 2 y habiendo 20 núcleos por nodo, no se realizarán pruebas con más de 40 procesos por grupo.
bash 1 100000 10000000 1000000000 2 2 10 1 2
-------------------------------- --------------------------------
Para ejecutar una sola prueba con un fichero de configuración se utiliza el siguiente comando: Para ejecutar una sola prueba con un fichero de configuración se utiliza el siguiente comando:
#!/bin/bash #!/bin/bash
#SBATCH --exclude=n[06-07],c01 #SBATCH --exclude=c02,c01,c00
dir="/home/martini/malleability_benchmark" dir="/home/martini/malleability_benchmark"
codeDir="/Codes" codeDir="/Codes"
ResultsDir="/Results" ResultsDir="/Results"
module load mpich-3.4.1-noucx module load mpich-3.4.1-noucx
name_dir=$1 name_dir=$1
i=$2 i=$2
procs_parents=$3 procs_parents=$3
procs_sons=$4 procs_sons=$4
percs_array=(0 25 50 75 100) #percs_array=(0 25 50 75 100)
cst_array=(0 1 2 3)
css_array=(0 1)
aux=$(($i + 1)) aux=$(($i + 1))
echo "START TEST init=$aux" echo "START TEST init=$aux"
for adr_perc in "${percs_array[@]}" for adr_perc in "${percs_array[@]}"
do do
for phy_dist in "${dist_array[@]}"
for phy_dist in cpu node
do do
for ibarrier_use in "${at_array[@]}"
for ibarrier_use in 0 #TODO Poner a 0 1
do do
i=$(($i + 1)) for cst in "${cst_array[@]}"
cd $name_dir/Run$i do
config_file="config$i.ini" for css in "${css_array[@]}"
echo "EXEC $procs_parents -- $procs_sons -- $adr_perc -- $ibarrier_use -- $phy_dist -- RUN $i" i=$(($i + 1))
cd $name_dir/Run$i
for index in 1 2 3 echo "EXEC $procs_parents -- $procs_sons -- $adr_perc -- $ibarrier_use -- $phy_dist -- $cst -- $css -- RUN $i"
numP=$(bash $dir$codeDir/ $config_file) # Crea el fichero hostfile for index in 1 2 3 4 5 6 7 8 9 10
mpirun -f hostfile.o$SLURM_JOB_ID -np $numP $dir$codeDir/bench.out $config_file $i do
rm hostfile.o$SLURM_JOB_ID numP=$(bash $dir$codeDir/ $config_file) # Crea el fichero hostfile
mpirun -f hostfile.o$SLURM_JOB_ID $dir$codeDir/./bench.out $config_file $i $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID
done done
done done
done done
done done
echo "END TEST" echo "END TEST"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
import sys import sys
import glob import glob
def general(f, resizes, matrix_tam, sdr, adr, aib, time): def general(f, resizes, matrix_tam, comm_tam, sdr, adr, aib, cst, css, time):
f.write("[general]\n") f.write("[general]\n")
f.write("resizes=" + resizes +"\n") f.write("resizes=" + resizes +"\n")
f.write("matrix_tam=" + matrix_tam +"\n") f.write("matrix_tam=" + matrix_tam +"\n")
f.write("comm_tam=" + comm_tam +"\n")
f.write("SDR=" + sdr +"\n") f.write("SDR=" + sdr +"\n")
f.write("ADR=" + adr +"\n") f.write("ADR=" + adr +"\n")
f.write("AIB=" + aib +"\n") f.write("AIB=" + aib +"\n")
f.write("CST=" + cst +"\n")
f.write("CSS=" + css +"\n")
f.write("time=" + time +"\n") f.write("time=" + time +"\n")
f.write("; end [general]\n") f.write("; end [general]\n")
...@@ -20,21 +23,24 @@ def resize_section(f, resize, iters, procs, factor, physical_dist): ...@@ -20,21 +23,24 @@ def resize_section(f, resize, iters, procs, factor, physical_dist):
f.write(";end [resize" + resize +"]\n") f.write(";end [resize" + resize +"]\n")
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("The config file name is missing\nUsage: python3 program nameFile args\nArgs: resizes matrix_tam SDR ADR AIB time iters0 procs0 dist0 iters1 procs1 dist1 ...") print("The config file name is missing\nUsage: python3 program nameFile args\nArgs: resizes matrix_tam SDR ADR AIB CST CSS time iters0 procs0 dist0 iters1 procs1 dist1 ...")
exit(1) exit(1)
if len(sys.argv) < 12: if len(sys.argv) < 12:
print("The are not enough arguments\nUsage: python3 program nameFile args\nArgs: resizes matrix_tam SDR ADR_perc AIB time proc_time iters0 procs0 dist0 iters1 procs1 dist1 ...") print("The are not enough arguments\nUsage: python3 program nameFile args\nArgs: resizes matrix_tam SDR ADR_perc AIB CST CSS time proc_time iters0 procs0 dist0 iters1 procs1 dist1 ...")
exit(1) exit(1)
name = sys.argv[1] name = sys.argv[1]
resizes = int(sys.argv[2]) resizes = int(sys.argv[2])
matrix_tam = sys.argv[3] matrix_tam = sys.argv[3]
sdr = int(sys.argv[4]) comm_tam = sys.argv[4]
adr_perc = float(sys.argv[5]) sdr = int(sys.argv[5])
aib = sys.argv[6] adr_perc = float(sys.argv[6])
time = sys.argv[7] aib = sys.argv[7]
proc_time = float(sys.argv[8]) # Usado para calcular el factor de cada proceso cst = sys.argv[8]
css = sys.argv[9]
time = sys.argv[10]
proc_time = float(sys.argv[11]) # Usado para calcular el factor de cada proceso
adr = (sdr * adr_perc) / 100 adr = (sdr * adr_perc) / 100
sdr = sdr - adr sdr = sdr - adr
...@@ -44,13 +50,13 @@ sdr = str(sdr) ...@@ -44,13 +50,13 @@ sdr = str(sdr)
factor = 0 factor = 0
f = open(name, "w") f = open(name, "w")
general(f, str(resizes), matrix_tam, sdr, adr, aib, time) general(f, str(resizes), matrix_tam, comm_tam, sdr, adr, aib, cst, css, time)
resizes = resizes + 1 # Internamente, los primeros procesos se muestran como un grupo resizes = resizes + 1 # Internamente, los primeros procesos se muestran como un grupo
for resize in range(resizes): for resize in range(resizes):
iters = sys.argv[9 + 3 * resize] iters = sys.argv[12 + 3 * resize]
procs = sys.argv[9 + 3 * resize + 1] procs = sys.argv[12 + 3 * resize + 1]
physical_dist = sys.argv[9 + 3 * resize + 2] physical_dist = sys.argv[12 + 3 * resize + 2]
if proc_time != 0: # Si el argumento proc_time es 0, todos los grupos tienen un factor de 1 if proc_time != 0: # Si el argumento proc_time es 0, todos los grupos tienen un factor de 1
factor = proc_time / float(procs) factor = proc_time / float(procs)
...@@ -6,82 +6,131 @@ execDir="Exec/" ...@@ -6,82 +6,131 @@ execDir="Exec/"
ResultsDir="Results/" ResultsDir="Results/"
#TODO Añadir diferenciar phy_dist de padres e hijos al ejecutar #TODO Añadir diferenciar phy_dist de padres e hijos al ejecutar
#TODO Añadir que se considere la cantidad de nucleos de un nodo y no este fijada
if [[ $# -lt 9 ]]
echo "Faltan argumentos"
echo "bash grupos tam_computo tam_comm tam_resize tiempo proc_init iters first_iter node_qty"
exit -1
groups=$1 #TODO Modificar para que admita más de dos grupos de procesos groups=$1 #TODO Modificar para que admita más de dos grupos de procesos
matrix_tam=$2 matrix_tam=$2
N_qty=$3 # Datos a redistribuir comm_tam=$3
time=$4 N_qty=$4 # Datos a redistribuir
proc_init=$5 #El tiempo por iteracion es para esta cantidad de procesos time=$5
iters=$6 proc_init=$6 #El tiempo por iteracion es para esta cantidad de procesos
node_qty=$7 iters=$7
# Si el valor es 0 el primer grupo de procesos realiza $iters iteraciones antes de redimensionar
if [[ $first_iter -eq 0 ]]
# Si el valor es diferente a 0, el primer grupo de procesos realiza $first_iter iteraciones antes de redimensionar
max_procs=$(($node_qty * 20)) max_procs=$(($node_qty * 20))
procs_array=() procs_array=(1 10)
percs_array=(0 25 50 75 100) #percs_array=(0 25 50 75 100)
i=1 percs_array=(0)
value=$((2 ** $i)) at_array=(0)
procs_array=(${procs_array[@]} $value) dist_array=(cpu)
cst_array=(0 1 2 3)
css_array=(0 1)
#Obtener cantidades de procesos posibles a ejecutar #Obtener cantidades de procesos posibles a ejecutar
while [[ $value -le $max_procs ]] i=0
#while [[ $value -lt $max_procs ]]
# i=$(($i + 1))
# value=$((20 * $i))
# procs_array=(${procs_array[@]} $value)
while [[ $value -lt $max_procs ]]
do do
i=$(($i + 1)) i=$(($i + 1))
value=$((2 ** $i)) value=$((2 ** $i))
value=$(($value * 10))
procs_array=(${procs_array[@]} $value) procs_array=(${procs_array[@]} $value)
done done
unset procs_array[-1]
#Crear carpeta de resultados #Crear carpeta de resultados
cd $dir$ResultsDir cd $dir$ResultsDir
name_res=$node_qty"N-"$(date '+%m-%d') name_res=$node_qty"N-"$(date '+%m-%d')
if [ -d $name_res ] # Si ya existe el directorio, modificar levemente el nombre y crear otro
name_res=$name_res"-"$(date '+%H:%M')
echo "Localizacion de los resultados: $dir$ResultsDir$name_res" echo "Localizacion de los resultados: $dir$ResultsDir$name_res"
mkdir $name_res mkdir $name_res
# Ejecutar pruebas # Ejecutar pruebas
i=0 i=0
j=0 j=0
for procs_parents in "${procs_array[@]}" for procs_parents in "${procs_array[@]}"
do do
node_qty1=$(($procs_parents / 20))
for procs_sons in "${procs_array[@]}" for procs_sons in "${procs_array[@]}"
do do
if [ $procs_sons -ne $procs_parents ] node_qty2=$(($procs_sons / 20))
if [ $node_qty1 -lt $node_qty2 ]
if [ $node_qty1 -eq 0 ]
then then
if [ $procs_parents -ne $procs_sons ]
for adr_perc in "${percs_array[@]}" for adr_perc in "${percs_array[@]}"
do do
for phy_dist in "${dist_array[@]}"
for phy_dist in cpu node
do do
for ibarrier_use in "${at_array[@]}"
for ibarrier_use in 0 #TODO Poner a 0 1
do do
i=$(($i + 1)) for cst in "${cst_array[@]}"
# Crear directorio para esta ejecucion for css in "${css_array[@]}"
cd $dir$ResultsDir$name_res do
mkdir Run$i i=$(($i + 1))
cd Run$i
# Crear archivo de configuracion # Crear directorio para esta ejecucion
echo "Config $procs_parents -- $procs_sons -- $adr_perc -- $ibarrier_use -- $phy_dist -- RUN $i" cd $dir$ResultsDir$name_res
array0=($iters $procs_parents $phy_dist) mkdir Run$i
array=("${array0[@]}") cd Run$i
array0=($iters $procs_sons $phy_dist)
python3 $dir$execDir/./ config$i.ini 1 $matrix_tam $N_qty $adr_perc $ibarrier_use $time $proc_init "${array[@]}"
# Crear archivo de configuracion
echo "Config $procs_parents -- $procs_sons -- $adr_perc -- $ibarrier_use -- $phy_dist -- $cst -- $css -- RUN $i"
array0=($iters_first_group $procs_parents $phy_dist)
array0=($iters $procs_sons $phy_dist)
python3 $dir$execDir/./ config$i.ini 1 $matrix_tam $comm_tam $N_qty $adr_perc $ibarrier_use $cst $css $time $proc_init "${array[@]}"
done done
done done
done done #adr_perc
aux=$(($j * 10)) #TODO Poner a 20 cuando se use ibarrier
echo $aux
sbatch -N $node_qty $dir$execDir./ $dir$ResultsDir$name_res $aux $procs_parents $procs_sons
j=$(($j + 1))
start_i=$(($j * ${#percs_array[@]} * ${#dist_array[@]} * ${#at_array[@]} * ${#cst_array[@]} * ${#css_array[@]}))
echo $aux
sbatch -N $node_qty1 $dir$execDir./ $dir$ResultsDir$name_res $start_i $procs_parents $procs_sons
j=$(($j + 1))
fi fi
done done
done done
echo "Localizacion de los resultados: $dir$ResultsDir$name_res"
echo "END TEST" echo "END TEST"
#!/bin/bash #!/bin/bash
#SBATCH --exclude=c01 #SBATCH --exclude=c02,c01,c00
dir="/home/martini/malleability_benchmark" dir="/home/martini/malleability_benchmark"
codeDir="/Codes" codeDir="/Codes"
ResultsDir="/Results" ResultsDir="/Results"
module load mpich-3.4.1-noucx module load mpich-3.4.1-noucx
...@@ -22,9 +26,11 @@ fi ...@@ -22,9 +26,11 @@ fi
for ((i=0; i<qty; i++)) for ((i=0; i<qty; i++))
do do
echo "Iter $i"
numP=$(bash $dir$codeDir/ $1) numP=$(bash $dir$codeDir/ $1)
mpirun -f hostfile.o$SLURM_JOB_ID -np $numP $dir$codeDir/bench.out $1 $2 mpirun -f hostfile.o$SLURM_JOB_ID $dir$codeDir/bench.out $1 $2 $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID rm hostfile.o$SLURM_JOB_ID
done done
echo "END TEST" echo "END TEST"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
