Commit 5eb9cddd authored by Iker Martín's avatar Iker Martín
Browse files

Version 2 de aplicación

parent 2663ec23
......@@ -3,16 +3,13 @@
#include <mpi.h>
#include <string.h>
#include <slurm/slurm.h>
#include "malleabilityStates.h"
#define COMM_UNRESERVED -2
#define COMM_IN_PROGRESS -1
#define COMM_FINISHED 0
int init_slurm_comm(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int numP, int numC, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child);
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread, double *end_real_time);
#define COMM_PHY_NODES 1
#define COMM_PHY_CPU 2
void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm);
#define COMM_SPAWN_SERIAL 0
#define COMM_SPAWN_PTHREAD 1
int init_slurm_comm(char **argv, int myId, int numP, int root, int type_dist, int type_creation);
int check_slurm_comm(int myId, int root, MPI_Comm comm, MPI_Comm *child);
void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group);
void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId);
#include <pthread.h>
#include "malleabilityManager.h"
#include "malleabilityStates.h"
#include "malleabilityTypes.h"
#include "malleabilityZombies.h"
#include "ProcessDist.h"
#include "CommDist.h"
#define MALLEABILITY_ROOT 0
#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous);
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous);
void Children_init();
int spawn_step();
int start_redistribution();
int check_redistribution();
int end_redistribution();
int shrink_redistribution();
int thread_creation();
int thread_check();
void* thread_async_work(void* void_arg);
typedef struct {
int spawn_type;
int spawn_dist;
int spawn_is_single;
int spawn_threaded;
int comm_type;
int comm_threaded;
int grp;
configuration *config_file;
results_data *results;
} malleability_config_t;
typedef struct { //FIXME numC_spawned no se esta usando
int myId, numP, numC, numC_spawned, root, root_parents;
pthread_t async_thread;
MPI_Comm comm, thread_comm;
MPI_Comm intercomm;
MPI_Comm user_comm;
char *name_exec, *nodelist;
int num_cpus, num_nodes;
} malleability_t;
int state = MAL_UNRESERVED; //FIXME Mover a otro lado
malleability_config_t *mall_conf;
malleability_t *mall;
malleability_data_t *rep_s_data;
malleability_data_t *dist_s_data;
malleability_data_t *rep_a_data;
malleability_data_t *dist_a_data;
/*
* Inicializa la reserva de memoria para el modulo de maleabilidad
* creando todas las estructuras necesarias y copias de comunicadores
* para no interferir en la aplicación.
*
* Si es llamada por un grupo de procesos creados de forma dinámica,
* inicializan la comunicacion con sus padres. En este caso, al terminar
* la comunicacion los procesos hijo estan preparados para ejecutar la
* aplicacion.
*/
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes) {
MPI_Comm dup_comm, thread_comm;
mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
mall = (malleability_t *) malloc(sizeof(malleability_t));
rep_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_s_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
MPI_Comm_dup(comm, &dup_comm);
MPI_Comm_dup(comm, &thread_comm);
mall->myId = myId;
mall->numP = numP;
mall->root = root;
mall->comm = dup_comm;
mall->thread_comm = thread_comm; // TODO Refactor -- Crear solo si es necesario?
mall->user_comm = comm;
mall->name_exec = name_exec;
mall->nodelist = nodelist;
mall->num_cpus = num_cpus;
mall->num_nodes = num_nodes;
rep_s_data->entries = 0;
rep_a_data->entries = 0;
dist_s_data->entries = 0;
dist_a_data->entries = 0;
state = MAL_NOT_STARTED;
// Si son el primer grupo de procesos, obtienen los datos de los padres
MPI_Comm_get_parent(&(mall->intercomm));
if(mall->intercomm != MPI_COMM_NULL ) {
Children_init();
return MALLEABILITY_CHILDREN;
}
zombies_service_init();
return MALLEABILITY_NOT_CHILDREN;
}
/*
* Elimina toda la memoria reservado por el modulo
* de maleabilidad y asegura que los zombies
* despierten si los hubiese.
*/
void free_malleability() {
free_malleability_data_struct(rep_s_data);
free_malleability_data_struct(rep_a_data);
free_malleability_data_struct(dist_s_data);
free_malleability_data_struct(dist_a_data);
free(rep_s_data);
free(rep_a_data);
free(dist_s_data);
free(dist_a_data);
//MPI_Comm_free(&(mall->comm)); // TODO Revisar si hace falta?
//MPI_Comm_free(&(mall->thread_comm));
free(mall);
free(mall_conf);
zombies_awake();
zombies_service_free();
state = MAL_UNRESERVED;
}
/*
* Se realiza el redimensionado de procesos por parte de los padres.
*
* Se crean los nuevos procesos con la distribucion fisica elegida y
* a continuacion se transmite la informacion a los mismos.
*
* Si hay datos asincronos a transmitir, primero se comienza a
* transmitir estos y se termina la funcion. Se tiene que comprobar con
* llamando a la función de nuevo que se han terminado de enviar
*
* Si hay ademas datos sincronos a enviar, no se envian aun.
*
* Si solo hay datos sincronos se envian tras la creacion de los procesos
* y finalmente se desconectan los dos grupos de procesos.
*/
int malleability_checkpoint() {
if(state == MAL_UNRESERVED) return MAL_UNRESERVED;
if(state == MAL_NOT_STARTED) {
// Comprobar si se tiene que realizar un redimensionado
//if(CHECK_RMS()) {return MAL_DENIED;}
state = spawn_step();
if (state == MAL_SPAWN_COMPLETED){
state = start_redistribution();
}
} else if(state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
double end_real_time;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD && mall->numP > mall->numC) {
state = shrink_redistribution(); //TODO REFACTOR
} else {
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm, &end_real_time);
if (state == MAL_SPAWN_COMPLETED) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start;
}
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos
state = start_redistribution();
}
}
} else if(state == MAL_DIST_PENDING) {
if(mall_conf->comm_type == MAL_USE_THREAD) {
state = thread_check();
} else {
state = check_redistribution();
}
}
return state;
}
// Funciones solo necesarias por el benchmark
//-------------------------------------------------------------------------------------------------------------
void set_benchmark_grp(int grp) {
mall_conf->grp = grp;
}
void set_benchmark_configuration(configuration *config_file) {
mall_conf->config_file = config_file;
}
void get_benchmark_configuration(configuration **config_file) {
*config_file = mall_conf->config_file;
}
void set_benchmark_results(results_data *results) {
mall_conf->results = results;
}
void get_benchmark_results(results_data **results) {
*results = mall_conf->results;
}
//-------------------------------------------------------------------------------------------------------------
void set_malleability_configuration(int spawn_type, int spawn_is_single, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded) {
mall_conf->spawn_type = spawn_type;
mall_conf->spawn_is_single = spawn_is_single;
mall_conf->spawn_dist = spawn_dist;
mall_conf->spawn_threaded = spawn_threaded;
mall_conf->comm_type = comm_type;
mall_conf->comm_threaded = comm_threaded;
}
/*
* To be deprecated
* Tiene que ser llamado despues de setear la config
*/
void set_children_number(int numC){
if((mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) && (numC - mall->numP >= 0)) {
mall->numC = numC;
mall->numC_spawned = numC - mall->numP;
if(numC == mall->numP) { // Migrar
mall->numC_spawned = numC;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->spawn_type = COMM_SPAWN_SERIAL;
else
mall_conf->spawn_type = COMM_SPAWN_PTHREAD;
}
} else {
mall->numC = numC;
mall->numC_spawned = numC;
}
}
/*
* TODO
*/
void get_malleability_user_comm(MPI_Comm *comm) {
*comm = mall->user_comm;
}
/*
* Anyade a la estructura concreta de datos elegida
* el nuevo set de datos "data" de un total de "total_qty" elementos.
*
* Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
*
* Mas informacion en la funcion "add_data".
*/
void malleability_add_data(void *data, int total_qty, int type, int is_replicated, int is_constant) {
if(is_constant) {
if(is_replicated) {
add_data(data, total_qty, type, 0, rep_s_data); //FIXME Numero magico
} else {
add_data(data, total_qty, type, 0, dist_s_data); //FIXME Numero magico
}
} else {
if(is_replicated) {
add_data(data, total_qty, type, 0, rep_a_data); //FIXME Numero magico || Un request?
} else {
int total_reqs = 0;
if(mall_conf->comm_type == MAL_USE_NORMAL) {
total_reqs = 1;
} else if(mall_conf->comm_type == MAL_USE_IBARRIER) {
total_reqs = 2;
} else if(mall_conf->comm_type == MAL_USE_POINT) {
total_reqs = mall->numC;
}
add_data(data, total_qty, type, total_reqs, dist_a_data);
}
}
}
/*
* Devuelve el numero de entradas para la estructura de descripcion de
* datos elegida.
*/
void malleability_get_entries(int *entries, int is_replicated, int is_constant){
if(is_constant) {
if(is_replicated) {
*entries = rep_s_data->entries;
} else {
*entries = dist_s_data->entries;
}
} else {
if(is_replicated) {
*entries = rep_a_data->entries;
} else {
*entries = dist_a_data->entries;
}
}
}
/*
* Devuelve el elemento de la lista "index" al usuario.
* La devolución es en el mismo orden que lo han metido los padres
* con la funcion "malleability_add_data()".
* Es tarea del usuario saber el tipo de esos datos.
* TODO Refactor a que sea automatico
*/
void malleability_get_data(void **data, int index, int is_replicated, int is_constant) {
malleability_data_t *data_struct;
if(is_constant) {
if(is_replicated) {
data_struct = rep_s_data;
} else {
data_struct = dist_s_data;
}
} else {
if(is_replicated) {
data_struct = rep_a_data;
} else {
data_struct = dist_a_data;
}
}
*data = data_struct->arrays[index];
}
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//================DATA COMMUNICATION====================||
//======================================================||
//======================================================||
/*
* Funcion generalizada para enviar datos desde los hijos.
* La asincronizidad se refiere a si el hilo padre e hijo lo hacen
* de forma bloqueante o no. El padre puede tener varios hilos.
*/
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
int i;
char *aux;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_async(aux, data_struct->qty[i], mall->myId, mall->numP, mall->root, mall->intercomm, numP_children, data_struct->requests, mall_conf->comm_type);
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_sync(aux, data_struct->qty[i], mall->myId, mall->numP, mall->root, mall->intercomm, numP_children);
}
}
}
/*
* Funcion generalizada para recibir datos desde los hijos.
* La asincronizidad se refiere a si el hilo padre e hijo lo hacen
* de forma bloqueante o no. El padre puede tener varios hilos.
*/
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
int i;
char *aux;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_async(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->root, mall->intercomm, numP_parents, mall_conf->comm_type);
data_struct->arrays[i] = (void *) aux;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_sync(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->root, mall->intercomm, numP_parents);
data_struct->arrays[i] = (void *) aux;
}
}
}
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
* Inicializacion de los datos de los hijos.
* En la misma se reciben datos de los padres: La configuracion
* de la ejecucion a realizar; y los datos a recibir de los padres
* ya sea de forma sincrona, asincrona o ambas.
*/
void Children_init() {
int numP_parents, root_parents, i;
int spawn_is_single;
MPI_Comm aux;
MPI_Bcast(&spawn_is_single, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
if(spawn_is_single) {
malleability_establish_connection(mall->myId, MALLEABILITY_ROOT, &(mall->intercomm));
}
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&root_parents, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&numP_parents, 1, MPI_INT, root_parents, mall->intercomm);
mall_conf->config_file = recv_config_file(mall->root, mall->intercomm);
mall_conf->results = (results_data *) malloc(sizeof(results_data));
init_results_data(mall_conf->results, mall_conf->config_file->resizes, RESULTS_INIT_DATA_QTY);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(mall_conf->comm_type == MAL_USE_NORMAL || mall_conf->comm_type == MAL_USE_IBARRIER || mall_conf->comm_type == MAL_USE_POINT) {
recv_data(numP_parents, dist_a_data, 1);
} else if (mall_conf->comm_type == MAL_USE_THREAD) { //TODO Modificar uso para que tenga sentido comm_threaded
recv_data(numP_parents, dist_a_data, 0);
}
mall_conf->results->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
recv_data(numP_parents, dist_s_data, 0);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo y qty
for(i=0; i<rep_s_data->entries; i++) {
MPI_Datatype datatype;
if(rep_s_data->types[i] == MAL_INT) {
datatype = MPI_INT;
} else {
datatype = MPI_CHAR;
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
}
}
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
}
// Guardar los resultados de esta transmision
recv_results(mall_conf->results, mall->root, mall_conf->config_file->resizes, mall->intercomm);
MPI_Comm_disconnect(&(mall->intercomm));
}
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//=====================PARENTS==========================||
//======================================================||
//======================================================||
/*
* Se encarga de realizar la creacion de los procesos hijos.
* Si se pide en segundo plano devuelve el estado actual.
*/
int spawn_step(){
mall_conf->results->spawn_start = MPI_Wtime();
if((mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) && mall->numP > mall->numC) {
state = shrink_redistribution();
return state;
}
state = init_slurm_comm(mall->name_exec, mall->num_cpus, mall->num_nodes, mall->nodelist, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm));
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
else if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
//mall_conf->results->spawn_thread_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
//mall_conf->results->spawn_start = MPI_Wtime();
}
return state;
}
/*
* Comienza la redistribucion de los datos con el nuevo grupo de procesos.
*
* Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
* se realiza el envio asincrono y/o sincrono si lo hay.
*
* En caso de que haya comunicacion asincrona, se comienza y se termina la funcion
* indicando que se ha comenzado un envio asincrono.
*
* Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
*
* Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
* grupos de procesos.
*/
int start_redistribution() {
int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->root), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->numP), 1, MPI_INT, rootBcast, mall->intercomm);
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
mall_conf->results->async_start = MPI_Wtime();
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(mall_conf->comm_type == MAL_USE_THREAD) {
return thread_creation();
} else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
return MAL_DIST_PENDING;
}
}
return end_redistribution();
}
/*
* @deprecated
* Comprueba si la redistribucion asincrona ha terminado.
* Si no ha terminado la funcion termina indicandolo, en caso contrario,
* se continua con la comunicacion sincrona, el envio de resultados y
* se desconectan los grupos de procesos.
*
* Esta funcion permite dos modos de funcionamiento al comprobar si la
* comunicacion asincrona ha terminado.
* Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera
* terminada cuando los padres terminan de enviar.
* Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
* los hijos han terminado de recibir.
*/
int check_redistribution() {
int completed, all_completed, test_err;
MPI_Request *req_completed;
//dist_a_data->requests[0][X] //FIXME Numero magico 0 -- Modificar para que sea un for?
if (mall_conf->comm_type == MAL_USE_POINT) {
test_err = MPI_Testall(mall->numC, dist_a_data->requests[0], &completed, MPI_STATUSES_IGNORE);
} else {
if(mall_conf->comm_type == MAL_USE_NORMAL) {
req_completed = &(dist_a_data->requests[0][0]);
} else if (mall_conf->comm_type == MAL_USE_IBARRIER) {
req_completed = &(dist_a_data->requests[0][1]);
}
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
}
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", mall->myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
}
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MAL_DIST_PENDING; // Continue only if asynchronous send has ended
if(mall_conf->comm_type == MAL_USE_IBARRIER) {
MPI_Wait(&(dist_a_data->requests[0][0]), MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
}
return end_redistribution();
}
/*
* Termina la redistribución de los datos con los hijos, comprobando
* si se han realizado iteraciones con comunicaciones en segundo plano
* y enviando cuantas iteraciones se han realizado a los hijos.
*
* Además se realizan las comunicaciones síncronas se las hay.
* Finalmente termina enviando los datos temporales a los hijos.
*/
int end_redistribution() {
int result, i, rootBcast = MPI_PROC_NULL;
MPI_Comm aux;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo
for(i=0; i<rep_s_data->entries; i++) {
MPI_Datatype datatype;
if(rep_s_data->types[i] == MAL_INT) {
datatype = MPI_INT;
} else {
datatype = MPI_CHAR;
}
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, rootBcast, mall->intercomm);
}
}
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
double time_adapt = MPI_Wtime();
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_adapt;
// result = MAL_DIST_ADAPTED;
}
send_results(mall_conf->results, rootBcast, mall_conf->config_file->resizes, mall->intercomm);
result = MAL_DIST_COMPLETED;
MPI_Comm_disconnect(&(mall->intercomm));
state = MAL_NOT_STARTED;
return result;
}
///=============================================
///=============================================
///=============================================
double time_adapt, time_adapt_end;
int state_shrink=0; //TODO Refactor
pthread_t thread_shrink;
MPI_Comm comm_shrink;
int thread_shrink_creation();
void *thread_shrink_work();
/*
* Crea una hebra para ejecutar una comunicación en segundo plano.
*/
int thread_shrink_creation() {
if(pthread_create(&thread_shrink, NULL, thread_shrink_work, NULL)) {
printf("Error al crear el hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return MAL_SPAWN_PENDING;
}
void* thread_shrink_work() {
proc_adapt_shrink(mall->numC, &comm_shrink, mall->myId);
time_adapt_end = MPI_Wtime();
state_shrink=2;
pthread_exit(NULL);
}
///=============================================
///=============================================
///=============================================
int shrink_redistribution() {
int global_state;
double time_aux;
MPI_Comm aux_comm;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
if(state_shrink == 0) {
time_adapt = MPI_Wtime();
state_shrink = 1;
MPI_Comm_dup(mall->comm, &comm_shrink);
thread_shrink_creation();
return MAL_SPAWN_PENDING;
} else if(state_shrink>0) {
MPI_Allreduce(&state_shrink, &global_state, 1, MPI_INT, MPI_MIN, mall->comm);
if(global_state < 2) return MAL_SPAWN_PENDING;
time_aux = MPI_Wtime();
if(pthread_join(thread_shrink, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
MPI_Comm_dup(mall->comm, &aux_comm);
mall->comm = comm_shrink;
}
} else {
time_adapt = MPI_Wtime();
MPI_Comm_dup(mall->comm, &aux_comm);
proc_adapt_shrink( mall->numC, &(mall->comm), mall->myId);
}
//TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos
zombies_collect_suspended(aux_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall->user_comm);
if(mall->myId < mall->numC) {
MPI_Comm_free(&aux_comm);
MPI_Comm_dup(mall->comm, &aux_comm);
mall->thread_comm = aux_comm;
MPI_Comm_dup(mall->comm, &aux_comm);
mall->user_comm = aux_comm;
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
mall_conf->results->spawn_real_time[mall_conf->grp] = time_adapt_end - time_adapt + MPI_Wtime() - time_aux;
}
return MAL_DIST_COMPLETED; //FIXME Refactor Poner a SPAWN_COMPLETED
} else {
return MAL_ZOMBIE;
}
}
// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
//===============COMM PARENTS THREADS===================||
//======================================================||
//======================================================||
/*
* Crea una hebra para ejecutar una comunicación en segundo plano.
*/
int thread_creation() {
if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
printf("Error al crear el hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return MAL_DIST_PENDING;
}
/*
* Comprobación por parte de una hebra maestra que indica
* si una hebra esclava ha terminado su comunicación en segundo plano.
*
* El estado de la comunicación es devuelto al finalizar la función.
*/
int thread_check() {
int all_completed = 0;
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
if(all_completed != MAL_DIST_COMPLETED) return MAL_DIST_PENDING; // Continue only if asynchronous send has ended
//FIXME No se tiene en cuenta el estado MAL_APP_ENDED
if(pthread_join(mall->async_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -2;
}
return end_redistribution();
}
/*
* Función ejecutada por una hebra.
* Ejecuta una comunicación síncrona con los hijos que
* para el usuario se puede considerar como en segundo plano.
*
* Cuando termina la comunicación la hebra maestra puede comprobarlo
* por el valor "commAsync".
*/
void* thread_async_work(void* void_arg) {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
state = MAL_DIST_COMPLETED;
pthread_exit(NULL);
}
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mpi.h>
#include "../IOcodes/read_ini.h"
#include "../IOcodes/results.h"
#include "malleabilityStates.h"
int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_exec, char *nodelist, int num_cpus, int num_nodes);
void free_malleability();
void indicate_ending_malleability(int new_outside_state);
int malleability_checkpoint();
void set_benchmark_grp(int grp);
void set_malleability_configuration(int spawn_type, int spawn_is_single, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded);
void set_children_number(int numC); // TODO TO BE DEPRECATED
void get_malleability_user_comm(MPI_Comm *comm);
void malleability_add_data(void *data, int total_qty, int type, int is_replicated, int is_constant);
void malleability_get_entries(int *entries, int is_replicated, int is_constant);
void malleability_get_data(void **data, int index, int is_replicated, int is_constant);
void set_benchmark_configuration(configuration *config_file);
void get_benchmark_configuration(configuration **config_file);
void set_benchmark_results(results_data *results);
void get_benchmark_results(results_data **results);
//States
#define MAL_UNRESERVED -1
#define MAL_DENIED -2
#define MAL_ZOMBIE -3
#define MAL_NOT_STARTED 0
#define MAL_SPAWN_PENDING 1
#define MAL_SPAWN_SINGLE_START 2
#define MAL_SPAWN_SINGLE_PENDING 3
#define MAL_SPAWN_COMPLETED 4
#define MAL_DIST_PENDING 5
#define MAL_DIST_COMPLETED 6
#define MAL_DIST_ADAPTED 7
#define MAL_APP_EXECUTING 0
#define MAL_APP_ENDED 1
// TODO Refactor
#define COMM_PHY_NODES 1
#define COMM_PHY_CPU 2
// SPAWN METHODS
#define COMM_SPAWN_SERIAL 0
#define COMM_SPAWN_PTHREAD 1
#define COMM_SPAWN_MERGE 2
#define COMM_SPAWN_MERGE_PTHREAD 3
//#define COMM_SPAWN_BASELINE 0
//#define COMM_SPAWN_MERGE 1
//SPAWN STRATEGIES
#define COMM_SPAWN_MULTIPLE 0
#define COMM_SPAWN_SINGLE 1
//#define COMM_SPAWN_SERIAL 0
//#define COMM_SPAWN_PTHREAD 1
#define MAL_USE_NORMAL 0
#define MAL_USE_IBARRIER 1
#define MAL_USE_POINT 2
#define MAL_USE_THREAD 3
#define MAL_INT 0
#define MAL_CHAR 1
#define MALLEABILITY_CHILDREN 1
#define MALLEABILITY_NOT_CHILDREN 0
#include "malleabilityTypes.h"
void init_malleability_data_struct(malleability_data_t *data_struct, int size);
void realloc_malleability_data_struct(malleability_data_t *data_struct, int qty_to_add);
void def_malleability_entries(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type);
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type);
//======================================================||
//======================================================||
//===================PUBLIC FUNCTIONS===================||
//======================================================||
//======================================================||
/*
* Anyade en la estructura de datos a comunicar con los hijos
* un nuevo set de datos de un total "total_qty" distribuido entre
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
*/
void add_data(void *data, int total_qty, int type, int request_qty, malleability_data_t *data_struct) {
int i;
if(data_struct->entries == 0) {
init_malleability_data_struct(data_struct, MALLEABILITY_INIT_DATA_QTY);
} else if(data_struct->entries == data_struct->max_entries) {
realloc_malleability_data_struct(data_struct, MALLEABILITY_INIT_DATA_QTY);
}
data_struct->qty[data_struct->entries] = total_qty;
data_struct->types[data_struct->entries] = type;
data_struct->arrays[data_struct->entries] = data;
data_struct->requests[data_struct->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request));
for(i=0; i < request_qty; i++) {
data_struct->requests[data_struct->entries][i] = MPI_REQUEST_NULL;
}
data_struct->entries+=1;
}
/*
* Comunicar desde los padres a los hijos las estructuras de datos sincronas o asincronas
* No es necesario que las estructuras esten inicializadas para un buen funcionamiento.
*
* En el argumento "root" todos tienen que indicar quien es el proceso raiz de los padres
* unicamente.
*/
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) {
int i, rootBcast = MPI_PROC_NULL;
MPI_Datatype entries_type, struct_type;
if(is_children_group) {
rootBcast = root;
} else {
if(myId == root) rootBcast = MPI_ROOT;
}
// Mandar primero numero de entradas
def_malleability_entries(data_struct_dist, data_struct_rep, &entries_type);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, rootBcast, intercomm);
if(is_children_group) {
if(data_struct_rep->entries != 0) init_malleability_data_struct(data_struct_rep, data_struct_rep->entries);
if(data_struct_dist->entries != 0) init_malleability_data_struct(data_struct_dist, data_struct_dist->entries);
}
def_malleability_qty_type(data_struct_dist, data_struct_rep, &struct_type);
MPI_Bcast(MPI_BOTTOM, 1, struct_type, rootBcast, intercomm); //FIXME Doy error
if(is_children_group) {
//data_struct->requests[data_struct->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request)); FIXME Crear los requests?
//data_struct->requests[data_struct->entries][i] = MPI_REQUEST_NULL;
for(i=0; i < data_struct_rep->entries; i++) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int
}
for(i=0; i < data_struct_dist->entries; i++) {
data_struct_dist->arrays[i] = (void *) malloc(data_struct_dist->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int
}
}
MPI_Type_free(&entries_type);
MPI_Type_free(&struct_type);
}
//======================================================||
//======================================================||
//=========INIT/REALLOC/FREE RESULTS FUNCTIONS==========||
//======================================================||
//======================================================||
/*
* Inicializa la estructura que describe una serie de datos con las mismas
* caracteristicas de localización y uso. Se inicializa para utilizar hasta
* "size" elementos.
*/
void init_malleability_data_struct(malleability_data_t *data_struct, int size) {
data_struct->max_entries = size;
data_struct->qty = (int *) malloc(size * sizeof(int));
data_struct->types = (int *) malloc(size * sizeof(int));
data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *));
data_struct->arrays = (void **) malloc(size * sizeof(void *));
data_struct->request_ibarrier = MPI_REQUEST_NULL;
}
/*
* Realoja la estructura que describe una serie de datos con las mismas
* caracteristicas de localización y uso. Se anyaden "size" entradas nuevas
* a las ya existentes.
*/
void realloc_malleability_data_struct(malleability_data_t *data_struct, int qty_to_add) {
int *qty_aux, *types_aux, needed;
MPI_Request **requests_aux;
void **arrays_aux;
needed = data_struct->max_entries + qty_to_add;
qty_aux = (int *) realloc(data_struct->qty, needed * sizeof(int));
types_aux = (int *) realloc(data_struct->types, needed * sizeof(int));
requests_aux = (MPI_Request **) realloc(data_struct->requests, needed * sizeof(MPI_Request *));
arrays_aux = (void **) realloc(data_struct->arrays, needed * sizeof(void *));
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria constante de datos a redistribuir/comunicar\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
data_struct->qty = qty_aux;
data_struct->types = types_aux;
data_struct->requests = requests_aux;
data_struct->arrays = arrays_aux;
data_struct->max_entries = needed;
}
void free_malleability_data_struct(malleability_data_t *data_struct) {
int i, max;
max = data_struct->entries;
if(max != 0) {
for(i=0; i<max; i++) {
//free(data_struct->arrays[i]); //FIXME Valores alojados con 1 elemento no se liberan?
//free(data_struct->requests[i]); //TODO Plantear como crearlos
}
free(data_struct->qty);
free(data_struct->types);
free(data_struct->requests);
free(data_struct->arrays);
}
}
//======================================================||
//======================================================||
//================MPI DERIVED DATATYPES=================||
//======================================================||
//======================================================||
/*
* Crea un tipo derivado para mandar el numero de entradas
* en dos estructuras de descripcion de datos.
*/
void def_malleability_entries(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 2;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
blocklengths[0] = blocklengths[1] = 1;
types[0] = types[1] = MPI_INT;
// Obtener direccion base
MPI_Get_address(&(data_struct_rep->entries), &displs[0]);
MPI_Get_address(&(data_struct_dist->entries), &displs[1]);
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type);
}
/*
* Crea un tipo derivado para mandar las cantidades y tipo
* de datos de dos estructuras de descripcion de datos.
* El vector de "requests" no es enviado ya que solo es necesario
* en los padres.
* TODO Refactor?
*/
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 4;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
types[0] = types[1] = types[2] = types[3] = MPI_INT;
blocklengths[0] = blocklengths[1] = data_struct_rep->entries;
blocklengths[2] = blocklengths[3] = data_struct_dist->entries;
MPI_Get_address((data_struct_rep->qty), &displs[0]);
MPI_Get_address((data_struct_rep->types), &displs[1]);
MPI_Get_address((data_struct_dist->qty), &displs[2]);
MPI_Get_address((data_struct_dist->types), &displs[3]);
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type);
}
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <fcntl.h>
#include <sys/stat.h>
#include "malleabilityStates.h"
#define MALLEABILITY_INIT_DATA_QTY 100
typedef struct {
int entries; // Indica numero de vectores a comunicar (replicated data)
int max_entries;
MPI_Request request_ibarrier; // Request para indicar que los padres esperan a que los hijos terminen de recibir
int *qty; // Indica numero de elementos en cada subvector de sync_array
int *types;
// Vector de vectores de request. En cada elemento superior se indican los requests a comprobar para dar por finalizada
// la comunicacion de ese dato
MPI_Request **requests;
void **arrays; // Cada subvector es una serie de datos a comunicar
} malleability_data_t;
void add_data(void *data, int total_qty, int type, int request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm);
void free_malleability_data_struct(malleability_data_t *data_struct);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
//#include <slurm/slurm.h>
#include <signal.h>
#include "../IOcodes/results.h"
#include "malleabilityZombies.h"
#define PIDS_QTY 320
void zombies_suspend();
int offset_pids, *pids = NULL;
void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, MPI_Comm user_comm) {
int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int));
int i, count=1;
if(myId < numC) {
count = 0;
if(myId == root) {
for(i=0; i < numC; i++) {
pids_counts[i] = 0;
}
for(i=numC; i<numP; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i + offset_pids) - numC;
}
offset_pids += numP - numC;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, root, comm);
free(pids_counts);
free(pids_displs);
if(myId >= numC) {
// Needed to ensure iteration times are collected before suspending these processes
results_data *results = (results_data *) results_void;
compute_results_iter(results, myId, root, user_comm);
zombies_suspend();
}
}
void zombies_service_init() {
offset_pids = 0;
pids = malloc(PIDS_QTY * sizeof(int));
for(int i=0; i<PIDS_QTY; i++) {
pids[i] = 0;
}
}
void zombies_service_free() {
free(pids);
}
void zombies_suspend() {
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
act.sa_handler=gestor_usr2;
sigaction(SIGUSR2, &act, NULL);
sigset_t set;
sigprocmask(SIG_SETMASK,NULL,&set);
sigsuspend(&set);
}
void zombies_awake() {
for(int i=0; i < offset_pids; i++) { // Despertar a los zombies
kill(pids[i], SIGUSR2);
}
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
//#include <slurm/slurm.h>
#include <signal.h>
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root, void *results_void, MPI_Comm user_comm);
void zombies_service_init();
void zombies_service_free();
void zombies_awake();
#!/bin/bash
#SBATCH -N 2
#module load gcc/6.4.0
#module load openmpi/1.10.7
#module load /home/martini/ejemplos_Mod/modulefiles/mpi/openmpi_aliaga_1_10_7
echo "OPENMPI"
#module load /home/martini/MODULES/modulefiles/openmpi4.1.0
#mpirun -mca btl_openib_allow_ib 1 -npernode 10 -np 20 ./batch5.out
echo "MPICH"
module load /home/martini/MODULES/modulefiles/mpich3.4
#export HYDRA_DEBUG=1
#-disable-hostname-propagation -disable-auto-cleanup -pmi-port -hosts n00,n01
mpirun -hostfile hostfile.o2785 -np 20 ./test.out
echo "Intel"
#module load /home/martini/MODULES/modulefiles/intel64.module
#export I_MPI_OFI_PROVIDER=tcp
#export I_MPI_DEBUG=6
#export I_MPI_FABRICS=shm:ofi
#mpirun -print-all-exitcodes -np 2 --ppn 1 ./batch.out
#mpirun -genv I_MPI_FABRICS=shm:ofi -print-all-exitcodes -iface ib0 -np 16 ./batch4.out
#mpirun -genv I_MPI_FABRICS=shm:ofi -iface enp59s0f0 -np 20 ./batch4.out
......@@ -15,5 +15,4 @@ elif [ $dist == "cpu" ]; then
fi
$dir/Recordnodelist.o $numP $dist
echo $numP
#!/bin/bash
#SBATCH -p P1
#SBATCH -N 1
#SBATCH --exclude=c01,c00,c02
dir="/home/martini/malleability_benchmark"
codeDir="/Codes"
nodelist=$SLURM_JOB_NODELIST
nodes=$SLURM_JOB_NUM_NODES
echo "MPICH"
module load mpich-3.4.1-noucx
......@@ -9,9 +17,9 @@ module load mpich-3.4.1-noucx
numP=$(bash recordMachinefile.sh $1)
mpirun -f hostfile.o$SLURM_JOB_ID -np $numP ./a.out $1 $2
#mpirun -np 2 ./a.out test.ini
mpirun -print-all-exitcodes -f hostfile.o$SLURM_JOB_ID $dir$codeDir/a.out $1 $2 $nodelist $nodes
#mpirun -np $numP $dir$codeDir/a.out $1 $2 $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID
echo "END RUN"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
[general]
resizes=1 ; Numero de redistribuciones
matrix_tam=1000 ; Tamaño en bytes de la matriz de computo
SDR=100 ; Tamaño en bytes a redistribuir de forma sincrona
ADR=100 ; Tamaño en bytes a redistribuir de forma asincrona 1000000000
AIB=0 ; Indica si las redistribuciones asíncronas se consideran terminadas para los padres
; cuando terminan de enviar (0) o cuando terminan de recibir los valores (1)
comm_tam=1000 ; Tamaño en bytes de los datos a comunicar en cada iteracion. Una sola vez
SDR=100000000 ; Tamaño en bytes a redistribuir de forma sincrona
ADR=100000000 ; Tamaño en bytes a redistribuir de forma asincrona 1000000000
AIB=3 ; Indica si las redistribuciones asíncronas se consideran terminadas para los padres
; cuando terminan de enviar (0), cuando terminan de recibir los valores (1)
; o usar comunicaciones punto a punto (2), o utilizar hebras(3)
CST=0 ; Indica como realizar el spawn. (0) Para el método baseline, (1) para el método
; baseline con hilos, (2) para el método merge y (3) para el método merge
; con hilos
CSS=0 ; Indica si el spawn se realiza con todos los procesos (0) o solo participa
; el proceso raiz (1)
time=1 ; Tiempo necesario para realizar una iteracion
; end [general]
[resize0] ; Grupo inicial(mpirun)
iters=10 ; Numero de iteraciones a realizar por este grupo
procs=2 ; Cantidad de procesos en el grupo
factor=1 ; Factor de coste
physical_dist=cpu ; Tipo de redistribución física de los procesos
physical_dist=node ; Tipo de redistribución física de los procesos
;end [resize0]
[resize1] ; Grupo de hijos 1
iters=20
procs=4
factor=0.5
physical_dist=cpu
physical_dist=node
;end [resize1]
......@@ -8,6 +8,15 @@ ResultsDir="Results/"
ResultsDirName=$1
maxIndex=$2
cantidadGrupos=$3 #Contando a los padres
totalEjGrupo=$4 #Total de ejecuciones por grupo
maxTime=$5 #Maximo tiempo que se considera válido
if [ $# -lt 3 ]
then
echo "Faltan argumentos"
echo "Uso -> bash CheckRun NombreDirectorio IndiceMaximo Grupos"
exit -1
fi
cd $dir$ResultsDir
if [ ! -d $ResultsDirName ]
......@@ -25,10 +34,39 @@ qty=$(wc -l errores2.txt | cut -d ' ' -f1)
if [ $qty -gt 0 ]
then
echo "Se han encontrado errores de ejecución graves. Abortando"
echo "Revisar archivo errores2.txt en el directorio $ResultsDirName"
exit -2
fi
rm errores2.txt
#Comprobar que el número de archivos es correcto
#Pueden estar todos los archivos pero no estar los archivos
#completos -- Esto se comprueba más tarde
qtyG=$(ls R*/R*_Global.out | wc -l)
qtyG=$(($qtyG * 2))
qtyL=$(ls R*/R*_G?N*.out | wc -l)
if [ $qtyG == $qtyL ]
then
echo "El numero de ficheros G($qtyG) y L($qtyL) coincide"
else
#Si faltan archivos, se indican cuales faltan
echo "Faltan ejecuciones Locales o globales"
for ((i=1; i<$maxIndex; i++))
do
qtyEx=$(grep Tex -r Run$i | wc -l)
qtyIt=$(grep Top -r Run$i | wc -l)
qtyEx=$(($qtyEx * 2))
if [ $qtyEx -ne $qtyIt ]
then
diff=$(($totalEjGrupo-$qtyEx))
echo "Faltan archivos en Run$i"
fi
done
exit -1
fi
#grep -rn "2.\." R* TODO Testear que el tiempo teorico maximo es valido?
#Comprobar si hay runs con tiempo negativos
#Si los hay, reejecutar e informar de cuales son
grep - */R* | grep Tex > errores.txt
......@@ -40,9 +78,9 @@ then
while IFS="" read -r lineRun || [ -n "$lineRun" ]
do
#Obtener datos de una ejecución erronea
run=$(echo $lineRun | cut -d '/R' -f2 | cut -d '_' -f1)
run=$(echo $lineRun | cut -d 'R' -f3 | cut -d '_' -f1)
if [ $run -gt $maxIndex ]
then #Indice de ejecuciones posteriores
then #Indice de ejecuciones posteriores echas a mano -- FIXME Eliminar?
realRun=$(($run - $maxIndex))
index=$run
else # Indice de las primeras ejecuciones
......@@ -60,20 +98,53 @@ then
for ((i=0; i<qty; i++))
do
fin=$(grep -n - R* | grep Tex | cut -d ':' -f2 | head -n1)
init=$(($fin - 6))
init=$(($fin - 7))
sed -i ''$init','$fin'd' R${realRun}_Global.out
aux=$(($fin / 7)) #Utilizado para saber de entre las ejecuciones del fichero, cual es la erronea
#Se borran las lineas de los ficheros locales asociados
aux=$(($fin / 8)) #Utilizado para saber de entre las ejecuciones del fichero, cual es la erronea
fin=$(($aux * 5))
ini=$(($fin - 4))
init=$(($fin - 4))
for ((j=0; j<cantidadGrupos; j++)); do
sed -i ''$init','$fin'd' R${realRun}_G${j}*
done
done
#2 - Reelanzar ejecucion
sbatch -N 2 $dir$execDir./singleRun.sh config$realRun.ini $index
proc_list=$(grep Procs R${realRun}_Global.out | cut -d '=' -f3 | cut -d ',' -f1)
proc_parents=$(echo $proc_list | cut -d ' ' -f1)
proc_children=$(echo $proc_list | cut -d ' ' -f2)
nodes=8 # Maximo actual
if [ $proc_parents -gt $proc_children ]
then
nodes=$(($proc_parents / 20))
else
nodes=$(($proc_children / 20))
fi
sbatch -N $nodes $dir$execDir./singleRun.sh config$realRun.ini $index
cd $dir$ResultsDir$ResultsDirName
done < errores.txt
exit 0
fi
#Comprobar que todas las ejecuciones tienen todas las ejecucciones que tocan
#Solo es necesario comprobar el global.
qty_missing=0
cd $dir$ResultsDir$ResultsDirName
for ((i=1; i<$maxIndex; i++))
do
qtyEx=$(grep Tex -r Run$i | wc -l)
if [ $qtyEx -ne $totalEjGrupo ]
then
diff=$(($totalEjGrupo-$qtyEx))
qty_missing=$(($qty_missing+1))
echo "Faltan en $i, $diff ejecuciones"
fi
done
if [ $qty_missing -eq 0 ]
then
echo "Todos los archivos tienen $totalEjGrupo ejecuciones"
fi
Los archivos de esta carpeta son para ejecutar pruebas con todas las posibles configuraciones.
Se tienen tres ficheros en esta carpeta:
-- run.sh: Para ejecutar una serie de pruebas.
-- arrayRun.sh: Script para ejecutar por slurm las pruebas. Es llamado por run.sh.
-- singleRun.sh: Para ejecutar pruebas con un fichero de configuración.
-- CheckRun.sh: Para comprobar que las ejecuciones realizadas por run.sh son correctas, y en caso de que algunas fallen, relanzarlas.
-- create_ini.py: Crea un fichero de configuración de tipo "config.ini" a partir de los argumentos pasados
-- runSpawn.sh: Para ejecutar una serie de pruebas dedicadas a la creacion de procesos.
-- arraySpawnRun.sh: Script para ejecutar por slurm las pruebas. Es llamado por runSpawn.sh
Para que la mayoria de estos scripts funcionen correctamente es necesario compilar el código
en la carpeta "Codes". Ejecutar dentro de esa carpeta lo siguiente: "./compila -e"
--------------------------------
Para ejecutar las pruebas se utiliza el comando:
Para ejecutar las pruebas se utiliza el comando:
bash run.sh grupos-hijos tamaño-matriz tamaño-comunicacion cantidad-datos-sincronos tiempo-iteracion proceso-tiempo iteraciones-por-grupo cantidad-nodos
Este script crea subcarpetas en "Results" donde almacena los resultados y los ficheros de configuración que crea.
grupos-hijos: Es la cantidad de grupos hijos de procesos a ejecutar. Por tanto, el valor 1 indicaría el grupo de procesos padres y un grupo de procesos hijos.
......@@ -16,6 +22,8 @@ grupos-hijos: Es la cantidad de grupos hijos de procesos a ejecutar. Por tanto,
tamaño-matriz: Cantidad de filas en la matriz rectangular. Esta matriz se utiliza para realizar el computo de la aplicación.
tamaño-comunicacion: Numero de bytes a comunicar en la aplicacion cuando se realiza computo.
cantidad-datos-sincronos: Indica la cantidad de bytes que se tienen que transmitir desde un grupo de procesos a otro en una redimensión
tiempo-iteracion: Indica la cantidad de segundos que deben pasar para que una iteración se considere terminada.
......@@ -28,8 +36,13 @@ proceso-tiempo: Ligado al valor "tiempo-iteracion". Indica con cuantos procesos
iteraciones-por-grupo: Cantidad de iteraciones a realizar en cada grupo para que consideren terminada su ejecución.
Actualmente todos los grupos de procesos realizan la misma cantidad de iteraciones.
primera-iter: Indica si el primer grupo de procesos sigue el valor en "iteraciones-por-grupo" (0), o realiza el numero indicado en este argumento antes de realizar la redistribucion (>=1).
cantidad-nodos: Cantidad de nodos a utilizar en las ejecuciones. La cantidad de nodos también influye en la cantidad de procesos por grupo, donde nunca habrá más procesos en un grupo
que núcleos entre todos los nodos. Si se elige el valor 2 y habiendo 20 núcleos por nodo, no se realizarán pruebas con más de 40 procesos por grupo.
EJEMPLO:
bash run.sh 1 100000 10000000 1000000000 2 2 10 1 2
--------------------------------
Para ejecutar una sola prueba con un fichero de configuración se utiliza el siguiente comando:
......
#!/bin/bash
#SBATCH --exclude=n[06-07],c01
#SBATCH --exclude=c02,c01,c00
#SBATCH -p P1
dir="/home/martini/malleability_benchmark"
codeDir="/Codes"
ResultsDir="/Results"
nodelist=$SLURM_JOB_NODELIST
nodes=$SLURM_JOB_NUM_NODES
module load mpich-3.4.1-noucx
name_dir=$1
i=$2
procs_parents=$3
procs_sons=$4
percs_array=(0 25 50 75 100)
#percs_array=(0 25 50 75 100)
percs_array=(0)
at_array=(0)
dist_array=(cpu)
cst_array=(0 1 2 3)
css_array=(0 1)
aux=$(($i + 1))
echo "START TEST init=$aux"
for adr_perc in "${percs_array[@]}"
do
for phy_dist in cpu node
for phy_dist in "${dist_array[@]}"
do
for ibarrier_use in 0 #TODO Poner a 0 1
for ibarrier_use in "${at_array[@]}"
do
i=$(($i + 1))
cd $name_dir/Run$i
config_file="config$i.ini"
for cst in "${cst_array[@]}"
do
for css in "${css_array[@]}"
do
echo "EXEC $procs_parents -- $procs_sons -- $adr_perc -- $ibarrier_use -- $phy_dist -- RUN $i"
i=$(($i + 1))
cd $name_dir/Run$i
config_file="config$i.ini"
for index in 1 2 3
do
numP=$(bash $dir$codeDir/recordMachinefile.sh $config_file) # Crea el fichero hostfile
mpirun -f hostfile.o$SLURM_JOB_ID -np $numP $dir$codeDir/bench.out $config_file $i
rm hostfile.o$SLURM_JOB_ID
echo "EXEC $procs_parents -- $procs_sons -- $adr_perc -- $ibarrier_use -- $phy_dist -- $cst -- $css -- RUN $i"
for index in 1 2 3 4 5 6 7 8 9 10
do
numP=$(bash $dir$codeDir/recordMachinefile.sh $config_file) # Crea el fichero hostfile
mpirun -f hostfile.o$SLURM_JOB_ID $dir$codeDir/./bench.out $config_file $i $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID
done
done
done
done
done
done
echo "END TEST"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
import sys
import glob
def general(f, resizes, matrix_tam, sdr, adr, aib, time):
def general(f, resizes, matrix_tam, comm_tam, sdr, adr, aib, cst, css, time):
f.write("[general]\n")
f.write("resizes=" + resizes +"\n")
f.write("matrix_tam=" + matrix_tam +"\n")
f.write("comm_tam=" + comm_tam +"\n")
f.write("SDR=" + sdr +"\n")
f.write("ADR=" + adr +"\n")
f.write("AIB=" + aib +"\n")
f.write("CST=" + cst +"\n")
f.write("CSS=" + css +"\n")
f.write("time=" + time +"\n")
f.write("; end [general]\n")
......@@ -20,21 +23,24 @@ def resize_section(f, resize, iters, procs, factor, physical_dist):
f.write(";end [resize" + resize +"]\n")
if len(sys.argv) < 2:
print("The config file name is missing\nUsage: python3 program nameFile args\nArgs: resizes matrix_tam SDR ADR AIB time iters0 procs0 dist0 iters1 procs1 dist1 ...")
print("The config file name is missing\nUsage: python3 program nameFile args\nArgs: resizes matrix_tam SDR ADR AIB CST CSS time iters0 procs0 dist0 iters1 procs1 dist1 ...")
exit(1)
if len(sys.argv) < 12:
print("The are not enough arguments\nUsage: python3 program nameFile args\nArgs: resizes matrix_tam SDR ADR_perc AIB time proc_time iters0 procs0 dist0 iters1 procs1 dist1 ...")
print("The are not enough arguments\nUsage: python3 program nameFile args\nArgs: resizes matrix_tam SDR ADR_perc AIB CST CSS time proc_time iters0 procs0 dist0 iters1 procs1 dist1 ...")
exit(1)
name = sys.argv[1]
resizes = int(sys.argv[2])
matrix_tam = sys.argv[3]
sdr = int(sys.argv[4])
adr_perc = float(sys.argv[5])
aib = sys.argv[6]
time = sys.argv[7]
proc_time = float(sys.argv[8]) # Usado para calcular el factor de cada proceso
comm_tam = sys.argv[4]
sdr = int(sys.argv[5])
adr_perc = float(sys.argv[6])
aib = sys.argv[7]
cst = sys.argv[8]
css = sys.argv[9]
time = sys.argv[10]
proc_time = float(sys.argv[11]) # Usado para calcular el factor de cada proceso
adr = (sdr * adr_perc) / 100
sdr = sdr - adr
......@@ -44,13 +50,13 @@ sdr = str(sdr)
factor = 0
f = open(name, "w")
general(f, str(resizes), matrix_tam, sdr, adr, aib, time)
general(f, str(resizes), matrix_tam, comm_tam, sdr, adr, aib, cst, css, time)
resizes = resizes + 1 # Internamente, los primeros procesos se muestran como un grupo
for resize in range(resizes):
iters = sys.argv[9 + 3 * resize]
procs = sys.argv[9 + 3 * resize + 1]
physical_dist = sys.argv[9 + 3 * resize + 2]
iters = sys.argv[12 + 3 * resize]
procs = sys.argv[12 + 3 * resize + 1]
physical_dist = sys.argv[12 + 3 * resize + 2]
if proc_time != 0: # Si el argumento proc_time es 0, todos los grupos tienen un factor de 1
factor = proc_time / float(procs)
......
......@@ -6,82 +6,131 @@ execDir="Exec/"
ResultsDir="Results/"
#TODO Añadir diferenciar phy_dist de padres e hijos al ejecutar
#TODO Añadir que se considere la cantidad de nucleos de un nodo y no este fijada
if [[ $# -lt 9 ]]
then
echo "Faltan argumentos"
echo "bash run.sh grupos tam_computo tam_comm tam_resize tiempo proc_init iters first_iter node_qty"
exit -1
fi
echo "START TEST"
groups=$1 #TODO Modificar para que admita más de dos grupos de procesos
matrix_tam=$2
N_qty=$3 # Datos a redistribuir
time=$4
proc_init=$5 #El tiempo por iteracion es para esta cantidad de procesos
iters=$6
node_qty=$7
comm_tam=$3
N_qty=$4 # Datos a redistribuir
time=$5
proc_init=$6 #El tiempo por iteracion es para esta cantidad de procesos
iters=$7
first_iter=$8
node_qty=$9
# Si el valor es 0 el primer grupo de procesos realiza $iters iteraciones antes de redimensionar
if [[ $first_iter -eq 0 ]]
then
iters_first_group=$iters
# Si el valor es diferente a 0, el primer grupo de procesos realiza $first_iter iteraciones antes de redimensionar
else
iters_first_group=$first_iter
fi
max_procs=$(($node_qty * 20))
procs_array=()
percs_array=(0 25 50 75 100)
i=1
value=$((2 ** $i))
procs_array=(${procs_array[@]} $value)
procs_array=(1 10)
#percs_array=(0 25 50 75 100)
percs_array=(0)
at_array=(0)
dist_array=(cpu)
cst_array=(0 1 2 3)
css_array=(0 1)
#Obtener cantidades de procesos posibles a ejecutar
while [[ $value -le $max_procs ]]
i=0
#while [[ $value -lt $max_procs ]]
#do
# i=$(($i + 1))
# value=$((20 * $i))
# procs_array=(${procs_array[@]} $value)
#done
i=0
while [[ $value -lt $max_procs ]]
do
i=$(($i + 1))
value=$((2 ** $i))
value=$(($value * 10))
procs_array=(${procs_array[@]} $value)
done
unset procs_array[-1]
#Crear carpeta de resultados
cd $dir$ResultsDir
name_res=$node_qty"N-"$(date '+%m-%d')
if [ -d $name_res ] # Si ya existe el directorio, modificar levemente el nombre y crear otro
then
name_res=$name_res"-"$(date '+%H:%M')
fi
echo "Localizacion de los resultados: $dir$ResultsDir$name_res"
mkdir $name_res
# Ejecutar pruebas
i=0
j=0
for procs_parents in "${procs_array[@]}"
do
node_qty1=$(($procs_parents / 20))
for procs_sons in "${procs_array[@]}"
do
if [ $procs_sons -ne $procs_parents ]
node_qty2=$(($procs_sons / 20))
if [ $node_qty1 -lt $node_qty2 ]
then
node_qty1=$node_qty2
fi
if [ $node_qty1 -eq 0 ]
then
node_qty1=1
fi
if [ $procs_parents -ne $procs_sons ]
then
for adr_perc in "${percs_array[@]}"
do
for phy_dist in cpu node
for phy_dist in "${dist_array[@]}"
do
for ibarrier_use in 0 #TODO Poner a 0 1
for ibarrier_use in "${at_array[@]}"
do
i=$(($i + 1))
# Crear directorio para esta ejecucion
cd $dir$ResultsDir$name_res
mkdir Run$i
cd Run$i
for cst in "${cst_array[@]}"
do
for css in "${css_array[@]}"
do
i=$(($i + 1))
# Crear archivo de configuracion
echo "Config $procs_parents -- $procs_sons -- $adr_perc -- $ibarrier_use -- $phy_dist -- RUN $i"
array0=($iters $procs_parents $phy_dist)
array=("${array0[@]}")
array0=($iters $procs_sons $phy_dist)
array+=("${array0[@]}")
python3 $dir$execDir/./create_ini.py config$i.ini 1 $matrix_tam $N_qty $adr_perc $ibarrier_use $time $proc_init "${array[@]}"
# Crear directorio para esta ejecucion
cd $dir$ResultsDir$name_res
mkdir Run$i
cd Run$i
# Crear archivo de configuracion
echo "Config $procs_parents -- $procs_sons -- $adr_perc -- $ibarrier_use -- $phy_dist -- $cst -- $css -- RUN $i"
array0=($iters_first_group $procs_parents $phy_dist)
array=("${array0[@]}")
array0=($iters $procs_sons $phy_dist)
array+=("${array0[@]}")
python3 $dir$execDir/./create_ini.py config$i.ini 1 $matrix_tam $comm_tam $N_qty $adr_perc $ibarrier_use $cst $css $time $proc_init "${array[@]}"
done
done
done
done
done
aux=$(($j * 10)) #TODO Poner a 20 cuando se use ibarrier
# LANZAR SCRIPT
echo $aux
sbatch -N $node_qty $dir$execDir./arrayRun.sh $dir$ResultsDir$name_res $aux $procs_parents $procs_sons
j=$(($j + 1))
done #adr_perc
start_i=$(($j * ${#percs_array[@]} * ${#dist_array[@]} * ${#at_array[@]} * ${#cst_array[@]} * ${#css_array[@]}))
# LANZAR SCRIPT
echo $aux
sbatch -N $node_qty1 $dir$execDir./arrayRun.sh $dir$ResultsDir$name_res $start_i $procs_parents $procs_sons
j=$(($j + 1))
fi
done
done
echo "Localizacion de los resultados: $dir$ResultsDir$name_res"
echo "END TEST"
#!/bin/bash
#SBATCH --exclude=c01
#SBATCH --exclude=c02,c01,c00
#SBATCH -p P1
dir="/home/martini/malleability_benchmark"
codeDir="/Codes"
ResultsDir="/Results"
nodelist=$SLURM_JOB_NODELIST
nodes=$SLURM_JOB_NUM_NODES
module load mpich-3.4.1-noucx
echo "START TEST"
......@@ -22,9 +26,11 @@ fi
for ((i=0; i<qty; i++))
do
echo "Iter $i"
numP=$(bash $dir$codeDir/recordMachinefile.sh $1)
mpirun -f hostfile.o$SLURM_JOB_ID -np $numP $dir$codeDir/bench.out $1 $2
mpirun -f hostfile.o$SLURM_JOB_ID $dir$codeDir/bench.out $1 $2 $nodelist $nodes
rm hostfile.o$SLURM_JOB_ID
done
echo "END TEST"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment