Commit cd5d81f7 authored by iker_martin's avatar iker_martin
Browse files

Arreglado error de memoria en expansion. Config ahora tiene en cuenta como...

Arreglado error de memoria en expansion. Config ahora tiene en cuenta como crear procesos. El metodo Merge ya no tiene problemas de escritura - Falta asegurar un nombre correcto en los ficheros
parent af1175a5
......@@ -39,8 +39,12 @@ static int handler(void* user, const char* section, const char* name,
pconfig->sdr = atoi(value);
} else if (MATCH("general", "ADR")) {
pconfig->adr = atoi(value);
} else if (MATCH("general", "AIB")) {
} else if (MATCH("general", "AIB")) { //TODO Refactor cambiar nombre
pconfig->aib = atoi(value);
} else if (MATCH("general", "CST")) {
pconfig->cst = atoi(value);
} else if (MATCH("general", "CSS")) {
pconfig->css = atoi(value);
} else if (MATCH("general", "time")) {
pconfig->general_time = atof(value);
......@@ -136,8 +140,8 @@ void free_config(configuration *user_config) {
void print_config(configuration *user_config, int grp) {
if(user_config != NULL) {
int i;
printf("Config loaded: resizes=%d, matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, time=%f || grp=%d\n",
user_config->resizes, user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->general_time, grp);
printf("Config loaded: resizes=%d, matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d, time=%f || grp=%d\n",
user_config->resizes, user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst, user_config->general_time, grp);
for(i=0; i<user_config->resizes; i++) {
printf("Resize %d: Iters=%d, Procs=%d, Factors=%f, Phy=%d\n",
i, user_config->iters[i], user_config->procs[i], user_config->factors[i], user_config->phy_dist[i]);
......@@ -161,8 +165,8 @@ void print_config_group(configuration *user_config, int grp) {
sons = user_config->procs[grp+1];
}
printf("Config: matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d time=%f\n",
user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->general_time);
printf("Config: matrix=%d, comm_tam=%d, sdr=%d, adr=%d, aib=%d, css=%d, cst=%d, time=%f\n",
user_config->matrix_tam, user_config->comm_tam, user_config->sdr, user_config->adr, user_config->aib, user_config->css, user_config->cst, user_config->general_time);
printf("Config Group: iters=%d, factor=%f, phy=%d, procs=%d, parents=%d, sons=%d\n",
user_config->iters[grp], user_config->factors[grp], user_config->phy_dist[grp], user_config->procs[grp], parents, sons);
}
......@@ -247,15 +251,15 @@ configuration *recv_config_file(int root, MPI_Comm intercomm) {
* de la estructura de configuracion con una sola comunicacion.
*/
void def_struct_config_file(configuration *config_file, MPI_Datatype *config_type) {
int i, counts = 9;
int blocklengths[9] = {1, 1, 1, 1, 1, 1, 1, 1, 1};
int i, counts = 11;
int blocklengths[11] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = MPI_INT;
types[7] = MPI_FLOAT;
types[8] = MPI_DOUBLE;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = types[7] = types[8] = MPI_INT;
types[9] = MPI_FLOAT;
types[10] = MPI_DOUBLE;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
......@@ -267,8 +271,10 @@ void def_struct_config_file(configuration *config_file, MPI_Datatype *config_typ
MPI_Get_address(&(config_file->sdr), &displs[4]);
MPI_Get_address(&(config_file->adr), &displs[5]);
MPI_Get_address(&(config_file->aib), &displs[6]);
MPI_Get_address(&(config_file->general_time), &displs[7]);
MPI_Get_address(&(config_file->Top), &displs[8]);
MPI_Get_address(&(config_file->css), &displs[7]);
MPI_Get_address(&(config_file->cst), &displs[8]);
MPI_Get_address(&(config_file->general_time), &displs[9]);
MPI_Get_address(&(config_file->Top), &displs[10]);
for(i=0;i<counts;i++) displs[i] -= dir;
......
......@@ -8,6 +8,7 @@ typedef struct
int resizes;
int actual_resize;
int matrix_tam, comm_tam, sdr, adr;
int css, cst;
int aib;
float general_time;
double Top;
......
......@@ -107,6 +107,19 @@ void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr)
}
}
/*
* Pone el indice del siguiente elemento a escribir a 0 para los vectores
* que tengan que ver con las iteraciones.
* Por tanto, todos los anteriores valores de esos vectores pasan a ser invalidos
* si se intentan acceder desde un código externo.
*
* Solo es necesario llamar a esta funcion cuando se ha realizado una
* expansion con el metodo MERGE
*/
void reset_results_index(results_data *results) {
results->iter_index = 0;
}
//======================================================||
//======================================================||
......
......@@ -21,6 +21,7 @@ void send_results(results_data *results, int root, int resizes, MPI_Comm interco
void recv_results(results_data *results, int root, int resizes, MPI_Comm intercomm);
void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr);
void reset_results_index(results_data *results);
void print_iter_results(results_data results, int last_normal_iter_index);
void print_global_results(results_data results, int resizes);
......
......@@ -15,18 +15,6 @@
#define ROOT 0
int work();
/*void Sons_init();
int checkpoint(int iter, int state, MPI_Request **comm_req);
int TC(int numS, int comm_type);
int start_redistribution(int iter, int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req);
int end_redistribution(int iter);
int thread_creation();
int thread_check(int iter);
void* thread_async_work(void* void_arg);
*/
void iterate(double *matrix, int n, int async_comm);
void init_group_struct(char *argv[], int argc, int myId, int numP);
......@@ -90,17 +78,13 @@ int main(int argc, char *argv[]) {
get_malleability_user_comm(&comm);
get_benchmark_configuration(&config_file); //No se obtiene bien el archivo
get_benchmark_results(&results); //No se obtiene bien el archivo
set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr);
set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion
if(config_file->comm_tam) {
group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char));
}
int entries;
void *value = NULL;
malleability_get_entries(&entries, 1, 1);
malleability_get_data(&value, 0, 1, 1);
group->grp = *((int *)value);
free(value);
......@@ -108,53 +92,56 @@ int main(int argc, char *argv[]) {
run_id = *((int *)value);
free(value);
malleability_get_data(&value, 2, 1, 1);
group->iter_start = *((int *)value);
free(value);
group->grp = group->grp + 1;
}
int spawn_type = COMM_SPAWN_MERGE; // TODO Pasar a CONFIG
int spawn_is_single = COMM_SPAWN_MULTIPLE; // TODO Pasar a CONFIG
//config_file->cst = COMM_SPAWN_MERGE; // TODO Pasar a CONFIG
//config_file->css = COMM_SPAWN_MULTIPLE; // TODO Pasar a CONFIG
group->grp = group->grp - 1; // TODO REFACTOR???
printf("TEST 3\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
do {
group->grp = group->grp + 1;
set_benchmark_grp(group->grp);
get_malleability_user_comm(&comm);
printf("TEST 4\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
if(comm == MPI_COMM_NULL) {
printf("Mi comunicador es nulo?\n");
}
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
printf("TEST 5\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
//printf("MAIN 2\n"); fflush(stdout); MPI_Barrier(comm);
if(config_file->resizes != group->grp + 1) {
set_malleability_configuration(spawn_type, spawn_is_single, config_file->phy_dist[group->grp+1], -1, config_file->aib, -1);
set_malleability_configuration(config_file->cst, config_file->css, config_file->phy_dist[group->grp+1], -1, config_file->aib, -1);
set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED
if(group->grp == 0) {
malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
}
}
printf("TEST 7\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
res = work();
printf("TEST 8\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
print_local_results();
} while((config_file->resizes > group->grp + 1) && (spawn_type == COMM_SPAWN_MERGE || spawn_type == COMM_SPAWN_MERGE_PTHREAD));
reset_results_index(results);
} while((config_file->resizes > group->grp + 1) && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD));
if(res) { // Se he llegado al final de la aplicacion
// MPI_Barrier(comm); FIXME?
MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
results->exec_time = MPI_Wtime() - results->exec_start;
}
print_final_results(); // Pasado este punto ya no pueden escribir los procesos
if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
MPI_Comm_free(&comm);
}
MPI_Finalize();
// free_application_data();
free_application_data();
return 0;
}
......@@ -182,19 +169,11 @@ int work() {
//initMatrix(&matrix, config_file->matrix_tam);
state = MAL_NOT_STARTED;
if(group->grp == 0) {
malleability_add_data(&iter, 1, MAL_INT, 1, 1);
} else {
void *value = NULL;
malleability_get_data(&value, 2, 1, 1);
group->iter_start = *((int *)value);
free(value);
}
res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) {
iterate(matrix, config_file->matrix_tam, state);
}
if(config_file->iters[group->grp] == iter && config_file->resizes != group->grp + 1)
state = malleability_checkpoint();
......@@ -204,6 +183,7 @@ int work() {
iter++;
state = malleability_checkpoint();
}
group->iter_start = iter;
if(config_file->resizes - 1 == group->grp) res=1;
return res;
......
......@@ -15,7 +15,7 @@ MPI_Comm *returned_comm;
struct Slurm_data {
char *cmd; // Executable name
int qty_procs;
int qty_procs, result_procs;
MPI_Info info;
int type_creation;
int spawn_is_single;
......@@ -68,12 +68,20 @@ void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_node
* Devuelve el estado de el procedimiento. Si no devuelve "COMM_FINISHED", es necesario llamar a
* "check_slurm_comm()".
*/
int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child) {
int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child) {
int spawn_qty;
slurm_data = malloc(sizeof(struct Slurm_data));
slurm_data->type_creation = type_creation;
slurm_data->spawn_is_single = spawn_is_single;
slurm_data->result_procs = numC;
spawn_qty = numC;
if(type_creation == COMM_SPAWN_MERGE || type_creation == COMM_SPAWN_MERGE_PTHREAD) {
if (numP < slurm_data->result_procs) {
spawn_qty = slurm_data->result_procs - numP;
}
}
if(type_creation == COMM_SPAWN_SERIAL || slurm_data->type_creation == COMM_SPAWN_MERGE) {
if(myId == root) {
......@@ -105,14 +113,13 @@ int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int
} else if(type_creation == COMM_SPAWN_PTHREAD || slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) {
commSlurm = MAL_SPAWN_PENDING;
if((spawn_is_single && myId == root) || !spawn_is_single) {
if((spawn_is_single && myId == root) || !spawn_is_single || (slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && numP > slurm_data->result_procs)) {
Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data));
creation_data->argv = argv;
creation_data->numP_childs = numP;
creation_data->numP_childs = spawn_qty;
creation_data->myId = myId;
creation_data->root = root;
creation_data->type_dist = type_dist;
creation_data->spawn_method = type_creation;
creation_data->comm = comm;
if(pthread_create(&slurm_thread, NULL, thread_work, (void *)creation_data)) {
......@@ -126,15 +133,16 @@ int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int
return commSlurm;
}
/*
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread) { // TODO Borrar numP si no se usa
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread) {
int state=-10;
if(slurm_data->type_creation == COMM_SPAWN_PTHREAD && slurm_data->spawn_is_single == 0) {
if(slurm_data->type_creation == COMM_SPAWN_PTHREAD || slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) {
if(!slurm_data->spawn_is_single || (slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && numP > slurm_data->result_procs)) {
MPI_Allreduce(&commSlurm, &state, 1, MPI_INT, MPI_MIN, comm);
if(state != MAL_SPAWN_COMPLETED) return state; // Continue only if asynchronous process creation has ended
......@@ -144,10 +152,10 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
*child = *returned_comm;
} else if (slurm_data->type_creation == COMM_SPAWN_PTHREAD && slurm_data->spawn_is_single) {
} else if (slurm_data->spawn_is_single) {
MPI_Bcast(&commSlurm, 1, MPI_INT, root, comm);
state = commSlurm;
if(state == MAL_SPAWN_PENDING) return state; // Continue only if asynchronous process creation has ended
......@@ -161,9 +169,15 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
*child = *returned_comm;
} else {
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
generic_spawn(myId, root, slurm_data->spawn_is_single, child, comm_thread);
}
} else {
printf("Error Check spawn: Configuracion invalida\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
} else {
return commSlurm;
}
......@@ -178,7 +192,6 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm com
return commSlurm;
}
/*
* Conectar grupo de hijos con grupo de padres
* Devuelve un intercomunicador para hablar con los padres
......@@ -222,7 +235,6 @@ void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm)
void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group) {
MPI_Comm new_comm = MPI_COMM_NULL;
// TODO Indicar por Bcast que es con MERGE
MPI_Intercomm_merge(intercomm, is_children_group, &new_comm); //El que pone 0 va primero
//MPI_Comm_free(intercomm); TODO Nueva redistribucion para estos casos y liberar aqui
// *intercomm = MPI_COMM_NULL;
......@@ -290,7 +302,6 @@ void* thread_work(void* creation_data_arg) {
//*returned_comm = aux_comm;
}
}
//commSlurm = MAL_SPAWN_COMPLETED;
free(creation_data);
pthread_exit(NULL);
......@@ -363,7 +374,9 @@ void generic_spawn(int myId, int root, int spawn_is_single, MPI_Comm *child, MPI
if(myId == root) rootBcast = MPI_ROOT;
create_processes(myId, root, child, comm);
MPI_Bcast(&spawn_is_single, 1, MPI_INT, rootBcast, *child);
if(*child == MPI_COMM_NULL) {printf("P%d tiene un error --\n", myId); fflush(stdout);} else {printf("P%d guay\n", myId);}
}
commSlurm = MAL_SPAWN_COMPLETED;
}
......
......@@ -5,7 +5,7 @@
#include <slurm/slurm.h>
#include "malleabilityStates.h"
int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child);
int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child);
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread);
void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm);
......
......@@ -145,9 +145,10 @@ int malleability_checkpoint() {
} else if(state == MAL_SPAWN_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm);
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos
if (state == MAL_SPAWN_COMPLETED) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
printf("TEST PADRES\n");
state = start_redistribution();
}
......@@ -418,9 +419,7 @@ void Children_init() {
// Guardar los resultados de esta transmision
recv_results(mall_conf->results, mall->root, mall_conf->config_file->resizes, mall->intercomm);
printf("HIJOS 1 %d\n", mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
proc_adapt_expand(&(mall->numP), mall->numP+numP_parents, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN); //TODO Que valor se pasa?
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
......@@ -429,9 +428,7 @@ void Children_init() {
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
}
printf("HIJOS 2\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
MPI_Comm_disconnect(&(mall->intercomm));
}
......@@ -448,7 +445,7 @@ void Children_init() {
*/
int spawn_step(){
mall_conf->results->spawn_start = MPI_Wtime();
state = init_slurm_comm(mall->name_exec, mall->myId, mall->numC_spawned, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm));
state = init_slurm_comm(mall->name_exec, mall->myId, mall->numP, mall->numC_spawned, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm));
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
......@@ -516,10 +513,14 @@ int start_redistribution() {
int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
printf("TEST EXPAND PADRES 1\n");
if(mall->intercomm == MPI_COMM_NULL) {printf("P%d tiene un error\n", mall->myId);}
fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->root), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->numP), 1, MPI_INT, rootBcast, mall->intercomm);
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
printf("TEST EXPAND PADRES 2\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
mall_conf->results->async_start = MPI_Wtime();
......@@ -611,8 +612,6 @@ int end_redistribution() {
send_results(mall_conf->results, rootBcast, mall_conf->config_file->resizes, mall->intercomm);
printf("PADRES 7\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
double time_adapt = MPI_Wtime();
if(mall->numP > mall->numC) { //Shrink
......@@ -622,8 +621,6 @@ int end_redistribution() {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt;
} else {
printf("PADRES 8\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
......@@ -636,11 +633,9 @@ int end_redistribution() {
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_adapt;
}
result = MAL_DIST_ADAPTED;
} else {
result = MAL_DIST_COMPLETED;
// result = MAL_DIST_ADAPTED;
}
printf("PADRES 11\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
result = MAL_DIST_COMPLETED;
MPI_Comm_disconnect(&(mall->intercomm));
state = MAL_NOT_STARTED;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment