Commit 2639ab13 authored by Iker Martín Álvarez's avatar Iker Martín Álvarez
Browse files

Merge branch 'dev' into 'master'

New version of Proteo

See merge request martini/malleability_benchmark!6
parents 26305fac e83b5922
#define _GNU_SOURCE
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sched.h>
#include <mpi.h>
#include "MAM_RMS.h"
#include "MAM_DataStructures.h"
#if MAM_USE_SLURM
#include <slurm/slurm.h>
int MAM_I_slurm_getenv_hosts_info();
int MAM_I_slurm_getjob_hosts_info();
#endif
int MAM_I_get_hosts_info();
int GetCPUCount();
void MAM_check_hosts() {
int not_filled = 1;
#if MAM_USE_SLURM
not_filled = MAM_I_slurm_getjob_hosts_info();
if(not_filled) {
#if MAM_DEBUG >= 2
DEBUG_FUNC("WARNING - RMS info retriever failed with slurm functions. Trying with ENV variables", mall->myId, mall->numP);
#endif
if(mall->nodelist != NULL) {
free(mall->nodelist);
mall->nodelist = NULL;
}
not_filled = MAM_I_slurm_getenv_hosts_info();
}
#endif
if(not_filled) {
if(mall->nodelist != NULL) {
free(mall->nodelist);
mall->nodelist = NULL;
}
not_filled = MAM_I_get_hosts_info();
}
if(not_filled) {
if(mall->myId == mall->root) printf("MAM FATAL ERROR: It has not been possible to obtain the nodelist\n");
fflush(stdout);
MPI_Abort(mall->comm, -50);
}
#if MAM_DEBUG >= 2
if(mall->myId == mall->root) {
DEBUG_FUNC("Obtained Nodelist", mall->myId, mall->numP);
printf("NODELIST: %s\nNODE_COUNT: %d NUM_CPUS_PER_NODE: %d\n", mall->nodelist, mall->num_nodes, mall->num_cpus);
fflush(stdout);
}
#endif
}
/*
* @brief Get if a group of processes uses an internode comunicator
*
* This function checks the physical distribution of all ranks in the
* original communicator passed to MaM. If all of them reside in the
* same host, false is returned. True is returned otherwise.
*
* @return Integer indicating if more than one node is used by the
* original communicator (>0) or only one (0).
*/
int MAM_Is_internode_group() {
int i, name_len, max_name_len, unique_count;
int myId, numP;
char *my_host, *all_hosts, *tested_host;
MPI_Comm_rank(mall->original_comm, &myId);
MPI_Comm_size(mall->original_comm, &numP);
unique_count = 0; //First node is not counted
if(numP == 1) return unique_count;
all_hosts = NULL;
my_host = (char *) malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
MPI_Get_processor_name(my_host, &name_len);
MPI_Allreduce(&name_len, &max_name_len, 1, MPI_INT, MPI_MAX, mall->original_comm);
my_host[max_name_len] = '\0';
max_name_len++; // Len does not consider terminating character
if(myId == MAM_ROOT) {
all_hosts = (char *) malloc(numP * max_name_len * sizeof(char));
}
//FIXME Should be a Gatherv as each host could have unitialised chars between name_len and max_name_len
MPI_Gather(my_host, max_name_len, MPI_CHAR, all_hosts, max_name_len, MPI_CHAR, MAM_ROOT, mall->original_comm);
if(myId == MAM_ROOT) {
for (i = 1; i < numP; i++) {
tested_host = all_hosts + (i * max_name_len);
if (strcmp(my_host, tested_host) != 0) {
unique_count++;
break;
}
}
free(all_hosts);
}
MPI_Bcast(&unique_count, 1, MPI_INT, MAM_ROOT, mall->original_comm);
free(my_host);
return unique_count;
}
/*
* TODO
* FIXME Does not consider heterogenous machines for num_cpus
* FIXME Always returns 0... -- Perform error checking?
*/
int MAM_I_get_hosts_info() {
int i, j, name_len, max_name_len, unique_count, *unique_hosts;
char *my_host, *all_hosts, *confirmed_host, *tested_host;
all_hosts = NULL;
my_host = (char *) malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
MPI_Get_processor_name(my_host, &name_len);
MPI_Allreduce(&name_len, &max_name_len, 1, MPI_INT, MPI_MAX, mall->comm);
my_host[max_name_len] = '\0';
max_name_len++; // Len does not consider terminating character
if(mall->myId == mall->root) {
all_hosts = (char *) malloc(mall->numP * max_name_len * sizeof(char));
unique_hosts = (int *) malloc(mall->numP * sizeof(int));
unique_hosts[0] = 0; //First host will always be unique
unique_count = 1;
}
//FIXME Should be a Gatherv as each host could have unitialised chars between name_len and max_name_len
MPI_Gather(my_host, max_name_len, MPI_CHAR, all_hosts, max_name_len, MPI_CHAR, mall->root, mall->comm);
if(mall->myId == mall->root) {
for (i = 1; i < mall->numP; i++) {
for (j = 0; j < unique_count; j++) {
tested_host = all_hosts + (i * max_name_len);
confirmed_host = all_hosts + (unique_hosts[j] * max_name_len);
if (strcmp(tested_host, confirmed_host) != 0) {
unique_hosts[unique_count] = i;
unique_count++;
break;
}
}
}
mall->num_nodes = unique_count;
mall->num_cpus = GetCPUCount();
mall->nodelist_len = unique_count*max_name_len;
mall->nodelist = (char *) malloc(mall->nodelist_len * sizeof(char));
strcpy(mall->nodelist, ""); //FIXME Strcat can be very inneficient...
for (i = 0; i < unique_count; i++) {
confirmed_host = all_hosts + (unique_hosts[i] * max_name_len);
strcat(mall->nodelist, confirmed_host);
if (i < unique_count - 1) {
strcat(mall->nodelist, ",");
}
}
free(all_hosts);
free(unique_hosts);
}
free(my_host);
return 0;
}
/*
* @brief Get the total number of CPUs available to the process.
*
* This function uses sched_getaffinity to obtain the CPU affinity of the current process
* and counts the number of CPUs in the affinity set. It adjusts the loop based on the
* maximum number of CPUs allowed on the system.
*
* @return The total number of CPUs available to the process.
*
* Code obtained from: https://stackoverflow.com/questions/4586405/how-to-get-the-number-of-cpus-in-linux-using-c
* The code has been slightly modified.
*/
int GetCPUCount() {
cpu_set_t cs;
CPU_ZERO(&cs);
sched_getaffinity(0, sizeof(cs), &cs);
int count = 0;
int max_cpus = sysconf(_SC_NPROCESSORS_ONLN);
for (int i = 0; i < max_cpus; i++) {
if (CPU_ISSET(i, &cs)) {
count++;
} else {
break;
}
}
return count;
}
#if MAM_USE_SLURM
/*
* TODO
*/
int MAM_I_slurm_getenv_hosts_info() {
char *tmp = NULL, *tmp_copy, *token;
int cpus, count;
//int i, *cpus_counts, *nodes_counts, *aux;
tmp = getenv("SLURM_JOB_NUM_NODES");
if(tmp == NULL) return 1;
mall->num_nodes = atoi(tmp);
tmp = NULL;
tmp = getenv("SLURM_JOB_NODELIST");
if(tmp == NULL) return 1;
mall->nodelist_len = strlen(tmp)+1;
mall->nodelist = (char *) malloc(mall->nodelist_len * sizeof(char));
strcpy(mall->nodelist, tmp);
tmp = NULL;
//EXAMPLE - SLURM_JOB_CPUS_PER_NODE='72(x2),36'
//It indicates two nodes have 72 CPUs each and third has 36 cpus
tmp = getenv("SLURM_JOB_CPUS_PER_NODE");
if(tmp == NULL) return 1;
tmp_copy = (char *) malloc((strlen(tmp)+1) * sizeof(char));
strcpy(tmp_copy, tmp);
token = strtok(tmp_copy, ",");
//TODO When MaM considers heteregenous allocations, these will be needed instead of num_cpus.
//cpus_counts = (int *) malloc(mall->num_nodes * sizeof(int));
//nodes_counts = (int *) malloc(mall->num_nodes * sizeof(int));
//i = 0;
mall->num_cpus = 0;
while (token != NULL) {
// If actual token contains only one node, the second portion
// does not appear and sscanf does not modify "count"
// First portion --> "%d"
// Second portion -> "(x%d)"
count = 1;
if (sscanf(token, "%d(x%d)", &cpus, &count) >= 1) {
mall->num_cpus = cpus; // num_cpus stores the amount of cores per cpu
//cpus_per_node[i] = cpus;
//nodes_count[i] = count;
//i++;
}
token = strtok(NULL, ",");
}
/*
if(i < mall->num_nodes) {
aux = (int *) realloc(cpus_per_node, i * sizeof(int));
if(cpus_per_node != aux && cpus_per_node != NULL) free(cpus_per_node);
cpus_per_node = aux;
aux = (int *) realloc(nodes_counts, i * sizeof(int));
if(nodes_count != aux && nodes_count != NULL) free(nodes_count);
nodes_count = aux;
}
*/
free(tmp_copy);
return 0;
}
/*
* TODO
* FIXME Does not consider heterogenous machines
*/
int MAM_I_slurm_getjob_hosts_info() {
int jobId, err;
char *tmp = NULL;
job_info_msg_t *j_info;
slurm_job_info_t last_record;
tmp = getenv("SLURM_JOB_ID");
if(tmp == NULL) return 1;
jobId = atoi(tmp);
err = slurm_load_job(&j_info, jobId, 1); // FIXME Valgrind Not freed
if(err) return err;
last_record = j_info->job_array[j_info->record_count - 1];
mall->num_nodes = last_record.num_nodes;
mall->num_cpus = last_record.num_cpus / last_record.num_nodes;
mall->nodelist_len = strlen(last_record.nodes)+1;
mall->nodelist = (char *) malloc(mall->nodelist_len * sizeof(char));
strcpy(mall->nodelist, last_record.nodes);
slurm_free_job_info_msg(j_info);
return 0;
}
#endif
//TODO REFACTOR PARA CUANDO SE COMUNIQUE CON RMS
// Get Slurm job info
//int jobId;
//char *tmp;
//job_info_msg_t *j_info;
//slurm_job_info_t last_record;
//tmp = getenv("SLURM_JOB_ID");
//jobId = atoi(tmp);
//slurm_load_job(&j_info, jobId, 1);
//last_record = j_info->job_array[j_info->record_count - 1];
// Free JOB INFO
//slurm_free_job_info_msg(j_info);
#ifndef MAM_RMS_H
#define MAM_RMS_H
void MAM_check_hosts();
int MAM_Is_internode_group();
#endif
#include "MAM_Times.h"
#include "MAM_DataStructures.h"
void def_malleability_times(MPI_Datatype *new_type);
void init_malleability_times() {
#if MAM_DEBUG >= 4
DEBUG_FUNC("Initializing recording structure", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(mall->comm);
#endif
mall_conf->times = (malleability_times_t *) malloc(sizeof(malleability_times_t));
if(mall_conf->times == NULL) {
perror("Error al crear la estructura de tiempos interna para maleabilidad\n");
MPI_Abort(MPI_COMM_WORLD, -5);
}
reset_malleability_times();
def_malleability_times(&mall_conf->times->times_type);
#if MAM_DEBUG >= 4
DEBUG_FUNC("Initialized recording structure", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(mall->comm);
#endif
}
void reset_malleability_times() {
malleability_times_t *times = mall_conf->times;
times->spawn_start = 0; times->sync_start = 0; times->async_start = 0; times->user_start = 0; times->malleability_start = 0;
times->sync_end = 0; times->async_end = 0; times->user_end = 0; times->malleability_end = 0;
times->spawn_time = 0;
}
void free_malleability_times() {
#if MAM_DEBUG >= 4
DEBUG_FUNC("Freeing recording structure", mall->myId, mall->numP); fflush(stdout);
#endif
if(mall_conf->times != NULL) {
if(mall_conf->times->times_type != MPI_DATATYPE_NULL) {
MPI_Type_free(&mall_conf->times->times_type);
mall_conf->times->times_type = MPI_DATATYPE_NULL;
}
free(mall_conf->times);
}
#if MAM_DEBUG >= 4
DEBUG_FUNC("Freed recording structure", mall->myId, mall->numP); fflush(stdout);
#endif
}
/*
* @brief Returns the times used for the different steps of last reconfiguration.
*
* This function is intended to be called when a reconfiguration has ended.
* It is designed to provide the necessary information for the user to perform data redistribution.
*
* Null values can be passed if there is no interest in retreiving particular times
*
* Parameters:
* - double *sp_time: A pointer where the spawn time will be saved.
* - double *sy_time: A pointer where the sychronous data redistribution time will be saved.
* - double *asy_time: A pointer where the asychronous data redistribution time will be saved.
* - double *user_time: A pointer where the user data redistribution time will be saved.
* - double *mall_time: A pointer where the malleability time will be saved.
*/
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *user_time, double *mall_time) {
malleability_times_t *times = mall_conf->times;
if(sp_time != NULL) *sp_time = times->spawn_time;
if(sy_time != NULL) *sy_time = times->sync_end - times->sync_start;
if(asy_time != NULL) *asy_time = times->async_end - times->async_start;
if(user_time != NULL) *user_time = times->user_end - times->user_start;
if(mall_time != NULL) *mall_time = times->malleability_end - times->malleability_start;
}
void malleability_times_broadcast(int root) {
MPI_Bcast(mall_conf->times, 1, mall_conf->times->times_type, root, mall->intercomm);
}
void def_malleability_times(MPI_Datatype *new_type) {
int i, counts = 5;
int blocklengths[counts];
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
blocklengths[0] = blocklengths[1] = blocklengths[2] = blocklengths[3] = blocklengths[4] = 1;
types[0] = types[1] = types[2] = types[3] = types[4] = MPI_DOUBLE;
// Se pasa el vector a traves de la direccion de "mall_conf"
// Rellenar vector displs
MPI_Get_address(mall_conf->times, &dir);
// Obtener direccion base
MPI_Get_address(&(mall_conf->times->spawn_time), &displs[0]);
MPI_Get_address(&(mall_conf->times->sync_start), &displs[1]);
MPI_Get_address(&(mall_conf->times->async_start), &displs[2]);
MPI_Get_address(&(mall_conf->times->user_start), &displs[3]);
MPI_Get_address(&(mall_conf->times->malleability_start), &displs[4]);
for(i=0;i<counts;i++) displs[i] -= dir;
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type);
}
#ifndef MAM_TIMES_H
#define MAM_TIMES_H
#include <mpi.h>
void init_malleability_times();
void reset_malleability_times();
void free_malleability_times();
void malleability_times_broadcast(int root);
void MAM_I_retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *user_time, double *mall_time);
#endif
#ifndef MAM_TIMES_RETRIEVE_H
#define MAM_TIMES_RETRIEVE_H
void MAM_Retrieve_times(double *sp_time, double *sy_time, double *asy_time, double *user_time, double *mall_time);
#endif
#include "malleabilityTypes.h" #include "MAM_Types.h"
#include "MAM_DataStructures.h"
#include "MAM_Configuration.h"
void init_malleability_data_struct(malleability_data_t *data_struct, int size); void init_malleability_data_struct(malleability_data_t *data_struct, size_t size);
void realloc_malleability_data_struct(malleability_data_t *data_struct, int qty_to_add); void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t qty_to_add);
void def_malleability_entries(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type); void def_malleability_entries(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type);
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type); void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type);
...@@ -20,26 +22,58 @@ void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleabilit ...@@ -20,26 +22,58 @@ void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleabilit
* todos los padres. La nueva serie "data" solo representa los datos * todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre. * que tiene este padre.
*/ */
void add_data(void *data, int total_qty, int type, int request_qty, malleability_data_t *data_struct) { void add_data(void *data, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct) {
int i; size_t i;
if(data_struct->entries == 0) { if(data_struct->entries == 0) {
init_malleability_data_struct(data_struct, MALLEABILITY_INIT_DATA_QTY); init_malleability_data_struct(data_struct, MAM_TYPES_INIT_DATA_QTY);
} else if(data_struct->entries == data_struct->max_entries) { } else if(data_struct->entries == data_struct->max_entries) {
realloc_malleability_data_struct(data_struct, MALLEABILITY_INIT_DATA_QTY); realloc_malleability_data_struct(data_struct, MAM_TYPES_INIT_DATA_QTY);
} }
data_struct->qty[data_struct->entries] = total_qty; data_struct->qty[data_struct->entries] = total_qty;
data_struct->types[data_struct->entries] = type; data_struct->types[data_struct->entries] = type;
data_struct->arrays[data_struct->entries] = data; data_struct->arrays[data_struct->entries] = data;
data_struct->request_qty[data_struct->entries] = request_qty;
data_struct->requests[data_struct->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request)); if(request_qty) {
for(i=0; i < request_qty; i++) { data_struct->requests[data_struct->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request));
data_struct->requests[data_struct->entries][i] = MPI_REQUEST_NULL; for(i=0; i < request_qty; i++) {
data_struct->requests[data_struct->entries][i] = MPI_REQUEST_NULL;
}
} }
data_struct->entries+=1; data_struct->entries+=1;
} }
/*
* Modifica en la estructura de datos a comunicar con los hijos
* un set de datos de un total "total_qty" distribuido entre
* todos los padres. La nueva serie "data" solo representa los datos
* que tiene este padre.
*/
void modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct) {
size_t i;
if(data_struct->entries < index) { // Index does not exist
return;
}
if(data_struct->requests[index] != NULL) {
//free(data_struct->requests[index]); TODO Error when trying to free
data_struct->requests[index] = NULL;
}
data_struct->qty[index] = total_qty;
data_struct->types[index] = type;
data_struct->arrays[index] = data;
data_struct->request_qty[index] = request_qty;
if(request_qty) {
data_struct->requests[index] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request));
for(i=0; i < request_qty; i++) {
data_struct->requests[index][i] = MPI_REQUEST_NULL;
}
}
}
/* /*
* Comunicar desde los padres a los hijos las estructuras de datos sincronas o asincronas * Comunicar desde los padres a los hijos las estructuras de datos sincronas o asincronas
...@@ -48,37 +82,42 @@ void add_data(void *data, int total_qty, int type, int request_qty, malleability ...@@ -48,37 +82,42 @@ void add_data(void *data, int total_qty, int type, int request_qty, malleability
* En el argumento "root" todos tienen que indicar quien es el proceso raiz de los padres * En el argumento "root" todos tienen que indicar quien es el proceso raiz de los padres
* unicamente. * unicamente.
*/ */
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm) { void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group) {
int i, rootBcast = MPI_PROC_NULL; int type_size;
size_t i, j;
MPI_Datatype entries_type, struct_type; MPI_Datatype entries_type, struct_type;
if(is_children_group) {
rootBcast = root;
} else {
if(myId == root) rootBcast = MPI_ROOT;
}
// Mandar primero numero de entradas // Mandar primero numero de entradas
def_malleability_entries(data_struct_dist, data_struct_rep, &entries_type); def_malleability_entries(data_struct_dist, data_struct_rep, &entries_type);
MPI_Bcast(MPI_BOTTOM, 1, entries_type, rootBcast, intercomm); MPI_Bcast(MPI_BOTTOM, 1, entries_type, mall->root_collectives, mall->intercomm);
if(is_children_group) { if(is_children_group) {
if(data_struct_rep->entries != 0) init_malleability_data_struct(data_struct_rep, data_struct_rep->entries); if(data_struct_rep->entries != 0) { init_malleability_data_struct(data_struct_rep, data_struct_rep->entries); }
if(data_struct_dist->entries != 0) init_malleability_data_struct(data_struct_dist, data_struct_dist->entries); if(data_struct_dist->entries != 0) { init_malleability_data_struct(data_struct_dist, data_struct_dist->entries); } //FIXME Valgrind not freed
} }
def_malleability_qty_type(data_struct_dist, data_struct_rep, &struct_type); def_malleability_qty_type(data_struct_dist, data_struct_rep, &struct_type);
MPI_Bcast(MPI_BOTTOM, 1, struct_type, rootBcast, intercomm); //FIXME Doy error MPI_Bcast(MPI_BOTTOM, 1, struct_type, mall->root_collectives, mall->intercomm);
if(is_children_group) { if(is_children_group) {
//data_struct->requests[data_struct->entries] = (MPI_Request *) malloc(request_qty * sizeof(MPI_Request)); FIXME Crear los requests?
//data_struct->requests[data_struct->entries][i] = MPI_REQUEST_NULL;
for(i=0; i < data_struct_rep->entries; i++) { for(i=0; i < data_struct_rep->entries; i++) {
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int MPI_Type_size(data_struct_rep->types[i], &type_size);
data_struct_rep->arrays[i] = (void *) malloc(data_struct_rep->qty[i] * (size_t) type_size); //FIXME This memory is not freed -- How should be done?
if(data_struct_rep->request_qty[i]) {
data_struct_rep->requests[i] = (MPI_Request *) malloc(data_struct_rep->request_qty[i] * sizeof(MPI_Request));
for(j=0; j < data_struct_rep->request_qty[i]; j++) {
data_struct_rep->requests[i][j] = MPI_REQUEST_NULL;
}
}
} }
for(i=0; i < data_struct_dist->entries; i++) { for(i=0; i < data_struct_dist->entries; i++) {
data_struct_dist->arrays[i] = (void *) malloc(data_struct_dist->qty[i] * sizeof(int)); //TODO Tener en cuenta que no siempre es int data_struct_dist->arrays[i] = (void *) NULL; // TODO Se podria inicializar aqui?
if(data_struct_dist->request_qty[i]) {
data_struct_dist->requests[i] = (MPI_Request *) malloc(data_struct_dist->request_qty[i] * sizeof(MPI_Request));
for(j=0; j < data_struct_dist->request_qty[i]; j++) {
data_struct_dist->requests[i][j] = MPI_REQUEST_NULL;
}
}
} }
} }
...@@ -97,14 +136,22 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d ...@@ -97,14 +136,22 @@ void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *d
* caracteristicas de localización y uso. Se inicializa para utilizar hasta * caracteristicas de localización y uso. Se inicializa para utilizar hasta
* "size" elementos. * "size" elementos.
*/ */
void init_malleability_data_struct(malleability_data_t *data_struct, int size) { void init_malleability_data_struct(malleability_data_t *data_struct, size_t size) {
size_t i;
data_struct->max_entries = size; data_struct->max_entries = size;
data_struct->qty = (int *) malloc(size * sizeof(int)); data_struct->qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->types = (int *) malloc(size * sizeof(int)); data_struct->types = (MPI_Datatype *) malloc(size * sizeof(MPI_Datatype));
data_struct->request_qty = (size_t *) malloc(size * sizeof(size_t));
data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *)); data_struct->requests = (MPI_Request **) malloc(size * sizeof(MPI_Request *));
data_struct->windows = (MPI_Win *) malloc(size * sizeof(MPI_Win));
data_struct->arrays = (void **) malloc(size * sizeof(void *)); data_struct->arrays = (void **) malloc(size * sizeof(void *));
data_struct->request_ibarrier = MPI_REQUEST_NULL; for(i=0; i<size; i++) { //calloc and memset does not ensure a NULL value
data_struct->requests[i] = NULL;
data_struct->windows[i] = MPI_WIN_NULL;
data_struct->arrays[i] = NULL;
}
} }
/* /*
...@@ -112,43 +159,83 @@ void init_malleability_data_struct(malleability_data_t *data_struct, int size) { ...@@ -112,43 +159,83 @@ void init_malleability_data_struct(malleability_data_t *data_struct, int size) {
* caracteristicas de localización y uso. Se anyaden "size" entradas nuevas * caracteristicas de localización y uso. Se anyaden "size" entradas nuevas
* a las ya existentes. * a las ya existentes.
*/ */
void realloc_malleability_data_struct(malleability_data_t *data_struct, int qty_to_add) { void realloc_malleability_data_struct(malleability_data_t *data_struct, size_t qty_to_add) {
int *qty_aux, *types_aux, needed; size_t i, needed, *qty_aux, *request_qty_aux;
MPI_Datatype *types_aux;
MPI_Win *windows_aux;
MPI_Request **requests_aux; MPI_Request **requests_aux;
void **arrays_aux; void **arrays_aux;
needed = data_struct->max_entries + qty_to_add; needed = data_struct->max_entries + qty_to_add;
qty_aux = (int *) realloc(data_struct->qty, needed * sizeof(int)); qty_aux = (size_t *) realloc(data_struct->qty, needed * sizeof(int));
types_aux = (int *) realloc(data_struct->types, needed * sizeof(int)); types_aux = (MPI_Datatype *) realloc(data_struct->types, needed * sizeof(MPI_Datatype));
request_qty_aux = (size_t *) realloc(data_struct->request_qty, needed * sizeof(int));
requests_aux = (MPI_Request **) realloc(data_struct->requests, needed * sizeof(MPI_Request *)); requests_aux = (MPI_Request **) realloc(data_struct->requests, needed * sizeof(MPI_Request *));
windows_aux = (MPI_Win *) realloc(data_struct->windows, needed * sizeof(MPI_Win));
arrays_aux = (void **) realloc(data_struct->arrays, needed * sizeof(void *)); arrays_aux = (void **) realloc(data_struct->arrays, needed * sizeof(void *));
if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL) { if(qty_aux == NULL || arrays_aux == NULL || requests_aux == NULL || types_aux == NULL || request_qty_aux == NULL || windows_aux == NULL) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria constante de datos a redistribuir/comunicar\n"); fprintf(stderr, "Fatal error - No se ha podido realojar la memoria constante de datos a redistribuir/comunicar\n");
MPI_Abort(MPI_COMM_WORLD, 1); MPI_Abort(MPI_COMM_WORLD, 1);
} }
for(i=data_struct->max_entries; i<needed; i++) { //realloc does not ensure a NULL value
requests_aux[i] = NULL;
windows_aux[i] = MPI_WIN_NULL;
arrays_aux[i] = NULL;
}
// Check if old array can be freed
if(data_struct->qty != qty_aux && data_struct->qty != NULL) free(data_struct->qty);
if(data_struct->types != types_aux && data_struct->types != NULL) free(data_struct->types);
if(data_struct->request_qty != request_qty_aux && data_struct->request_qty != NULL) free(data_struct->request_qty);
if(data_struct->requests != requests_aux && data_struct->requests != NULL) free(data_struct->requests);
if(data_struct->windows != windows_aux && data_struct->windows != NULL) free(data_struct->windows);
if(data_struct->arrays != arrays_aux && data_struct->arrays != NULL) free(data_struct->arrays);
data_struct->qty = qty_aux; data_struct->qty = qty_aux;
data_struct->types = types_aux; data_struct->types = types_aux;
data_struct->request_qty = request_qty_aux;
data_struct->requests = requests_aux; data_struct->requests = requests_aux;
data_struct->windows = windows_aux;
data_struct->arrays = arrays_aux; data_struct->arrays = arrays_aux;
data_struct->max_entries = needed; data_struct->max_entries = needed;
} }
void free_malleability_data_struct(malleability_data_t *data_struct) { void free_malleability_data_struct(malleability_data_t *data_struct) {
int i, max; size_t i, j, max;
max = data_struct->entries; max = data_struct->entries;
if(max != 0) { if(max != 0) {
for(i=0; i<max; i++) { if(data_struct->qty != NULL) {
//free(data_struct->arrays[i]); //FIXME Valores alojados con 1 elemento no se liberan? free(data_struct->qty);
//free(data_struct->requests[i]); //TODO Plantear como crearlos }
if(data_struct->types != NULL) {
free(data_struct->types);
}
if(data_struct->requests != NULL && data_struct->request_qty != NULL) {
for(i=0; i<max; i++) {
if(data_struct->requests[i] != NULL) {
for(j=0; j<data_struct->request_qty[i]; j++) {
if(data_struct->requests[i][j] != MPI_REQUEST_NULL) {
MPI_Request_free(&(data_struct->requests[i][j]));
data_struct->requests[i][j] = MPI_REQUEST_NULL;
}
}
free(data_struct->requests[i]);
}
}
free(data_struct->request_qty);
free(data_struct->requests);
}
if(data_struct->windows != NULL) {
free(data_struct->windows);
} }
free(data_struct->qty); if(data_struct->arrays != NULL) {
free(data_struct->types); free(data_struct->arrays);
free(data_struct->requests); }
free(data_struct->arrays);
} }
} }
...@@ -166,10 +253,11 @@ void def_malleability_entries(malleability_data_t *data_struct_rep, malleability ...@@ -166,10 +253,11 @@ void def_malleability_entries(malleability_data_t *data_struct_rep, malleability
int counts = 2; int counts = 2;
int blocklengths[counts]; int blocklengths[counts];
MPI_Aint displs[counts]; MPI_Aint displs[counts];
MPI_Datatype types[counts]; MPI_Datatype types[counts], type_size_t;
MPI_Type_match_size(MPI_TYPECLASS_INTEGER, sizeof(size_t), &type_size_t);
blocklengths[0] = blocklengths[1] = 1; blocklengths[0] = blocklengths[1] = 1;
types[0] = types[1] = MPI_INT; types[0] = types[1] = type_size_t;
// Obtener direccion base // Obtener direccion base
MPI_Get_address(&(data_struct_rep->entries), &displs[0]); MPI_Get_address(&(data_struct_rep->entries), &displs[0]);
...@@ -187,20 +275,27 @@ void def_malleability_entries(malleability_data_t *data_struct_rep, malleability ...@@ -187,20 +275,27 @@ void def_malleability_entries(malleability_data_t *data_struct_rep, malleability
* TODO Refactor? * TODO Refactor?
*/ */
void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) { void def_malleability_qty_type(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, MPI_Datatype *new_type) {
int counts = 4; int counts = 6;
int blocklengths[counts]; int blocklengths[counts];
MPI_Aint displs[counts]; MPI_Aint displs[counts];
MPI_Datatype types[counts]; MPI_Datatype types[counts], type_size_t;
MPI_Type_match_size(MPI_TYPECLASS_INTEGER, sizeof(size_t), &type_size_t);
types[0] = types[1] = types[2] = types[3] = MPI_INT; types[0] = types[1] = types[3] = types[4] = type_size_t;
blocklengths[0] = blocklengths[1] = data_struct_rep->entries; types[2] = types[5] = MPI_INT;
blocklengths[2] = blocklengths[3] = data_struct_dist->entries; blocklengths[0] = blocklengths[1] = blocklengths[2] = data_struct_rep->entries;
blocklengths[3] = blocklengths[4] = blocklengths[5] = data_struct_dist->entries;
MPI_Get_address((data_struct_rep->qty), &displs[0]); MPI_Get_address((data_struct_rep->qty), &displs[0]);
MPI_Get_address((data_struct_rep->types), &displs[1]); MPI_Get_address((data_struct_rep->request_qty), &displs[1]);
MPI_Get_address((data_struct_dist->qty), &displs[2]); MPI_Get_address((data_struct_rep->types), &displs[2]); // MPI_Datatype uses typedef int to be declared
MPI_Get_address((data_struct_dist->types), &displs[3]); MPI_Get_address((data_struct_dist->qty), &displs[3]);
MPI_Get_address((data_struct_dist->request_qty), &displs[4]);
MPI_Get_address((data_struct_dist->types), &displs[5]); // MPI_Datatype uses typedef int to be declared
MPI_Type_create_struct(counts, blocklengths, displs, types, new_type); MPI_Type_create_struct(counts, blocklengths, displs, types, new_type);
MPI_Type_commit(new_type); MPI_Type_commit(new_type);
} }
#ifndef MAM_TYPES_H
#define MAM_TYPES_H
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include "malleabilityStates.h" #include "MAM_Constants.h"
#define MALLEABILITY_INIT_DATA_QTY 100 #define MAM_TYPES_INIT_DATA_QTY 100
typedef struct { typedef struct {
int entries; // Indica numero de vectores a comunicar (replicated data) size_t entries; // Indica numero de vectores a comunicar (replicated data)
int max_entries; size_t max_entries;
MPI_Request request_ibarrier; // Request para indicar que los padres esperan a que los hijos terminen de recibir size_t *qty; // Indica numero de elementos en cada subvector de sync_array
int *qty; // Indica numero de elementos en cada subvector de sync_array MPI_Datatype *types;
int *types;
// Vector de vectores de request. En cada elemento superior se indican los requests a comprobar para dar por finalizada // Vector de vectores de request. En cada elemento superior se indican los requests a comprobar para dar por finalizada
// la comunicacion de ese dato // la comunicacion de ese dato
size_t *request_qty;
MPI_Request **requests; MPI_Request **requests;
MPI_Win *windows;
void **arrays; // Cada subvector es una serie de datos a comunicar void **arrays; // Cada subvector es una serie de datos a comunicar
} malleability_data_t; } malleability_data_t;
void add_data(void *data, int total_qty, int type, int request_qty, malleability_data_t *data_struct); void add_data(void *data, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group, int myId, int root, MPI_Comm intercomm); void modify_data(void *data, size_t index, size_t total_qty, MPI_Datatype type, size_t request_qty, malleability_data_t *data_struct);
void comm_data_info(malleability_data_t *data_struct_rep, malleability_data_t *data_struct_dist, int is_children_group);
void free_malleability_data_struct(malleability_data_t *data_struct); void free_malleability_data_struct(malleability_data_t *data_struct);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <mpi.h>
#include <signal.h>
#include "MAM_Zombies.h"
#include "MAM_DataStructures.h"
#define PIDS_QTY 320
//TODO Add option to allow the usage of signal USR2 or not.
//This code asumes ROOT of each group will be the last to be zombified
//
void MAM_I_zombies_collect(int new_zombies);
void MAM_I_zombies_split();
void MAM_I_zombies_suspend();
int MAM_I_zombies_awake();
void zombies_handler_usr2() {}
int *pids = NULL;
int zombies_qty = 0;
void MAM_Zombies_service_init() {
zombies_qty = 0;
pids = malloc(PIDS_QTY * sizeof(int));
for(int i=0; i<PIDS_QTY; i++) {
pids[i] = 0;
}
}
int MAM_Zombies_service_free() {
int request_abort = MAM_I_zombies_awake();
free(pids);
return request_abort;
}
void MAM_Zombies_update() {
int myId, numP, new_zombies;
MPI_Comm_rank(mall->original_comm, &myId);
MPI_Comm_size(mall->original_comm, &numP);
MPI_Allreduce(&mall->zombie, &new_zombies, 1, MPI_INT, MPI_SUM, mall->original_comm);
if(new_zombies && new_zombies < numP) {
MAM_I_zombies_collect(new_zombies);
MAM_I_zombies_split();
MAM_I_zombies_suspend();
if(myId == MAM_ROOT) zombies_qty += new_zombies;
}
}
void MAM_I_zombies_collect(int new_zombies) {
int pid = getpid();
int *pids_counts, *pids_displs;
int i, count, active;
int myId, numP;
MPI_Comm_rank(mall->original_comm, &myId);
MPI_Comm_size(mall->original_comm, &numP);
pids_counts = (int *) malloc(numP * sizeof(int));
pids_displs = (int *) malloc(numP * sizeof(int));
#if MAM_DEBUG > 2
if(myId == MAM_ROOT){ DEBUG_FUNC("Collecting zombies", mall->myId, mall->numP); } fflush(stdout);
#endif
count = mall->zombie;
if(myId == MAM_ROOT) {
active = numP - new_zombies;
for(i=0; i < active; i++) {
pids_counts[i] = 0;
}
pids_displs[i-1] = -1;
for(; i< active+new_zombies; i++) {
pids_counts[i] = 1;
pids_displs[i] = (pids_displs[i-1] + 1) + zombies_qty;
}
}
MPI_Gatherv(&pid, count, MPI_INT, pids, pids_counts, pids_displs, MPI_INT, MAM_ROOT, mall->original_comm);
free(pids_counts);
free(pids_displs);
}
void MAM_I_zombies_split() {
int myId, color;
MPI_Comm new_original_comm;
MPI_Comm_rank(mall->original_comm, &myId);
color = mall->zombie ? MPI_UNDEFINED : 1;
MPI_Comm_split(mall->original_comm, color, myId, &new_original_comm);
if(mall->original_comm != MPI_COMM_WORLD) MPI_Comm_free(&mall->original_comm);
if(new_original_comm != MPI_COMM_NULL) MPI_Comm_set_name(new_original_comm, "MAM_ORIGINAL");
mall->original_comm = new_original_comm;
}
void MAM_I_zombies_suspend() {
struct sigaction act;
if(!mall->zombie) return;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
act.sa_handler=zombies_handler_usr2;
sigaction(SIGUSR2, &act, NULL);
sigset_t set;
sigprocmask(SIG_SETMASK,NULL,&set);
sigsuspend(&set);
}
int MAM_I_zombies_awake() {
if(mall->internode_group && zombies_qty) return 1; //Request Abort
for(int i=0; i < zombies_qty; i++) { // Despertar a los zombies
kill(pids[i], SIGUSR2);
}
zombies_qty = 0;
return 0; //Normal termination
}
#ifndef MAM_ZOMBIES_H
#define MAM_ZOMBIES_H
void MAM_Zombies_service_init();
int MAM_Zombies_service_free();
void MAM_Zombies_update();
#endif
CC = gcc
MCC = mpicc
#C_FLAGS_ALL = -Wconversion -Wpedantic
C_FLAGS = -Wall -Wextra -Wshadow -Wfatal-errors
LD_FLAGS = -lm -pthread
MAM_USE_SLURM ?= 0
MAM_USE_BARRIERS ?= 0
MAM_DEBUG ?= 0
DEF = -DMAM_USE_SLURM=$(MAM_USE_SLURM) -DMAM_USE_BARRIERS=$(MAM_USE_BARRIERS) -DMAM_DEBUG=$(MAM_DEBUG)
ifeq ($(MAM_USE_SLURM),1)
LD_FLAGS += -lslurm
endif
ifeq ($(shell test $(MAM_DEBUG) -gt 0; echo $$?),0)
C_FLAGS += -g
endif
# Final library
LIB = libmam.so
# Put all auto generated stuff to this build dir.
BUILD_DIR = ./build
# List of all directories where source files are located
SRCDIRS = . spawn_methods distribution_methods
# List of all .c source files.
C_FILES = $(foreach dire, $(SRCDIRS), $(wildcard $(dire)/*.c))
# All .o files go to build dir.
OBJ = $(C_FILES:%.c=$(BUILD_DIR)/%.o)
# Gcc will create these .d files containing dependencies.
DEP = $(OBJ:%.o=%.d)
# BASIC RULES
.PHONY : clean clear install
all: install
clean:
-rm $(BUILD_DIR)/$(LIB) $(OBJ) $(DEP)
clear:
-rm -rf $(BUILD_DIR)
install: $(LIB)
echo "Done"
# SPECIFIC RULES
# Default target named after the binary.
$(LIB) : $(BUILD_DIR)/$(LIB)
# Actual target of the binary - depends on all .o files.
$(BUILD_DIR)/$(LIB) : $(OBJ)
$(MCC) $(C_FLAGS) $^ -shared -o $@ $(LD_FLAGS)
# Include all .d files
# .d files are used for knowing the dependencies of each source file
-include $(DEP)
# Build target for every single object file.
# The potential dependency on header files is covered
# by calling `-include $(DEP)`.
# The -MMD flags additionaly creates a .d file with
# the same name as the .o file.
$(BUILD_DIR)/%.o : %.c
@mkdir -p $(@D)
$(MCC) $(C_FLAGS) $(DEF) -fpic -MMD -c $< -o $@
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include "block_distribution.h"
#include "Distributed_CommDist.h"
#include "../MAM_Constants.h"
#include "../MAM_Configuration.h"
#include "../MAM_DataStructures.h"
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts);
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty);
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm);
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win);
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
/*
* Reserva memoria para un vector de hasta "qty" elementos.
* Los "qty" elementos se disitribuyen entre los "numP" procesos
* que llaman a esta funcion.
*/
void malloc_comm_array(char **array, int qty, int myId, int numP) {
struct Dist_data dist_data;
get_block_dist(qty, myId, numP, &dist_data);
if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl);
exit(1);
}
/*
int i;
for(i=0; i<dist_data.tamBl; i++) {
(*array)[i] = '!' + i + dist_data.ini;
}
printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
}
//================================================================================
//================================================================================
//========================SYNCHRONOUS FUNCTIONS===================================
//================================================================================
//================================================================================
/*
* Performs a communication to redistribute an array in a block distribution.
* In the redistribution is differenciated parent group from the children and the values each group indicates can be
* different.
*
* - send (IN): Array with the data to send. This data can not be null for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - qty (IN): Sum of elements shared by all processes that will send data.
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - comm (IN): Communicator to use to perform the redistribution.
*
*/
void sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm) {
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
/* PREPARE COMMUNICATION */
prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts);
/* PERFORM COMMUNICATION */
switch (mall_conf->red_method) {
case MAM_RED_RMA_LOCKALL:
case MAM_RED_RMA_LOCK:
if (is_children_group) {
dist_data.tamBl = 0;
} else {
get_block_dist(qty, mall->myId, numO, &dist_data);
}
sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm);
break;
case MAM_RED_POINT:
sync_point2point(send, *recv, datatype, s_counts, r_counts, comm);
break;
case MAM_RED_BASELINE:
default:
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm);
break;
}
freeCounts(&s_counts);
freeCounts(&r_counts);
}
/*
* Performs a series of blocking point2point communications to redistribute an array in a block distribution.
* It should be called after calculating how data should be redistributed.
*
* - send (IN): Array with the data to send. This value can not be NULL for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to
* receive data. If the process receives data and is NULL, the behaviour is undefined.
* - s_counts (IN): Struct which describes how many elements will send this process to each children and
* the displacements.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent
* and the displacements.
* - comm (IN): Communicator to use to perform the redistribution.
*
*/
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
int i, j, init, end, total_sends, datasize;
size_t offset, offset2;
MPI_Request *sends;
MPI_Type_size(datatype, &datasize);
init = s_counts.idI;
end = s_counts.idE;
if(mall_conf->spawn_method == MAM_SPAWN_MERGE && (s_counts.idI == mall->myId || s_counts.idE == mall->myId + 1)) {
offset = s_counts.displs[mall->myId] * datasize;
offset2 = r_counts.displs[mall->myId] * datasize;
memcpy(recv+offset2, send+offset, s_counts.counts[mall->myId]);
if(s_counts.idI == mall->myId) init = s_counts.idI+1;
else end = s_counts.idE-1;
}
total_sends = end - init;
j = 0;
if(total_sends > 0) {
sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
}
for(i=init; i<end; i++) {
sends[j] = MPI_REQUEST_NULL;
offset = s_counts.displs[i] * datasize;
MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
j++;
}
init = r_counts.idI;
end = r_counts.idE;
if(mall_conf->spawn_method == MAM_SPAWN_MERGE) {
if(r_counts.idI == mall->myId) init = r_counts.idI+1;
else if(r_counts.idE == mall->myId + 1) end = r_counts.idE-1;
}
for(i=init; i<end; i++) {
offset = r_counts.displs[i] * datasize;
MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
}
if(total_sends > 0) {
MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
free(sends);
}
}
/*
* Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
* how data should be redistributed
*
* - send (IN): Array with the data to send. This value can be NULL for children.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - tamBl (IN): How many elements are stored in the parameter "send".
* - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
*
* FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get
* A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount
* you need to perform multiples Gets.
* prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000
* prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA 16000
* prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
*/
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm) {
int datasize;
MPI_Win win;
MPI_Type_size(datatype, &datasize);
MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win);
#if MAM_DEBUG >= 3
DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
#endif
switch(mall_conf->red_method) {
case MAM_RED_RMA_LOCKALL:
sync_rma_lockall(recv, datatype, r_counts, win);
break;
case MAM_RED_RMA_LOCK:
sync_rma_lock(recv, datatype, r_counts, win);
break;
}
#if MAM_DEBUG >= 3
DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
#endif
MPI_Win_free(&win);
}
/*
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
int i, target_displs, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
for(i=r_counts.idI; i<r_counts.idE; i++) {
offset = r_counts.displs[i] * datasize;
MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
MPI_Win_unlock(i, win);
target_displs=0;
}
}
/*
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
int i, target_displs, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
for(i=r_counts.idI; i<r_counts.idE; i++) {
offset = r_counts.displs[i] * datasize;
MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
target_displs=0;
}
MPI_Win_unlock_all(win);
}
//================================================================================
//================================================================================
//========================ASYNCHRONOUS FUNCTIONS==================================
//================================================================================
//================================================================================
/*
* Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions.
* In the redistribution is differenciated parent group from the children and the values each group indicates can be
* different.
*
* - send (IN): Array with the data to send. This data can not be null for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - qty (IN): Sum of elements shared by all processes that will send data.
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - comm (IN): Communicator to use to perform the redistribution.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer
* is null or not enough space has been reserved the pointer is allocated/reallocated.
* - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be
* modified to the expected value.
*
*/
void async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
/* PREPARE COMMUNICATION */
prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts);
check_requests(s_counts, r_counts, requests, request_qty); //FIXME Error related to second reconf if Merge Shrink + P2P -->Invalid requests
/* PERFORM COMMUNICATION */
switch(mall_conf->red_method) {
case MAM_RED_RMA_LOCKALL:
case MAM_RED_RMA_LOCK:
if(is_children_group) {
dist_data.tamBl = 0;
} else {
get_block_dist(qty, mall->myId, numO, &dist_data);
}
async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm, *requests, win);
break;
case MAM_RED_POINT:
async_point2point(send, *recv, datatype, s_counts, r_counts, comm, *requests);
break;
case MAM_RED_BASELINE:
default:
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm, &((*requests)[0]));
break;
}
freeCounts(&s_counts);
freeCounts(&r_counts);
}
/*
* Checks if a set of requests have been completed (1) or not (0).
*
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
*/
int async_communication_check(int is_children_group, MPI_Request *requests, size_t request_qty) {
int completed, req_completed, test_err;
size_t i;
completed = 1;
test_err = MPI_SUCCESS;
if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
for(i=0; i<request_qty; i++) {
test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
completed = completed && req_completed;
}
//test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", mall->myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
}
return completed;
}
/*
* Waits until the completion of a set of requests. If the Ibarrier strategy
* is being used, the corresponding ibarrier is posted.
*
* - comm (IN): Communicator to use to confirm finalizations of redistribution
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
*/
void async_communication_wait(MPI_Request *requests, size_t request_qty) {
MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE);
#if MAM_DEBUG >= 3
DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
#endif
}
/*
* Frees Requests/Windows associated to a particular redistribution.
* Should be called for each output result of calling "async_communication_start".
*
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
* - win (IN): Window to free.
*/
void async_communication_end(MPI_Request *requests, size_t request_qty, MPI_Win *win) {
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); }
if((mall_conf->red_method == MAM_RED_RMA_LOCKALL || mall_conf->red_method == MAM_RED_RMA_LOCK)
&& *win != MPI_WIN_NULL) { MPI_Win_free(win); }
}
/*
* Performs a series of non-blocking point2point communications to redistribute an array in a block distribution.
* It should be called after calculating how data should be redistributed.
*
* - send (IN): Array with the data to send. This value can not be NULL for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to
* receive data. If the process receives data and is NULL, the behaviour is undefined.
* - s_counts (IN): Struct which describes how many elements will send this process to each children and
* the displacements.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent
* and the displacements.
* - comm (IN): Communicator to use to perform the redistribution.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
int i, j = 0, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
for(i=s_counts.idI; i<s_counts.idE; i++) {
offset = s_counts.displs[i] * datasize;
MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
j++;
}
for(i=r_counts.idI; i<r_counts.idE; i++) {
offset = r_counts.displs[i] * datasize;
MPI_Irecv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
j++;
}
}
/*
* Performs asynchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
* how data should be redistributed.
*
* - send (IN): Array with the data to send. This value can be NULL for children.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - tamBl (IN): How many elements are stored in the parameter "send".
* - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
* - window (OUT): Pointer to a window object used for the RMA operations.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win) {
int datasize;
MPI_Type_size(datatype, &datasize);
MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, win);
switch(mall_conf->red_method) {
case MAM_RED_RMA_LOCKALL:
async_rma_lockall(recv, datatype, r_counts, *win, requests);
break;
case MAM_RED_RMA_LOCK:
async_rma_lock(recv, datatype, r_counts, *win, requests);
break;
}
}
/*
* Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
int i, target_displs, j = 0, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
for(i=r_counts.idI; i<r_counts.idE; i++) {
offset = r_counts.displs[i] * datasize;
MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
MPI_Win_unlock(i, win);
target_displs=0;
j++;
}
}
/*
* Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
int i, target_displs, j = 0, datasize;
size_t offset;
MPI_Type_size(datatype, &datasize);
target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
for(i=r_counts.idI; i<r_counts.idE; i++) {
offset = r_counts.displs[i] * datasize;
MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
target_displs=0;
j++;
}
MPI_Win_unlock_all(win);
}
/*
* ========================================================================================
* ========================================================================================
* ================================DISTRIBUTION FUNCTIONS==================================
* ========================================================================================
* ========================================================================================
*/
/*
* Performs a communication to redistribute an array in a block distribution. For each process calculates
* how many elements sends/receives to other processes for the new group.
*
* - qty (IN): Sum of elements shared by all processes that will send data.
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* process receives data and is NULL, the behaviour is undefined.
* - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
*
*/
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
int array_size = numO;
int offset_ids = 0;
int datasize;
size_t total_bytes;
struct Dist_data dist_data;
if(mall_conf->spawn_method == MAM_SPAWN_BASELINE) {
offset_ids = MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL) ?
0 : numP;
} else { // Merge method
array_size = numP > numO ? numP : numO;
}
mallocCounts(s_counts, array_size+offset_ids);
mallocCounts(r_counts, array_size+offset_ids);
MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
if(is_children_group) {
offset_ids = 0;
prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, mall->myId, numP, &dist_data);
total_bytes = ((size_t) dist_data.tamBl) * ((size_t) datasize);
*recv = malloc(total_bytes);
#if MAM_DEBUG >= 4
get_block_dist(qty, mall->myId, numP, &dist_data);
print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
#endif
} else {
#if MAM_DEBUG >= 4
get_block_dist(qty, mall->myId, numP, &dist_data);
#endif
prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, s_counts);
if(mall_conf->spawn_method == MAM_SPAWN_MERGE && mall->myId < numO) {
prepare_comm_alltoall(mall->myId, numO, numP, qty, offset_ids, r_counts);
// Obtener distribución para este hijo y reservar vector de recibo
get_block_dist(qty, mall->myId, numO, &dist_data);
total_bytes = ((size_t) dist_data.tamBl) * ((size_t) datasize);
*recv = malloc(total_bytes);
#if MAM_DEBUG >= 4
print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
#endif
}
#if MAM_DEBUG >= 4
print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
#endif
}
}
/*
* Ensures that the array of request of a process has an amount of elements equal to the amount of communication
* functions the process will perform. In case the array is not initialized or does not have enough space it is
* allocated/reallocated to the minimum amount of space needed.
*
* - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
* - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer
* is null or not enough space has been reserved the pointer is allocated/reallocated.
* - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
* functions to perform, it is modified to the minimum value.
*/
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty) {
size_t i, sum;
MPI_Request *aux;
switch(mall_conf->red_method) {
case MAM_RED_BASELINE:
sum = 1;
break;
case MAM_RED_POINT:
default:
sum = (size_t) s_counts.idE - s_counts.idI;
sum += (size_t) r_counts.idE - r_counts.idI;
break;
}
if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests
if (*requests == NULL) {
*requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request));
} else { // Array exists, but is too small
aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request));
*requests = aux;
}
if (*requests == NULL) {
fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
for(i=0; i < sum; i++) {
(*requests)[i] = MPI_REQUEST_NULL;
}
*request_qty = sum;
}
#ifndef MAM_DISTRIBUTED_COMMDIST_H
#define MAM_DISTRIBUTED_COMMDIST_H
#include <mpi.h>
void sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm);
void async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_check(int is_children_group, MPI_Request *requests, size_t request_qty);
void async_communication_wait(MPI_Request *requests, size_t request_qty);
void async_communication_end(MPI_Request *requests, size_t request_qty, MPI_Win *win);
void malloc_comm_array(char **array, int qty, int myId, int numP);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "block_distribution.h"
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int offset_ids, int *sendcounts);
void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
/*
* Prepares a communication from "numP" processes to "numP_other" processes
* of "n" elements an returns an struct of counts with 3 arrays to perform the
* communications.
*
* The struct should be freed with freeCounts
*/
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int offset_ids, struct Counts *counts) {
int i, *idS, first_id = 0;
struct Dist_data dist_data, dist_target;
if(counts == NULL) {
fprintf(stderr, "Counts is NULL for rank %d/%d ", myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
get_block_dist(n, myId, numP, &dist_data);
get_util_ids(dist_data, numP_other, &idS);
counts->idI = idS[0] + offset_ids;
counts->idE = idS[1] + offset_ids;
get_block_dist(n, idS[0], numP_other, &dist_target); // RMA Specific operation -- uses idS[0], not idI
counts->first_target_displs = dist_data.ini - dist_target.ini; // RMA Specific operation
if(idS[0] == 0) { // Uses idS[0], not idI
set_interblock_counts(counts->idI, numP_other, dist_data, offset_ids, counts->counts);
first_id++;
}
for(i=counts->idI + first_id; i<counts->idE; i++) {
set_interblock_counts(i, numP_other, dist_data, offset_ids, counts->counts);
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
}
free(idS);
for(i=0; i<numP_other; i++) {
if(counts->counts[i] < 0) {
fprintf(stderr, "Counts value [i=%d/%d] is negative for rank %d/%d ", i, numP_other, myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
if(counts->displs[i] < 0) {
fprintf(stderr, "Displs value [i=%d/%d] is negative for rank %d/%d ", i, numP_other, myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
}
}
/*
* Prepares a communication of "numP" processes of "n" elements an
* returns an struct of counts with 3 arrays to perform the
* communications.
*
* The struct should be freed with freeCounts
*/
void prepare_comm_allgatherv(int numP, int n, struct Counts *counts) {
int i;
struct Dist_data dist_data;
mallocCounts(counts, numP);
get_block_dist(n, 0, numP, &dist_data);
counts->counts[0] = dist_data.tamBl;
for(i=1; i<numP; i++){
get_block_dist(n, i, numP, &dist_data);
counts->counts[i] = dist_data.tamBl;
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
}
}
/*
* ========================================================================================
* ========================================================================================
* ================================DISTRIBUTION FUNCTIONS==================================
* ========================================================================================
* ========================================================================================
*/
/*
* Obatains for "Id" and "numP", how many
* elements per row will have process "Id"
* and fills the results in a Dist_data struct
*/
void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
int rem;
dist_data->myId = id;
dist_data->numP = numP;
dist_data->qty = qty;
dist_data->tamBl = qty / numP;
rem = qty % numP;
if(id < rem) { // First subgroup
dist_data->ini = id * dist_data->tamBl + id;
dist_data->fin = (id+1) * dist_data->tamBl + (id+1);
} else { // Second subgroup
dist_data->ini = id * dist_data->tamBl + rem;
dist_data->fin = (id+1) * dist_data->tamBl + rem;
}
if(dist_data->fin > qty) { dist_data->fin = qty; }
if(dist_data->ini > dist_data->fin) { dist_data->ini = dist_data->fin; }
dist_data->tamBl = dist_data->fin - dist_data->ini;
}
/*
* Obtiene para el Id de un proceso dado, cuantos elementos
* enviara o recibira desde el proceso indicado en Dist_data.
*/
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int offset_ids, int *sendcounts) {
struct Dist_data other;
int biggest_ini, smallest_end;
get_block_dist(data_dist.qty, id - offset_ids, numP, &other);
// Si el rango de valores no coincide, se pasa al siguiente proceso
if(data_dist.ini >= other.fin || data_dist.fin <= other.ini) {
return;
}
// Obtiene el proceso con mayor ini entre los dos procesos
biggest_ini = (data_dist.ini > other.ini) ? data_dist.ini : other.ini;
// Obtiene el proceso con menor fin entre los dos procesos
smallest_end = (data_dist.fin < other.fin) ? data_dist.fin : other.fin;
sendcounts[id] = smallest_end - biggest_ini; // Numero de elementos a enviar/recibir del proceso Id
}
/*
* Obtiene para un proceso de un grupo a que rango procesos de
* otro grupo tiene que enviar o recibir datos.
*
* Devuelve el primer identificador y el último (Excluido) con el que
* comunicarse.
*/
void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS) {
int idI, idE;
int tamOther = dist_data.qty / numP_other;
int remOther = dist_data.qty % numP_other;
// Indica el punto de corte del grupo de procesos externo que
// divide entre los procesos que tienen
// un tamaño tamOther + 1 y un tamaño tamOther
int middle = (tamOther + 1) * remOther;
// Calcular idI teniendo en cuenta si se comunica con un
// proceso con tamano tamOther o tamOther+1
if(middle > dist_data.ini) { // First subgroup (tamOther+1)
idI = dist_data.ini / (tamOther + 1);
} else { // Second subgroup (tamOther)
idI = ((dist_data.ini - middle) / tamOther) + remOther;
}
// Calcular idR teniendo en cuenta si se comunica con un
// proceso con tamano tamOther o tamOther+1
if(middle >= dist_data.fin) { // First subgroup (tamOther +1)
idE = dist_data.fin / (tamOther + 1);
idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
} else { // Second subgroup (tamOther)
idE = ((dist_data.fin - middle) / tamOther) + remOther;
idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
}
*idS = malloc(2 * sizeof(int));
(*idS)[0] = idI;
(*idS)[1] = idE;
}
/*
* ========================================================================================
* ========================================================================================
* ==============================INIT/FREE/PRINT FUNCTIONS=================================
* ========================================================================================
* ========================================================================================
*/
/*
* Reserva memoria para los vectores de counts/displs de la funcion
* MPI_Alltoallv. Todos los vectores tienen un tamaño de numP, que es la
* cantidad de procesos en el otro grupo de procesos.
*
* El vector counts indica cuantos elementos se comunican desde este proceso
* al proceso "i" del otro grupo.
*
* El vector displs indica los desplazamientos necesarios para cada comunicacion
* con el proceso "i" del otro grupo.
*
*/
void mallocCounts(struct Counts *counts, size_t numP) {
counts->counts = calloc(numP, sizeof(int));
if(counts->counts == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->displs = calloc(numP, sizeof(int));
if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->len = numP;
counts->idI = -1;
counts->idE = -1;
counts->first_target_displs = -1;
}
/*
* Libera la memoria interna de una estructura Counts.
*
* No libera la memoria de la estructura counts si se ha alojado
* de forma dinamica.
*/
void freeCounts(struct Counts *counts) {
if(counts == NULL) {
return;
}
if(counts->counts != NULL) {
free(counts->counts);
counts->counts = NULL;
}
if(counts->displs != NULL) {
free(counts->displs);
counts->displs = NULL;
}
}
/*
* Muestra la informacion de comunicaciones de un proceso
* Si se activa la bandera "include_zero" a verdadero se mostraran para el vector
* xcounts los valores a 0.
*
* En "name" se puede indicar un string con el fin de identificar mejor a que vectores
* se refiere la llamada.
*/
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, int include_zero, const char* name) {
int i;
for(i=0; i < size; i++) {
if(xcounts[i] != 0 || include_zero) {
printf("P%d of %d | %scounts[%d]=%d disp=%d\n", data_dist.myId, data_dist.numP, name, i, xcounts[i], xdispls[i]);
}
}
}
#ifndef MAM_BLOCK_DISTRIBUTION_H
#define MAM_BLOCK_DISTRIBUTION_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
struct Dist_data {
int ini; //Primer elemento a enviar
int fin; //Ultimo elemento a enviar
int tamBl; // Total de elementos
int qty; // Total number of rows of the full disperse matrix
int myId;
int numP;
MPI_Comm intercomm;
};
struct Counts {
int len, idI, idE;
int first_target_displs; // RMA. Indicates displacement for first target when performing a Get.
int *counts;
int *displs;
};
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int offset_ids, struct Counts *counts);
void prepare_comm_allgatherv(int numP, int n, struct Counts *counts);
void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data);
void mallocCounts(struct Counts *counts, size_t numP);
void freeCounts(struct Counts *counts);
void print_counts(struct Dist_data data_dist, int *xcounts, int *xdispls, int size, int include_zero, const char* name);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "Baseline.h"
#include "SpawnUtils.h"
#include "Strategy_Single.h"
#include "Strategy_Multiple.h"
#include "Strategy_Parallel.h"
#include "PortService.h"
//--------------PRIVATE DECLARATIONS---------------//
void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child);
void baseline_children(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents);
//--------------PUBLIC FUNCTIONS---------------//
/*
* Metodo basico para la creacion de procesos. Crea en total
* spawn_data.spawn_qty procesos.
*/
int baseline(Spawn_data spawn_data, MPI_Comm *child) { //TODO Tratamiento de errores
Spawn_ports spawn_port;
MPI_Comm intercomm;
MPI_Comm_get_parent(&intercomm); //FIXME May be a problem for third reconf or more with only expansions
init_ports(&spawn_port);
if (intercomm == MPI_COMM_NULL) { // Parents path
baseline_parents(spawn_data, &spawn_port, child);
} else { // Children path
baseline_children(spawn_data, &spawn_port, child);
}
free_ports(&spawn_port);
return MAM_I_SPAWN_COMPLETED;
}
//--------------PRIVATE FUNCTIONS---------------//
/*
* Funcion utilizada por los padres para realizar la
* creación de procesos.
*
*/
void baseline_parents(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *child) {
int i;
MPI_Comm comm, *intercomms;
#if MAM_DEBUG >= 3
DEBUG_FUNC("Starting spawning of processes", mall->myId, mall->numP); fflush(stdout);
#endif
if (spawn_data.spawn_is_parallel) {
// This spawn is quite different from the rest, as so
// it takes care of everything related to spawning.
parallel_strat_parents(spawn_data, spawn_port, child);
return;
}
if (spawn_data.spawn_is_single && mall->myId != mall->root) {
single_strat_parents(spawn_data, child);
return;
}
comm = spawn_data.spawn_is_single ? MPI_COMM_SELF : spawn_data.comm;
MPI_Bcast(&spawn_data.total_spawns, 1, MPI_INT, mall->root, comm);
intercomms = (MPI_Comm*) malloc(spawn_data.total_spawns * sizeof(MPI_Comm));
if(mall->myId != mall->root) {
spawn_data.sets = (Spawn_set *) malloc(spawn_data.total_spawns * sizeof(Spawn_set));
}
for(i=0; i<spawn_data.total_spawns; i++) {
mam_spawn(spawn_data.sets[i], comm, &intercomms[i]);
}
#if MAM_DEBUG >= 3
DEBUG_FUNC("Sources have created the new processes. Performing additional actions if required.", mall->myId, mall->numP); fflush(stdout);
#endif
// TODO Improvement - Deactivate Multiple spawn before spawning if total_spawns == 1
if(spawn_data.spawn_is_multiple) { multiple_strat_parents(spawn_data, spawn_port, comm, intercomms, child); }
else { *child = intercomms[0]; }
if(spawn_data.spawn_is_single) { single_strat_parents(spawn_data, child); }
free(intercomms);
if(mall->myId != mall->root) { free(spawn_data.sets); }
}
void baseline_children(Spawn_data spawn_data, Spawn_ports *spawn_port, MPI_Comm *parents) {
if(spawn_data.spawn_is_parallel) {
// This spawn is quite different from the rest, as so
// it takes care of everything related to spawning.
parallel_strat_children(spawn_data, spawn_port, parents);
return;
}
if(spawn_data.spawn_is_multiple) { multiple_strat_children(parents, spawn_port); }
if(spawn_data.spawn_is_single) { single_strat_children(parents, spawn_port); }
}
\ No newline at end of file
#ifndef MAM_SPAWN_BASELINE_H
#define MAM_SPAWN_BASELINE_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include "Spawn_DataStructure.h"
int baseline(Spawn_data spawn_data, MPI_Comm *child);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <mpi.h>
#include <string.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "../MAM_Configuration.h"
#include "ProcessDist.h"
#include "GenericSpawn.h"
#include "Baseline.h"
#include "Merge.h"
#include "Spawn_state.h"
// This code is a Singleton object -- Only one instance can be used at a given time and
// no multiple calls to perform diferent resizes can be performed at the same time.
Spawn_data *spawn_data = NULL;
pthread_t spawn_thread;
//--------------PRIVATE CONFIGURATION DECLARATIONS---------------//
void set_spawn_configuration(MPI_Comm comm);
void deallocate_spawn_data();
//--------------PRIVATE DECLARATIONS---------------//
void generic_spawn(MPI_Comm *child, int data_stage);
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed);
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed);
//--------------PRIVATE THREADS DECLARATIONS---------------//
int allocate_thread_spawn(MPI_Comm *child);
void* thread_work(void *args);
//--------------PUBLIC FUNCTIONS---------------//
/*
* Se solicita la creacion de un nuevo grupo de "numP" procesos con una distribucion
* fisica "type_dist".
*
* Se puede solicitar en primer plano, encargandose por tanto el proceso que llama a esta funcion,
* o en segundo plano, donde un hilo se encarga de configurar esta creacion.
*
* Si se pide en primer plano, al terminarla es posible llamar a "check_spawn_state()" para crear
* los procesos.
*
* Si se pide en segundo plano, llamar a "check_spawn_state()" comprobara si la configuracion para
* crearlos esta lista, y si es asi, los crea.
*
* Devuelve el estado de el procedimiento. Si no devuelve "MAM_I_SPAWN_COMPLETED", es necesario llamar a
* "check_spawn_state()".
*/
int init_spawn(MPI_Comm comm, MPI_Comm *child) {
int local_state;
set_spawn_configuration(comm);
if(!spawn_data->spawn_is_async) {
generic_spawn(child, MAM_I_NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async);
if (local_state == MAM_I_SPAWN_COMPLETED)
deallocate_spawn_data();
} else {
local_state = spawn_data->spawn_is_single ?
MAM_I_SPAWN_SINGLE_PENDING : MAM_I_SPAWN_PENDING;
local_state = mall_conf->spawn_method == MAM_SPAWN_MERGE && spawn_data->initial_qty > spawn_data->target_qty ?
MAM_I_SPAWN_ADAPT_POSTPONE : local_state;
set_spawn_state(local_state, 0);
if((spawn_data->spawn_is_single && mall->myId == mall->root) || !spawn_data->spawn_is_single) {
allocate_thread_spawn(child);
}
}
return local_state;
}
/*
* Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista,
* y en caso de que lo este, se devuelve el communicador a estos nuevos procesos.
*/
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed) {
int local_state;
int global_state=MAM_I_NOT_STARTED;
if(spawn_data->spawn_is_async) { // Async
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MAM_I_SPAWN_SINGLE_PENDING || local_state == MAM_I_SPAWN_SINGLE_COMPLETED) { // Single
global_state = check_single_state(comm, child, local_state, wait_completed);
} else if(local_state == MAM_I_SPAWN_PENDING || local_state == MAM_I_SPAWN_COMPLETED || local_state == MAM_I_SPAWN_ADAPTED) { // Generic
global_state = check_generic_state(comm, local_state, wait_completed);
} else if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE) {
global_state = local_state;
} else {
printf("Error Check spawn: Configuracion invalida State = %d\n", local_state);
MPI_Abort(MPI_COMM_WORLD, -1);
return -10;
}
} else if(mall_conf->spawn_method == MAM_SPAWN_MERGE){ // Start Merge shrink Sync
generic_spawn(child, MAM_I_DIST_COMPLETED);
global_state = get_spawn_state(spawn_data->spawn_is_async);
}
if(global_state == MAM_I_SPAWN_COMPLETED || global_state == MAM_I_SPAWN_ADAPTED)
deallocate_spawn_data();
return global_state;
}
/*
* Elimina la bandera bloqueante MAM_I_SPAWN_ADAPT_POSTPONE para los hilos
* auxiliares. Esta bandera los bloquea para que el metodo Merge shrink no
* avance hasta que se complete la redistribucion de datos. Por tanto,
* al modificar la bandera los hilos pueden continuar.
*
* Por seguridad se comprueba que no se realice el cambio a la bandera a
* no ser que se cumplan las 3 condiciones.
*/
void unset_spawn_postpone_flag(int outside_state) {
int local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE && outside_state == MAM_I_SPAWN_ADAPT_PENDING && spawn_data->spawn_is_async) {
set_spawn_state(MAM_I_SPAWN_PENDING, spawn_data->spawn_is_async);
wakeup_redistribution();
}
}
/*
* Funcion bloqueante de los hijos para asegurar que todas las tareas del paso
* de creacion de los hijos se terminan correctamente.
*
* Ademas los hijos obtienen informacion basica de los padres
* para el paso de redistribucion de datos (Numeros de procesos y Id del Root).
*
*/
void malleability_connect_children(MPI_Comm *parents) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
MAM_Comm_main_structures(*parents, MAM_ROOT); //FIXME What if root is another id different to 0? Send from spawn to root id?
spawn_data->initial_qty = mall->num_parents;
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PARALLEL, &(spawn_data->spawn_is_parallel));
switch(mall_conf->spawn_method) {
case MAM_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
baseline(*spawn_data, parents);
if(!spawn_data->spawn_is_intercomm) {
intracomm_strategy(MAM_TARGETS, parents);
}
break;
case MAM_SPAWN_MERGE:
spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
merge(*spawn_data, parents, MAM_I_NOT_STARTED);
break;
}
free(spawn_data);
}
//--------------PRIVATE CONFIGURATION FUNCTIONS---------------//
/*
* Agrupa en una sola estructura todos los datos de configuración necesarios
* e inicializa las estructuras necesarias.
*/
void set_spawn_configuration(MPI_Comm comm) {
spawn_data = (Spawn_data *) malloc(sizeof(Spawn_data));
spawn_data->total_spawns = 0;
spawn_data->initial_qty = mall->numP;
spawn_data->target_qty = mall->numC;
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_SINGLE, &(spawn_data->spawn_is_single));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PTHREAD, &(spawn_data->spawn_is_async));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, &(spawn_data->spawn_is_intercomm));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_MULTIPLE, &(spawn_data->spawn_is_multiple));
MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_PARALLEL, &(spawn_data->spawn_is_parallel));
spawn_data->comm = comm;
spawn_data->mapping_fill_method = MAM_PHY_TYPE_HOSTFILE;
spawn_data->sets = NULL;
switch(mall_conf->spawn_method) {
case MAM_SPAWN_BASELINE:
spawn_data->spawn_qty = spawn_data->target_qty;
spawn_data->already_created = 0;
break;
case MAM_SPAWN_MERGE:
spawn_data->spawn_qty = spawn_data->target_qty - spawn_data->initial_qty;
spawn_data->already_created = spawn_data->initial_qty;
break;
}
if(spawn_data->spawn_is_async) {
init_spawn_state();
}
}
/*
* Libera una estructura de datos spawn_data
* junto a la destrucion de aquellas estructuras que utiliza.
*/
void deallocate_spawn_data() {
int i;
MPI_Info *info;
if(spawn_data == NULL) return;
for(i=0; i<spawn_data->total_spawns; i++) {
info = &(spawn_data->sets[i].mapping);
if(*info != MPI_INFO_NULL) {
MPI_Info_free(info);
*info = MPI_INFO_NULL;
}
}
if(spawn_data->sets != NULL) {
free(spawn_data->sets);
spawn_data->sets = NULL;
}
if(spawn_data->spawn_is_async) {
free_spawn_state();
}
free(spawn_data);
spawn_data = NULL;
}
//--------------PRIVATE SPAWN CREATION FUNCTIONS---------------//
/*
* Funcion generica para la creacion de procesos. Obtiene la configuracion
* y segun esta, elige como deberian crearse los procesos.
*
* Cuando termina, modifica la variable global para indicar este cambio
*/
void generic_spawn(MPI_Comm *child, int data_stage) {
int local_state, aux_state;
// WORK
if(mall->myId == mall->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
processes_dist(spawn_data);
}
switch(mall_conf->spawn_method) {
case MAM_SPAWN_BASELINE:
local_state = baseline(*spawn_data, child);
if(!spawn_data->spawn_is_intercomm) {
local_state = intracomm_strategy(MAM_SOURCES, child);
}
break;
case MAM_SPAWN_MERGE:
local_state = merge(*spawn_data, child, data_stage);
break;
}
// END WORK
aux_state = get_spawn_state(spawn_data->spawn_is_async);
if(!(aux_state == MAM_I_SPAWN_PENDING && local_state == MAM_I_SPAWN_ADAPT_POSTPONE)) {
set_spawn_state(local_state, spawn_data->spawn_is_async);
}
}
//--------------PRIVATE THREAD FUNCTIONS---------------//
/*
* Aloja la memoria para un hilo auxiliar dedicado a la creacion de procesos.
* No se puede realizar un "join" sobre el hilo y el mismo libera su memoria
* asociado al terminar.
*/
int allocate_thread_spawn(MPI_Comm *child) {
if(pthread_create(&spawn_thread, NULL, thread_work, (void *) child)) {
printf("Error al crear el hilo de SPAWN\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
if(pthread_detach(spawn_thread)) {
printf("Error when detaching spawning thread\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return 0;
}
/*
* Funcion llamada por un hilo para que este se encarge
* de configurar la creacion de un nuevo grupo de procesos.
*
* Una vez esta lista la configuracion y es posible crear los procesos
* se avisa al hilo maestro.
*/
void* thread_work(void *args) {
int local_state;
MPI_Comm *child = (MPI_Comm *) args;
generic_spawn(child, MAM_I_NOT_STARTED);
local_state = get_spawn_state(spawn_data->spawn_is_async);
if(local_state == MAM_I_SPAWN_ADAPT_POSTPONE || local_state == MAM_I_SPAWN_PENDING) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos
local_state = wait_redistribution();
generic_spawn(child, MAM_I_DIST_COMPLETED);
}
wakeup_completion();
pthread_exit(NULL);
}
/*
* Comprueba si una creacion de procesos asincrona en el
* paso "single" ha terminado.
* Si no ha terminado se mantiene el estado
* "MAM_I_SPAWN_SINGLE_PENDING".
*
* Si ha terminado se crean los hilos auxiliares para
* los procesos no root y se devuelve el estado
* "MAM_I_SPAWN_PENDING".
*/
int check_single_state(MPI_Comm comm, MPI_Comm *child, int global_state, int wait_completed) {
while(wait_completed && mall->myId == mall->root && global_state == MAM_I_SPAWN_SINGLE_PENDING) {
global_state = wait_completion();
}
MPI_Bcast(&global_state, 1, MPI_INT, mall->root, comm);
// Non-root processes join root to finalize the spawn
// They also must join if the application has ended its work
if(global_state == MAM_I_SPAWN_SINGLE_COMPLETED) {
global_state = MAM_I_SPAWN_PENDING;
set_spawn_state(global_state, spawn_data->spawn_is_async);
if(mall->myId != mall->root) {
allocate_thread_spawn(child);
}
}
return global_state;
}
/*
* Comprueba si una creación de procesos asincrona en el
* paso "generic" ha terminado.
* Si no ha terminado devuelve el estado
* "MAM_I_SPAWN_PENDING".
*
* Si ha terminado libera la memoria asociada a spawn_data
* y devuelve el estado "MAM_I_SPAWN_COMPLETED".
*/
int check_generic_state(MPI_Comm comm, int local_state, int wait_completed) {
int global_state;
while(wait_completed && local_state == MAM_I_SPAWN_PENDING) local_state = wait_completion();
MPI_Allreduce(&local_state, &global_state, 1, MPI_INT, MPI_MIN, comm);
if(global_state == MAM_I_SPAWN_COMPLETED || global_state == MAM_I_SPAWN_ADAPTED) {
set_spawn_state(global_state, spawn_data->spawn_is_async);
}
return global_state;
}
#ifndef MAM_GENERIC_SPAWN_H
#define MAM_GENERIC_SPAWN_H
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "Spawn_DataStructure.h"
int init_spawn(MPI_Comm comm, MPI_Comm *child);
int check_spawn_state(MPI_Comm *child, MPI_Comm comm, int wait_completed);
void malleability_connect_children(MPI_Comm *parents);
void unset_spawn_postpone_flag(int outside_state);
#endif
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "Merge.h"
#include "Baseline.h"
//--------------PRIVATE DECLARATIONS---------------//
void merge_adapt_expand(MPI_Comm *child, int is_children_group);
void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId);
//--------------PUBLIC FUNCTIONS---------------//
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state) {
MPI_Comm intercomm;
int local_state;
int is_children_group = 1;
if(spawn_data.initial_qty > spawn_data.target_qty) { //Shrink
if(data_state == MAM_I_DIST_COMPLETED) {
merge_adapt_shrink(spawn_data.target_qty, child, spawn_data.comm, mall->myId);
local_state = MAM_I_SPAWN_ADAPTED;
} else {
local_state = MAM_I_SPAWN_ADAPT_POSTPONE;
}
} else { //Expand
MPI_Comm_get_parent(&intercomm);
is_children_group = intercomm == MPI_COMM_NULL ? 0:1;
baseline(spawn_data, child);
merge_adapt_expand(child, is_children_group);
local_state = MAM_I_SPAWN_COMPLETED;
}
return local_state;
}
int intracomm_strategy(int is_children_group, MPI_Comm *child) {
merge_adapt_expand(child, is_children_group);
return MAM_I_SPAWN_COMPLETED;
}
//--------------PRIVATE MERGE TYPE FUNCTIONS---------------//
/*
* Se encarga de que el grupo de procesos resultante se
* encuentren todos en un intra comunicador, uniendo a
* padres e hijos en un solo comunicador.
*
* Se llama antes de la redistribución de datos.
*
* TODO REFACTOR
*/
void merge_adapt_expand(MPI_Comm *child, int is_children_group) {
MPI_Comm new_comm = MPI_COMM_NULL;
MPI_Intercomm_merge(*child, is_children_group, &new_comm); //El que pone 0 va primero
//char test;
//MPI_Bcast(&test, 1, MPI_CHAR, 0, new_comm);
//MPI_Barrier(*child);
MPI_Comm_disconnect(child);
*child = new_comm;
}
/*
* Se encarga de que el grupo de procesos resultante se
* eliminen aquellos procesos que ya no son necesarios.
* Los procesos eliminados se quedaran como zombies.
*
* Se llama una vez ha terminado la redistribución de datos.
*/
void merge_adapt_shrink(int numC, MPI_Comm *child, MPI_Comm comm, int myId) {
int color = MPI_UNDEFINED;
if(*child != MPI_COMM_NULL && *child != MPI_COMM_WORLD) MPI_Comm_disconnect(child);
if(myId < numC) {
color = 1;
}
MPI_Comm_split(comm, color, myId, child);
}
#ifndef MAM_SPAWN_MERGE_H
#define MAM_SPAWN_MERGE_H
#include <mpi.h>
#include "Spawn_DataStructure.h"
int merge(Spawn_data spawn_data, MPI_Comm *child, int data_state);
int intracomm_strategy(int is_children_group, MPI_Comm *child);
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment