Commit 66732b2a authored by iker_martin's avatar iker_martin
Browse files

Anadiendo maleabilidad sincrona

parent b958ccfd
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "read_ini.h"
#include "../malleability/ProcessDist.h"
#include "ini.h"
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type);
void def_struct_config_file_array(configuration *config_file, MPI_Datatype *config_type);
static int handler(void* user, const char* section, const char* name,
const char* value) {
configuration* pconfig = (configuration*)user;
......@@ -37,9 +43,9 @@ static int handler(void* user, const char* section, const char* name,
char *aux = strdup(value);
if (strcmp(aux, "node") == 0) {
pconfig->phy_dist[act_resize] = 1; //FIXME MAGICAL NUM
pconfig->phy_dist[act_resize] = COMM_PHY_NODES;
} else {
pconfig->phy_dist[act_resize] = 2; //FIXME MAGICAL NUM
pconfig->phy_dist[act_resize] = COMM_PHY_CPU;
}
free(aux);
pconfig->actual_resize = pconfig->actual_resize+1; // Ultimo elemento del grupo
......@@ -78,7 +84,7 @@ void malloc_config_arrays(configuration *user_config, int resizes) {
if(user_config != NULL) {
user_config->iters = malloc(sizeof(int) * resizes);
user_config->procs = malloc(sizeof(int) * resizes);
user_config->factors = malloc(sizeof(int) * resizes);
user_config->factors = malloc(sizeof(float) * resizes);
user_config->phy_dist = malloc(sizeof(int) * resizes);
}
}
......@@ -105,3 +111,99 @@ void print_config(configuration *user_config) {
}
}
}
//
//
//
//
//
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm) {
MPI_Datatype config_type, config_type_array;
def_struct_config_file(config_file, &config_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
def_struct_config_file_array(config_file, &config_type_array);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->resizes, MPI_FLOAT, root, intercomm);
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
}
configuration *recv_config_file(int root, MPI_Comm intercomm) {
MPI_Datatype config_type, config_type_array;
configuration *config_file = malloc(sizeof(configuration) * 1);
def_struct_config_file(config_file, &config_type);
MPI_Bcast(config_file, 1, config_type, root, intercomm);
malloc_config_arrays(config_file, config_file->resizes);
def_struct_config_file_array(config_file, &config_type_array);
MPI_Bcast(config_file, 1, config_type_array, root, intercomm);
MPI_Bcast(config_file->factors, config_file->resizes, MPI_FLOAT, root, intercomm);
MPI_Type_free(&config_type);
MPI_Type_free(&config_type_array);
return config_file;
}
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 6;
int blocklengths[6] = {1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = MPI_INT;
types[5] = MPI_FLOAT;
//Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(&(config_file->resizes), &displs[0]);
MPI_Get_address(&(config_file->actual_resize), &displs[1]);
MPI_Get_address(&(config_file->matrix_tam), &displs[2]);
MPI_Get_address(&(config_file->sdr), &displs[3]);
MPI_Get_address(&(config_file->adr), &displs[4]);
MPI_Get_address(&(config_file->general_time), &displs[5]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, config_type);
MPI_Type_commit(config_type);
}
void def_struct_config_file_array(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 3;
int blocklengths[3] = {1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = MPI_INT;
// Modificar blocklengths al valor adecuado
blocklengths[0] = blocklengths[1] = blocklengths[2] = config_file->resizes;
//Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(config_file->iters, &displs[0]);
MPI_Get_address(config_file->procs, &displs[1]);
MPI_Get_address(config_file->phy_dist, &displs[2]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, &aux);
MPI_Type_create_resized(aux, 0, 1*sizeof(int), config_type);
MPI_Type_commit(config_type);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
typedef struct
{
......@@ -21,3 +22,8 @@ void malloc_config_arrays(configuration *user_config, int resizes);
void free_config(configuration *user_config);
void print_config(configuration *user_config);
void send_config_file(configuration *config_file, int root, MPI_Comm intercomm);
configuration *recv_config_file(int root, MPI_Comm intercomm);
......@@ -2,40 +2,71 @@
#include <stdlib.h>
#include <mpi.h>
#include "../IOcodes/read_ini.h"
#include "../malleability/ProcessDist.h"
#include "../malleability/CommDist.h"
#define ROOT 0
int work(int n);
int work();
void Sons_init();
int checkpoint(int iter);
void TC(int numS);
void iterate(double *matrix, int n);
void computeMatrix(double *matrix, int n);
void initMatrix(double **matrix, int n);
typedef struct {
int myId;
int numP;
int grp;
MPI_Comm children, parents;
char **argv;
char *sync_array;
} group_data;
configuration *config_file;
group_data *group;
int main(int argc, char *argv[]) {
int numP, myId;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numP);
MPI_Comm_rank(MPI_COMM_WORLD, &myId);
/*
MPI_Comm_get_parent(&comm_parents);
if(comm_parents != MPI_COMM_NULL ) { // Si son procesos hijos deben recoger la distribucion
if(myId == ROOT) {
printf("Nuevo set de procesos de %d\n", numP);
MPI_Comm_rank(MPI_COMM_WORLD, &myId);
group = malloc(1 * sizeof(group_data));
group->myId = myId;
group->numP = numP;
group->grp = 0;
group->argv = argv;
MPI_Comm_get_parent(&(group->parents));
if(group->parents != MPI_COMM_NULL ) { // Si son procesos hijos deben recoger la distribucion
Sons_init();
} else {
config_file = read_ini_file(argv[1]);
if(config_file->sdr > 0) {
malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
printf("Vector reservado por padres\n");
fflush(stdout);
}
} else { // Primer set de procesos inicializan valores
}
*/
int res = work(10000);
if(myId== ROOT) print_config(config_file);
int res = work();
if(res) { // Ultimo set de procesos comprueba resultados
//RESULTADOS
}
configuration *config_file = read_ini_file("test.ini");
print_config(config_file);
free_config(config_file);
free(group->sync_array);
free(group);
......@@ -46,26 +77,100 @@ int main(int argc, char *argv[]) {
/*
* Bucle de computo principal
*/
int work(int n) {
int iter, MAXITER=5; //FIXME BORRAR MAXITER
int work() {
int iter, maxiter;
double *matrix;
initMatrix(&matrix, n);
for(iter=0; iter<MAXITER; iter++) {
iterate(matrix, n);
maxiter = config_file->iters[group->grp];
initMatrix(&matrix, config_file->matrix_tam);
for(iter=0; iter < maxiter; iter++) {
iterate(matrix, config_file->matrix_tam);
}
checkpoint(iter);
return 0;
}
int checkpoint(int iter) {
// Comprobar si se tiene que realizar un redimensionado
if(config_file->iters[group->grp] < iter || group->grp!= 0) {return 0;}
int numS = config_file->procs[group->grp +1];
TC(numS);
int rootBcast = MPI_PROC_NULL;
if(group->myId == ROOT) rootBcast = MPI_ROOT;
// Enviar a los hijos que grupo de procesos son
MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
send_config_file(config_file, rootBcast, group->children);
if(config_file->sdr > 0) {
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
}
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->children));
//MPI_Comm_free(&(group->children));
return 1;
}
void TC(int numS){
// Inicialización de la comunicación con SLURM
int dist = config_file->phy_dist[group->grp +1];
init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, COMM_SPAWN_SERIAL);
// Esperar a que la comunicación y creación de procesos
// haya finalizado
int test = -1;
while(test != MPI_SUCCESS) {
test = check_slurm_comm(group->myId, ROOT, MPI_COMM_WORLD, &(group->children));
}
}
void Sons_init() {
// Enviar a los hijos que grupo de procesos son
MPI_Bcast(&(group->grp), 1, MPI_INT, ROOT, group->parents);
group->grp++;
config_file = recv_config_file(ROOT, group->parents);
int numP_parents = config_file->procs[group->grp -1];
if(config_file->sdr > 0) {
recv_sync(&(group->sync_array), config_file->sdr, group->myId, group->numP, ROOT, group->parents, numP_parents);
group->sync_array = malloc(5);
}
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->parents));
}
/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////
/*
* Simula la ejecucción de una iteración de computo en la aplicación
*/
void iterate(double *matrix, int n) {
double start_time, actual_time;
int TIME=10; //FIXME BORRAR
double time = config_file->general_time * config_file->factors[group->grp];
start_time = actual_time = MPI_Wtime();
while (actual_time - start_time > TIME) {
while (actual_time - start_time < time) {
computeMatrix(matrix, n);
actual_time = MPI_Wtime();
}
......
#!/bin/bash
#SBATCH -N 1
#module load gcc/6.4.0
#module load openmpi/1.10.7
#module load /home/martini/ejemplos_Mod/modulefiles/mpi/openmpi_aliaga_1_10_7
echo "OPENMPI"
#module load /home/martini/MODULES/modulefiles/openmpi4.1.0
#mpirun -mca btl_openib_allow_ib 1 -npernode 10 -np 20 ./batch5.out
echo "MPICH"
module load /home/martini/MODULES/modulefiles/mpich3.4
#export HYDRA_DEBUG=1
#-disable-hostname-propagation -disable-auto-cleanup -pmi-port -hosts n00,n01
mpirun -np 2 ./a.out test.ini
echo "Intel"
#module load /home/martini/MODULES/modulefiles/intel64.module
#export I_MPI_OFI_PROVIDER=tcp
#export I_MPI_DEBUG=6
#export I_MPI_FABRICS=shm:ofi
#mpirun -print-all-exitcodes -np 2 --ppn 1 ./batch.out
#mpirun -genv I_MPI_FABRICS=shm:ofi -print-all-exitcodes -iface ib0 -np 16 ./batch4.out
#mpirun -genv I_MPI_FABRICS=shm:ofi -iface enp59s0f0 -np 20 ./batch4.out
[general]
resizes=1 ; Numero de redistribuciones
matrix_tam=20000 ; Tamaño en bytes de la matriz de computo
matrix_tam=2000 ; Tamaño en bytes de la matriz de computo
SDR=10000 ; Tamaño en bytes a redistribuir de forma sincrona
ADR=10000 ; Tamaño en bytes a redistribuir de forma asincrona
time=30 ; Tiempo necesario para realizar una iteracion
time=0.5 ; Tiempo necesario para realizar una iteracion
[resize0] ; Grupo inicial(mpirun)
iters=100 ; Numero de iteraciones a realizar por este grupo
procs=10 ; Cantidad de procesos en el grupo
[resize0] ; Grupo inicial(mpirun) - La aplicación no sigue el valor procs ni physical_dist
iters=5 ; Numero de iteraciones a realizar por este grupo
procs=2 ; Cantidad de procesos en el grupo
factor=1 ; Factor de coste
physical_dist=node ; Tipo de redistribución física de los procesos
[resize1] ; Grupo de hijos 1
iters=100
procs=8
factor=1.1
physical_dist=cpu
iters=5
procs=2
factor=0.5
physical_dist=node
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
struct Dist_data {
int ini; //Primer elemento a enviar
int fin; //Ultimo elemento a enviar
int tamBl; // Total de elementos
int qty; // Total number of rows of the full disperse matrix
MPI_Comm intercomm;
};
struct Counts {
int *counts;
int *displs;
int *zero_arr;
};
void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE,
int *sendcounts, int *recvcounts, int *sdispls, int *rdispls);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE,
int *sendcounts, int *recvcounts,int *sdispls, int *rdispls);
// DIST FUNCTIONS
void get_dist(int qty, int id, int numP, struct Dist_data *dist_data);
void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
void mallocCounts(struct Counts *counts, int numP);
void freeCounts(struct Counts *counts);
int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child) {
int rootBcast = MPI_PROC_NULL;
int *idS = NULL;
struct Counts counts;
struct Dist_data dist_data;
if(myId == root) rootBcast = MPI_ROOT;
get_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
dist_data.intercomm = intercomm;
// Create arrays which contains info about how many elements will be send to each created process
mallocCounts(&counts, numP_child);
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
printf("-1!! -- Vector de padres realizan COUNTS\n");
fflush(stdout);
MPI_Barrier(MPI_COMM_WORLD);
send_sync_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts.counts, counts.zero_arr, counts.displs, counts.zero_arr);
freeCounts(&counts);
free(idS);
return 1;
}
void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents) {
int *idS = NULL;
struct Counts counts;
struct Dist_data dist_data;
printf("Vector de hijos mandan datos\n");
fflush(stdout);
MPI_Barrier(MPI_COMM_WORLD);
// Obtener distribución para este hijo
get_dist(qty, myId, numP, &dist_data);
//*array = malloc(dist_data.tamBl * sizeof(char));
*array = malloc(qty * sizeof(char));
dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
mallocCounts(&counts, numP_parents);
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
recv_sync_arrays(dist_data, *array, root, idS[0], idS[1], numP_parents, counts.zero_arr, counts.counts, counts.zero_arr, counts.displs);
freeCounts(&counts);
free(idS);
}
void malloc_comm_array(char **array, int qty, int myId, int numP) {
struct Dist_data dist_data;
get_dist(qty, myId, numP, &dist_data);
//*array = malloc(dist_data.tamBl * sizeof(char));
*array = malloc(qty * sizeof(char));
}
/*
* Send to children Compute_data arrays which change in each iteration
*/
void send_sync_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE,
int *sendcounts, int *recvcounts, int *sdispls, int *rdispls) {
int i;
// PREPARAR ENVIO DEL VECTOR
if(idI == 0) {
set_counts(0, numP_child, dist_data, sendcounts);
idI++;
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_child, dist_data, sendcounts);
sdispls[i] = sdispls[i-1] + sendcounts[i-1];
}
/* COMUNICACION DE DATOS */
// MPI_Alltoallv(array, sendcounts, sdispls, MPI_CHAR, NULL, recvcounts, rdispls, MPI_CHAR, dist_data.intercomm);
}
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE,
int *sendcounts, int *recvcounts,int *sdispls, int *rdispls) {
int i;
char *aux;
printf("A -- Vector de hijos realizan COUNTS\n");
fflush(stdout);
MPI_Barrier(MPI_COMM_WORLD);
// Ajustar los valores de recepcion
if(idI == 0) {
set_counts(0, numP_parents, dist_data, recvcounts);
idI++;
}
printf("B -- Vector de hijos realizan COUNTS\n");
fflush(stdout);
MPI_Barrier(MPI_COMM_WORLD);
for(i=idI; i<idE; i++) {
set_counts(i, numP_parents, dist_data, recvcounts);
rdispls[i] = rdispls[i-1] + recvcounts[i-1];
}
printf("C -- Vector de hijos realizan COUNTS\n");
fflush(stdout);
MPI_Barrier(MPI_COMM_WORLD);
// print_counts(*dist_data, recvcounts, rdispls, numP_parents, "Recv");
/* COMUNICACION DE DATOS */
// MPI_Alltoallv(aux, sendcounts, sdispls, MPI_CHAR, array, recvcounts, rdispls, MPI_CHAR, dist_data->intercomm);
}
/*
* ========================================================================================
* ========================================================================================
* ================================DISTRIBUTION FUNCTIONS==================================
* ========================================================================================
* ========================================================================================
*/
/*
* Obatains for "Id" and "numP", how many
* rows and elements per row will have process "Id"
* and fills the results in a Dist_data struct
*/
void get_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
int rem;
dist_data->qty = qty;
dist_data->tamBl = qty / numP;
rem = qty % numP;
if(id < rem) { // First subgroup
dist_data->ini = id * dist_data->tamBl + id;
dist_data->fin = (id+1) * dist_data->tamBl + (id+1);
} else { // Second subgroup
dist_data->ini = id * dist_data->tamBl + rem;
dist_data->fin = (id+1) * dist_data->tamBl + rem;
}
if(dist_data->fin > qty) {
dist_data->fin = qty;
}
if(dist_data->ini > dist_data->fin) {
dist_data->ini = dist_data->fin;
}
dist_data->tamBl = dist_data->fin - dist_data->ini;
}
/*
* Obtains for a given process Id, how many elements will
* send or recieve from the process indicated in Dist_data
*/
void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts) {
struct Dist_data other;
int biggest_ini, smallest_end;
get_dist(data_dist.qty, id, numP, &other);
// Si el rango de valores no coincide, se pasa al siguiente proceso
if(data_dist.ini >= other.fin || data_dist.fin <= other.ini) {
return;
}
// Obtiene el proceso con mayor ini entre los dos procesos
if(data_dist.ini > other.ini) {
biggest_ini = data_dist.ini;
} else {
biggest_ini = other.ini;
}
// Obtiene el proceso con menor fin entre los dos procesos
if(data_dist.fin < other.fin) {
smallest_end = data_dist.fin;
} else {
smallest_end = other.fin;
}
sendcounts[id] = smallest_end - biggest_ini; // Numero de elementos a enviar/recibir del proceso Id
}
/*
* Obtiene para un proceso de un grupo a que rango procesos de
* otro grupo tiene que enviar o recibir datos.
*
* Devuelve el primer identificador y el último (Excluido) con el que
* comunicarse.
*/
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
int idI, idE;
int tamOther = dist_data.qty / numP_other;
int remOther = dist_data.qty % numP_other;
int middle = (tamOther + 1) * remOther;
if(middle > dist_data.ini) { // First subgroup
idI = dist_data.ini / (tamOther + 1);
} else { // Second subgroup
idI = ((dist_data.ini - middle) / tamOther) + remOther;
}
if(middle >= dist_data.fin) { // First subgroup
idE = dist_data.fin / (tamOther + 1);
idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
} else { // Second subgroup
idE = ((dist_data.fin - middle) / tamOther) + remOther;
idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
}
//free(*idS);
idS = malloc(2 * sizeof(int));
(*idS)[0] = idI;
(*idS)[1] = idE;
}
void mallocCounts(struct Counts *counts, int numP) {
counts->counts = calloc(numP, sizeof(int));
if(counts->counts == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->displs = calloc(numP, sizeof(int));
if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->zero_arr = calloc(numP, sizeof(int));
if(counts->zero_arr == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
}
void freeCounts(struct Counts *counts) {
free(counts->counts);
free(counts->displs);
free(counts->zero_arr);
}
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child);
void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents);
void malloc_comm_array(char **array, int qty, int myId, int numP);
......@@ -37,7 +37,7 @@ void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty
int create_hostfile(char *jobId, char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name);
void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_nodes, MPI_Info *info_array);
void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_nodes);
void print_Info(MPI_Info info);
......@@ -139,7 +139,7 @@ void processes_dist(char *argv[], int numP_childs, int type) {
free(hostfile_name);
// SET NEW DISTRIBUTION
fill_hostfile(last_record, ptr, procs_array, used_nodes, &(slurm_data->info));
fill_hostfile(last_record, ptr, procs_array, used_nodes);
close(ptr);
// Free JOB INFO
......@@ -223,7 +223,7 @@ int create_hostfile(char *jobId, char **file_name) {
return ptr; // Devolver puntero a fichero
}
void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_nodes, MPI_Info *info_array) {
void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_nodes) {
int i=0;
char *host;
hostlist_t hostlist;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment