Commit 45bf2b43 authored by iker_martin's avatar iker_martin
Browse files

Refactor resultados y hotfixes de recogida de tiempos

parent b6569593
......@@ -240,6 +240,6 @@ void free_results_data(results_data *results) {
free(results->iters_time);
free(results->iters_type);
}
//free(*results); FIXME Borrar
}
//free(*results); FIXME Borrar
}
......@@ -14,9 +14,9 @@ typedef struct {
double sync_start, sync_end, *sync_time;
double async_start, async_end, *async_time;
double exec_start, exec_time;
//Overcharge time is time spent in malleability that is from IO modules
} results_data;
void send_results(results_data *results, int root, int resizes, MPI_Comm intercomm);
void recv_results(results_data *results, int root, int resizes, MPI_Comm intercomm);
......
......@@ -75,9 +75,10 @@ int main(int argc, char *argv[]) {
MPI_Barrier(comm);
results->exec_start = MPI_Wtime();
} else { //Init hijos
get_malleability_user_comm(&comm);
get_benchmark_configuration(&config_file); //No se obtiene bien el archivo
get_benchmark_results(&results); //No se obtiene bien el archivo
get_benchmark_configuration(&config_file);
get_benchmark_results(&results);
set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
if(config_file->comm_tam) {
......@@ -131,15 +132,16 @@ int main(int argc, char *argv[]) {
MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
results->exec_time = MPI_Wtime() - results->exec_start;
}
print_final_results(); // Pasado este punto ya no pueden escribir los procesos
if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
MPI_Comm_free(&comm);
}
free_application_data();
if(group->myId == ROOT) MPI_Abort(MPI_COMM_WORLD, -100);
if(group->myId == ROOT && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD)) {
MPI_Abort(MPI_COMM_WORLD, -100);
}
free_application_data();
MPI_Finalize();
return 0;
......@@ -173,16 +175,19 @@ int work() {
iterate(matrix, config_file->matrix_tam, state);
}
if(config_file->iters[group->grp] == iter && config_file->resizes != group->grp + 1)
if(config_file->resizes != group->grp + 1)
state = malleability_checkpoint();
iter = 0;
while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING) {
iterate(matrix, config_file->matrix_tam, state);
iter++;
if(iter < config_file->iters[group->grp+1]) {
iterate(matrix, config_file->matrix_tam, state);
iter++;
group->iter_start = iter;
}
state = malleability_checkpoint();
}
group->iter_start = iter;
if(config_file->resizes - 1 == group->grp) res=1;
if(state == MAL_ZOMBIE) res=state;
......@@ -208,13 +213,14 @@ void iterate(double *matrix, int n, int async_comm) {
int i, operations = 0;
double aux = 0;
start_time = actual_time = MPI_Wtime();
start_time = MPI_Wtime();
operations = time / Top; //FIXME Calcular una sola vez
for(i=0; i < operations; i++) {
aux += computePiSerial(n);
}
if(config_file->comm_tam) {
MPI_Bcast(group->compute_comm_array, config_file->comm_tam, MPI_CHAR, ROOT, comm);
}
......@@ -277,6 +283,7 @@ int print_local_results() {
print_iter_results(*results, config_file->iters[group->grp] -1);
free(file_name);
fflush(stdout);
close(1);
dup(ptr_out);
}
......@@ -303,6 +310,7 @@ int print_final_results() {
create_out_file(file_name, &ptr_global, 1);
print_config(config_file, group->grp);
print_global_results(*results, config_file->resizes);
fflush(stdout);
free(file_name);
}
......@@ -396,7 +404,6 @@ void free_application_data() {
free(results);
}
free(group);
}
......
......@@ -23,7 +23,8 @@ struct Slurm_data {
typedef struct {
char *argv;
int numP_childs, myId, root, type_dist;
int numP_childs, myId, root, already_created;
int type_dist;
int spawn_is_single;
int spawn_method;
MPI_Comm comm;
......@@ -34,13 +35,13 @@ typedef struct {
void* thread_work(void* creation_data_arg);
//--------------PRIVATE DECLARATIONS---------------//
void processes_dist(char *argv, int numP_childs, int type_dist);
void processes_dist(char *argv, int numP_childs, int already_created, int type_dist);
void generic_spawn(int myId, int root, int is_single, MPI_Comm *child, MPI_Comm comm);
void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child);
int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm);
void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty, int *used_nodes);
void node_dist(slurm_job_info_t job_record, int type, int total_procs, int already_created, int **qty, int *used_nodes);
void fill_str_hostfile(slurm_job_info_t job_record, int *qty, int used_nodes, char **hostfile_str);
int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name);
......@@ -69,7 +70,7 @@ void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_node
* "check_slurm_comm()".
*/
int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child) {
int spawn_qty;
int spawn_qty, already_created = 0;
slurm_data = malloc(sizeof(struct Slurm_data));
slurm_data->type_creation = type_creation;
......@@ -79,13 +80,14 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
if(type_creation == COMM_SPAWN_MERGE || type_creation == COMM_SPAWN_MERGE_PTHREAD) {
if (numP < slurm_data->result_procs) {
spawn_qty = slurm_data->result_procs - numP;
already_created = numP;
}
}
if(type_creation == COMM_SPAWN_SERIAL || slurm_data->type_creation == COMM_SPAWN_MERGE) {
if(myId == root) {
processes_dist(argv, numP, type_dist);
processes_dist(argv, spawn_qty, already_created, type_dist);
} else {
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
......@@ -93,13 +95,6 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
// WORK
generic_spawn(myId, root, slurm_data->spawn_is_single, child, comm);
if(slurm_data->type_creation == COMM_SPAWN_MERGE) {
int numParents;
MPI_Comm_size(comm, &numParents);
if(numParents < numP) { //Expand
//proc_adapt_expand(numParents, numP, child, comm, MALLEABILITY_NOT_CHILDREN);
}
}
// END WORK
if(myId == root && slurm_data->info != MPI_INFO_NULL) {
......@@ -108,8 +103,6 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
free(slurm_data->cmd);
free(slurm_data);
commSlurm = MAL_SPAWN_COMPLETED;
} else if(type_creation == COMM_SPAWN_PTHREAD || slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) {
commSlurm = MAL_SPAWN_PENDING;
......@@ -117,6 +110,7 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data));
creation_data->argv = argv;
creation_data->numP_childs = spawn_qty;
creation_data->already_created = already_created;
creation_data->myId = myId;
creation_data->root = root;
creation_data->type_dist = type_dist;
......@@ -158,7 +152,7 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
MPI_Bcast(&commSlurm, 1, MPI_INT, root, comm);
state = commSlurm;
if(state == MAL_SPAWN_PENDING) return state; // Continue only if asynchronous process creation has ended
if(state != MAL_SPAWN_SINGLE_PENDING) return state; // Continue only if asynchronous process creation has ended
if(myId == root) {
if(pthread_join(slurm_thread, NULL)) {
......@@ -278,30 +272,18 @@ void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId) {
* se avisa al hilo maestro.
*/
void* thread_work(void* creation_data_arg) {
int numP;
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
if(creation_data->myId == creation_data->root) {
processes_dist(creation_data->argv, creation_data->numP_childs, creation_data->type_dist);
processes_dist(creation_data->argv, creation_data->numP_childs, creation_data->already_created, creation_data->type_dist);
} else {
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
}
commSlurm = MAL_SPAWN_COMPLETED; // TODO REFACTOR?
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
if(slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) {
//MPI_Comm_size(creation_data->comm, &numP);
numP= 1; //FIXME BORRAR
if(numP < creation_data->numP_childs) { //Expand
//TODO Crear nueva redistribucion y descomentar esto
//proc_adapt_expand(numP, creation_data->numP_childs, returned_comm, &aux_comm, MALLEABILITY_NOT_CHILDREN);
//*returned_comm = aux_comm;
}
}
free(creation_data);
pthread_exit(NULL);
}
......@@ -313,7 +295,7 @@ void* thread_work(void* creation_data_arg) {
* para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica
* para los procesos y creando un fichero hostfile.
*/
void processes_dist(char *argv, int numP_childs, int type) {
void processes_dist(char *argv, int numP_childs, int already_created, int type) {
int jobId;
char *tmp;
job_info_msg_t *j_info;
......@@ -334,7 +316,7 @@ void processes_dist(char *argv, int numP_childs, int type) {
strcpy(slurm_data->cmd, argv);
// GET NEW DISTRIBUTION
node_dist(last_record, type, numP_childs, &procs_array, &used_nodes);
node_dist(last_record, type, numP_childs, already_created, &procs_array, &used_nodes);
slurm_data->qty_procs = numP_childs;
/*
......@@ -393,6 +375,7 @@ void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child)
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *child, MPI_STATUS_IGNORE);
commSlurm = MAL_SPAWN_SINGLE_PENDING;
} else {
port_name = malloc(1);
}
......@@ -432,7 +415,7 @@ int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm) {
* COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de
* ocupar otro nodo.
*/
void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty, int *used_nodes) {
void node_dist(slurm_job_info_t job_record, int type, int total_procs, int already_created, int **qty, int *used_nodes) {
int i, asigCores;
int tamBl, remainder;
int *procs;
......@@ -453,8 +436,8 @@ void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty
} else if (type == 2) { // DIST CPUs
tamBl = job_record.num_cpus / job_record.num_nodes;
asigCores = 0;
i = 0;
*used_nodes = 0;
i = already_created / tamBl;
*used_nodes = already_created / tamBl;
while(asigCores+tamBl <= total_procs) {
asigCores += tamBl;
......@@ -462,6 +445,7 @@ void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty
i = (i+1) % job_record.num_nodes;
(*used_nodes)++;
}
if(asigCores < total_procs) {
procs[i] += total_procs - asigCores;
(*used_nodes)++;
......@@ -474,6 +458,7 @@ void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty
(*qty)[i] = procs[i];
}
free(procs);
}
/*
......@@ -487,7 +472,9 @@ void fill_str_hostfile(slurm_job_info_t job_record, int *qty, int used_nodes, ch
hostlist = slurm_hostlist_create(job_record.nodes);
while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) {
len = write_str_node(hostfile_str, len, qty[i], host);
if(qty[i] != 0) {
len = write_str_node(hostfile_str, len, qty[i], host);
}
i++;
free(host);
}
......
......@@ -423,9 +423,6 @@ void Children_init() {
}
}
// Guardar los resultados de esta transmision
recv_results(mall_conf->results, mall->root, mall_conf->config_file->resizes, mall->intercomm);
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN); //TODO Que valor se pasa?
......@@ -437,7 +434,11 @@ void Children_init() {
mall->user_comm = aux;
}
// Guardar los resultados de esta transmision
recv_results(mall_conf->results, mall->root, mall_conf->config_file->resizes, mall->intercomm);
MPI_Comm_disconnect(&(mall->intercomm));
}
//======================================================||
......@@ -457,7 +458,7 @@ int spawn_step(){
state = shrink_redistribution();
return state;
}
state = init_slurm_comm(mall->name_exec, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm));
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
......@@ -491,6 +492,7 @@ int start_redistribution() {
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->root), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->numP), 1, MPI_INT, rootBcast, mall->intercomm);
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
......@@ -569,8 +571,7 @@ int end_redistribution() {
MPI_Comm aux;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
mall_conf->results->sync_start = MPI_Wtime();
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
......@@ -580,8 +581,6 @@ int end_redistribution() {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], MPI_INT, rootBcast, mall->intercomm);
}
}
send_results(mall_conf->results, rootBcast, mall_conf->config_file->resizes, mall->intercomm);
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
double time_adapt = MPI_Wtime();
......@@ -599,6 +598,8 @@ int end_redistribution() {
// result = MAL_DIST_ADAPTED;
}
send_results(mall_conf->results, rootBcast, mall_conf->config_file->resizes, mall->intercomm);
result = MAL_DIST_COMPLETED;
MPI_Comm_disconnect(&(mall->intercomm));
......
......@@ -3,10 +3,11 @@
#define MAL_ZOMBIE -3
#define MAL_NOT_STARTED 0
#define MAL_SPAWN_PENDING 1
#define MAL_SPAWN_COMPLETED 2
#define MAL_DIST_PENDING 3
#define MAL_DIST_COMPLETED 4
#define MAL_DIST_ADAPTED 5
#define MAL_SPAWN_SINGLE_PENDING 2
#define MAL_SPAWN_COMPLETED 3
#define MAL_DIST_PENDING 4
#define MAL_DIST_COMPLETED 5
#define MAL_DIST_ADAPTED 6
// TODO Refactor
......
......@@ -8,6 +8,8 @@ ResultsDir="Results/"
ResultsDirName=$1
maxIndex=$2
cantidadGrupos=$3 #Contando a los padres
totalEjGrupo=$4 #Total de ejecuciones por grupo
maxTime=$5 #Maximo tiempo que se considera válido
if [ $# -lt 3 ]
then
......@@ -43,11 +45,25 @@ qtyL=$(ls R*/R*_G?N*.out | wc -l)
if [ $qtyG == $qtyL ]
then
echo "El numero de ficheros G($qtyG) y L($qtyL) coincide"
else # TODO Expandir indicando cuales
else
#Si faltan archivos, se indican cuales faltan
echo "Faltan ejecuciones Locales o globales"
for ((i=1; i<$maxIndex; i++))
do
qtyEx=$(grep Tex -r Run$i | wc -l)
qtyIt=$(grep Top -r Run$i | wc -l)
qtyEx=$(($qtyEx * 2))
if [ $qtyEx -ne $qtyIt ]
then
diff=$(($totalEjGrupo-$qtyEx))
echo "Faltan archivos en Run$i"
fi
done
exit -1
fi
#grep -rn "2.\." R* TODO Testear que el tiempo teorico maximo es valido?
#Comprobar si hay runs con tiempo negativos
#Si los hay, reejecutar e informar de cuales son
grep - */R* | grep Tex > errores.txt
......@@ -61,7 +77,7 @@ then
#Obtener datos de una ejecución erronea
run=$(echo $lineRun | cut -d 'R' -f3 | cut -d '_' -f1)
if [ $run -gt $maxIndex ]
then #Indice de ejecuciones posteriores
then #Indice de ejecuciones posteriores echas a mano -- FIXME Eliminar?
realRun=$(($run - $maxIndex))
index=$run
else # Indice de las primeras ejecuciones
......@@ -91,9 +107,39 @@ then
done
#2 - Reelanzar ejecucion
sbatch -N 2 $dir$execDir./singleRun.sh config$realRun.ini $index
proc_list=$(grep Procs R${realRun}_Global.out | cut -d '=' -f3 | cut -d ',' -f1)
proc_parents=$(echo $proc_list | cut -d ' ' -f1)
proc_children=$(echo $proc_list | cut -d ' ' -f2)
nodes=8 # Maximo actual
if [ $procs_parents -gt $procs_children ]
then
nodes=$(($procs_parents / 20))
else
nodes=$(($procs_children / 20))
fi
sbatch -N $nodes $dir$execDir./singleRun.sh config$realRun.ini $index
cd $dir$ResultsDir$ResultsDirName
done < errores.txt
exit 0
fi
#Comprobar que todas las ejecuciones tienen todas las ejecucciones que tocan
#Solo es necesario comprobar el global.
qty_missing=0
for ((i=1; i<$maxIndex; i++))
do
qtyEx=$(grep Tex -r Run$i | wc -l)
if [ $qtyEx -ne $totalEjGrupo ]
then
diff=$(($totalEjGrupo-$qtyEx))
qty_missing=$(($qty_missing+1))
echo "Faltan en $i, $diff ejecuciones"
fi
done
if [ $qty_missing -eq 0 ]
then
echo "Todos los archivos tienen $totalEjGrupo ejecuciones"
fi
#!/bin/bash
#SBATCH --exclude=c01,c00
#SBATCH --exclude=c02,c01,c00
dir="/home/martini/malleability_benchmark"
codeDir="/Codes"
......
......@@ -45,11 +45,20 @@ cst_array=(0 1 2 3)
css_array=(0 1)
#Obtener cantidades de procesos posibles a ejecutar
i=0
#while [[ $value -lt $max_procs ]]
#do
# i=$(($i + 1))
# value=$((20 * $i))
# procs_array=(${procs_array[@]} $value)
#done
i=0
while [[ $value -lt $max_procs ]]
do
i=$(($i + 1))
value=$((20 * $i))
value=$((2 ** $i))
value=$(($value * 10))
procs_array=(${procs_array[@]} $value)
done
......
#!/bin/bash
#SBATCH --exclude=c01,c00
#SBATCH --exclude=c02,c01,c00
dir="/home/martini/malleability_benchmark"
codeDir="/Codes"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment