Commit e0864c44 authored by iker_martin's avatar iker_martin
Browse files

Hotfix -- Ya funciona el metodo single con threads

parent 4771566f
......@@ -75,7 +75,7 @@ void def_results_type(results_data *results, int resizes, MPI_Datatype *results_
MPI_Get_address(&(results->sync_start), &displs[0]);
MPI_Get_address(&(results->async_start), &displs[1]);
MPI_Get_address(&(results->exec_start), &displs[2]);
MPI_Get_address(&(results->spawn_thread_time[0]), &displs[3]);
MPI_Get_address(&(results->spawn_real_time[0]), &displs[3]);
MPI_Get_address(&(results->spawn_time[0]), &displs[4]); //TODO Revisar si se puede simplificar //FIXME Si hay mas de un spawn error?
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -162,14 +162,14 @@ void print_iter_results(results_data results, int last_normal_iter_index) {
void print_global_results(results_data results, int resizes) {
int i;
printf("Tspawn: ");
printf("Tspawn: "); // FIXME REFACTOR Cambiar nombre a T_resize_real
for(i=0; i< resizes - 1; i++) {
printf("%lf ", results.spawn_time[i]);
}
printf("\nTthread: ");
printf("\nTspawn_real: "); // FIXME REFACTOR Cambiar nombre a T_resize
for(i=0; i< resizes - 1; i++) {
printf("%lf ", results.spawn_thread_time[i]);
printf("%lf ", results.spawn_real_time[i]);
}
printf("\nTsync: ");
......@@ -201,7 +201,7 @@ void init_results_data(results_data *results, int resizes, int iters_size) {
//*results = malloc(1 * sizeof(results_data)); FIXME Borrar
results->spawn_time = calloc(resizes, sizeof(double));
results->spawn_thread_time = calloc(resizes, sizeof(double));
results->spawn_real_time = calloc(resizes, sizeof(double));
results->sync_time = calloc(resizes, sizeof(double));
results->async_time = calloc(resizes, sizeof(double));
......@@ -234,7 +234,7 @@ void realloc_results_iters(results_data *results, int needed) {
void free_results_data(results_data *results) {
if(results != NULL) {
free(results->spawn_time);
free(results->spawn_thread_time);
free(results->spawn_real_time);
free(results->sync_time);
free(results->async_time);
......
......@@ -10,7 +10,7 @@ typedef struct {
int *iters_type, iter_index, iters_size;
// Spawn, Thread, Sync, Async and Exec time
double spawn_start, *spawn_time, *spawn_thread_time;
double spawn_start, *spawn_time, *spawn_real_time;
double sync_start, sync_end, *sync_time;
double async_start, async_end, *async_time;
double exec_start, exec_time;
......
......@@ -12,7 +12,7 @@
#define ROOT 0
int work();
void iterate(double *matrix, int n, int async_comm);
void iterate(double *matrix, int n, int async_comm, int iter);
void init_group_struct(char *argv[], int argc, int myId, int numP);
void init_application();
......@@ -171,7 +171,7 @@ int work() {
res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) {
iterate(matrix, config_file->matrix_tam, state);
iterate(matrix, config_file->matrix_tam, state, iter);
}
if(config_file->resizes != group->grp + 1)
......@@ -180,10 +180,9 @@ int work() {
iter = 0;
while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) {
if(iter < config_file->iters[group->grp+1]) {
iterate(matrix, config_file->matrix_tam, state);
iterate(matrix, config_file->matrix_tam, state, iter);
iter++;
group->iter_start = iter;
if(iter == config_file->iters[group->grp+1]) indicate_ending_malleability(MAL_APP_ENDED);
}
state = malleability_checkpoint();
}
......@@ -205,7 +204,7 @@ int work() {
* Simula la ejecucción de una iteración de computo en la aplicación
* que dura al menos un tiempo de "time" segundos.
*/
void iterate(double *matrix, int n, int async_comm) {
void iterate(double *matrix, int n, int async_comm, int iter) {
double start_time, actual_time;
double time = config_file->general_time * config_file->factors[group->grp];
double Top = config_file->Top;
......@@ -219,7 +218,7 @@ void iterate(double *matrix, int n, int async_comm) {
for(i=0; i < operations; i++) {
aux += computePiSerial(n);
}
if(config_file->comm_tam) {
MPI_Bcast(group->compute_comm_array, config_file->comm_tam, MPI_CHAR, ROOT, comm);
}
......
......@@ -10,9 +10,12 @@
int commState = MAL_NOT_STARTED;
struct Slurm_data *slurm_data;
pthread_t slurm_thread;
pthread_t spawn_thread;
pthread_mutex_t spawn_mutex;
MPI_Comm *returned_comm;
double end_time; //FIXME REFACTOR
struct Slurm_data {
char *cmd; // Executable name
int qty_procs, result_procs;
......@@ -33,7 +36,6 @@ typedef struct {
//--------------PRIVATE SPAWN TYPE DECLARATIONS---------------//
void* thread_work(void* creation_data_arg);
void* thread_join_single(void* creation_data_arg);
//--------------PRIVATE DECLARATIONS---------------//
void processes_dist(char *argv, int numP_childs, int already_created, int type_dist);
......@@ -74,7 +76,7 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
int spawn_qty, already_created = 0;
slurm_data = malloc(sizeof(struct Slurm_data));
slurm_thread = pthread_self();
spawn_thread = pthread_self();
slurm_data->type_creation = type_creation;
slurm_data->spawn_is_single = spawn_is_single;
slurm_data->result_procs = numC;
......@@ -85,6 +87,7 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
already_created = numP;
}
}
pthread_mutex_init(&spawn_mutex,NULL);
if(type_creation == COMM_SPAWN_SERIAL || slurm_data->type_creation == COMM_SPAWN_MERGE) {
......@@ -102,6 +105,7 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
if(myId == root && slurm_data->info != MPI_INFO_NULL) {
MPI_Info_free(&(slurm_data->info));
}
pthread_mutex_destroy(&spawn_mutex);
free(slurm_data->cmd);
free(slurm_data);
......@@ -118,7 +122,7 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
creation_data->type_dist = type_dist;
creation_data->comm = comm;
if(pthread_create(&slurm_thread, NULL, thread_work, (void *)creation_data)) {
if(pthread_create(&spawn_thread, NULL, thread_work, (void *)creation_data)) {
printf("Error al crear el hilo de contacto con SLURM\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
......@@ -134,15 +138,28 @@ int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_slurm_comm(int myId, int root, int numP, int outside_state, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread) {
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread, double *real_time) {
if(slurm_data->type_creation == COMM_SPAWN_PTHREAD || slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) {
if(!slurm_data->spawn_is_single || (slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && numP > slurm_data->result_procs)) {
if (slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && numP > slurm_data->result_procs) { //TODO REFACTOR
printf("Error Check spawn: Configuracion invalida\nSe intenta usar el método Spawn junto a un Shrink merge\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
if(!slurm_data->spawn_is_single || commState == MAL_SPAWN_SINGLE_PENDING || commState == MAL_SPAWN_COMPLETED) {
int state=-10;
//printf("[%d][3] Test min\n", myId); fflush(stdout);
//pthread_mutex_lock(&spawn_mutex);
MPI_Allreduce(&commState, &state, 1, MPI_INT, MPI_MIN, comm);
if(state != MAL_SPAWN_COMPLETED && outside_state != MAL_APP_ENDED) return state; // Continue only if asynchronous process creation has ended
//pthread_mutex_unlock(&spawn_mutex);
if(state != MAL_SPAWN_COMPLETED) return state; // Continue only if asynchronous process creation has ended
if(pthread_join(slurm_thread, NULL)) {
//printf("[%d][5] Test Passed-----------\n", myId); fflush(stdout);
if(pthread_join(spawn_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
......@@ -150,15 +167,18 @@ int check_slurm_comm(int myId, int root, int numP, int outside_state, MPI_Comm *
*child = *returned_comm;
} else if (slurm_data->spawn_is_single) {
//pthread_mutex_lock(&spawn_mutex);
MPI_Bcast(&commState, 1, MPI_INT, root, comm);
int app_terminated = (outside_state == MAL_APP_ENDED);
int terminate_spawn = (app_terminated && pthread_equal(pthread_self(), slurm_thread));
//pthread_mutex_unlock(&spawn_mutex);
int threads_not_spawned = pthread_equal(pthread_self(), spawn_thread);
// Non-root processes join root to finalize the spawn
// They also must join if the application has ended its work
if(commState == MAL_SPAWN_SINGLE_START || terminate_spawn) {
if(commState == MAL_SPAWN_SINGLE_START) {
commState = MAL_SPAWN_SINGLE_PENDING;
if(myId != root) {
if(myId != root && threads_not_spawned) {
Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data));
creation_data->argv = NULL;
creation_data->numP_childs = -1;
......@@ -168,7 +188,7 @@ int check_slurm_comm(int myId, int root, int numP, int outside_state, MPI_Comm *
creation_data->type_dist = -1;
creation_data->comm = comm_thread;
if(pthread_create(&slurm_thread, NULL, thread_join_single, (void *)creation_data)) {
if(pthread_create(&spawn_thread, NULL, thread_work, (void *)creation_data)) {
printf("Error al crear el hilo de apoyo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
......@@ -176,22 +196,17 @@ int check_slurm_comm(int myId, int root, int numP, int outside_state, MPI_Comm *
}
}
printf("TEST commState %d\n",myId); fflush(stdout);
MPI_Barrier(comm);
if(myId == root) {printf("TEST preCHECK--------------------------------------------\n"); fflush(stdout);}
// Continue only if asynchronous process creation has ended or application does not have more work
if(commState != MAL_SPAWN_COMPLETED) return commState;
printf("TEST commState 2 ------------------------------------- Id%d\n",myId); fflush(stdout);
//printf("[%d][4] Test Passed-----------\n", myId); fflush(stdout);
//Asegurar que los hilos han terminado
if(pthread_join(slurm_thread, NULL)) {
if(pthread_join(spawn_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
*child = *returned_comm;
MPI_Barrier(comm);
if(myId == root) {printf("TEST afterJoin %d\n", myId); fflush(stdout);}
} else {
printf("Error Check spawn: Configuracion invalida\n");
......@@ -208,7 +223,10 @@ int check_slurm_comm(int myId, int root, int numP, int outside_state, MPI_Comm *
}
free(slurm_data->cmd);
free(slurm_data);
slurm_thread = pthread_self();
pthread_mutex_destroy(&spawn_mutex);
spawn_thread = pthread_self();
*real_time=end_time;
return commState;
}
......@@ -262,26 +280,6 @@ void* thread_work(void* creation_data_arg) {
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
}
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
free(creation_data);
pthread_exit(NULL);
}
/*
* Funcion llamada por un hilo para que este finalize junto al proceso raiz
* la creacion de procesos single.
*
* Una vez se ha realizado la union con los hijos, se avisa al hilo maestro.
*/
void* thread_join_single(void* creation_data_arg) {
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
free(creation_data);
......@@ -306,7 +304,10 @@ void generic_spawn(int myId, int root, int spawn_is_single, MPI_Comm *child, MPI
create_processes(myId, root, child, comm);
MPI_Bcast(&spawn_is_single, 1, MPI_INT, rootBcast, *child);
}
commState = MAL_SPAWN_COMPLETED;
pthread_mutex_lock(&spawn_mutex);
commState = MAL_SPAWN_COMPLETED;
end_time = MPI_Wtime();
pthread_mutex_unlock(&spawn_mutex);
}
/*
......@@ -341,12 +342,10 @@ void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child)
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *child, MPI_STATUS_IGNORE);
commState = MAL_SPAWN_SINGLE_START; // Indicate other processes to join root to end spawn procedure
} else {
port_name = malloc(10);
port_name = malloc(1);
}
//printf("TEST Connect %d\n", myId); fflush(stdout);
MPI_Comm_connect(port_name, MPI_INFO_NULL, root, comm, &newintercomm);
if(myId == root) {printf("TEST ConnectED %d\n", myId); fflush(stdout);}
if(myId == root)
MPI_Comm_free(child);
......
......@@ -6,7 +6,7 @@
#include "malleabilityStates.h"
int init_slurm_comm(char *argv, int myId, int numP, int numC, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child);
int check_slurm_comm(int myId, int root, int numP, int outside_state, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread);
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread, double *end_real_time);
void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm);
......
......@@ -49,7 +49,6 @@ typedef struct { //FIXME numC_spawned no se esta usando
} malleability_t;
int state = MAL_UNRESERVED; //FIXME Mover a otro lado
int outside_state = MAL_APP_EXECUTING;
malleability_config_t *mall_conf;
malleability_t *mall;
......@@ -135,15 +134,6 @@ void free_malleability() {
state = MAL_UNRESERVED;
}
/*
* FIXME Deprecated -- Borrar si al final no es necesario
* -- Es para evitar casos asincronos donde la aplicacion ha terminado
* pero la maleabilidad sigue en curso
*/
void indicate_ending_malleability(int new_outside_state) {
outside_state = new_outside_state;
}
/*
* Se realiza el redimensionado de procesos por parte de los padres.
*
......@@ -174,9 +164,14 @@ int malleability_checkpoint() {
}
} else if(state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
state = check_slurm_comm(mall->myId, mall->root, mall->numP, outside_state, &(mall->intercomm), mall->comm, mall->thread_comm);
double end_real_time;
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm, &end_real_time);
if (state == MAL_SPAWN_COMPLETED) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
mall_conf->results->spawn_real_time[mall_conf->grp] = end_real_time - mall_conf->results->spawn_start;
}
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos
state = start_redistribution();
}
......@@ -486,8 +481,8 @@ int spawn_step(){
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
else if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
mall_conf->results->spawn_thread_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
mall_conf->results->spawn_start = MPI_Wtime();
//mall_conf->results->spawn_thread_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
//mall_conf->results->spawn_start = MPI_Wtime();
}
return state;
}
......
#!/bin/bash
#SBATCH -N 1
#SBATCH --exclude=c01,c00
#SBATCH --exclude=c01,c00,c02
dir="/home/martini/malleability_benchmark"
codeDir="/Codes"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment