Commit edb47bb9 authored by iker_martin's avatar iker_martin
Browse files

Simplified function for preparing data redistribution. New feature for...

Simplified function for preparing data redistribution. New feature for intercomms where they communicate using intracomms when redistributing data. Other minor changes.
parent 364330f8
...@@ -386,50 +386,39 @@ void async_point2point(char *send, char *recv, struct Counts s_counts, struct Co ...@@ -386,50 +386,39 @@ void async_point2point(char *send, char *recv, struct Counts s_counts, struct Co
* *
*/ */
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) { void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
int array_size, init_struct = 1; int array_size = numO;
int offset_ids = 0;
struct Dist_data dist_data; struct Dist_data dist_data;
array_size = numO; if(is_intercomm) {
if(!is_intercomm) { //offset_ids = numP; //FIXME Modify only if active?
} else {
array_size = numP > numO ? numP : numO; array_size = numP > numO ? numP : numO;
} }
mallocCounts(s_counts, array_size+offset_ids);
mallocCounts(r_counts, array_size+offset_ids);
if(is_children_group) { if(is_children_group) {
if(is_intercomm) { offset_ids = 0;
mallocCounts(s_counts, numO); prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, r_counts);
} else { // Merge method needs an array equal to the number of processes
mallocCounts(s_counts, array_size);
mallocCounts(r_counts, array_size);
init_struct = 0;
}
prepare_comm_alltoall(myId, numP, numO, qty, init_struct, r_counts);
// Obtener distribución para este hijo // Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data); get_block_dist(qty, myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char)); *recv = malloc(dist_data.tamBl * sizeof(char));
//get_block_dist(qty, myId, numP, &dist_data); //get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO, 0, "Children C "); //print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Children C ");
} else { } else {
//get_block_dist(qty, myId, numP, &dist_data); //get_block_dist(qty, myId, numP, &dist_data);
if(is_intercomm) { prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, s_counts);
mallocCounts(r_counts, numO); if(!is_intercomm && myId < numO) {
prepare_comm_alltoall(myId, numP, numO, qty, init_struct, s_counts); prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts);
} else { // Obtener distribución para este hijo y reservar vector de recibo
mallocCounts(s_counts, array_size);
mallocCounts(r_counts, array_size);
init_struct = 0;
prepare_comm_alltoall(myId, numP, numO, qty, init_struct, s_counts);
if(myId < numO) {
prepare_comm_alltoall(myId, numO, numP, qty, init_struct, r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numO, &dist_data); get_block_dist(qty, myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char)); *recv = malloc(dist_data.tamBl * sizeof(char));
}
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Children P "); //print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Children P ");
} }
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO, 0, "Parents "); //print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Parents ");
} }
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#include <mpi.h> #include <mpi.h>
#include "block_distribution.h" #include "block_distribution.h"
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts); void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int offset_ids, int *sendcounts);
void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS); void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
/* /*
...@@ -13,26 +13,29 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS); ...@@ -13,26 +13,29 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
* *
* The struct should be freed with freeCounts * The struct should be freed with freeCounts
*/ */
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int init_struct, struct Counts *counts) { void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int offset_ids, struct Counts *counts) {
int i, *idS; int i, *idS, first_id = 0;
struct Dist_data dist_data, dist_target; struct Dist_data dist_data, dist_target;
if(init_struct) mallocCounts(counts, numP_other); if(counts == NULL) {
fprintf(stderr, "Counts is NULL for rank %d/%d ", myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
get_block_dist(n, myId, numP, &dist_data); get_block_dist(n, myId, numP, &dist_data);
get_util_ids(dist_data, numP_other, &idS); get_util_ids(dist_data, numP_other, &idS);
counts->idI = idS[0]; counts->idI = idS[0] + offset_ids;
counts->idE = idS[1]; counts->idE = idS[1] + offset_ids;
get_block_dist(n, idS[0], numP_other, &dist_target); // RMA Specific operation get_block_dist(n, idS[0], numP_other, &dist_target); // RMA Specific operation -- uses idS[0], not idI
counts->first_target_displs = dist_data.ini - dist_target.ini; // RMA Specific operation counts->first_target_displs = dist_data.ini - dist_target.ini; // RMA Specific operation
if(idS[0] == 0) { if(idS[0] == 0) { // Uses idS[0], not idI
set_interblock_counts(0, numP_other, dist_data, counts->counts); set_interblock_counts(counts->idI, numP_other, dist_data, offset_ids, counts->counts);
idS[0]++; first_id++;
} }
for(i=idS[0]; i<idS[1]; i++) { for(i=counts->idI + first_id; i<counts->idE; i++) {
set_interblock_counts(i, numP_other, dist_data, counts->counts); set_interblock_counts(i, numP_other, dist_data, offset_ids, counts->counts);
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1]; counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
} }
free(idS); free(idS);
...@@ -116,11 +119,11 @@ void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data) { ...@@ -116,11 +119,11 @@ void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
* Obtiene para el Id de un proceso dado, cuantos elementos * Obtiene para el Id de un proceso dado, cuantos elementos
* enviara o recibira desde el proceso indicado en Dist_data. * enviara o recibira desde el proceso indicado en Dist_data.
*/ */
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts) { void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int offset_ids, int *sendcounts) {
struct Dist_data other; struct Dist_data other;
int biggest_ini, smallest_end; int biggest_ini, smallest_end;
get_block_dist(data_dist.qty, id, numP, &other); get_block_dist(data_dist.qty, id - offset_ids, numP, &other);
// Si el rango de valores no coincide, se pasa al siguiente proceso // Si el rango de valores no coincide, se pasa al siguiente proceso
if(data_dist.ini >= other.fin || data_dist.fin <= other.ini) { if(data_dist.ini >= other.fin || data_dist.fin <= other.ini) {
......
...@@ -24,7 +24,7 @@ struct Counts { ...@@ -24,7 +24,7 @@ struct Counts {
int *displs; int *displs;
}; };
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int init_struct, struct Counts *counts); void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int offset_ids, struct Counts *counts);
void prepare_comm_allgatherv(int numP, int n, struct Counts *counts); void prepare_comm_allgatherv(int numP, int n, struct Counts *counts);
void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data); void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data);
......
...@@ -493,7 +493,6 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch ...@@ -493,7 +493,6 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
//=====================CHILDREN=========================|| //=====================CHILDREN=========================||
//======================================================|| //======================================================||
//======================================================|| //======================================================||
/* /*
* Inicializacion de los datos de los hijos. * Inicializacion de los datos de los hijos.
* En la misma se reciben datos de los padres: La configuracion * En la misma se reciben datos de los padres: La configuracion
...@@ -535,7 +534,6 @@ void Children_init() { ...@@ -535,7 +534,6 @@ void Children_init() {
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm); comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS); recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
// TODO Crear funcion especifica y anyadir para Asinc // TODO Crear funcion especifica y anyadir para Asinc
...@@ -622,7 +620,7 @@ int start_redistribution() { ...@@ -622,7 +620,7 @@ int start_redistribution() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm); comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
//FIXME No se envian los datos replicados (rep_a_data) //FIXME No se envian los datos replicados (rep_a_data)
mall_conf->results->async_time[mall_conf->grp] = MPI_Wtime(); mall_conf->results->async_time[mall_conf->grp] = MPI_Wtime();
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) { if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
return thread_creation(); return thread_creation();
...@@ -685,7 +683,6 @@ int check_redistribution() { ...@@ -685,7 +683,6 @@ int check_redistribution() {
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
} }
MPI_Comm_test_inter(mall->intercomm, &is_intercomm); MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution(); return end_redistribution();
......
...@@ -43,9 +43,7 @@ int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) { ...@@ -43,9 +43,7 @@ int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) {
if(spawn_data.myId == spawn_data.root) rootBcast = MPI_ROOT; if(spawn_data.myId == spawn_data.root) rootBcast = MPI_ROOT;
// WORK // WORK
char *cmd="./trace_worker.sh"; int spawn_err = MPI_Comm_spawn(spawn_data.cmd, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, spawn_data.root, comm, child, MPI_ERRCODES_IGNORE);
int spawn_err = MPI_Comm_spawn(cmd, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, spawn_data.root, comm, child, MPI_ERRCODES_IGNORE);
//int spawn_err = MPI_Comm_spawn(spawn_data.cmd, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, spawn_data.root, comm, child, MPI_ERRCODES_IGNORE);
MPI_Comm_set_name(*child, "MPI_COMM_MALL_RESIZE"); MPI_Comm_set_name(*child, "MPI_COMM_MALL_RESIZE");
// END WORK // END WORK
......
...@@ -51,6 +51,7 @@ do ...@@ -51,6 +51,7 @@ do
do do
index=$((10#${element[0]} + $j)) index=$((10#${element[0]} + $j))
index=0$index # FIXME What if there are more than 9 nodes? index=0$index # FIXME What if there are more than 9 nodes?
#FIXME What if less than $cores have to be spawned?
for ((core=0; core<$cores; core++)) # FIXME What if the user asks for a spread distribution for ((core=0; core<$cores; core++)) # FIXME What if the user asks for a spread distribution
do do
initial_nodelist="${initial_nodelist:+$initial_nodelist,}"$common_node_name$index initial_nodelist="${initial_nodelist:+$initial_nodelist,}"$common_node_name$index
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment