Commit 96b75523 authored by iker_martin's avatar iker_martin
Browse files

Improved zombies treatment so its possible to finalize without an abort if...

Improved zombies treatment so its possible to finalize without an abort if group is in only one node. Some enhacements could still be applied. MAM_Finalize returns whether an abort is required or not.
parent 63f720dd
......@@ -45,7 +45,6 @@ int main(int argc, char *argv[]) {
int numP, myId, res;
int req;
int im_child;
int abort_needed = 0;
size_t i;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req);
......@@ -103,6 +102,10 @@ int main(int argc, char *argv[]) {
}
}
int myId2, numP2;
MPI_Comm_size(comm, &numP2);
MPI_Comm_rank(comm, &myId2);
print_general_info(myId2, group->grp, numP2);
res = work();
if(res==1) { // Se ha llegado al final de la aplicacion
......@@ -126,13 +129,8 @@ int main(int argc, char *argv[]) {
if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
MPI_Comm_free(&comm);
}
if(group->myId == ROOT && config_file->groups[group->grp-1].sm == MALL_SPAWN_MERGE) {
abort_needed = 1;
}
free_application_data();
if(abort_needed) { MPI_Abort(MPI_COMM_WORLD, -100); }
MPI_Finalize();
return 0;
}
......@@ -291,7 +289,8 @@ void print_general_info(int myId, int grp, int numP) {
char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
MPI_Get_processor_name(name, &len);
MPI_Get_library_version(version, &len);
printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);
//printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);
printf("P%d Nuevo GRUPO %d de %d procs en nodo %s -- PID=%d\n", myId, grp, numP, name, getpid());
free(name);
free(version);
......@@ -450,6 +449,7 @@ void obtain_op_times(int compute) {
* Libera toda la memoria asociada con la aplicacion
*/
void free_application_data() {
int abort_needed;
size_t i;
if(config_file->sdr && group->sync_array != NULL) {
......@@ -473,9 +473,10 @@ void free_application_data() {
free(group->async_array);
group->async_array = NULL;
}
MAM_Finalize();
abort_needed = MAM_Finalize();
free_zombie_process();
free(group);
if(abort_needed) { MPI_Abort(MPI_COMM_WORLD, -100); }
}
......
......@@ -25,7 +25,7 @@ int MAM_St_rms(int *mam_state);
int MAM_St_spawn_start();
int MAM_St_spawn_pending(int wait_completed);
int MAM_St_red_start();
int MAM_St_red_pending(int *mam_state, int wait_completed);
int MAM_St_red_pending(int wait_completed);
int MAM_St_user_start(int *mam_state);
int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function)(void *), void *user_args);
int MAM_St_user_completed();
......@@ -46,9 +46,6 @@ int thread_creation();
int thread_check(int wait_completed);
void* thread_async_work();
void print_comms_state();
void malleability_comms_update(MPI_Comm comm);
int MAM_I_convert_key(char *key);
void MAM_I_create_user_struct(int is_children_group);
......@@ -70,7 +67,7 @@ mam_user_reconf_t *user_reconf;
* aplicacion.
*/
int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(void *), void *user_args) {
MPI_Comm dup_comm, thread_comm;
MPI_Comm dup_comm, thread_comm, original_comm;
mall_conf = (malleability_config_t *) malloc(sizeof(malleability_config_t));
mall = (malleability_t *) malloc(sizeof(malleability_t));
......@@ -90,14 +87,17 @@ int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(vo
MPI_Comm_dup(*comm, &dup_comm);
MPI_Comm_dup(*comm, &thread_comm);
MPI_Comm_dup(*comm, &original_comm);
MPI_Comm_set_name(dup_comm, "MAM_MAIN");
MPI_Comm_set_name(thread_comm, "MAM_THREAD");
MPI_Comm_set_name(original_comm, "MAM_ORIGINAL");
mall->root = root;
mall->root_parents = root;
mall->zombie = 0;
mall->comm = dup_comm;
mall->thread_comm = thread_comm;
mall->original_comm = original_comm;
mall->user_comm = comm;
mall->tmp_comm = MPI_COMM_NULL;
......@@ -113,7 +113,7 @@ int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(vo
state = MALL_NOT_STARTED;
MAM_Init_configuration();
zombies_service_init();
MAM_Zombies_service_init();
init_malleability_times();
MAM_Def_main_datatype();
......@@ -124,7 +124,9 @@ int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(vo
return MALLEABILITY_CHILDREN;
}
//TODO Check potential improvement - If check_hosts does not use slurm, internode_group could be obtained there
MAM_check_hosts();
mall->internode_group = MAM_Is_internode_group();
MAM_Set_initial_configuration();
#if USE_MAL_BARRIERS && USE_MAL_DEBUG
......@@ -144,7 +146,8 @@ int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(vo
* de maleabilidad y asegura que los zombies
* despierten si los hubiese.
*/
void MAM_Finalize() {
int MAM_Finalize() {
int request_abort;
free_malleability_data_struct(rep_s_data);
free_malleability_data_struct(rep_a_data);
free_malleability_data_struct(dist_s_data);
......@@ -157,18 +160,18 @@ void MAM_Finalize() {
if(mall->nodelist != NULL) free(mall->nodelist);
MAM_Free_main_datatype();
request_abort = MAM_Zombies_service_free();
free_malleability_times();
if(mall->comm != MPI_COMM_WORLD && mall->comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->comm));
if(mall->thread_comm != MPI_COMM_WORLD && mall->thread_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->thread_comm));
if(mall->intercomm != MPI_COMM_WORLD && mall->intercomm != MPI_COMM_NULL) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
if(mall->original_comm != MPI_COMM_WORLD && mall->original_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->original_comm));
free(mall);
free(mall_conf);
free(user_reconf);
zombies_awake();
zombies_service_free();
state = MALL_UNRESERVED;
return request_abort;
}
/*
......@@ -208,7 +211,7 @@ int MAM_Checkpoint(int *mam_state, int wait_completed, void (*user_function)(voi
break;
case MALL_DIST_PENDING:
call_checkpoint = MAM_St_red_pending(mam_state, wait_completed);
call_checkpoint = MAM_St_red_pending(wait_completed);
break;
case MALL_USER_START:
......@@ -250,14 +253,14 @@ void MAM_Resume_redistribution(int *mam_state) {
* TODO
*/
void MAM_Commit(int *mam_state) {
int zombies = 0;
int request_abort;
#if USE_MAL_DEBUG
if(mall->myId == mall->root){ DEBUG_FUNC("Trying to commit", mall->myId, mall->numP); } fflush(stdout);
#endif
// Get times before commiting
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
// This communication is only needed when a root process will become a zombie
// This communication is only needed when the root process will become a zombie
malleability_times_broadcast(mall->root_collectives);
}
......@@ -265,26 +268,20 @@ void MAM_Commit(int *mam_state) {
if(mall->tmp_comm != MPI_COMM_WORLD && mall->tmp_comm != MPI_COMM_NULL) MPI_Comm_free(&(mall->tmp_comm));
if(*(mall->user_comm) != MPI_COMM_WORLD && *(mall->user_comm) != MPI_COMM_NULL) MPI_Comm_free(mall->user_comm);
// Zombies treatment
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
MPI_Allreduce(&mall->zombie, &zombies, 1, MPI_INT, MPI_MAX, mall->comm);
if(zombies) {
zombies_collect_suspended(mall->comm);
}
}
// Zombies KILL
// Zombies Treatment
MAM_Zombies_update();
if(mall->zombie) {
#if USE_MAL_DEBUG >= 2
#if USE_MAL_DEBUG >= 1
DEBUG_FUNC("Is terminating as zombie", mall->myId, mall->numP); fflush(stdout);
#endif
MAM_Finalize();
request_abort = MAM_Finalize();
if(request_abort) { MPI_Abort(MPI_COMM_WORLD, -101); }
MPI_Finalize();
exit(0);
}
// Reset/Free communicators
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { malleability_comms_update(mall->intercomm); }
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MAM_comms_update(mall->intercomm); }
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { MPI_Comm_disconnect(&(mall->intercomm)); } //FIXME Error en OpenMPI + Merge
MPI_Comm_rank(mall->comm, &mall->myId);
......@@ -295,8 +292,9 @@ void MAM_Commit(int *mam_state) {
if(mam_state != NULL) *mam_state = MAM_COMPLETED;
// Set new communicator
if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
MPI_Comm_dup(mall->comm, mall->user_comm);
//if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) { *(mall->user_comm) = MPI_COMM_WORLD; }
//else if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { MPI_Comm_dup(mall->comm, mall->user_comm); }
#if USE_MAL_DEBUG
if(mall->myId == mall->root) DEBUG_FUNC("Reconfiguration has been commited", mall->myId, mall->numP); fflush(stdout);
#endif
......@@ -553,9 +551,9 @@ int MAM_St_rms(int *mam_state) {
#endif
mall_conf->times->malleability_start = MPI_Wtime();
MAM_Check_configuration();
*mam_state = MAM_NOT_STARTED;
state = MALL_RMS_COMPLETED;
MAM_Check_configuration();
mall->wait_targets_posted = 0;
//if(CHECK_RMS()) {return MALL_DENIED;}
......@@ -598,7 +596,7 @@ int MAM_St_red_start() {
return 1;
}
int MAM_St_red_pending(int *mam_state, int wait_completed) {
int MAM_St_red_pending(int wait_completed) {
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
state = thread_check(wait_completed);
} else {
......@@ -652,7 +650,7 @@ int MAM_St_user_completed() {
}
int MAM_St_spawn_adapt_pending(int wait_completed) {
wait_completed = 1;
wait_completed = MAM_WAIT_COMPLETION;
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
#endif
......@@ -661,6 +659,8 @@ int MAM_St_spawn_adapt_pending(int wait_completed) {
state = check_spawn_state(&(mall->intercomm), mall->comm, wait_completed);
/* TODO Comentar problema, basicamente indicar que no es posible de la forma actual
* Ademas es solo para una operación que hemos visto como "extremadamente" rápida
* NO es posible debido a que solo se puede hacer tras enviar los datos variables
* y por tanto pierden validez dichos datos
if(!MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, NULL)) {
#if USE_MAL_BARRIERS
MPI_Barrier(mall->comm);
......@@ -705,13 +705,19 @@ void Children_init(void (*user_function)(void *), void *user_args) {
DEBUG_FUNC("MaM will now initialize spawned processes", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
malleability_connect_children(mall->comm, &(mall->intercomm));
malleability_connect_children(&(mall->intercomm));
if(mall_conf->spawn_method == MALL_SPAWN_MERGE) { // For Merge Method, these processes will be added
MPI_Comm_rank(mall->intercomm, &mall->myId);
MPI_Comm_size(mall->intercomm, &mall->numP);
}
mall->root_collectives = mall->root_parents;
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, NULL)) {
mall->internode_group = 0;
} else {
mall->internode_group = MAM_Is_internode_group();
}
#if USE_MAL_DEBUG
DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
......@@ -1100,37 +1106,6 @@ void* thread_async_work() {
//==============================================================================
/*
* Muestra por pantalla el estado actual de todos los comunicadores
*/
void print_comms_state() {
int tester;
char *test = malloc(MPI_MAX_OBJECT_NAME * sizeof(char));
MPI_Comm_get_name(mall->comm, test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->comm, test);
MPI_Comm_get_name(*(mall->user_comm), test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, *(mall->user_comm), test);
if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_get_name(mall->intercomm, test, &tester);
printf("P%d Comm=%d Name=%s\n", mall->myId, mall->intercomm, test);
}
free(test);
}
/*
* Función solo necesaria en Merge
*/
void malleability_comms_update(MPI_Comm comm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
MPI_Comm_dup(comm, &(mall->thread_comm));
MPI_Comm_dup(comm, &(mall->comm));
MPI_Comm_set_name(mall->thread_comm, "MAM_THREAD");
MPI_Comm_set_name(mall->comm, "MAM_MAIN");
}
/*
* TODO Por hacer
......
......@@ -8,65 +8,103 @@
#include "malleabilityZombies.h"
#define PIDS_QTY 320
//TODO Add option to allow the usage of signal USR2 or not.
//This code asumes ROOT of each group will be the last to be zombified
//
void zombies_suspend();
void MAM_I_zombies_collect(int new_zombies);
void MAM_I_zombies_split();
void MAM_I_zombies_suspend();
int MAM_I_zombies_awake();
void zombies_handler_usr2() {}
int offset_pids, *pids = NULL;
int *pids = NULL;
int zombies_qty = 0;
void gestor_usr2() {}
void MAM_Zombies_service_init() {
zombies_qty = 0;
pids = malloc(PIDS_QTY * sizeof(int));
void zombies_collect_suspended(MPI_Comm comm) {
for(int i=0; i<PIDS_QTY; i++) {
pids[i] = 0;
}
}
int MAM_Zombies_service_free() {
int request_abort = MAM_I_zombies_awake();
free(pids);
return request_abort;
}
void MAM_Zombies_update() {
int myId, numP, new_zombies;
MPI_Comm_rank(mall->original_comm, &myId);
MPI_Comm_size(mall->original_comm, &numP);
MPI_Allreduce(&mall->zombie, &new_zombies, 1, MPI_INT, MPI_SUM, mall->original_comm);
if(new_zombies && new_zombies < numP) {
MAM_I_zombies_collect(new_zombies);
MAM_I_zombies_split();
MAM_I_zombies_suspend();
if(myId == MALLEABILITY_ROOT) zombies_qty += new_zombies;
}
}
void MAM_I_zombies_collect(int new_zombies) {
int pid = getpid();
int *pids_counts = malloc(mall->numP * sizeof(int));
int *pids_displs = malloc(mall->numP * sizeof(int));
int i, count=1;
int *pids_counts, *pids_displs;
int i, count, active;
int myId, numP;
MPI_Comm_rank(mall->original_comm, &myId);
MPI_Comm_size(mall->original_comm, &numP);
pids_counts = (int *) malloc(numP * sizeof(int));
pids_displs = (int *) malloc(numP * sizeof(int));
#if USE_MAL_DEBUG > 2
if(mall->myId == mall->root){ DEBUG_FUNC("Collecting zombies", mall->myId, mall->numP); } fflush(stdout);
if(myId == MALLEABILITY_ROOT){ DEBUG_FUNC("Collecting zombies", mall->myId, mall->numP); } fflush(stdout);
#endif
if(mall->myId < mall->numC) {
count = 0;
if(mall->myId == mall->root) {
for(i=0; i < mall->numC; i++) {
count = mall->zombie;
if(myId == MALLEABILITY_ROOT) {
active = numP - new_zombies;
for(i=0; i < active; i++) {
pids_counts[i] = 0;
}
for(i=mall->numC; i<mall->numP; i++) {
pids_displs[i-1] = -1;
for(; i< active+new_zombies; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i - mall->numC) + offset_pids;
pids_displs[i] = (pids_displs[i-1] + 1) + zombies_qty;
}
offset_pids += mall->numP - mall->numC;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, mall->root, comm);
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, MALLEABILITY_ROOT, mall->original_comm);
free(pids_counts);
free(pids_displs);
if(mall->myId >= mall->numC) {
zombies_suspend();
}
}
void zombies_service_init() {
offset_pids = 0;
pids = malloc(PIDS_QTY * sizeof(int));
void MAM_I_zombies_split() {
int myId, color;
MPI_Comm new_original_comm;
for(int i=0; i<PIDS_QTY; i++) {
pids[i] = 0;
}
}
MPI_Comm_rank(mall->original_comm, &myId);
color = mall->zombie ? MPI_UNDEFINED : 1;
MPI_Comm_split(mall->original_comm, color, myId, &new_original_comm);
void zombies_service_free() {
free(pids);
if(mall->original_comm != MPI_COMM_WORLD) MPI_Comm_free(&mall->original_comm);
MPI_Comm_set_name(new_original_comm, "MAM_ORIGINAL");
mall->original_comm = new_original_comm;
}
void zombies_suspend() {
void MAM_I_zombies_suspend() {
struct sigaction act;
if(!mall->zombie) return;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
act.sa_handler=gestor_usr2;
act.sa_handler=zombies_handler_usr2;
sigaction(SIGUSR2, &act, NULL);
......@@ -76,8 +114,11 @@ void zombies_suspend() {
sigsuspend(&set);
}
void zombies_awake() {
for(int i=0; i < offset_pids; i++) { // Despertar a los zombies
int MAM_I_zombies_awake() {
if(mall->internode_group && zombies_qty) return 1; //Request Abort
for(int i=0; i < zombies_qty; i++) { // Despertar a los zombies
kill(pids[i], SIGUSR2);
}
zombies_qty = 0;
return 0; //Normal termination
}
......@@ -10,9 +10,8 @@
#include <signal.h>
#include "malleabilityDataStructures.h"
void zombies_collect_suspended(MPI_Comm comm);
void zombies_service_init();
void zombies_service_free();
void zombies_awake();
void MAM_Zombies_service_init();
int MAM_Zombies_service_free();
void MAM_Zombies_update();
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment