Commit 60dedf0d authored by iker_martin's avatar iker_martin
Browse files

WIP. Renaming constants to start with MAM

parent 2ca85de0
......@@ -26,7 +26,7 @@ enum mam_key_values{MAM_SPAWN_METHOD=0, MAM_SPAWN_STRATEGIES, MAM_PHYSICAL_DISTR
#define MAM_RED_STRATS_ENV "MAM_RED_STRATS"
#define MAM_NUM_TARGETS_ENV "MAM_NUM_TARGETS"
#define MALLEABILITY_ROOT 0
#define MAM_ROOT 0
#define MAL_APP_EXECUTING 0
#define MAL_APP_ENDED 1
......@@ -34,8 +34,8 @@ enum mam_key_values{MAM_SPAWN_METHOD=0, MAM_SPAWN_STRATEGIES, MAM_PHYSICAL_DISTR
#define MAM_CHECK_COMPLETION 0
#define MAM_WAIT_COMPLETION 1
#define MALLEABILITY_CHILDREN 1
#define MALLEABILITY_NOT_CHILDREN 0
#define MAM_SOURCES 0
#define MAM_TARGETS 1
#define MAM_DATA_DISTRIBUTED 0
#define MAM_DATA_REPLICATED 1
......
......@@ -11,8 +11,8 @@
#include "spawn_methods/GenericSpawn.h"
#include "distribution_methods/Distributed_CommDist.h"
#define MALLEABILITY_USE_SYNCHRONOUS 0
#define MALLEABILITY_USE_ASYNCHRONOUS 1
#define MAM_USE_SYNCHRONOUS 0
#define MAM_USE_ASYNCHRONOUS 1
void MAM_Commit(int *mam_state);
......@@ -120,7 +120,7 @@ int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(vo
MPI_Comm_get_parent(&(mall->intercomm));
if(mall->intercomm != MPI_COMM_NULL) {
Children_init(user_function, user_args);
return MALLEABILITY_CHILDREN;
return MAM_TARGETS;
}
//TODO Check potential improvement - If check_hosts does not use slurm, internode_group could be obtained there
......@@ -137,7 +137,7 @@ int MAM_Init(int root, MPI_Comm *comm, char *name_exec, void (*user_function)(vo
DEBUG_FUNC("MaM has been initialized correctly as parents", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(*comm);
#endif
return MALLEABILITY_NOT_CHILDREN;
return MAM_SOURCES;
}
/*
......@@ -494,7 +494,7 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
for(i=0; i < data_struct->entries; i++) {
aux_send = data_struct->arrays[i];
aux_recv = NULL;
async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN,
async_communication_start(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MAM_SOURCES,
mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
......@@ -502,7 +502,7 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
for(i=0; i < data_struct->entries; i++) {
aux_send = data_struct->arrays[i];
aux_recv = NULL;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall->intercomm);
sync_communication(aux_send, &aux_recv, data_struct->qty[i], data_struct->types[i], mall->numP, numP_children, MAM_SOURCES, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = aux_recv;
}
}
......@@ -520,14 +520,14 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = data_struct->arrays[i];
async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN,
async_communication_start(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MAM_TARGETS,
mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
data_struct->arrays[i] = aux;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = data_struct->arrays[i];
sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall->intercomm);
sync_communication(aux_s, &aux, data_struct->qty[i], data_struct->types[i], mall->numP, numP_parents, MAM_TARGETS, mall->intercomm);
data_struct->arrays[i] = aux;
}
}
......@@ -616,7 +616,7 @@ int MAM_St_user_start(int *mam_state) {
#endif
mall_conf->times->user_start = MPI_Wtime(); // Obtener timestamp de cuando termina user redist
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_NOT_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
MPI_Intercomm_merge(mall->intercomm, MAM_SOURCES, &mall->tmp_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
}
......@@ -631,7 +631,7 @@ int MAM_St_user_pending(int *mam_state, int wait_completed, void (*user_function
if(mall->myId == mall->root) DEBUG_FUNC("Starting USER redistribution", mall->myId, mall->numP); fflush(stdout);
#endif
if(user_function != NULL) {
MAM_I_create_user_struct(MALLEABILITY_NOT_CHILDREN);
MAM_I_create_user_struct(MAM_SOURCES);
do {
user_function(user_args);
} while(wait_completed && state == MALL_USER_PENDING);
......@@ -730,7 +730,7 @@ void Children_init(void (*user_function)(void *), void *user_args) {
DEBUG_FUNC("Spawned have completed spawn step", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN);
comm_data_info(rep_a_data, dist_a_data, MAM_TARGETS);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
#if USE_MAL_DEBUG >= 2
DEBUG_FUNC("Spawned start asynchronous redistribution", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
......@@ -740,12 +740,12 @@ void Children_init(void (*user_function)(void *), void *user_args) {
#endif
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
recv_data(mall->num_parents, dist_a_data, MAM_USE_SYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
}
} else {
recv_data(mall->num_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
recv_data(mall->num_parents, dist_a_data, MAM_USE_ASYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
......@@ -790,14 +790,14 @@ void Children_init(void (*user_function)(void *), void *user_args) {
MPI_Barrier(mall->intercomm);
#endif
if(MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL)) {
MPI_Intercomm_merge(mall->intercomm, MALLEABILITY_CHILDREN, &mall->tmp_comm); //El que pone 0 va primero
MPI_Intercomm_merge(mall->intercomm, MAM_TARGETS, &mall->tmp_comm); //El que pone 0 va primero
} else {
MPI_Comm_dup(mall->intercomm, &mall->tmp_comm);
}
MPI_Comm_set_name(mall->tmp_comm, "MAM_USER_TMP");
if(user_function != NULL) {
state = MALL_USER_PENDING;
MAM_I_create_user_struct(MALLEABILITY_CHILDREN);
MAM_I_create_user_struct(MAM_TARGETS);
user_function(user_args);
}
#if USE_MAL_BARRIERS
......@@ -805,12 +805,12 @@ void Children_init(void (*user_function)(void *), void *user_args) {
#endif
mall_conf->times->user_end = MPI_Wtime(); // Obtener timestamp de cuando termina user redist
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN);
comm_data_info(rep_s_data, dist_s_data, MAM_TARGETS);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
#endif
recv_data(mall->num_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
recv_data(mall->num_parents, dist_s_data, MAM_USE_SYNCHRONOUS);
for(i=0; i<rep_s_data->entries; i++) {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
......@@ -884,7 +884,7 @@ int start_redistribution() {
MPI_Comm_dup(mall->comm, &(mall->intercomm));
}
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN);
comm_data_info(rep_a_data, dist_a_data, MAM_SOURCES);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
......@@ -893,7 +893,7 @@ int start_redistribution() {
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_PTHREAD, NULL)) {
return thread_creation();
} else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
send_data(mall->numC, dist_a_data, MAM_USE_ASYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Ibcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm, &(rep_a_data->requests[i][0]));
}
......@@ -957,13 +957,13 @@ int check_redistribution(int wait_completed) {
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
completed = async_communication_check(MAM_SOURCES, req_completed, req_qty);
local_completed = local_completed && completed;
}
for(i=0; i<rep_a_data->entries; i++) {
req_completed = rep_a_data->requests[i];
req_qty = rep_a_data->request_qty[i];
completed = async_communication_check(MALLEABILITY_NOT_CHILDREN, req_completed, req_qty);
completed = async_communication_check(MAM_SOURCES, req_completed, req_qty);
local_completed = local_completed && completed;
}
......@@ -1017,13 +1017,13 @@ int end_redistribution() {
size_t i;
int local_state;
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_NOT_CHILDREN);
comm_data_info(rep_s_data, dist_s_data, MAM_SOURCES);
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
#if USE_MAL_BARRIERS
MPI_Barrier(mall->intercomm);
#endif
mall_conf->times->sync_start = MPI_Wtime();
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
send_data(mall->numC, dist_s_data, MAM_USE_SYNCHRONOUS);
for(i=0; i<rep_s_data->entries; i++) {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], rep_s_data->types[i], mall->root_collectives, mall->intercomm);
......@@ -1111,7 +1111,7 @@ int thread_check(int wait_completed) {
void* thread_async_work() {
size_t i;
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
send_data(mall->numC, dist_a_data, MAM_USE_SYNCHRONOUS);
for(i=0; i<rep_a_data->entries; i++) {
MPI_Bcast(rep_a_data->arrays[i], rep_a_data->qty[i], rep_a_data->types[i], mall->root_collectives, mall->intercomm);
}
......
......@@ -85,13 +85,13 @@ int MAM_Is_internode_group() {
MPI_Allreduce(&name_len, &max_name_len, 1, MPI_INT, MPI_MAX, mall->original_comm);
my_host[max_name_len] = '\0';
max_name_len++; // Len does not consider terminating character
if(myId == MALLEABILITY_ROOT) {
if(myId == MAM_ROOT) {
all_hosts = (char *) malloc(numP * max_name_len * sizeof(char));
}
//FIXME Should be a Gatherv as each host could have unitialised chars between name_len and max_name_len
MPI_Gather(my_host, max_name_len, MPI_CHAR, all_hosts, max_name_len, MPI_CHAR, MALLEABILITY_ROOT, mall->original_comm);
MPI_Gather(my_host, max_name_len, MPI_CHAR, all_hosts, max_name_len, MPI_CHAR, MAM_ROOT, mall->original_comm);
if(myId == MALLEABILITY_ROOT) {
if(myId == MAM_ROOT) {
for (i = 1; i < numP; i++) {
tested_host = all_hosts + (i * max_name_len);
if (strcmp(my_host, tested_host) != 0) {
......@@ -101,7 +101,7 @@ int MAM_Is_internode_group() {
}
free(all_hosts);
}
MPI_Bcast(&unique_count, 1, MPI_INT, MALLEABILITY_ROOT, mall->original_comm);
MPI_Bcast(&unique_count, 1, MPI_INT, MAM_ROOT, mall->original_comm);
free(my_host);
return unique_count;
}
......
......@@ -26,9 +26,9 @@ void add_data(void *data, size_t total_qty, MPI_Datatype type, size_t request_qt
size_t i;
if(data_struct->entries == 0) {
init_malleability_data_struct(data_struct, MALLEABILITY_INIT_DATA_QTY);
init_malleability_data_struct(data_struct, MAM_TYPES_INIT_DATA_QTY);
} else if(data_struct->entries == data_struct->max_entries) {
realloc_malleability_data_struct(data_struct, MALLEABILITY_INIT_DATA_QTY);
realloc_malleability_data_struct(data_struct, MAM_TYPES_INIT_DATA_QTY);
}
data_struct->qty[data_struct->entries] = total_qty;
......
......@@ -8,7 +8,7 @@
#include <sys/stat.h>
#include "MAM_Constants.h"
#define MALLEABILITY_INIT_DATA_QTY 100
#define MAM_TYPES_INIT_DATA_QTY 100
typedef struct {
size_t entries; // Indica numero de vectores a comunicar (replicated data)
......
......@@ -50,7 +50,7 @@ void MAM_Zombies_update() {
MAM_I_zombies_collect(new_zombies);
MAM_I_zombies_split();
MAM_I_zombies_suspend();
if(myId == MALLEABILITY_ROOT) zombies_qty += new_zombies;
if(myId == MAM_ROOT) zombies_qty += new_zombies;
}
}
......@@ -66,11 +66,11 @@ void MAM_I_zombies_collect(int new_zombies) {
pids_displs = (int *) malloc(numP * sizeof(int));
#if USE_MAL_DEBUG > 2
if(myId == MALLEABILITY_ROOT){ DEBUG_FUNC("Collecting zombies", mall->myId, mall->numP); } fflush(stdout);
if(myId == MAM_ROOT){ DEBUG_FUNC("Collecting zombies", mall->myId, mall->numP); } fflush(stdout);
#endif
count = mall->zombie;
if(myId == MALLEABILITY_ROOT) {
if(myId == MAM_ROOT) {
active = numP - new_zombies;
for(i=0; i < active; i++) {
pids_counts[i] = 0;
......@@ -81,7 +81,7 @@ void MAM_I_zombies_collect(int new_zombies) {
pids_displs[i] = (pids_displs[i-1] + 1) + zombies_qty;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, MALLEABILITY_ROOT, mall->original_comm);
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, MAM_ROOT, mall->original_comm);
free(pids_counts);
free(pids_displs);
}
......
......@@ -106,10 +106,10 @@ void multiple_strat_parents(Spawn_data spawn_data, MPI_Comm comm, MPI_Comm *inte
if(mall->myId == mall->root) {
port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char));
tag = MAM_TAG_STRAT_MULTIPLE_FIRST;
MPI_Send(&spawn_data.total_spawns, 1, MPI_INT, MALLEABILITY_ROOT, tag, intercomms[0]);
MPI_Send(&spawn_data.total_spawns, 1, MPI_INT, MAM_ROOT, tag, intercomms[0]);
MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MPI_ANY_SOURCE, tag, intercomms[0], MPI_STATUS_IGNORE);
for(i=1; i<spawn_data.total_spawns; i++) {
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MALLEABILITY_ROOT, tag+i, intercomms[i]);
MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, MAM_ROOT, tag+i, intercomms[i]);
MPI_Recv(&aux, 1, MPI_CHAR, MPI_ANY_SOURCE, MAM_TAG_STRAT_MULTIPLE_FIRST, intercomms[0], MPI_STATUS_IGNORE);
}
} else { port_name = malloc(1); }
......
......@@ -140,7 +140,7 @@ void unset_spawn_postpone_flag(int outside_state) {
void malleability_connect_children(MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
MAM_Comm_main_structures(*parents, MALLEABILITY_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
MAM_Comm_main_structures(*parents, MAM_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
spawn_data->initial_qty = mall->num_parents;
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
......@@ -153,7 +153,7 @@ void malleability_connect_children(MPI_Comm *parents) {
spawn_data->spawn_qty = spawn_data->target_qty;
baseline(*spawn_data, parents);
if(!spawn_data->spawn_is_intercomm) {
intracomm_strategy(MALLEABILITY_CHILDREN, parents);
intracomm_strategy(MAM_TARGETS, parents);
}
break;
case MALL_SPAWN_MERGE:
......@@ -248,7 +248,7 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
case MALL_SPAWN_BASELINE:
local_state = baseline(*spawn_data, child);
if(!spawn_data->spawn_is_intercomm) {
local_state = intracomm_strategy(MALLEABILITY_NOT_CHILDREN, child);
local_state = intracomm_strategy(MAM_SOURCES, child);
}
break;
case MALL_SPAWN_MERGE:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment