Commit 926feb73 authored by iker_martin's avatar iker_martin
Browse files

Anadida nueva funcion para crear procesos. Falta comprobar cual de las dos funciones es mas rapida

parent 3e176fda
...@@ -220,10 +220,7 @@ int start_redistribution(int numS, MPI_Request **comm_req) { ...@@ -220,10 +220,7 @@ int start_redistribution(int numS, MPI_Request **comm_req) {
if(config_file->adr > 0) { if(config_file->adr > 0) {
results->async_start = MPI_Wtime(); results->async_start = MPI_Wtime();
//send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
return thread_creation(); return thread_creation();
//return MAL_ASYNC_PENDING;
} }
return end_redistribution(0); return end_redistribution(0);
} }
...@@ -274,6 +271,7 @@ void* thread_async_work(void* void_arg) { ...@@ -274,6 +271,7 @@ void* thread_async_work(void* void_arg) {
} }
/* /*
* @deprecated
* Comprueba si la redistribucion asincrona ha terminado. * Comprueba si la redistribucion asincrona ha terminado.
* Si no ha terminado la funcion termina indicandolo, en caso contrario, * Si no ha terminado la funcion termina indicandolo, en caso contrario,
* se continua con la comunicacion sincrona, el envio de resultados y * se continua con la comunicacion sincrona, el envio de resultados y
...@@ -294,7 +292,7 @@ int check_redistribution(int iter, MPI_Request **comm_req) { ...@@ -294,7 +292,7 @@ int check_redistribution(int iter, MPI_Request **comm_req) {
req_completed = &(*comm_req)[0]; req_completed = &(*comm_req)[0];
} else { // MAL_USE_IBARRIER } else { // MAL_USE_IBARRIER
req_completed = &(*comm_req)[1]; req_completed = &(*comm_req)[1];
} }
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE); test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) { if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
...@@ -303,11 +301,10 @@ int check_redistribution(int iter, MPI_Request **comm_req) { ...@@ -303,11 +301,10 @@ int check_redistribution(int iter, MPI_Request **comm_req) {
} }
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD); MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended
//MPI_Wait(req_completed, MPI_STATUS_IGNORE); MPI_Wait(req_completed, MPI_STATUS_IGNORE);
if(config_file->aib == MAL_USE_IBARRIER) { if(config_file->aib == MAL_USE_IBARRIER) {
MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta
...@@ -422,25 +419,6 @@ void iterate(double *matrix, int n, int async_comm) { ...@@ -422,25 +419,6 @@ void iterate(double *matrix, int n, int async_comm) {
results->iters_type[results->iter_index] = operations; results->iters_type[results->iter_index] = operations;
results->iter_index = results->iter_index + 1; results->iter_index = results->iter_index + 1;
} }
/*
if(async_comm == MAL_ASYNC_PENDING) { // Se esta realizando una redistribucion de datos asincrona
operations = results->iters_type[config_file->iters[group->grp] - 1];
for (i=0; i<operations; i++) {
//computeMatrix(matrix, n);
computePi(n);
}
actual_time = MPI_Wtime(); // Guardar tiempos
operations = 0;
} else { // No hay redistribucion de datos actualmente
while (actual_time - start_time < time) {
//computeMatrix(matrix, n);
computePi(n);
operations++;
actual_time = MPI_Wtime(); // Guardar tiempos
}
}
*/
/* /*
* Realiza una multiplicación de matrices de tamaño n * Realiza una multiplicación de matrices de tamaño n
......
...@@ -28,6 +28,11 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num ...@@ -28,6 +28,11 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int num
void send_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req); void send_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req); void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
//TESTS
void send_async_arrays2(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request **comm_req);
void recv_async_arrays2(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request **comm_req);
//
// DIST FUNCTIONS // DIST FUNCTIONS
void get_dist(int qty, int id, int numP, struct Dist_data *dist_data); void get_dist(int qty, int id, int numP, struct Dist_data *dist_data);
void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts); void set_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
...@@ -281,7 +286,6 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in ...@@ -281,7 +286,6 @@ void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm in
* del otro grupo se transmiten elementos. * del otro grupo se transmiten elementos.
*/ */
void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req) { void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
int i; int i;
// PREPARAR ENVIO DEL VECTOR // PREPARAR ENVIO DEL VECTOR
...@@ -305,7 +309,6 @@ void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, i ...@@ -305,7 +309,6 @@ void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, i
* del otro grupo se transmiten elementos. * del otro grupo se transmiten elementos.
*/ */
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) { void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
int i; int i;
char *aux = malloc(1); char *aux = malloc(1);
......
...@@ -39,6 +39,11 @@ int create_hostfile(char *jobId, char **file_name); ...@@ -39,6 +39,11 @@ int create_hostfile(char *jobId, char **file_name);
int write_hostfile_node(int ptr, int qty, char *node_name); int write_hostfile_node(int ptr, int qty, char *node_name);
void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_nodes); void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_nodes);
//TESTS
void fill_str_hostfile(slurm_job_info_t job_record, int *qty, int used_nodes, char **hostfile_str);
int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name);
//
void print_Info(MPI_Info info); void print_Info(MPI_Info info);
//--------------PUBLIC FUNCTIONS---------------// //--------------PUBLIC FUNCTIONS---------------//
...@@ -146,7 +151,7 @@ void processes_dist(char *argv[], int numP_childs, int type) { ...@@ -146,7 +151,7 @@ void processes_dist(char *argv[], int numP_childs, int type) {
int used_nodes=0; int used_nodes=0;
int *procs_array; int *procs_array;
char *hostfile_name; char *hostfile;
// Get Slurm job info // Get Slurm job info
tmp = getenv("SLURM_JOB_ID"); tmp = getenv("SLURM_JOB_ID");
...@@ -163,14 +168,23 @@ void processes_dist(char *argv[], int numP_childs, int type) { ...@@ -163,14 +168,23 @@ void processes_dist(char *argv[], int numP_childs, int type) {
slurm_data->qty_procs = numP_childs; slurm_data->qty_procs = numP_childs;
// CREATE/UPDATE HOSTFILE // CREATE/UPDATE HOSTFILE
ptr = create_hostfile(tmp, &hostfile_name);
ptr = create_hostfile(tmp, &hostfile);
MPI_Info_create(&(slurm_data->info)); MPI_Info_create(&(slurm_data->info));
MPI_Info_set(slurm_data->info, "hostfile", hostfile_name); MPI_Info_set(slurm_data->info, "hostfile", hostfile);
free(hostfile_name); free(hostfile);
// SET NEW DISTRIBUTION // SET NEW DISTRIBUTION
fill_hostfile(last_record, ptr, procs_array, used_nodes); fill_hostfile(last_record, ptr, procs_array, used_nodes);
close(ptr); close(ptr);
// TEST
/*
fill_str_hostfile(last_record, procs_array, used_nodes, &hostfile);
MPI_Info_create(&(slurm_data->info));
MPI_Info_set(slurm_data->info, "hosts", hostfile);
*/
// Free JOB INFO // Free JOB INFO
slurm_free_job_info_msg(j_info); slurm_free_job_info_msg(j_info);
...@@ -322,3 +336,51 @@ int write_hostfile_node(int ptr, int qty, char *node_name) { ...@@ -322,3 +336,51 @@ int write_hostfile_node(int ptr, int qty, char *node_name) {
return 0; return 0;
} }
void fill_str_hostfile(slurm_job_info_t job_record, int *qty, int used_nodes, char **hostfile_str) {
int i=0, len=0;
char *host;
hostlist_t hostlist;
hostlist = slurm_hostlist_create(job_record.nodes);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
len = write_str_node(hostfile_str, len, qty[i], host);
i++;
free(host);
}
slurm_hostlist_destroy(hostlist);
}
int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name) {
int err, len_node, len, i;
char *ocurrence;
len_node = strlen(node_name);
len = qty * (len_node + 1);
if(len_og == 0) { // Memoria no reservada
*hostfile_str = (char *) malloc(len * sizeof(char));
} else { // Cadena ya tiene datos
*hostfile_str = (char *) realloc(*hostfile_str, (len_og + len) * sizeof(char));
}
if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria
ocurrence = (char *) malloc((len_node+1) * sizeof(char));
if(ocurrence == NULL) return -1; // No ha sido posible alojar la memoria
err = sprintf(ocurrence, "%s,", node_name);
if(err < 0) return -2; // No ha sido posible escribir sobre la variable auxiliar
i=0;
if(len_og == 0) {
i++;
strcpy(*hostfile_str, ocurrence);
}
for(; i<qty; i++){
strcat(*hostfile_str, ocurrence);
}
free(ocurrence);
return len;
}
...@@ -7,9 +7,9 @@ module load mpich-3.4.1-noucx ...@@ -7,9 +7,9 @@ module load mpich-3.4.1-noucx
#export HYDRA_DEBUG=1 #export HYDRA_DEBUG=1
#-disable-hostname-propagation -disable-auto-cleanup -pmi-port -hosts n00,n01 #-disable-hostname-propagation -disable-auto-cleanup -pmi-port -hosts n00,n01
numP=$(bash recordMachinefile.sh test.ini) numP=$(bash recordMachinefile.sh $1)
mpirun -f hostfile.o$SLURM_JOB_ID -np $numP ./a.out test.ini mpirun -f hostfile.o$SLURM_JOB_ID -np $numP ./a.out $1 $2
#mpirun -np 2 ./a.out test.ini #mpirun -np 2 ./a.out test.ini
rm hostfile.o$SLURM_JOB_ID rm hostfile.o$SLURM_JOB_ID
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment