Commit 635dfb14 authored by iker_martin's avatar iker_martin
Browse files

Arreglado error en Shrink merge

parent cd5d81f7
......@@ -99,9 +99,6 @@ int main(int argc, char *argv[]) {
group->grp = group->grp + 1;
}
//config_file->cst = COMM_SPAWN_MERGE; // TODO Pasar a CONFIG
//config_file->css = COMM_SPAWN_MULTIPLE; // TODO Pasar a CONFIG
group->grp = group->grp - 1; // TODO REFACTOR???
do {
......@@ -123,13 +120,14 @@ int main(int argc, char *argv[]) {
}
res = work();
if(res == MAL_ZOMBIE) break;
print_local_results();
reset_results_index(results);
} while((config_file->resizes > group->grp + 1) && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD));
if(res) { // Se he llegado al final de la aplicacion
if(res==1) { // Se he llegado al final de la aplicacion
MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
results->exec_time = MPI_Wtime() - results->exec_start;
}
......@@ -139,9 +137,10 @@ int main(int argc, char *argv[]) {
if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) {
MPI_Comm_free(&comm);
}
free_application_data();
if(group->myId == ROOT) MPI_Abort(MPI_COMM_WORLD, 0);
MPI_Finalize();
free_application_data();
return 0;
}
......@@ -186,6 +185,7 @@ int work() {
group->iter_start = iter;
if(config_file->resizes - 1 == group->grp) res=1;
if(state == MAL_ZOMBIE) res=state;
return res;
}
......
module load mpich-3.4.1-noucx
#mpicc -Wall Main/Main.c Main/computing_func.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
mpicc -Wall Main/Main.c Main/computing_func.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
mpicc -Wall Main/Main.c Main/computing_func.c IOcodes/results.c IOcodes/read_ini.c IOcodes/ini.c malleability/malleabilityManager.c malleability/malleabilityTypes.c malleability/malleabilityZombies.c malleability/ProcessDist.c malleability/CommDist.c -pthread -lslurm -lm
if [ $# -gt 0 ]
then
......
......@@ -279,7 +279,6 @@ void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId) {
*/
void* thread_work(void* creation_data_arg) {
int numP;
MPI_Comm aux_comm;
Creation_data *creation_data = (Creation_data*) creation_data_arg;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
......@@ -374,7 +373,6 @@ void generic_spawn(int myId, int root, int spawn_is_single, MPI_Comm *child, MPI
if(myId == root) rootBcast = MPI_ROOT;
create_processes(myId, root, child, comm);
MPI_Bcast(&spawn_is_single, 1, MPI_INT, rootBcast, *child);
if(*child == MPI_COMM_NULL) {printf("P%d tiene un error --\n", myId); fflush(stdout);} else {printf("P%d guay\n", myId);}
}
commSlurm = MAL_SPAWN_COMPLETED;
}
......
......@@ -2,6 +2,7 @@
#include "malleabilityManager.h"
#include "malleabilityStates.h"
#include "malleabilityTypes.h"
#include "malleabilityZombies.h"
#include "ProcessDist.h"
#include "CommDist.h"
......@@ -18,6 +19,7 @@ int spawn_step();
int start_redistribution();
int check_redistribution();
int end_redistribution();
int shrink_redistribution();
int thread_creation();
int thread_check();
......@@ -36,7 +38,7 @@ typedef struct {
results_data *results;
} malleability_config_t;
typedef struct {
typedef struct { //FIXME numC_spawned no se esta usando
int myId, numP, numC, numC_spawned, root, root_parents;
pthread_t async_thread;
MPI_Comm comm, thread_comm;
......@@ -93,6 +95,8 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
Children_init();
return MALLEABILITY_CHILDREN;
}
zombies_service_init();
return MALLEABILITY_NOT_CHILDREN;
}
......@@ -111,6 +115,10 @@ void free_malleability() {
//MPI_Comm_free(&(mall->thread_comm));
free(mall);
free(mall_conf);
zombies_awake();
zombies_service_free();
state = MAL_UNRESERVED;
}
......@@ -136,7 +144,7 @@ int malleability_checkpoint() {
if(state == MAL_NOT_STARTED) {
// Comprobar si se tiene que realizar un redimensionado
//if(CHECK_RMS()) {return MAL_DENIED;}
state = spawn_step();
if (state == MAL_SPAWN_COMPLETED){
......@@ -148,7 +156,6 @@ int malleability_checkpoint() {
//TODO Si es MERGE SHRINK, metodo diferente de redistribucion de datos
if (state == MAL_SPAWN_COMPLETED) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
printf("TEST PADRES\n");
state = start_redistribution();
}
......@@ -427,7 +434,7 @@ void Children_init() {
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
mall->user_comm = aux;
}
MPI_Comm_disconnect(&(mall->intercomm));
......@@ -445,7 +452,13 @@ void Children_init() {
*/
int spawn_step(){
mall_conf->results->spawn_start = MPI_Wtime();
state = init_slurm_comm(mall->name_exec, mall->myId, mall->numP, mall->numC_spawned, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm));
if((mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) && mall->numP > mall->numC) {
state = shrink_redistribution();
return state;
}
state = init_slurm_comm(mall->name_exec, mall->myId, mall->numP, mall->numC, mall->root, mall_conf->spawn_dist, mall_conf->spawn_type, mall_conf->spawn_is_single, mall->thread_comm, &(mall->intercomm));
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
......@@ -456,44 +469,6 @@ int spawn_step(){
return state;
}
/*
* TODO Si los eliminados pertenecen al mismo COMMWORLD
* eliminar del todo
* TODO Eliminar los procesos por encima de numC y modificar numP
*/
/*
void malleability_zombies(int *pids, int *offset_pids) {
// Zombies treatment
int pid = getpid();
int *pids_counts = malloc(*numP * sizeof(int));
int *pids_displs = malloc(*numP * sizeof(int));
int count=1;
if(myId < new_numP) {
count = 0;
if(myId == mall->root) {
int i;
for(i=0; i < new_numP; i++) {
pids_counts[i] = 0;
}
for(i=new_numP; i<*numP; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i + *offset_pids) - new_numP;
}
*offset_pids += *numP - new_numP;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, ROOT, *comm);
if(myId == ROOT) {
int i;
for(i=0;i<*offset_pids;i++){
printf("PID[%d]=%d\n",i,pids[i]);
}
}
//free pids_counts, pids_displs
}
*/
/*
* Comienza la redistribucion de los datos con el nuevo grupo de procesos.
......@@ -513,14 +488,10 @@ int start_redistribution() {
int rootBcast = MPI_PROC_NULL;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
printf("TEST EXPAND PADRES 1\n");
if(mall->intercomm == MPI_COMM_NULL) {printf("P%d tiene un error\n", mall->myId);}
fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
MPI_Bcast(&(mall_conf->spawn_type), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->root), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall->numP), 1, MPI_INT, rootBcast, mall->intercomm);
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
printf("TEST EXPAND PADRES 2\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
mall_conf->results->async_start = MPI_Wtime();
......@@ -614,25 +585,18 @@ int end_redistribution() {
if(mall_conf->spawn_type == COMM_SPAWN_MERGE || mall_conf->spawn_type == COMM_SPAWN_MERGE_PTHREAD) {
double time_adapt = MPI_Wtime();
if(mall->numP > mall->numC) { //Shrink
//proc_adapt_shrink( numC, MPI_Comm *comm, mall->myId);
//malleability_zombies()
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt;
} else {
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
if(mall_conf->spawn_type == COMM_SPAWN_SERIAL || mall_conf->spawn_type == COMM_SPAWN_MERGE)
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_adapt;
proc_adapt_expand(&(mall->numP), mall->numC, mall->intercomm, &(mall->comm), MALLEABILITY_NOT_CHILDREN);
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
MPI_Comm_dup(mall->comm, &aux);
mall->thread_comm = aux;
MPI_Comm_dup(mall->comm, &aux);
mall->user_comm = aux;
mall_conf->results->spawn_time[mall_conf->grp] += MPI_Wtime() - time_adapt;
}
// result = MAL_DIST_ADAPTED;
}
result = MAL_DIST_COMPLETED;
......@@ -642,6 +606,28 @@ int end_redistribution() {
return result;
}
int shrink_redistribution() {
double time_adapt = MPI_Wtime();
MPI_Comm aux_comm;
MPI_Comm_dup(mall->comm, &aux_comm);
proc_adapt_shrink( mall->numC, &(mall->comm), mall->myId);
zombies_collect_suspended(aux_comm, mall->myId, mall->numP, mall->numC, mall->root);
MPI_Comm_free(&aux_comm);
if(mall->myId < mall->numC) {
MPI_Comm_dup(mall->comm, &aux_comm);
mall->thread_comm = aux_comm;
MPI_Comm_dup(mall->comm, &aux_comm);
mall->user_comm = aux_comm;
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - time_adapt;
return MAL_DIST_COMPLETED; //FIXME Refactor Poner a SPAWN_COMPLETED
} else {
return MAL_ZOMBIE;
}
}
// TODO MOVER A OTRO LADO??
//======================================================||
//================PRIVATE FUNCTIONS=====================||
......
#define MAL_UNRESERVED -1
#define MAL_DENIED -2
#define MAL_ZOMBIE -3
#define MAL_NOT_STARTED 0
#define MAL_SPAWN_PENDING 1
#define MAL_SPAWN_COMPLETED 2
......
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
//#include <slurm/slurm.h>
#include <signal.h>
#include "malleabilityZombies.h"
#define PIDS_QTY 320
void zombies_suspend();
int offset_pids, *pids = NULL;
void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root) {
int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int));
int i, count=1;
if(myId < numC) {
count = 0;
if(myId == root) {
for(i=0; i < numC; i++) {
pids_counts[i] = 0;
}
for(i=numC; i<numP; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i + offset_pids) - numC;
}
offset_pids += numP - numC;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, root, comm);
free(pids_counts);
free(pids_displs);
if(myId >= numC) {
zombies_suspend();
}
}
void zombies_service_init() {
offset_pids = 0;
pids = malloc(PIDS_QTY * sizeof(int));
for(int i=0; i<PIDS_QTY; i++) {
pids[i] = 0;
}
}
void zombies_service_free() {
free(pids);
}
void zombies_suspend() {
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
act.sa_handler=gestor_usr2;
sigaction(SIGUSR2, &act, NULL);
sigset_t set;
sigprocmask(SIG_SETMASK,NULL,&set);
sigsuspend(&set);
}
void zombies_awake() {
for(int i=0; i < offset_pids; i++) { // Despertar a los zombies
kill(pids[i], SIGUSR2);
}
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
//#include <slurm/slurm.h>
#include <signal.h>
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root);
void zombies_service_init();
void zombies_service_free();
void zombies_awake();
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment