Commit e466e997 authored by iker_martin's avatar iker_martin
Browse files

Updated maleable CG to use the new MaM interface

parent b7bcaffe
#include "malleabilityTypes.h"
#include "malleabilityDataStructures.h"
#include "MAM_Configuration.h"
void init_malleability_data_struct(malleability_data_t *data_struct, size_t size);
......@@ -20,7 +22,7 @@ void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleabilit
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
*/
void add_data(void *data, size_t total_qty, int type, int dependency, size_t request_qty, malleability_data_t *data_struct) {
void add_data(void *data, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct) {
size_t i;
if(data_struct->entries == 0) {
......@@ -31,7 +33,6 @@ void add_data(void *data, size_t total_qty, int type, int dependency, size_t req
data_struct->qty[data_struct->entries] = total_qty;
data_struct->types[data_struct->entries] = type;
data_struct->dependencies[data_struct->entries] = dependency;
data_struct->arrays[data_struct->entries] = data;
data_struct->request_qty[data_struct->entries] = request_qty;
......@@ -50,7 +51,7 @@ void add_data(void *data, size_t total_qty, int type, int dependency, size_t req
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
*/
void modify_data(void *data, size_t index, size_t total_qty, int type, int dependency, size_t request_qty, malleability_data_t *data_struct) {
void modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct) {
size_t i;
if(data_struct->entries < index) { // Index does not exist
......@@ -63,7 +64,6 @@ void modify_data(void *data, size_t index, size_t total_qty, int type, int depen
data_struct->qty[index] = total_qty;
data_struct->types[index] = type;
data_struct->dependencies[index] = dependency;
data_struct->arrays[index] = data;
data_struct->request_qty[index] = request_qty;
......@@ -82,21 +82,14 @@ void modify_data(void *data, size_t index, size_t total_qty, int type, int depen
* En el argumento "root" todos tienen que indicar quien es el proceso raiz de los padres
* unicamente.
*/
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) {
int is_intercomm, rootBcast = MPI_PROC_NULL;
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group) {
int type_size;
size_t i, j;
MPI_Datatype entries_type, struct_type;
MPI_Comm_test_inter(intercomm, &is_intercomm);
if(is_intercomm && !is_children_group) {
rootBcast = myId == root ? MPI_ROOT : MPI_PROC_NULL;
} else {
rootBcast = root;
}
// Mandar primero numero de entradas
def_malleability_entries(data_struct_dist, data_struct_rep, &entries_type);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, rootBcast, intercomm);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, mall->root_collectives, mall->intercomm);
if(is_children_group && ( data_struct_rep->entries != 0 || data_struct_dist->entries != 0 )) {
init_malleability_data_struct(data_struct_rep, data_struct_rep->entries);
......@@ -104,28 +97,19 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
}
def_malleability_qty_type(data_struct_dist, data_struct_rep, &struct_type);
MPI_Bcast(MPI_BOTTOM, 1, struct_type, rootBcast, intercomm);
MPI_Bcast(MPI_BOTTOM, 1, struct_type, mall->root_collectives, mall->intercomm);
if(is_children_group) {
for(i=0; i < data_struct_rep->entries; i++) {
if(data_struct_rep->types[i] == MAL_INT) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(int));
} else if(data_struct_rep->types[i] == MAL_DOUBLE) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(double));
} else if(data_struct_rep->types[i] == MAL_CHAR) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(char));
} else {
printf("Malleability -- Redistribution data array type not recognised\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
MPI_Type_size(data_struct_rep->types[i], &type_size);
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * type_size);
data_struct_rep->requests[i] = (MPI_Request *) malloc(data_struct_rep->request_qty[i] * sizeof(MPI_Request));
for(j=0; j < data_struct_rep->request_qty[i]; j++) {
data_struct_rep->requests[i][j] = MPI_REQUEST_NULL;
}
}
for(i=0; i < data_struct_dist->entries; i++) {
data_struct_dist->arrays[i] = (void *) NULL;
data_struct_dist->arrays[i] = (void *) NULL; // TODO Se podria inicializar aqui?
data_struct_dist->requests[i] = (MPI_Request *) malloc(data_struct_dist->request_qty[i] * sizeof(MPI_Request));
for(j=0; j < data_struct_dist->request_qty[i]; j++) {
data_struct_dist->requests[i][j] = MPI_REQUEST_NULL;
......@@ -153,10 +137,10 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size
data_struct->max_entries = size;
data_struct->qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->types = (int *) malloc(size * sizeof(int));
data_struct->dependencies = (int *) malloc(size * sizeof(int));
data_struct->types = (MPI_Datatype *) malloc(size * sizeof(MPI_Datatype));
data_struct->request_qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *));
data_struct->windows = (MPI_Win *) malloc(size * sizeof(MPI_Win));
data_struct->arrays = (void **) malloc(size * sizeof(void *));
for(i=0; i<size; i++) { //calloc and memset does not ensure a NULL value
......@@ -172,33 +156,42 @@ void init_malleability_data_struct(malleability_data_t *data_struct, size_t size
*/
void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t qty_to_add) {
size_t i, needed, *qty_aux, *request_qty_aux;
int *types_aux, *dependencies_aux;
MPI_Datatype *types_aux;
MPI_Win *windows_aux;
MPI_Request **requests_aux;
void **arrays_aux;
needed = data_struct->max_entries + qty_to_add;
qty_aux = (size_t *) realloc(data_struct->qty, needed * sizeof(int));
types_aux = (int *) realloc(data_struct->types, needed * sizeof(int));
dependencies_aux = (int *) realloc(data_struct->dependencies, needed * sizeof(int));
types_aux = (MPI_Datatype *) realloc(data_struct->types, needed * sizeof(MPI_Datatype));
request_qty_aux = (size_t *) realloc(data_struct->request_qty, needed * sizeof(int));
requests_aux = (MPI_Request **) realloc(data_struct->requests, needed * sizeof(MPI_Request *));
windows_aux = (MPI_Win *) realloc(data_struct->windows, needed * sizeof(MPI_Win));
arrays_aux = (void **) realloc(data_struct->arrays, needed * sizeof(void *));
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL || dependencies_aux == NULL || request_qty_aux == NULL) {
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL || request_qty_aux == NULL || windows_aux == NULL) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria constante de datos a redistribuir/comunicar\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
for(i=data_struct->max_entries; i<needed; i++) { //calloc and memset does not ensure a NULL value
for(i=data_struct->max_entries; i<needed; i++) { //realloc does not ensure a NULL value
requests_aux[i] = NULL;
arrays_aux[i] = NULL;
}
// Check if old array can be freed
if(data_struct->qty != qty_aux && data_struct->qty != NULL) free(data_struct->qty);
if(data_struct->types != types_aux && data_struct->types != NULL) free(data_struct->types);
if(data_struct->request_qty != request_qty_aux && data_struct->request_qty != NULL) free(data_struct->request_qty);
if(data_struct->requests != requests_aux && data_struct->requests != NULL) free(data_struct->requests);
if(data_struct->windows != windows_aux && data_struct->windows != NULL) free(data_struct->windows);
if(data_struct->arrays != arrays_aux && data_struct->arrays != NULL) free(data_struct->arrays);
data_struct->qty = qty_aux;
data_struct->types = types_aux;
data_struct->dependencies = dependencies_aux;
data_struct->request_qty = request_qty_aux;
data_struct->requests = requests_aux;
data_struct->windows = windows_aux;
data_struct->arrays = arrays_aux;
data_struct->max_entries = needed;
}
......@@ -208,19 +201,12 @@ void free_malleability_data_struct(malleability_data_t *data_struct) {
max = data_struct->entries;
if(max != 0) {
for(i=0; i<max; i++) {
//free(data_struct->arrays[i]); //FIXME Valores alojados con 1 elemento no se liberan?
}
if(data_struct->qty != NULL) {
free(data_struct->qty);
}
if(data_struct->types != NULL) {
free(data_struct->types);
}
if(data_struct->dependencies != NULL) {
free(data_struct->dependencies);
}
if(data_struct->requests != NULL && data_struct->request_qty != NULL) {
for(i=0; i<max; i++) {
if(data_struct->requests[i] != NULL) {
......@@ -237,6 +223,10 @@ void free_malleability_data_struct(malleability_data_t *data_struct) {
free(data_struct->requests);
}
if(data_struct->windows != NULL) {
free(data_struct->windows);
}
if(data_struct->arrays != NULL) {
free(data_struct->arrays);
}
......@@ -257,10 +247,11 @@ void def_malleability_entries(malleability_data_t *data_struct_rep, malleability
int counts = 2;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
MPI_Datatype types[counts], type_size_t;
MPI_Type_match_size(MPI_TYPECLASS_INTEGER, sizeof(size_t), &type_size_t);
blocklengths[0] = blocklengths[1] = 1;
types[0] = types[1] = MPI_UNSIGNED_LONG;
types[0] = types[1] = type_size_t;
// Obtener direccion base
MPI_Get_address(&(data_struct_rep->entries), &displs[0]);
......@@ -278,25 +269,27 @@ void def_malleability_entries(malleability_data_t *data_struct_rep, malleability
* TODO Refactor?
*/
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 8;
int counts = 6;
int blocklengths[counts];
MPI_Aint displs[counts];
MPI_Datatype types[counts];
MPI_Datatype types[counts], type_size_t;
MPI_Type_match_size(MPI_TYPECLASS_INTEGER, sizeof(size_t), &type_size_t);
types[0] = types[1] = types[4] = types[5] = MPI_UNSIGNED_LONG;
types[2] = types[3] = types[6] = types[7] = MPI_INT;
blocklengths[0] = blocklengths[1] = blocklengths[2] = blocklengths[3] = data_struct_rep->entries;
blocklengths[4] = blocklengths[5] = blocklengths[6] = blocklengths[7] = data_struct_dist->entries;
types[0] = types[1] = types[3] = types[4] = type_size_t;
types[2] = types[5] = MPI_INT;
blocklengths[0] = blocklengths[1] = blocklengths[2] = data_struct_rep->entries;
blocklengths[3] = blocklengths[4] = blocklengths[5] = data_struct_dist->entries;
MPI_Get_address((data_struct_rep->qty), &displs[0]);
MPI_Get_address((data_struct_rep->request_qty), &displs[1]);
MPI_Get_address((data_struct_rep->types), &displs[2]);
MPI_Get_address((data_struct_rep->dependencies), &displs[3]);
MPI_Get_address((data_struct_dist->qty), &displs[4]);
MPI_Get_address((data_struct_dist->request_qty), &displs[5]);
MPI_Get_address((data_struct_dist->types), &displs[6]);
MPI_Get_address((data_struct_dist->dependencies), &displs[7]);
MPI_Get_address((data_struct_rep->types), &displs[2]); // MPI_Datatype uses typedef int to be declared
MPI_Get_address((data_struct_dist->qty), &displs[3]);
MPI_Get_address((data_struct_dist->request_qty), &displs[4]);
MPI_Get_address((data_struct_dist->types), &displs[5]); // MPI_Datatype uses typedef int to be declared
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type);
}
......@@ -14,20 +14,20 @@ typedef struct {
size_t entries; // Indica numero de vectores a comunicar (replicated data)
size_t max_entries;
size_t *qty; // Indica numero de elementos en cada subvector de sync_array
int *types;
int *dependencies;
MPI_Datatype *types;
// Vector de vectores de request. En cada elemento superior se indican los requests a comprobar para dar por finalizada
// la comunicacion de ese dato
size_t *request_qty;
MPI_Request **requests;
MPI_Win *windows;
void **arrays; // Cada subvector es una serie de datos a comunicar
} malleability_data_t;
void add_data(void *data, size_t total_qty, int type, int dependency, size_t request_qty, malleability_data_t *data_struct);
void modify_data(void *data, size_t index, size_t total_qty, int type, int dependency, size_t request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm);
void add_data(void *data, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct);
void modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group);
void free_malleability_data_struct(malleability_data_t *data_struct);
#endif
......@@ -16,30 +16,34 @@ int offset_pids, *pids = NULL;
void gestor_usr2() {}
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root) {
void zombies_collect_suspended(MPI_Comm comm) {
int pid = getpid();
int *pids_counts = malloc(numP * sizeof(int));
int *pids_displs = malloc(numP * sizeof(int));
int *pids_counts = malloc(mall->numP * sizeof(int));
int *pids_displs = malloc(mall->numP * sizeof(int));
int i, count=1;
if(myId < numC) {
#if USE_MAL_DEBUG > 2
if(mall->myId == mall->root){ DEBUG_FUNC("Collecting zombies", mall->myId, mall->numP); } fflush(stdout);
#endif
if(mall->myId < mall->numC) {
count = 0;
if(myId == root) {
for(i=0; i < numC; i++) {
if(mall->myId == mall->root) {
for(i=0; i < mall->numC; i++) {
pids_counts[i] = 0;
}
for(i=numC; i<numP; i++) {
for(i=mall->numC; i<mall->numP; i++) {
pids_counts[i] = 1;
pids_displs[i] = (i + offset_pids) - numC;
pids_displs[i] = (i - mall->numC) + offset_pids;
}
offset_pids += numP - numC;
offset_pids += mall->numP - mall->numC;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, root, comm);
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, mall->root, comm);
free(pids_counts);
free(pids_displs);
if(myId >= numC) {
if(mall->myId >= mall->numC) {
zombies_suspend();
}
}
......
......@@ -8,8 +8,9 @@
#include <unistd.h>
#include <mpi.h>
#include <signal.h>
#include "malleabilityDataStructures.h"
void zombies_collect_suspended(MPI_Comm comm, int myId, int numP, int numC, int root);
void zombies_collect_suspended(MPI_Comm comm);
void zombies_service_init();
void zombies_service_free();
void zombies_awake();
......
......@@ -2,13 +2,14 @@
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#include "Baseline.h"
#include "Spawn_state.h"
//--------------PRIVATE DECLARATIONS---------------//
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child);
int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child);
void single_strat_children(int myId, int root, MPI_Comm *parents);
void single_strat_children(MPI_Comm *parents);
//--------------PUBLIC FUNCTIONS---------------//
......@@ -27,8 +28,9 @@ int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de err
baseline_spawn(spawn_data, spawn_data.comm, child);
}
} else if(spawn_data.spawn_is_single) { // Children path
single_strat_children(spawn_data.myId, spawn_data.root, child);
single_strat_children(child);
}
return MALL_SPAWN_COMPLETED;
}
......@@ -38,20 +40,16 @@ int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de err
* "processes_dist()".
*/
int baseline_spawn(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *child) {
int rootBcast = MPI_PROC_NULL;
if(spawn_data.myId == spawn_data.root) rootBcast = MPI_ROOT;
if(mall->myId == mall->root) rootBcast = MPI_ROOT;
// WORK
int spawn_err = MPI_Comm_spawn(spawn_data.cmd, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, spawn_data.root, comm, child, MPI_ERRCODES_IGNORE);
int spawn_err = MPI_Comm_spawn(mall->name_exec, MPI_ARGV_NULL, spawn_data.spawn_qty, spawn_data.mapping, mall->root, comm, child, MPI_ERRCODES_IGNORE);
MPI_Comm_set_name(*child, "MPI_COMM_MALL_RESIZE");
// END WORK
if(spawn_err != MPI_SUCCESS) {
printf("Error creating new set of %d procs.\n", spawn_data.spawn_qty);
}
MPI_Bcast(&spawn_data, 1, spawn_data.dtype, rootBcast, *child);
MAM_Comm_main_structures(rootBcast);
return spawn_err;
}
......@@ -65,21 +63,21 @@ int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
char *port_name;
MPI_Comm newintercomm;
if (spawn_data.myId == spawn_data.root) {
if (mall->myId == mall->root) {
spawn_err = baseline_spawn(spawn_data, MPI_COMM_SELF, child);
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, spawn_data.root, 130, *child, MPI_STATUS_IGNORE);
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, 130, *child, MPI_STATUS_IGNORE);
set_spawn_state(MALL_SPAWN_SINGLE_COMPLETED, spawn_data.spawn_is_async); // Indicate other processes to join root to end spawn procedure
wakeup_completion();
} else {
port_name = malloc(1);
}
MPI_Comm_connect(port_name, MPI_INFO_NULL, spawn_data.root, spawn_data.comm, &newintercomm);
MPI_Comm_connect(port_name, MPI_INFO_NULL, mall->root, spawn_data.comm, &newintercomm);
if(spawn_data.myId == spawn_data.root)
if(mall->myId == mall->root)
MPI_Comm_free(child);
free(port_name);
*child = newintercomm;
......@@ -94,21 +92,21 @@ int single_strat_parents(Spawn_data spawn_data, MPI_Comm *child) {
* Solo se utiliza cuando la creación de los procesos ha sido
* realizada por un solo proceso padre
*/
void single_strat_children(int myId, int root, MPI_Comm *parents) {
void single_strat_children(MPI_Comm *parents) {
char *port_name;
MPI_Comm newintercomm;
if(myId == root) {
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
MPI_Open_port(MPI_INFO_NULL, port_name);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *parents);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, mall->root_parents, 130, *parents);
} else {
port_name = malloc(1);
}
MPI_Comm_accept(port_name, MPI_INFO_NULL, root, MPI_COMM_WORLD, &newintercomm);
MPI_Comm_accept(port_name, MPI_INFO_NULL, mall->root, MPI_COMM_WORLD, &newintercomm);
if(myId == root) {
if(mall->myId == mall->root) {
MPI_Close_port(port_name);
}
free(port_name);
......
......@@ -6,6 +6,7 @@
#include <mpi.h>
#include <string.h>
#include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
int baseline(Spawn_data spawn_data, MPI_Comm *child);
#endif
......@@ -6,6 +6,8 @@
#include <mpi.h>
#include <string.h>
#include "../malleabilityStates.h"
#include "../malleabilityDataStructures.h"
#include "../MAM_Configuration.h"
#include "ProcessDist.h"
#include "GenericSpawn.h"
#include "Baseline.h"
......@@ -17,24 +19,20 @@
Spawn_data *spawn_data = NULL;
pthread_t spawn_thread;
MPI_Comm *returned_comm;
double end_time; //FIXME REFACTOR
//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodelist, int myId, int root, int initial_qty, int target_qty, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm);
void set_basic_spawn_dtype();
void set_spawn_configuration(MPI_Comm comm);
void deallocate_spawn_data();
//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, int global_state);
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time);
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed);
//--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn();
void* thread_work();
int allocate_thread_spawn(MPI_Comm *child);
void* thread_work(void *args);
//--------------PUBLIC FUNCTIONS---------------//
......@@ -55,9 +53,9 @@ void* thread_work();
* Devuelve el estado de el procedimiento. Si no devuelve "MALL_SPAWN_COMPLETED", es necesario llamar a
* "check_spawn_state()".
*/
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child) {
int init_spawn(MPI_Comm comm, MPI_Comm *child) {
int local_state;
set_spawn_configuration(argv, num_cpus, num_nodes, nodelist, myId, root, initial_qty, target_qty, type_dist, spawn_method, spawn_strategies, comm);
set_spawn_configuration(comm);
if(!spawn_data->spawn_is_async) {
generic_spawn(child, MALL_NOT_STARTED);
......@@ -68,11 +66,11 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
} else {
local_state = spawn_data->spawn_is_single ?
MALL_SPAWN_SINGLE_PENDING : MALL_SPAWN_PENDING;
local_state = spawn_data->spawn_method == MALL_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
local_state = mall_conf->spawn_method == MALL_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
MALL_SPAWN_ADAPT_POSTPONE : local_state;
set_spawn_state(local_state, 0);
if((spawn_data->spawn_is_single && myId == root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn();
if((spawn_data->spawn_is_single && mall->myId == mall->root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn(child);
}
}
......@@ -83,7 +81,7 @@ int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) {
int local_state;
int global_state=MALL_NOT_STARTED;
......@@ -91,10 +89,10 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_SINGLE_PENDING || local_state == MALL_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, local_state);
global_state = check_single_state(comm, child, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Baseline
global_state = check_generic_state(comm, child, local_state, real_time);
} else if(local_state == MALL_SPAWN_PENDING || local_state == MALL_SPAWN_COMPLETED || local_state == MALL_SPAWN_ADAPTED) { // Generic
global_state = check_generic_state(comm, local_state, wait_completed);
} else if(local_state == MALL_SPAWN_ADAPT_POSTPONE) {
global_state = local_state;
......@@ -104,7 +102,7 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
} else if(spawn_data->spawn_method == MALL_SPAWN_MERGE){ // Start Merge shrink Sync
} else if(mall_conf->spawn_method == MALL_SPAWN_MERGE){ // Start Merge shrink Sync
generic_spawn(child, MALL_DIST_COMPLETED);
global_state = get_spawn_state(spawn_data->spawn_is_async);
}
......@@ -126,8 +124,8 @@ int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time) {
void unset_spawn_postpone_flag(int outside_state) {
int local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE && outside_state == MALL_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) {
set_spawn_state(MALL_SPAWN_PENDING, MALL_SPAWN_PTHREAD);
wakeup();
set_spawn_state(MALL_SPAWN_PENDING, spawn_data->spawn_is_async);
wakeup_redistribution();
}
}
......@@ -139,68 +137,52 @@ void unset_spawn_postpone_flag(int outside_state) {
* para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
*
*/
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents) {
void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->root = root;
spawn_data->myId = myId;
spawn_data->spawn_qty = numP;
spawn_data->target_qty = numP;
spawn_data->spawn_qty = mall->numP;
spawn_data->target_qty = mall->numP;
spawn_data->comm = comm;
set_basic_spawn_dtype();
MPI_Bcast(spawn_data, 1, spawn_data->dtype, MALLEABILITY_ROOT, *parents);
MAM_Comm_main_structures(MALLEABILITY_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
//MPI_Comm_remote_size(*parents, &spawn_data->initial_qty);
spawn_data->initial_qty = mall->num_parents;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
switch(spawn_data->spawn_method) {
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
baseline(*spawn_data, parents);
if(!spawn_data->spawn_is_intercomm) {
intracomm_strategy(MALLEABILITY_CHILDREN, parents);
}
break;
case MALL_SPAWN_MERGE:
spawn_data->target_qty += spawn_data->initial_qty;
merge(*spawn_data, parents, MALL_NOT_STARTED);
break;
}
*root_parents = spawn_data->root_parents;
*numP_parents = spawn_data->initial_qty;
MPI_Type_free(&(spawn_data->dtype));
//mall->num_parents = spawn_data->initial_qty;
free(spawn_data);
}
/*
* Función para obtener si entre las estrategias elegidas, se utiliza
* la estrategia pasada como segundo argumento.
*
* Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso
* contrario.
*/
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result) {
int value = spawn_strategies % strategy ? 0 : 1;
if(result != NULL) *result = value;
return value;
}
//--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/*
* Agrupa en una sola estructura todos los datos de configuración necesarios
* e inicializa las estructuras necesarias.
*/
void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodelist, int myId, int root, int initial_qty, int target_qty, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm) {
void set_spawn_configuration(MPI_Comm comm) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->myId = myId;
spawn_data->root = root;
spawn_data->root_parents = root;
spawn_data->initial_qty = initial_qty;
spawn_data->target_qty = target_qty;
spawn_data->spawn_method = spawn_method;
malleability_spawn_contains_strat(spawn_strategies, MALL_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
malleability_spawn_contains_strat(spawn_strategies, MALL_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
spawn_data->initial_qty = mall->numP;
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
spawn_data->comm = comm;
spawn_data->mapping_fill_method = MALL_DIST_STRING;
set_basic_spawn_dtype();
switch(spawn_data->spawn_method) {
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
spawn_data->already_created = 0;
......@@ -214,47 +196,7 @@ void set_spawn_configuration(char *cmd, int num_cpus, int num_nodes, char *nodel
if(spawn_data->spawn_is_async) {
init_spawn_state();
}
spawn_data->mapping = MPI_INFO_NULL;
if(spawn_data->myId == spawn_data->root) {
physical_struct_create(target_qty, spawn_data->already_created, num_cpus, num_nodes, nodelist, type_dist, MALL_DIST_STRING, &(spawn_data->dist));
//COPY PROGRAM NAME
spawn_data->cmd = malloc((strlen(cmd)+1) * sizeof(char));
strcpy(spawn_data->cmd, cmd);
spawn_data->cmd[strlen(cmd)]='\0';
} else {
spawn_data->cmd = malloc(1 * sizeof(char));
}
}
/*
* Crea un tipo derivado para mandar 4 enteros con informacion
* basica a los hijos. Son datos necesarios para que terminen
* la creacion de procesos.
*/
void set_basic_spawn_dtype() {
int i, counts = 4;
int blocklengths[] = {1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = MPI_INT;
// Rellenar vector displs
MPI_Get_address(spawn_data, &dir);
MPI_Get_address(&(spawn_data->root_parents), &displs[0]);
MPI_Get_address(&(spawn_data->initial_qty), &displs[1]);
MPI_Get_address(&(spawn_data->spawn_is_single), &displs[2]);
MPI_Get_address(&(spawn_data->spawn_method), &displs[3]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, &(spawn_data->dtype));
MPI_Type_commit(&(spawn_data->dtype));
}
/*
......@@ -264,12 +206,6 @@ void set_basic_spawn_dtype() {
void deallocate_spawn_data() {
if(spawn_data == NULL) return;
if(spawn_data->cmd != NULL) {
free(spawn_data->cmd);
}
if(spawn_data->dtype != MPI_DATATYPE_NULL) {
MPI_Type_free(&(spawn_data->dtype));
}
if(spawn_data->mapping != MPI_INFO_NULL) {
MPI_Info_free(&(spawn_data->mapping));
}
......@@ -293,19 +229,21 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
int local_state, aux_state;
// WORK
if(spawn_data->myId == spawn_data->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
processes_dist(spawn_data->dist, &(spawn_data->mapping));
if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
processes_dist(*spawn_data, &(spawn_data->mapping));
}
switch(spawn_data->spawn_method) {
switch(mall_conf->spawn_method) {
case MALL_SPAWN_BASELINE:
local_state = baseline(*spawn_data, child);
if(!spawn_data->spawn_is_intercomm) {
local_state = intracomm_strategy(MALLEABILITY_NOT_CHILDREN, child);
}
break;
case MALL_SPAWN_MERGE:
local_state = merge(*spawn_data, child, data_stage);
break;
}
// END WORK
end_time = MPI_Wtime();
aux_state = get_spawn_state(spawn_data->spawn_is_async);
if(!(aux_state == MALL_SPAWN_PENDING && local_state == MALL_SPAWN_ADAPT_POSTPONE)) {
set_spawn_state(local_state, spawn_data->spawn_is_async);
......@@ -320,8 +258,8 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
* No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
* asociado al terminar.
*/
int allocate_thread_spawn() {
if(pthread_create(&spawn_thread, NULL, thread_work, NULL)) {
int allocate_thread_spawn(MPI_Comm *child) {
if(pthread_create(&spawn_thread, NULL, thread_work, (void *) child)) {
printf("Error al crear el hilo de SPAWN\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
......@@ -341,19 +279,20 @@ int allocate_thread_spawn() {
* Una vez esta lista la configuracion y es posible crear los procesos
* se avisa al hilo maestro.
*/
void* thread_work() {
void* thread_work(void *args) {
int local_state;
returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm));
MPI_Comm *child = (MPI_Comm *) args;
generic_spawn(returned_comm, MALL_NOT_STARTED);
generic_spawn(child, MALL_NOT_STARTED);
local_state = get_spawn_state(MALL_SPAWN_PTHREAD);
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE || local_state == MALL_SPAWN_PENDING) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos
local_state = wait_wakeup();
generic_spawn(returned_comm, MALL_DIST_COMPLETED);
local_state = wait_redistribution();
generic_spawn(child, MALL_DIST_COMPLETED);
}
wakeup_completion();
pthread_exit(NULL);
}
......@@ -368,17 +307,20 @@ void* thread_work() {
* los procesos no root y se devuelve el estado
* "MALL_SPAWN_PENDING".
*/
int check_single_state(MPI_Comm comm, int global_state) {
MPI_Bcast(&global_state, 1, MPI_INT, spawn_data->root, comm);
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed) {
while(wait_completed && mall->myId == mall->root && global_state == MALL_SPAWN_SINGLE_PENDING) {
global_state = wait_completion();
}
MPI_Bcast(&global_state, 1, MPI_INT, mall->root, comm);
// Non-root processes join root to finalize the spawn
// They also must join if the application has ended its work
if(global_state == MALL_SPAWN_SINGLE_COMPLETED) {
global_state = MALL_SPAWN_PENDING;
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
set_spawn_state(global_state, spawn_data->spawn_is_async);
if(spawn_data->myId != spawn_data->root) {
allocate_thread_spawn(spawn_data);
if(mall->myId != mall->root) {
allocate_thread_spawn(child);
}
}
return global_state;
......@@ -393,15 +335,14 @@ int check_single_state(MPI_Comm comm, int global_state) {
* Si ha terminado libera la memoria asociada a spawn_data
* y devuelve el estado "MALL_SPAWN_COMPLETED".
*/
int check_generic_state(MPI_Comm comm, MPI_Comm *child, int local_state, double *real_time) {
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed) {
int global_state;
while(wait_completed && local_state == MALL_SPAWN_PENDING) local_state = wait_completion();
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MALL_SPAWN_COMPLETED || global_state == MALL_SPAWN_ADAPTED) {
set_spawn_state(global_state, MALL_SPAWN_PTHREAD);
*child = *returned_comm;
deallocate_spawn_data(spawn_data);
*real_time=end_time;
set_spawn_state(global_state, spawn_data->spawn_is_async);
}
return global_state;
}
......@@ -4,14 +4,13 @@
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
int init_spawn(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int initial_qty, int target_qty, int root, int type_dist, int spawn_method, int spawn_strategies, MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, double *real_time);
void malleability_connect_children(int myId, int numP, int root, MPI_Comm comm, int *numP_parents, int *root_parents, MPI_Comm *parents);
int init_spawn(MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed);
void malleability_connect_children(MPI_Comm comm, MPI_Comm *parents);
void unset_spawn_postpone_flag(int outside_state);
int malleability_spawn_contains_strat(int spawn_strategies, int strategy, int *result);
#endif
This diff is collapsed.
......@@ -5,7 +5,9 @@
#include <stdlib.h>
#include <mpi.h>
#include "../malleabilityDataStructures.h"
#include "Spawn_DataStructure.h"
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state);
int intracomm_strategy(int is_children_group, MPI_Comm *child);
#endif
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment