Commit 0a5e948b authored by iker_martin's avatar iker_martin
Browse files

WIP: Se ha anyadido la creacion de procesos a traves de uno solo. Arreglados...

WIP: Se ha anyadido la creacion de procesos a traves de uno solo. Arreglados algunos errores pequeños. Se esta anyadiendo el merge method
parent e5b80639
......@@ -120,12 +120,12 @@ void malloc_config_arrays(configuration *user_config, int resizes) {
*/
void free_config(configuration *user_config) {
if(user_config != NULL) {
//free(user_config->iters);
//free(user_config->procs);
//free(user_config->factors);
//free(user_config->phy_dist);
free(user_config->iters);
free(user_config->procs);
free(user_config->factors);
free(user_config->phy_dist);
//free(user_config);
free(user_config);
}
}
......
......@@ -216,17 +216,17 @@ void realloc_results_iters(results_data *results, int needed) {
/*
* Libera toda la memoria asociada con una estructura de resultados.
* TODO Asegurar que ha sido inicializado?
*/
void free_results_data(results_data *results) {
if(results != NULL) {
//free(results->spawn_time);
//free(results->spawn_thread_time);
//free(results->sync_time);
//if(results->async_time != NULL)
// free(results->async_time);
//free(results->iters_time);
//free(results->iters_type);
free(results->spawn_time);
free(results->spawn_thread_time);
free(results->sync_time);
free(results->async_time);
free(results->iters_time);
free(results->iters_type);
}
//free(*results); FIXME Borrar
}
......@@ -35,6 +35,7 @@ void obtain_op_times();
void free_application_data();
void print_general_info(int myId, int grp, int numP);
int print_local_results();
int print_final_results();
int create_out_file(char *nombre, int *ptr, int newstdout);
......@@ -56,6 +57,7 @@ typedef struct {
configuration *config_file;
group_data *group;
results_data *results;
MPI_Comm comm;
int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis
int main(int argc, char *argv[]) {
......@@ -66,13 +68,14 @@ int main(int argc, char *argv[]) {
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
MPI_Comm_size(MPI_COMM_WORLD, &numP);
MPI_Comm_rank(MPI_COMM_WORLD, &myId);
comm = MPI_COMM_WORLD;
if(req != MPI_THREAD_MULTIPLE) {
printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
}
init_group_struct(argv, argc, myId, numP);
im_child = init_malleability(myId, numP, ROOT, MPI_COMM_WORLD, argv[0]);
im_child = init_malleability(myId, numP, ROOT, comm, argv[0]);
if(!im_child) {
init_application();
......@@ -81,12 +84,14 @@ int main(int argc, char *argv[]) {
set_benchmark_configuration(config_file);
set_benchmark_results(results);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Barrier(comm);
results->exec_start = MPI_Wtime();
} else {
} else { //Init hijos
get_malleability_user_comm(&comm);
get_benchmark_configuration(&config_file); //No se obtiene bien el archivo
get_benchmark_results(&results); //No se obtiene bien el archivo
set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr);
printf("HIJOS 2\n"); fflush(stdout); MPI_Barrier(comm);
if(config_file->comm_tam) {
group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char));
......@@ -105,24 +110,39 @@ int main(int argc, char *argv[]) {
free(value);
group->grp = group->grp + 1;
set_benchmark_grp(group->grp);
}
if(config_file->resizes != group->grp + 1) {
int spawn_type = COMM_SPAWN_SERIAL; // TODO Pasar a CONFIG
set_malleability_configuration(spawn_type, config_file->phy_dist[group->grp+1], -1, config_file->aib, -1);
set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED
int spawn_type = COMM_SPAWN_MERGE; // TODO Pasar a CONFIG
int spawn_is_single = COMM_SPAWN_MULTIPLE; // TODO Pasar a CONFIG
group->grp = group->grp - 1; // TODO REFACTOR???
do {
if(group->grp == 0) {
malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
group->grp = group->grp + 1;
set_benchmark_grp(group->grp);
get_malleability_user_comm(&comm);
MPI_Comm_size(comm, &(group->numP));
MPI_Comm_rank(comm, &(group->myId));
printf("MAIN 2\n"); fflush(stdout); MPI_Barrier(comm);
if(config_file->resizes != group->grp + 1) {
set_malleability_configuration(spawn_type, spawn_is_single, config_file->phy_dist[group->grp+1], -1, config_file->aib, -1);
set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED
if(group->grp == 0) {
malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
}
}
}
printf("MAIN 3\n"); fflush(stdout); MPI_Barrier(comm);
res = work();
print_local_results();
} while((config_file->resizes > group->grp + 1) && (spawn_type == COMM_SPAWN_MERGE || spawn_type == COMM_SPAWN_MERGE_PTHREAD));
res = work();
if(res) { // Se he llegado al final de la aplicacion
MPI_Barrier(MPI_COMM_WORLD);
// MPI_Barrier(comm); FIXME?
results->exec_time = MPI_Wtime() - results->exec_start;
}
......@@ -156,6 +176,15 @@ int work() {
//initMatrix(&matrix, config_file->matrix_tam);
state = MAL_NOT_STARTED;
if(group->grp == 0) {
malleability_add_data(&iter, 1, MAL_INT, 1, 1);
} else {
void *value = NULL;
malleability_get_data(&value, 2, 1, 1);
group->iter_start = *((int *)value);
free(value);
}
res = 0;
for(iter=group->iter_start; iter < maxiter; iter++) {
iterate(matrix, config_file->matrix_tam, state);
......@@ -201,7 +230,7 @@ void iterate(double *matrix, int n, int async_comm) {
}
if(config_file->comm_tam) {
MPI_Bcast(group->compute_comm_array, config_file->comm_tam, MPI_CHAR, ROOT, MPI_COMM_WORLD);
MPI_Bcast(group->compute_comm_array, config_file->comm_tam, MPI_CHAR, ROOT, comm);
}
actual_time = MPI_Wtime(); // Guardar tiempos
......@@ -240,17 +269,17 @@ void print_general_info(int myId, int grp, int numP) {
free(version);
}
/*
* Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos.
*
* Si es el ultimo grupo de procesos, muestra los datos obtenidos de tiempo de ejecucion, creacion de procesos
* y las comunicaciones.
*/
int print_final_results() {
int ptr_local, ptr_global, err;
int print_local_results() {
int ptr_local, ptr_out, err;
char *file_name;
if(group->myId == ROOT) {
ptr_out = dup(1);
file_name = NULL;
file_name = malloc(40 * sizeof(char));
if(file_name == NULL) return -1; // No ha sido posible alojar la memoria
......@@ -262,6 +291,22 @@ int print_final_results() {
print_iter_results(*results, config_file->iters[group->grp] -1);
free(file_name);
close(1);
dup(ptr_out);
}
return 0;
}
/*
* Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos
* y las comunicaciones.
*/
int print_final_results() {
int ptr_global, err;
char *file_name;
if(group->myId == ROOT) {
if(group->grp == config_file->resizes -1) {
file_name = NULL;
file_name = malloc(20 * sizeof(char));
......@@ -340,7 +385,7 @@ void obtain_op_times() {
//fflush(stdout);
config_file->Top = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion
MPI_Bcast(&(config_file->Top), 1, MPI_DOUBLE, ROOT, MPI_COMM_WORLD);
MPI_Bcast(&(config_file->Top), 1, MPI_DOUBLE, ROOT, comm);
}
/*
......@@ -392,281 +437,3 @@ int create_out_file(char *nombre, int *ptr, int newstdout) {
return 0;
}
/*
* Se realiza el redimensionado de procesos por parte de los padres.
*
* Se crean los nuevos procesos con la distribucion fisica elegida y
* a continuacion se transmite la informacion a los mismos.
*
* Si hay datos asincronos a transmitir, primero se comienza a
* transmitir estos y se termina la funcion. Se tiene que comprobar con
* llamando a la función de nuevo que se han terminado de enviar
*
* Si hay ademas datos sincronos a enviar, no se envian aun.
*
* Si solo hay datos sincronos se envian tras la creacion de los procesos
* y finalmente se desconectan los dos grupos de procesos.
*/
/*
int checkpoint(int iter, int state, MPI_Request **comm_req) {
if(state == MAL_COMM_UNINITIALIZED) {
// Comprobar si se tiene que realizar un redimensionado
//if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}
group->numS = config_file->procs[group->grp +1];
int comm_type = COMM_SPAWN_SERIAL; // TODO Pasar a CONFIG
state = TC(group->numS, comm_type);
if (state == COMM_FINISHED){
state = start_redistribution(0, group->numS, comm_req);
}
} else if(state == COMM_IN_PROGRESS) { // Comprueba si el spawn ha terminado y comienza la redistribucion
state = check_slurm_comm(group->myId, ROOT, group->numP, &(group->children));
if (state == COMM_FINISHED) {
results->spawn_time[group->grp] = MPI_Wtime() - results->spawn_start;
state = start_redistribution(iter, group->numS, comm_req);
}
} else if(state == MAL_ASYNC_PENDING) {
if(config_file->aib == MAL_USE_THREAD) {
state = thread_check(iter);
} else {
state = check_redistribution(iter, comm_req);
}
}
return state;
}
*/
/*
* Se encarga de realizar la creacion de los procesos hijos.
* Si se pide en segundo plano devuelve el estado actual.
*/
/*
int TC(int numS, int comm_type){
// Inicialización de la comunicación con SLURM
int dist = config_file->phy_dist[group->grp +1];
int comm_state;
MPI_Comm *new_comm = malloc(sizeof(MPI_Comm));
results->spawn_start = MPI_Wtime();
MPI_Comm_dup(MPI_COMM_WORLD, new_comm);
comm_state = init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, comm_type, *new_comm, &(group->children));
if(comm_type == COMM_SPAWN_SERIAL)
results->spawn_time[group->grp] = MPI_Wtime() - results->spawn_start;
else if(comm_type == COMM_SPAWN_PTHREAD) {
results->spawn_thread_time[group->grp] = MPI_Wtime() - results->spawn_start;
results->spawn_start = MPI_Wtime();
}
return comm_state;
}
*/
/*
* Comienza la redistribucion de los datos con el nuevo grupo de procesos.
*
* Primero se envia la configuracion a utilizar al nuevo grupo de procesos y a continuacion
* se realiza el envio asincrono y/o sincrono si lo hay.
*
* En caso de que haya comunicacion asincrona, se comienza y se termina la funcion
* indicando que se ha comenzado un envio asincrono.
*
* Si no hay comunicacion asincrono se pasa a realizar la sincrona si la hubiese.
*
* Finalmente se envian datos sobre los resultados a los hijos y se desconectan ambos
* grupos de procesos.
*/
/*
int start_redistribution(int iter, int numS, MPI_Request **comm_req) {
int rootBcast = MPI_PROC_NULL;
if(group->myId == ROOT) rootBcast = MPI_ROOT;
// Enviar a los hijos que grupo de procesos son
MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
MPI_Bcast(&run_id, 1, MPI_INT, rootBcast, group->children);
send_config_file(config_file, rootBcast, group->children);
if(config_file->adr > 0) {
results->async_start = MPI_Wtime();
if(config_file->aib == MAL_USE_THREAD) {
return thread_creation();
} else {
send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, group->numS, comm_req, config_file->aib);
return MAL_ASYNC_PENDING;
}
}
return end_redistribution(iter);
}
*/
/*
* Crea una hebra para ejecutar una comunicación en segundo plano.
*/
/*
int thread_creation() {
if(pthread_create(&async_thread, NULL, thread_async_work, NULL)) {
printf("Error al crear el hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return MAL_ASYNC_PENDING;
}
*/
/*
* Comprobación por parte de una hebra maestra que indica
* si una hebra esclava ha terminado su comunicación en segundo plano.
*
* El estado de la comunicación es devuelto al finalizar la función.
*/
/*
int thread_check(int iter) {
int all_completed = 0;
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&group->commAsync, &all_completed, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD);
if(all_completed != MAL_COMM_COMPLETED) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended
if(pthread_join(async_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -2;
}
return end_redistribution(iter);
}
*/
/*
* Función ejecutada por una hebra.
* Ejecuta una comunicación síncrona con los hijos que
* para el usuario se puede considerar como en segundo plano.
*
* Cuando termina la comunicación la hebra maestra puede comprobarlo
* por el valor "commAsync".
*/
/*
void* thread_async_work(void* void_arg) {
send_sync(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, group->numS);
group->commAsync = MAL_COMM_COMPLETED;
pthread_exit(NULL);
}
*/
/*
* @deprecated
* Comprueba si la redistribucion asincrona ha terminado.
* Si no ha terminado la funcion termina indicandolo, en caso contrario,
* se continua con la comunicacion sincrona, el envio de resultados y
* se desconectan los grupos de procesos.
*
* Esta funcion permite dos modos de funcionamiento al comprobar si la
* comunicacion asincrona ha terminado.
* Si se utiliza el modo "MAL_USE_NORMAL" o "MAL_USE_POINT", se considera
* terminada cuando los padres terminan de enviar.
* Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
* los hijos han terminado de recibir.
*/
/*
int check_redistribution(int iter, MPI_Request **comm_req) {
int completed, all_completed, test_err;
MPI_Request *req_completed;
if (config_file->aib == MAL_USE_POINT) {
test_err = MPI_Testall(group->numS, *comm_req, &completed, MPI_STATUSES_IGNORE);
} else {
if(config_file->aib == MAL_USE_NORMAL) {
req_completed = &(*comm_req)[0];
} else if (config_file->aib == MAL_USE_IBARRIER) {
req_completed = &(*comm_req)[1];
}
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
}
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", group->myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
}
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended
if(config_file->aib == MAL_USE_IBARRIER) {
MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
}
free(*comm_req);
return end_redistribution(iter);
}
*/
/*
* Termina la redistribución de los datos con los hijos, comprobando
* si se han realizado iteraciones con comunicaciones en segundo plano
* y enviando cuantas iteraciones se han realizado a los hijos.
*
* Además se realizan las comunicaciones síncronas se las hay.
* Finalmente termina enviando los datos temporales a los hijos.
*/
/*
int end_redistribution(int iter) {
int rootBcast = MPI_PROC_NULL;
if(group->myId == ROOT) rootBcast = MPI_ROOT;
if(config_file->sdr > 0) { // Realizar envio sincrono
results->sync_start = MPI_Wtime();
send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, group->numS);
}
MPI_Bcast(&iter, 1, MPI_INT, rootBcast, group->children);
send_results(results, rootBcast, config_file->resizes, group->children);
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->children));
return MAL_COMM_COMPLETED;
}
*/
/*
* Inicializacion de los datos de los hijos.
* En la misma se reciben datos de los padres: La configuracion
* de la ejecucion a realizar; y los datos a recibir de los padres
* ya sea de forma sincrona, asincrona o ambas.
*/
/*
void Sons_init() {
// Enviar a los hijos que grupo de procesos son
MPI_Bcast(&(group->grp), 1, MPI_INT, ROOT, group->parents);
MPI_Bcast(&run_id, 1, MPI_INT, ROOT, group->parents);
group->grp++;
config_file = recv_config_file(ROOT, group->parents);
int numP_parents = config_file->procs[group->grp -1];
results = malloc(sizeof(results_data));
init_results_data(results, config_file->resizes - 1, config_file->iters[group->grp]);
if(config_file->adr) { // Recibir datos asincronos
if(config_file->aib == MAL_USE_NORMAL || config_file->aib == MAL_USE_IBARRIER || config_file->aib == MAL_USE_POINT) {
recv_async(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents, config_file->aib);
} else if (config_file->aib == MAL_USE_THREAD) {
recv_sync(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents);
}
results->async_end = MPI_Wtime();
}
if(config_file->sdr) { // Recibir datos sincronos
recv_sync(&(group->sync_array), config_file->sdr, group->myId, group->numP, ROOT, group->parents, numP_parents);
results->sync_end = MPI_Wtime();
}
MPI_Bcast(&(group->iter_start), 1, MPI_INT, ROOT, group->parents);
// Guardar los resultados de esta transmision
recv_results(results, ROOT, config_file->resizes, group->parents);
// Desconectar intercomunicador con los hijos
MPI_Comm_disconnect(&(group->parents));
}
*/
......@@ -18,11 +18,14 @@ struct Slurm_data {
int qty_procs;
MPI_Info info;
int type_creation;
int spawn_is_single;
};
typedef struct {
char *argv;
int numP_childs, myId, root, type_dist;
int spawn_is_single;
int spawn_method;
MPI_Comm comm;
}Creation_data;
......@@ -32,7 +35,11 @@ void* thread_work(void* creation_data_arg);
//--------------PRIVATE DECLARATIONS---------------//
void processes_dist(char *argv, int numP_childs, int type_dist);
void generic_spawn(int myId, int root, int is_single, MPI_Comm *child, MPI_Comm comm);
void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child);
int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm);
void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty, int *used_nodes);
void fill_str_hostfile(slurm_job_info_t job_record, int *qty, int used_nodes, char **hostfile_str);
......@@ -61,11 +68,12 @@ void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_node
* Devuelve el estado de el procedimiento. Si no devuelve "COMM_FINISHED", es necesario llamar a
* "check_slurm_comm()".
*/
int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int type_creation, MPI_Comm comm, MPI_Comm *child) {
int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child) {
slurm_data = malloc(sizeof(struct Slurm_data));
slurm_data->type_creation = type_creation;
slurm_data->spawn_is_single = spawn_is_single;
if(type_creation == COMM_SPAWN_SERIAL) {
if(myId == root) {
......@@ -74,7 +82,17 @@ int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
}
create_processes(myId, root, child, comm);
// WORK
generic_spawn(myId, root, slurm_data->spawn_is_single, child, comm);
if(slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && 0) {
int numParents;
MPI_Comm_size(comm, &numParents);
if(numParents < numP) { //Expand
//proc_adapt_expand(numParents, numP, child, comm, MALLEABILITY_NOT_CHILDREN);
}
}
// END WORK
if(myId == root && slurm_data->info != MPI_INFO_NULL) {
MPI_Info_free(&(slurm_data->info));
......@@ -86,19 +104,22 @@ int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int
} else if(type_creation == COMM_SPAWN_PTHREAD) {
commSlurm = MAL_SPAWN_PENDING;
Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data));
creation_data->argv = argv;
creation_data->numP_childs = numP;
creation_data->myId = myId;
creation_data->root = root;
creation_data->type_dist = type_dist;
creation_data->comm = comm;
if(pthread_create(&slurm_thread, NULL, thread_work, (void *)creation_data)) {
printf("Error al crear el hilo de contacto con SLURM\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
if((spawn_is_single && myId == root) || !spawn_is_single) {
Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data));
creation_data->argv = argv;
creation_data->numP_childs = numP;
creation_data->myId = myId;
creation_data->root = root;
creation_data->type_dist = type_dist;
creation_data->spawn_method = type_creation;
creation_data->comm = comm;
if(pthread_create(&slurm_thread, NULL, thread_work, (void *)creation_data)) {
printf("Error al crear el hilo de contacto con SLURM\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
}
}
......@@ -109,27 +130,45 @@ int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child) { // TODO Borrar numP si no se usa
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread) { // TODO Borrar numP si no se usa
int state=-10;
if(slurm_data->type_creation == COMM_SPAWN_PTHREAD) {
MPI_Allreduce(&commSlurm, &state, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD); // FIXME No usar MPI_COMM_WORLD
if(slurm_data->type_creation == COMM_SPAWN_PTHREAD && slurm_data->spawn_is_single == 0) {
MPI_Allreduce(&commSlurm, &state, 1, MPI_INT, MPI_MIN, comm);
if(state != MAL_SPAWN_COMPLETED) return state; // Continue only if asynchronous process creation has ended
} else {
return commSlurm;
}
if(pthread_join(slurm_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
*child = *returned_comm;
if(pthread_join(slurm_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
} else if (slurm_data->type_creation == COMM_SPAWN_PTHREAD && slurm_data->spawn_is_single) {
MPI_Bcast(&commSlurm, 1, MPI_INT, root, comm);
state = commSlurm;
if(state == MAL_SPAWN_PENDING) return state; // Continue only if asynchronous process creation has ended
*child = *returned_comm;
if(myId == root) {
if(pthread_join(slurm_thread, NULL)) {
printf("Error al esperar al hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
*child = *returned_comm;
} else {
slurm_data->cmd = malloc(1 * sizeof(char));
generic_spawn(myId, root, slurm_data->spawn_is_single, child, comm_thread);
}
} else {
return commSlurm;
}
//Free memory
if(myId == root && slurm_data->info != MPI_INFO_NULL) {
MPI_Info_free(&(slurm_data->info));
}
......@@ -139,6 +178,84 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child) { // TODO Bo
return commSlurm;
}
/*
* Conectar grupo de hijos con grupo de padres
* Devuelve un intercomunicador para hablar con los padres
*
* Solo se utiliza cuando la creación de los procesos ha sido
* realizada por un solo proceso padre
*/
void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm) {
char *port_name;
MPI_Comm newintercomm;
if(myId == root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *intercomm);
} else {
port_name = malloc(1);
}
MPI_Comm_accept(port_name, MPI_INFO_NULL, root, MPI_COMM_WORLD, &newintercomm);
if(myId == root) {
MPI_Close_port(port_name);
}
free(port_name);
MPI_Comm_free(intercomm);
*intercomm = newintercomm;
}
//--------------TODO PRIVATE SPAWN TYPE FUNCTIONS---------------//
/*
* Se encarga de que el grupo de procesos resultante se
* encuentren todos en un intra comunicador, uniendo a
* padres e hijos en un solo comunicador.
*
* Se llama antes de la redistribución de datos.
*
* TODO REFACTOR
*/
void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group) {
MPI_Comm new_comm = MPI_COMM_NULL;
// TODO Indicar por Bcast que es con MERGE
MPI_Intercomm_merge(intercomm, is_children_group, &new_comm); //El que pone 0 va primero
//MPI_Comm_free(intercomm); TODO Nueva redistribucion para estos casos y liberar aqui
// *intercomm = MPI_COMM_NULL;
*numP = numC;
if(*comm != MPI_COMM_WORLD && *comm != MPI_COMM_NULL) {
//MPI_Comm_free(comm); FIXME
}
*comm=new_comm;
}
/*
* Se encarga de que el grupo de procesos resultante se
* eliminen aquellos procesos que ya no son necesarios.
* Los procesos eliminados se quedaran como zombies.
*
* Se llama una vez ha terminado la redistribución de datos.
*/
void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId) {
int color = MPI_UNDEFINED;
MPI_Comm new_comm = MPI_COMM_NULL;
if(myId < numC) {
color = 1;
}
MPI_Comm_split(*comm, color, myId, &new_comm);
if(*comm != MPI_COMM_WORLD && *comm != MPI_COMM_NULL)
//MPI_Comm_free(comm); FIXME
*comm=new_comm;
}
//--------------PRIVATE SPAWN TYPE FUNCTIONS---------------//
/*
......@@ -149,6 +266,8 @@ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child) { // TODO Bo
* se avisa al hilo maestro.
*/
void* thread_work(void* creation_data_arg) {
int numP;
MPI_Comm aux_comm;
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
......@@ -158,9 +277,21 @@ void* thread_work(void* creation_data_arg) {
slurm_data->cmd = malloc(1 * sizeof(char));
slurm_data->info = MPI_INFO_NULL;
}
create_processes(creation_data->myId, creation_data->root, returned_comm, creation_data->comm);
commSlurm = MAL_SPAWN_COMPLETED;
commSlurm = MAL_SPAWN_COMPLETED; // TODO REFACTOR?
generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm);
//TODO Eliminar el && 0
if(slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && 0) {
//MPI_Comm_size(creation_data->comm, &numP);
numP= 1; //FIXME BORRAR
if(numP < creation_data->numP_childs) { //Expand
//TODO Crear nueva redistribucion y descomentar esto
//proc_adapt_expand(numP, creation_data->numP_childs, returned_comm, &aux_comm, MALLEABILITY_NOT_CHILDREN);
//*returned_comm = aux_comm;
}
}
//commSlurm = MAL_SPAWN_COMPLETED;
free(creation_data);
pthread_exit(NULL);
......@@ -222,6 +353,46 @@ void processes_dist(char *argv, int numP_childs, int type) {
slurm_free_job_info_msg(j_info);
}
/*
TODO
*/
void generic_spawn(int myId, int root, int spawn_is_single, MPI_Comm *child, MPI_Comm comm) {
if(spawn_is_single) {
single_spawn_connection(myId, root, comm, child);
} else {
create_processes(myId, root, child, comm);
MPI_Bcast(&spawn_is_single, 1, MPI_INT, MPI_ROOT, *child);
}
}
/*
* Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres
* Si el valor es diferente, la creación es solo con la participación del proceso root
*/
void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child){
char *port_name;
int auxiliar_conf = COMM_SPAWN_SINGLE;
MPI_Comm newintercomm;
if (myId == root) {
create_processes(myId, root, child, MPI_COMM_SELF);
MPI_Bcast(&auxiliar_conf, 1, MPI_INT, MPI_ROOT, *child);
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *child, MPI_STATUS_IGNORE);
} else {
port_name = malloc(1);
}
MPI_Comm_connect(port_name, MPI_INFO_NULL, root, comm, &newintercomm);
if(myId == root)
MPI_Comm_free(child);
free(port_name);
*child = newintercomm;
}
/*
* Crea un grupo de procesos segun la configuracion indicada por la funcion
* "processes_dist()".
......@@ -236,6 +407,7 @@ int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm) {
return spawn_err;
}
/*
* Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo
* cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada
......
......@@ -5,15 +5,11 @@
#include <slurm/slurm.h>
#include "malleabilityStates.h"
//#define COMM_UNRESERVED -2
//#define COMM_IN_PROGRESS -1
//#define COMM_FINISHED 0
int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child);
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread);
//#define COMM_PHY_NODES 1
//#define COMM_PHY_CPU 2
void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm);
//#define COMM_SPAWN_SERIAL 0
//#define COMM_SPAWN_PTHREAD 1
int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int type_creation, MPI_Comm comm, MPI_Comm *child);
int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child);
void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group);
void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId);
......@@ -6,8 +6,6 @@
#include "CommDist.h"
#define MALLEABILITY_ROOT 0
#define MALLEABILITY_CHILDREN 1
#define MALLEABILITY_NOT_CHILDREN 0
#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1
......@@ -28,6 +26,7 @@ void* thread_async_work(void* void_arg);
typedef struct {
int spawn_type;
int spawn_dist;
int spawn_is_single;
int spawn_threaded;
int comm_type;
int comm_threaded;
......@@ -38,10 +37,11 @@ typedef struct {
} malleability_config_t;
typedef struct {
int myId, numP, numC, root, root_parents;
int myId, numP, numC, numC_spawned, root, root_parents;
pthread_t async_thread;
MPI_Comm comm, thread_comm;
MPI_Comm intercomm;
MPI_Comm user_comm;
char *name_exec;
} malleability_t;
......@@ -76,7 +76,8 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
mall->numP = numP;
mall->root = root;
mall->comm = dup_comm;
mall->comm = thread_comm; // TODO Refactor -- Crear solo si es necesario?
mall->thread_comm = thread_comm; // TODO Refactor -- Crear solo si es necesario?
mall->user_comm = comm;
mall->name_exec = name_exec;
rep_s_data->entries = 0;
......@@ -87,8 +88,10 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
state = MAL_NOT_STARTED;
// Si son el primer grupo de procesos, obtienen los datos de los padres
printf("TESTHHH 1\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
MPI_Comm_get_parent(&(mall->intercomm));
if(mall->intercomm != MPI_COMM_NULL ) {
printf("TESTHHH 2\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
Children_init();
return MALLEABILITY_CHILDREN;
}
......@@ -143,10 +146,10 @@ int malleability_checkpoint() {
}
} else if(state == MAL_SPAWN_PENDING) { // Comprueba si el spawn ha terminado y comienza la redistribucion
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm));
state = check_slurm_comm(mall->myId, mall->root, mall->numP, &(mall->intercomm), mall->comm, mall->thread_comm);
if (state == MAL_SPAWN_COMPLETED) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
state = start_redistribution();
}
......@@ -184,8 +187,9 @@ void get_benchmark_results(results_data **results) {
}
//-------------------------------------------------------------------------------------------------------------
void set_malleability_configuration(int spawn_type, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded) {
void set_malleability_configuration(int spawn_type, int spawn_is_single, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded) {
mall_conf->spawn_type = spawn_type;
mall_conf->spawn_is_single = spawn_is_single;
mall_conf->spawn_dist = spawn_dist;
mall_conf->spawn_threaded = spawn_threaded;
mall_conf->comm_type = comm_type;
......@@ -194,9 +198,31 @@ void set_malleability_configuration(int spawn_type, int spawn_dist, int spawn_th
/*
* To be deprecated
* Tiene que ser llamado despues de setear la config
*/
void set_children_number(int numC){
mall->numC = numC;
if((mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) && (numC - mall->numP >= 0)) {
mall->numC = numC;
mall->numC_spawned = numC - mall->numP;
if(numC == mall->numP) { // Migrar
mall->numC_spawned = numC;
if(mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->spawn_type = COMM_SPAWN_SERIAL;
else
mall_conf->spawn_type = COMM_SPAWN_PTHREAD;
}
} else {
mall->numC = numC;
mall->numC_spawned = numC;
}
}
/*
* TODO
*/
void get_malleability_user_comm(MPI_Comm *comm) {
*comm = mall->user_comm;
}
/*
......@@ -349,11 +375,14 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
* ya sea de forma sincrona, asincrona o ambas.
*/
void Children_init() {
/* FIXME
* iter_start -- a constante replicado || TODO Setear valor segun adr==0
*/
int numP_parents, root_parents, i;
int spawn_is_single;
MPI_Comm aux;
MPI_Bcast(&spawn_is_single, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
if(spawn_is_single) {
malleability_establish_connection(mall->myId, MALLEABILITY_ROOT, &(mall->intercomm));
}
MPI_Bcast(&root_parents, 1, MPI_INT, MALLEABILITY_ROOT, mall->intercomm);
MPI_Bcast(&numP_parents, 1, MPI_INT, root_parents, mall->intercomm);
......@@ -362,7 +391,6 @@ void Children_init() {
mall_conf->results = (results_data *) malloc(sizeof(results_data));
init_results_data(mall_conf->results, mall_conf->config_file->resizes, RESULTS_INIT_DATA_QTY);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
......@@ -387,11 +415,22 @@ void Children_init() {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], MPI_INT, root_parents, mall->intercomm);
}
}
// Guardar los resultados de esta transmision
recv_results(mall_conf->results, mall->root, mall_conf->config_file->resizes, mall->intercomm);
printf("HIJOS 1\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_CHILDREN);
//if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
}
MPI_Comm_disconnect(&(mall->intercomm));
}
......@@ -407,17 +446,57 @@ void Children_init() {
*/
int spawn_step(){
mall_conf->results->spawn_start = MPI_Wtime();
state = init_slurm_comm(mall->name_exec, mall->myId, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall->comm, &(mall->intercomm));
state = init_slurm_comm(mall->name_exec, mall->myId, mall->numC_spawned, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm));
printf("TEST 2 un total de %d\n", mall->numC_spawned); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL)
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
else if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD) {
else if(mall_conf->spawn_type == COMM_SPAWN_PTHREAD || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
mall_conf->results->spawn_thread_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
mall_conf->results->spawn_start = MPI_Wtime();
}
return state;
}
/*
* TODO Si los eliminados pertenecen al mismo COMMWORLD
* eliminar del todo
* TODO Eliminar los procesos por encima de numC y modificar numP
*/
/*
void malleability_zombies(int *pids, int *offset_pids) {
// Zombies treatment
int pid = getpid();
int *pids_counts = malloc(*numP * sizeof(int));
int *pids_displs = malloc(*numP * sizeof(int));
int count=1;
if(myId < new_numP) {
count = 0;
if(myId == mall->root) {
int i;
for(i=0; i < new_numP; i++) {
pids_counts[i] = 0;
}
for(i=new_numP; i<*numP; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i + *offset_pids) - new_numP;
}
*offset_pids += *numP - new_numP;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, ROOT, *comm);
if(myId == ROOT) {
int i;
for(i=0;i<*offset_pids;i++){
printf("PID[%d]=%d\n",i,pids[i]);
}
}
//free pids_counts, pids_displs
}
*/
/*
* Comienza la redistribucion de los datos con el nuevo grupo de procesos.
*
......@@ -512,7 +591,8 @@ int check_redistribution() {
* Finalmente termina enviando los datos temporales a los hijos.
*/
int end_redistribution() {
int i, rootBcast = MPI_PROC_NULL;
int result, i, rootBcast = MPI_PROC_NULL;
MPI_Comm aux;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
......@@ -529,9 +609,34 @@ int end_redistribution() {
send_results(mall_conf->results, rootBcast, mall_conf->config_file->resizes, mall->intercomm);
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
double time_adapt = MPI_Wtime();
if(mall->numP > mall->numC) { //Shrink
//proc_adapt_shrink( numC, MPI_Comm *comm, mall->myId);
//malleability_zombies()
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt;
} else {
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN);
// if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); FIXME
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_adapt;
}
result = MAL_DIST_ADAPTED;
} else {
result = MAL_DIST_COMPLETED;
}
MPI_Comm_disconnect(&(mall->intercomm));
state = MAL_NOT_STARTED;
return MAL_DIST_COMPLETED;
return result;
}
// TODO MOVER A OTRO LADO??
......
......@@ -13,8 +13,9 @@ void free_malleability();
int malleability_checkpoint();
void set_benchmark_grp(int grp);
void set_malleability_configuration(int spawn_type, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded);
void set_malleability_configuration(int spawn_type, int spawn_is_single, int spawn_dist, int spawn_threaded, int comm_type, int comm_threaded);
void set_children_number(int numC); // TODO TO BE DEPRECATED
void get_malleability_user_comm(MPI_Comm *comm);
void malleability_add_data(void *data, int total_qty, int type, int is_replicated, int is_constant);
void malleability_get_entries(int *entries, int is_replicated, int is_constant);
......
......@@ -5,14 +5,21 @@
#define MAL_SPAWN_COMPLETED 2
#define MAL_DIST_PENDING 3
#define MAL_DIST_COMPLETED 4
#define MAL_DIST_ADAPTED 5
// TODO Refactor
#define COMM_PHY_NODES 1
#define COMM_PHY_CPU 2
// TODO Separar PTHREAD
#define COMM_SPAWN_SERIAL 0
#define COMM_SPAWN_PTHREAD 1
#define COMM_SPAWN_MERGE 2
#define COMM_SPAWN_MERGE_PTHREAD 3
#define COMM_SPAWN_MULTIPLE 0
#define COMM_SPAWN_SINGLE 1
#define MAL_USE_NORMAL 0
#define MAL_USE_IBARRIER 1
......@@ -20,6 +27,8 @@
#define MAL_USE_THREAD 3
#define MAL_INT 0
#define MAL_CHAR 1
#define MALLEABILITY_CHILDREN 1
#define MALLEABILITY_NOT_CHILDREN 0
......@@ -60,7 +60,7 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
// Mandar primero numero de entradas
def_malleability_entries(data_struct_dist, data_struct_rep, &entries_type);
MPI_Bcast(&(data_struct_rep->entries), 1, entries_type, rootBcast, intercomm);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, rootBcast, intercomm);
if(is_children_group) {
if(data_struct_rep->entries != 0) init_malleability_data_struct(data_struct_rep, data_struct_rep->entries);
......@@ -164,15 +164,18 @@ void free_malleability_data_struct(malleability_data_t *data_struct) {
*/
void def_malleability_entries(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 2;
int blocklength = 1;
MPI_Aint displs, dir;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
blocklengths[0] = blocklengths[1] = 1;
types[0] = types[1] = MPI_INT;
// Obtener direccion base
MPI_Get_address(&(data_struct_rep->entries), &dir);
MPI_Get_address(&(data_struct_dist->entries), &displs);
displs -= dir;
MPI_Get_address(&(data_struct_rep->entries), &displs[0]);
MPI_Get_address(&(data_struct_dist->entries), &displs[1]);
MPI_Type_create_hvector(counts, blocklength, displs, MPI_INT, new_type);
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment