Commit 6633cd95 authored by Iker Martín Álvarez's avatar Iker Martín Álvarez
Browse files

RMA functionality and refactor of many of the codes

parent 2f81e29c
import sys
import glob
import numpy as np
import pandas as pd
from enum import Enum
class G_enum(Enum):
TOTAL_RESIZES = 0
TOTAL_GROUPS = 1
TOTAL_STAGES = 2
GRANULARITY = 3
SDR = 4
ADR = 5
DR = 6
RED_METHOD = 7
RED_STRATEGY = 8
SPAWN_METHOD = 9
SPAWN_STRATEGY = 10
GROUPS = 11
FACTOR_S = 12
DIST = 13
STAGE_TYPES = 14
STAGE_TIMES = 15
STAGE_BYTES = 16
ITERS = 17
ASYNCH_ITERS = 18
T_ITER = 19
T_STAGES = 20
T_SPAWN = 21
T_SPAWN_REAL = 22
T_SR = 23
T_AR = 24
T_MALLEABILITY = 25
T_TOTAL = 26
#Malleability specific
NP = 0
NC = 1
#Iteration specific
IS_DYNAMIC = 11
N_PARENTS = 17
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
# "Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
# "Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_Malleability", "T_total"] #27
columnsL = ["NP", "NC", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Is_Dynamic", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
"Stage_Bytes", "N_Parents", "Asynch_Iters", "T_iter", "T_stages"] #20
def copy_iteration(row, dataL_it, group, iteration, is_asynch):
basic_indexes = [G_enum.TOTAL_STAGES.value, G_enum.GRANULARITY.value, \
G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
basic_asynch = [G_enum.SDR.value, G_enum.ADR.value, G_enum.DR.value]
array_asynch_group = [G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, \
G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value, G_enum.DIST.value]
dataL_it[G_enum.FACTOR_S.value] = row[G_enum.FACTOR_S.value][group]
dataL_it[G_enum.NP.value] = row[G_enum.GROUPS.value][group]
dataL_it[G_enum.ASYNCH_ITERS.value] = is_asynch
dataL_it[G_enum.T_ITER.value] = row[G_enum.T_ITER.value][group][iteration]
dataL_it[G_enum.T_STAGES.value] = list(row[G_enum.T_STAGES.value][group][iteration])
dataL_it[G_enum.IS_DYNAMIC.value] = True if group > 0 else False
for index in basic_indexes:
dataL_it[index] = row[index]
for index in array_asynch_group:
dataL_it[index] = [None, -1]
dataL_it[index][0] = row[index][group]
dataL_it[G_enum.N_PARENTS.value] = -1
if group > 0:
dataL_it[G_enum.N_PARENTS.value] = row[G_enum.GROUPS.value][group-1]
if is_asynch:
dataL_it[G_enum.NC.value] = row[G_enum.GROUPS.value][group+1]
for index in basic_asynch:
dataL_it[index] = row[index]
for index in array_asynch_group:
dataL_it[index][1] = row[index][group+1]
for index in array_asynch_group: # Convert to tuple
dataL_it[index] = tuple(dataL_it[index])
#-----------------------------------------------
def write_iter_dataframe(dataL, name, i, first=False):
dfL = pd.DataFrame(dataL, columns=columnsL)
dfL.to_pickle(name + str(i) + '.pkl')
if first:
print(dfL)
#-----------------------------------------------
def create_iter_dataframe(dfG, name, max_it_L):
it = -1
file_i = 0
first = True
dataL = []
for row_index in range(len(dfG)):
row = dfG.iloc[row_index]
groups = row[G_enum.TOTAL_GROUPS.value]
for group in range(groups):
real_iterations = len(row[G_enum.T_ITER.value][group])
real_asynch = row[G_enum.ASYNCH_ITERS.value][group]
is_asynch = False
for iteration in range(real_iterations-real_asynch):
it += 1
dataL.append( [None] * len(columnsL) )
copy_iteration(row, dataL[it], group, iteration, is_asynch)
is_asynch = True
for iteration in range(real_iterations-real_asynch, real_iterations):
it += 1
dataL.append( [None] * len(columnsL) )
copy_iteration(row, dataL[it], group, iteration, is_asynch)
if it >= max_it_L-1: #Var "it" starts at -1, so one more must be extracted for precise cut
write_iter_dataframe(dataL, name, file_i, first)
dataL = []
file_i += 1
first = False
it = -1
if it != -1:
write_iter_dataframe(dataL, name, file_i)
#-----------------------------------------------
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 CreateIterDataframe.py input_file.pkl output_name [max_rows_per_file]")
exit(1)
input_name = sys.argv[1]
if len(sys.argv) > 2:
name = sys.argv[2]
else:
name = "dataL"
print("File names will be: " + name + ".pkl")
if len(sys.argv) > 3:
max_it_L = int(sys.argv[3])
else:
max_it_L = 100000
dfG = pd.read_pickle(input_name)
print(dfG)
create_iter_dataframe(dfG, name, max_it_L)
import sys
import glob
import numpy as np
import pandas as pd
from enum import Enum
class G_enum(Enum):
TOTAL_RESIZES = 0
TOTAL_GROUPS = 1
TOTAL_STAGES = 2
GRANULARITY = 3
SDR = 4
ADR = 5
DR = 6
RED_METHOD = 7
RED_STRATEGY = 8
SPAWN_METHOD = 9
SPAWN_STRATEGY = 10
GROUPS = 11
FACTOR_S = 12
DIST = 13
STAGE_TYPES = 14
STAGE_TIMES = 15
STAGE_BYTES = 16
ITERS = 17
ASYNCH_ITERS = 18
T_ITER = 19
T_STAGES = 20
T_SPAWN = 21
T_SPAWN_REAL = 22
T_SR = 23
T_AR = 24
T_MALLEABILITY = 25
T_TOTAL = 26
#Malleability specific
NP = 0
NC = 1
#Iteration specific
IS_DYNAMIC = 11
N_PARENTS = 17
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
# "Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
# "Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_Malleability", "T_total"] #27
columnsM = ["NP", "NC", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "FactorS", "Dist", "Stage_Type", "Stage_Time", \
"Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_Malleability"] #25
def copy_resize(row, dataM_it, resize):
basic_indexes = [G_enum.TOTAL_STAGES.value, G_enum.GRANULARITY.value, G_enum.SDR.value, \
G_enum.ADR.value, G_enum.DR.value]
basic_group = [G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
array_actual_group = [G_enum.FACTOR_S.value, G_enum.ITERS.value, G_enum.ASYNCH_ITERS.value, \
G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, \
G_enum.T_AR.value, G_enum.T_MALLEABILITY.value, G_enum.T_ITER.value, G_enum.T_STAGES.value]
array_next_group = [G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, \
G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value]
dataM_it[G_enum.NP.value] = row[G_enum.GROUPS.value][resize]
dataM_it[G_enum.NC.value] = row[G_enum.GROUPS.value][resize+1]
dataM_it[G_enum.DIST.value-1] = [None, None]
dataM_it[G_enum.DIST.value-1][0] = row[G_enum.DIST.value][resize]
dataM_it[G_enum.DIST.value-1][1] = row[G_enum.DIST.value][resize+1]
for index in basic_indexes:
dataM_it[index] = row[index]
for index in basic_group:
dataM_it[index-1] = row[index]
for index in array_actual_group:
dataM_it[index-1] = row[index][resize]
for index in array_next_group:
dataM_it[index] = row[index][resize+1]
#-----------------------------------------------
def create_resize_dataframe(dfG, dataM):
it = -1
for row_index in range(len(dfG)):
row = dfG.iloc[row_index]
resizes = row[G_enum.TOTAL_RESIZES.value]
for resize in range(resizes):
it += 1
dataM.append( [None] * len(columnsM) )
copy_resize(row, dataM[it], resize)
#-----------------------------------------------
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 CreateResizeDataframe.py input_file.pkl output_name")
exit(1)
input_name = sys.argv[1]
if len(sys.argv) > 2:
name = sys.argv[2]
else:
name = "dataM"
print("File name will be: " + name + ".pkl")
dfG = pd.read_pickle(input_name)
dataM = []
create_resize_dataframe(dfG, dataM)
dfM = pd.DataFrame(dataM, columns=columnsM)
dfM.to_pickle(name + '.pkl')
print(dfG)
print(dfM)
......@@ -12,34 +12,56 @@ class G_enum(Enum):
SDR = 4
ADR = 5
DR = 6
ASYNCH_REDISTRIBUTION_TYPE = 7
SPAWN_METHOD = 8
SPAWN_STRATEGY = 9
GROUPS = 10
FACTOR_S = 11
DIST = 12
STAGE_TYPES = 13
STAGE_TIMES = 14
STAGE_BYTES = 15
ITERS = 16
ASYNCH_ITERS = 17
T_ITER = 18
T_STAGES = 19
T_SPAWN = 20
T_SPAWN_REAL = 21
T_SR = 22
T_AR = 23
T_TOTAL = 24
columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Asynch_Redistribution_Type", \
"Spawn_Method", "Spawn_Strategy", "Groups", "Factor_S", "Dist", "Stage_Types", "Stage_Times", "Stage_Bytes", \
"Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #25
RED_METHOD = 7
RED_STRATEGY = 8
SPAWN_METHOD = 9
SPAWN_STRATEGY = 10
GROUPS = 11
FACTOR_S = 12
DIST = 13
STAGE_TYPES = 14
STAGE_TIMES = 15
STAGE_BYTES = 16
ITERS = 17
ASYNCH_ITERS = 18
T_ITER = 19
T_STAGES = 20
T_SPAWN = 21
T_SPAWN_REAL = 22
T_SR = 23
T_AR = 24
T_MALLEABILITY = 25
T_TOTAL = 26
#Malleability specific
NP = 0
NC = 1
#Iteration specific
IS_DYNAMIC = 11
N_PARENTS = 17
columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
"Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_Malleability", "T_total"] #27
#-----------------------------------------------
# Obtains the value of a given index in a splited line
# and returns it as a float values
def get_value(line, index):
return float(line[index].split('=')[1].split(',')[0])
# and returns it as a float values if possible, string otherwise
def get_value(line, index, separator=True):
if separator:
value = line[index].split('=')[1].split(',')[0]
else:
value = line[index]
try:
value = float(value)
if value.is_integer():
value = int(value)
except ValueError:
return value
return value
#-----------------------------------------------
# Obtains the general parameters of an execution and
# stores them for creating a global dataframe
def record_config_line(lineS, dataG_it):
......@@ -48,27 +70,25 @@ def record_config_line(lineS, dataG_it):
offset_line = 2
for i in range(len(ordered_indexes)):
value = get_value(lineS, i+offset_line)
if value.is_integer():
value = int(value)
index = ordered_indexes[i]
dataG_it[index] = value
dataG_it[G_enum.TOTAL_GROUPS.value] = dataG_it[G_enum.TOTAL_RESIZES.value]
dataG_it[G_enum.TOTAL_RESIZES.value] -=1 #FIXME Modificar en App sintetica
dataG_it[G_enum.TOTAL_GROUPS.value] = dataG_it[G_enum.TOTAL_RESIZES.value]+1
#FIXME Modificar cuando ADR ya no sea un porcentaje
dataG_it[G_enum.DR.value] = dataG_it[G_enum.SDR.value] + dataG_it[G_enum.ADR.value]
# Init lists for each column
array_groups = [G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, G_enum.ITERS.value, \
G_enum.ASYNCH_ITERS.value, G_enum.T_ITER.value, G_enum.T_STAGES.value]
array_resizes = [G_enum.ASYNCH_REDISTRIBUTION_TYPE.value, G_enum.SPAWN_METHOD.value, \
G_enum.SPAWN_STRATEGY.value, G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, \
G_enum.T_SR.value, G_enum.T_AR.value]
G_enum.ASYNCH_ITERS.value, G_enum.T_ITER.value, G_enum.T_STAGES.value, G_enum.RED_METHOD.value, \
G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value,]
array_resizes = [ G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value, G_enum.T_MALLEABILITY.value]
array_stages = [G_enum.STAGE_TYPES.value, \
G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
for index in array_groups:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_GROUPS.value]
for group in range(dataG_it[G_enum.TOTAL_GROUPS.value]):
dataG_it[G_enum.T_ITER.value][group] = []
for index in array_resizes:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_RESIZES.value]
......@@ -76,6 +96,7 @@ def record_config_line(lineS, dataG_it):
for index in array_stages:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_STAGES.value]
#-----------------------------------------------
# Obtains the parameters of a stage line
# and stores it in the dataframe
# Is needed to indicate in which stage is
......@@ -86,144 +107,194 @@ def record_stage_line(lineS, dataG_it, stage):
offset_lines = 2
for i in range(len(array_stages)):
value = get_value(lineS, i+offset_lines)
if value.is_integer():
value = int(value)
index = array_stage[i]
index = array_stages[i]
dataG_it[index][stage] = value
#-----------------------------------------------
# Obtains the parameters of a resize line
# and stores them in the dataframe
# Is needed to indicate to which group refers
# the resize line
def record_resize_line(lineS, dataG_it, group):
array_stages = [G_enum.ITERS.value, G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, \
G_enum.ASYNCH_REDISTRIBUTION_TYPE.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value]
def record_group_line(lineS, dataG_it, group):
array_groups = [G_enum.ITERS.value, G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, \
G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value]
offset_lines = 2
for i in range(len(array_stages)):
for i in range(len(array_groups)):
value = get_value(lineS, i+offset_lines)
if value.is_integer():
value = int(value)
index = array_stage[i]
index = array_groups[i]
dataG_it[index][group] = value
#-----------------------------------------------
def record_time_line(lineS, dataG_it):
T_names = ["T_spawn:", "T_spawn_real:", "T_SR:", "T_AR:", "T_total:"]
T_values = [G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value, G_enum.T_TOTAL.value]
T_names = ["T_spawn:", "T_spawn_real:", "T_SR:", "T_AR:", "T_Malleability:", "T_total:"]
T_values = [G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value, G_enum.T_MALLEABILITY.value, G_enum.T_TOTAL.value]
if not (lineS[0] in T_names): # Execute only if line represents a Time
return
index = T_names.index(linesS[0])
index = T_names.index(lineS[0])
index = T_values[index]
offset_lines = 1
for i in range(len(dataG_it[index])):
value = get_value(lineS, i+offset_lines)
dataG_it[index][i] = value
len_index = 1
if dataG_it[index] != None:
len_index = len(dataG_it[index])
for i in range(len_index):
dataG_it[index][i] = get_value(lineS, i+offset_lines, False)
else:
dataG_it[index] = get_value(lineS, offset_lines, False)
#-----------------------------------------------
def read_global_file(f, dataG, it):
resizes = 0
timer = 0
previousNP = 0
def record_multiple_times_line(lineS, dataG_it, group):
T_names = ["T_iter:", "T_stage"]
T_values = [G_enum.T_ITER.value, G_enum.T_STAGES.value]
if not (lineS[0] in T_names): # Execute only if line represents a Time
return
index = T_names.index(lineS[0])
index = T_values[index]
offset_lines = 1
if index == G_enum.T_STAGES.value:
offset_lines += 1
total_iters = len(lineS)-offset_lines
stage = int(lineS[1].split(":")[0])
if stage == 0:
dataG_it[index][group] = [None] * total_iters
for i in range(total_iters):
dataG_it[index][group][i] = [None] * dataG_it[G_enum.TOTAL_STAGES.value]
for i in range(total_iters):
dataG_it[index][group][i][stage] = get_value(lineS, i+offset_lines, False)
else:
total_iters = len(lineS)-offset_lines
for i in range(total_iters):
dataG_it[index][group].append(get_value(lineS, i+offset_lines, False))
#-----------------------------------------------
def read_local_file(f, dataG, it, runs_in_file):
offset = 0
real_it = 0
group = 0
for line in f:
lineS = line.split()
if len(lineS) > 0:
if lineS[0] == "Config": # CONFIG LINE
it += 1
dataG.append([None]*(25+1))
#dataG[it][-1] = None Indicates if local data has been recorded(1) or not(None)
record_config(lineS, dataG[it])
resize = 0
stage = 0
elif lineS[0] == "Stage":
record_stage_line(lineS, dataG[it], stage)
stage+=1
elif lineS[0] == "Resize":
record_resize_line(lineS, dataG[it], resize)
resize+=1
elif lineS[0] == "T_total:":
value = get_value(lineS, 1)
dataG[it][G_enum.T_TOTAL.value] = value
if lineS[0] == "Group": # GROUP number
offset += 1
real_it = it - (runs_in_file-offset)
group = int(lineS[1].split(":")[0])
elif lineS[0] == "Async_Iters:":
offset_line = 1
dataG[real_it][G_enum.ASYNCH_ITERS.value][group] = get_value(lineS, offset_line, False)
else:
record_time_line(lineS, dataG[it])
return it
record_multiple_times_line(lineS, dataG[real_it], group)
#-----------------------------------------------
def read_local_file(f, dataG, it):
resizes = 0
timer = 0
previousNP = 0
def read_global_file(f, dataG, it):
runs_in_file=0
for line in f:
lineS = line.split()
if len(lineS) > 0:
if lineS[0] == "Config": # CONFIG LINE
it += 1
record_config(lineS, dataG[it], dataM[it])
resize = 0
runs_in_file += 1
group = 0
stage = 0
dataG.append([None]*len(columnsG))
record_config_line(lineS, dataG[it])
elif lineS[0] == "Stage":
record_stage_line(lineS, dataG[it], stage)
stage+=1
elif lineS[0] == "Resize":
record_resize_line(lineS, dataG[it], resize)
resize+=1
elif lineS[0] == "T_total:":
value = get_value(lineS, 1)
dataG[it][G_enum.T_TOTAL.value] = value
elif lineS[0] == "Group":
record_group_line(lineS, dataG[it], group)
group+=1
else:
record_time_line(lineS, dataG[it])
return it
return it,runs_in_file
#-----------------------------------------------
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Asynch_Redistribution_Type", \\
# "Spawn_Method", "Spawn_Strategy", "Groups", "Dist", "Stage_Types", "Stage_Times", "Stage_Bytes", \\
# "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #24
#-----------------------------------------------
def convert_to_tuples(dfG):
array_list_items = [G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, G_enum.ITERS.value, \
G_enum.ASYNCH_ITERS.value, G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, \
G_enum.SPAWN_STRATEGY.value, G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, \
G_enum.T_AR.value, G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
#TODO Falta T_malleability?
array_multiple_list_items = [G_enum.T_ITER.value, G_enum.T_STAGES.value]
for item in array_list_items:
name = columnsG[item]
values = dfG[name].copy()
for index in range(len(values)):
values[index] = tuple(values[index])
dfG[name] = values
for item in array_multiple_list_items:
name = columnsG[item]
values = dfG[name].copy()
for i in range(len(values)):
for j in range(len(values[i])):
if(type(values[i][j][0]) == list):
for r in range(len(values[i][j])):
values[i][j][r] = tuple(values[i][j][r])
values[i][j] = tuple(values[i][j])
values[i] = tuple(values[i])
dfG[name] = values
#-----------------------------------------------
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 iterTimes.py resultsName directory csvOutName")
print("The files name is missing\nUsage: python3 MallTimes.py commonName directory OutName")
exit(1)
common_name = sys.argv[1]
if len(sys.argv) >= 3:
BaseDir = sys.argv[2]
print("Searching in directory: "+ BaseDir)
else:
BaseDir = sys.argv[2]
BaseDir = "./"
if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + "G.csv & " + sys.argv[3] + "M.csv")
name = sys.argv[3]
else:
name = "data"
print("File name will be: " + name + "G.pkl")
insideDir = "Run"
lista = glob.glob("./" + BaseDir + insideDir + "*/" + sys.argv[1]+ "*Global.o*")
lista += (glob.glob("./" + BaseDir + sys.argv[1]+ "*Global.o*")) # Se utiliza cuando solo hay un nivel de directorios
lista = glob.glob(BaseDir + insideDir + "*/" + common_name + "*_Global.out")
lista += (glob.glob(BaseDir + common_name + "*_Global.out")) # Se utiliza cuando solo hay un nivel de directorios
print("Number of files found: "+ str(len(lista)));
it = -1
dataG = []
dataM = []
columnsG = ["N", "%Async", "Groups", "NP", "NS", "Dist", "Matrix", "CommTam", "Cst", "Css", "Time", "Iters", "TE"] #13
columnsM = ["N", "%Async", "NP", "NS", "Dist", "Matrix", "CommTam", "Cst", "Css", "Time", "Iters", "TC", "TH", "TS", "TA"] #15
for elem in lista:
f = open(elem, "r")
it = read_file(f, dataG, dataM, it)
id_run = elem.split("_Global.out")[0].split(common_name)[1]
path_to_run = elem.split(common_name)[0]
lista_local = glob.glob(path_to_run + common_name + id_run + "_G*NP*.out")
it,runs_in_file = read_global_file(f, dataG, it)
f.close()
for elem_local in lista_local:
f_local = open(elem_local, "r")
read_local_file(f_local, dataG, it, runs_in_file)
f_local.close()
#print(data)
dfG = pd.DataFrame(dataG, columns=columnsG)
dfG.to_csv(name + 'G.csv')
convert_to_tuples(dfG)
print(dfG)
dfG.to_pickle(name + 'G.pkl')
dfM = pd.DataFrame(dataM, columns=columnsM)
#dfM = pd.DataFrame(dataM, columns=columnsM)
#Poner en TC el valor real y en TH el necesario para la app
cond = dfM.TH != 0
dfM.loc[cond, ['TC', 'TH']] = dfB.loc[cond, ['TH', 'TC']].values
dfM.to_csv(name + 'M.csv')
#cond = dfM.TH != 0
#dfM.loc[cond, ['TC', 'TH']] = dfM.loc[cond, ['TH', 'TC']].values
#dfM.to_csv(name + 'M.csv')
This source diff could not be displayed because it is too large. You can view the blob instead.
'''
Created on Oct 24, 2016
@author: David Llorens (dllorens@uji.es)
(c) Universitat Jaume I 2016
@license: GPL2
'''
from abc import ABCMeta, abstractmethod
infinity = float("infinity")
## Esquema para BT básico --------------------------------------------------------------------------
class PartialSolution(metaclass=ABCMeta):
@abstractmethod
def is_solution(self)-> "bool":
pass
@abstractmethod
def get_solution(self) -> "solution":
pass
@abstractmethod
def successors(self) -> "IEnumerable<PartialSolution>":
pass
class BacktrackingSolver(metaclass=ABCMeta):
@staticmethod
def solve(initial_ps : "PartialSolution") -> "IEnumerable<Solution>":
def bt(ps):
if ps.is_solution():
yield ps.get_solution()
else:
for new_ps in ps.successors():
yield from bt(new_ps)
yield from bt(initial_ps)
class BacktrackingSolverOld(metaclass=ABCMeta):
def solve(self, initial_ps : "PartialSolution") -> "IEnumerable<Solution>":
def bt(ps):
if ps.is_solution():
return [ps.get_solution()]
else:
solutions = []
for new_ps in ps.successors():
solutions.extend(bt(new_ps))
return solutions
return bt(initial_ps)
## Esquema para BT con control de visitados --------------------------------------------------------
class PartialSolutionWithVisitedControl(PartialSolution):
@abstractmethod
def state(self)-> "state":
# the returned object must be of an inmutable type
pass
class BacktrackingVCSolver(metaclass=ABCMeta):
@staticmethod
def solve(initial_ps : "PartialSolutionWithVisitedControl") -> "IEnumerable<Solution>":
def bt(ps):
seen.add(ps.state())
if ps.is_solution():
yield ps.get_solution()
else:
for new_ps in ps.successors():
state = new_ps.state()
if state not in seen:
yield from bt(new_ps)
seen = set()
yield from bt(initial_ps)
## Esquema para BT para optimización ----------------------------------------------------------------
class PartialSolutionWithOptimization(PartialSolutionWithVisitedControl):
@abstractmethod
def f(self)-> "int or double":
# result of applying the objective function to the partial solution
pass
class BacktrackingOptSolver(metaclass=ABCMeta):
@staticmethod
def solve(initial_ps : "PartialSolutionWithOptimization") -> "IEnumerable<Solution>":
def bt(ps):
nonlocal best_solution_found_score
ps_score = ps.f()
best_seen[ps.state()] = ps_score
if ps.is_solution() and ps_score < best_solution_found_score: #sólo muestra una solución si mejora la última mostrada
best_solution_found_score = ps_score
yield ps.get_solution()
else:
for new_ps in ps.successors():
state = new_ps.state()
if state not in best_seen or new_ps.f() < best_seen[state]:
yield from bt(new_ps)
best_seen = {}
best_solution_found_score = infinity
yield from bt(initial_ps)
import sys
import glob
import numpy as numpy
import pandas as pd
#-----------------------------------------------
def read_file(f, dataA, dataB, itA, itB):
compute_tam = 0
comm_tam = 0
sdr = 0
adr = 0
dist = 0
css = 0
cst = 0
time = 0
recording = False
it_line = 0
aux_itA = 0
aux_itB = 0
iters = 0
np = 0
np_par = 0
ns = 0
array = []
columnas = ['Titer','Ttype','Top']
#print(f)
for line in f:
lineS = line.split()
if len(lineS) > 1:
if recording and lineS[0].split(':')[0] in columnas: #Record data
aux_itA = 0
lineS.pop(0)
if it_line==0:
for observation in lineS:
dataA.append([None]*15)
dataA[itA+aux_itA][0] = sdr
dataA[itA+aux_itA][1] = adr
dataA[itA+aux_itA][2] = np
dataA[itA+aux_itA][3] = np_par
dataA[itA+aux_itA][4] = ns
dataA[itA+aux_itA][5] = dist
dataA[itA+aux_itA][6] = compute_tam
dataA[itA+aux_itA][7] = comm_tam
dataA[itA+aux_itA][8] = cst
dataA[itA+aux_itA][9] = css
dataA[itA+aux_itA][10] = time
dataA[itA+aux_itA][11] = iters
dataA[itA+aux_itA][12] = float(observation)
array.append(float(observation))
aux_itA+=1
elif it_line==1:
deleted = 0
for observation in lineS:
dataA[itA+aux_itA][13] = float(observation)
if float(observation) == 0:
array.pop(aux_itA - deleted)
deleted+=1
aux_itA+=1
else:
for observation in lineS:
dataA[itA+aux_itA][14] = float(observation)
aux_itA+=1
it_line += 1
if(it_line % 3 == 0): # Comprobar si se ha terminado de mirar esta ejecucion
recording = False
it_line = 0
itA = itA + aux_itA
if ns != 0: # Solo obtener datos de grupos con hijos
dataB.append([None]*14)
dataB[itB][0] = sdr
dataB[itB][1] = adr
dataB[itB][2] = np
dataB[itB][3] = np_par
dataB[itB][4] = ns
dataB[itB][5] = dist
dataB[itB][6] = compute_tam
dataB[itB][7] = comm_tam
dataB[itB][8] = cst
dataB[itB][9] = css
dataB[itB][10] = time
dataB[itB][11] = iters
dataB[itB][12] = tuple(array)
dataB[itB][13] = numpy.sum(array)
itB+=1
array = []
if lineS[0] == "Config:":
compute_tam = int(lineS[1].split('=')[1].split(',')[0])
comm_tam = int(lineS[2].split('=')[1].split(',')[0])
sdr = int(lineS[3].split('=')[1].split(',')[0])
adr = int(lineS[4].split('=')[1].split(',')[0])
css = int(lineS[6].split('=')[1].split(',')[0])
cst = int(lineS[7].split('=')[1].split(',')[0])
time = float(lineS[8].split('=')[1])
elif lineS[0] == "Config":
recording = True
iters = int(lineS[2].split('=')[1].split(',')[0])
dist = int(lineS[4].split('=')[1].split(',')[0])
np = int(lineS[5].split('=')[1].split(',')[0])
np_par = int(lineS[6].split('=')[1].split(',')[0])
ns = int(float(lineS[7].split('=')[1]))
return itA,itB
#-----------------------------------------------
#Config: matrix=1000, sdr=1000000000, adr=0, aib=0 time=2.000000
#Config Group: iters=100, factor=1.000000, phy=2, procs=2, parents=0, sons=4
#Ttype: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 iterTimes.py resultsName directory csvOutName")
exit(1)
if len(sys.argv) >= 3:
BaseDir = sys.argv[2]
print("Searching in directory: "+ BaseDir)
else: #FIXME
BaseDir = sys.argv[2]
if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + ".csv and "+ sys.argv[3] + "_Total.csv")
name = sys.argv[3]
else:
name = "data"
insideDir = "Run"
lista = glob.glob("./" + BaseDir + insideDir + "*/" + sys.argv[1]+ "*ID*.o*")
print("Number of files found: "+ str(len(lista)));
itA = itB = 0
dataA = []
dataB = [] #0 #1 #2 #3 #4 #5 #6 #7 #8 #9 #10 #11 #12 #13 #14
columnsA = ["N", "%Async", "NP", "N_par", "NS", "Dist", "Compute_tam", "Comm_tam", "Cst", "Css","Time", "Iters", "Ti", "Tt", "To"] #15
columnsB = ["N", "%Async", "NP", "N_par", "NS", "Dist", "Compute_tam", "Comm_tam", "Cst", "Css","Time", "Iters", "Ti", "Sum"] #14
for elem in lista:
f = open(elem, "r")
itA,itB = read_file(f, dataA, dataB, itA, itB)
f.close()
#print(data)
dfA = pd.DataFrame(dataA, columns=columnsA)
dfB = pd.DataFrame(dataB, columns=columnsB)
dfA['N'] += dfA['%Async']
dfA['%Async'] = (dfA['%Async'] / dfA['N']) * 100
dfA.to_csv(name + '.csv')
dfB['N'] += dfB['%Async']
dfB['%Async'] = (dfB['%Async'] / dfB['N']) * 100
dfB.to_csv(name + '_Total.csv')
......@@ -3,22 +3,19 @@ import glob
import numpy as numpy
import pandas as pd
if len(sys.argv) < 3:
print("The files name is missing\nUsage: python3 joinDf.py resultsName1.csv resultsName2.csv csvOutName")
print("The files name is missing\nUsage: python3 joinDf.py resultsName1.pkl resultsName2.pkl OutName")
exit(1)
if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + ".csv")
name = sys.argv[3]
else:
name = "dataJOINED"
df1 = pd.read_csv( sys.argv[1] )
df2 = pd.read_csv( sys.argv[2] )
print("File name will be: " + name + ".pkl")
df1 = pd.read_pickle( sys.argv[1] )
df2 = pd.read_pickle( sys.argv[2] )
frames = [df1, df2]
df3 = pd.concat(frames)
df3 = df3.drop(columns=df3.columns[0])
df3.to_csv(name + '.csv')
df3.to_pickle(name + '.pkl')
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -42,9 +42,9 @@ static int handler(void* user, const char* section, const char* name,
} else if (MATCH("general", "Granularity")) {
pconfig->granularity = atoi(value);
} else if (MATCH("general", "SDR")) { // TODO Refactor a nombre manual
pconfig->sdr = atoi(value);
pconfig->sdr = strtoul(value, NULL, 10);
} else if (MATCH("general", "ADR")) { // TODO Refactor a nombre manual
pconfig->adr = atoi(value);
pconfig->adr = strtoul(value, NULL, 10);
} else if (MATCH("general", "Rigid")) {
pconfig->rigid_times = atoi(value);
......@@ -72,8 +72,10 @@ static int handler(void* user, const char* section, const char* name,
aux_value = MALL_DIST_SPREAD;
}
pconfig->groups[pconfig->actual_group].phy_dist = aux_value;
} else if (MATCH(resize_name, "Asynch_Redistribution_Type") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].at = atoi(value);
} else if (MATCH(resize_name, "Redistribution_Method") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].rm = atoi(value);
} else if (MATCH(resize_name, "Redistribution_Strategy") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].rs = atoi(value);
} else if (MATCH(resize_name, "Spawn_Method") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].sm = atoi(value);
} else if (MATCH(resize_name, "Spawn_Strategy") && LAST(pconfig->actual_group, pconfig->n_groups)) {
......
......@@ -41,14 +41,14 @@ void comm_results(results_data *results, int root, size_t resizes, MPI_Comm inte
* En concreto son tres escalares y dos vectores de tamaño "resizes"
*/
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type) {
int i, counts = 6;
int blocklengths[] = {1, 1, 1, 1, 1, 1};
int i, counts = 7;
int blocklengths[] = {1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_DOUBLE;
blocklengths[2] = blocklengths[3] = blocklengths[4] = blocklengths[5] = resizes;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = MPI_DOUBLE;
blocklengths[2] = blocklengths[3] = blocklengths[4] = blocklengths[5] = blocklengths[6] = resizes;
// Rellenar vector displs
MPI_Get_address(results, &dir);
......@@ -59,6 +59,7 @@ void def_results_type(results_data *results, int resizes, MPI_Datatype *results_
MPI_Get_address(results->async_time, &displs[3]);
MPI_Get_address(results->spawn_real_time, &displs[4]);
MPI_Get_address(results->spawn_time, &displs[5]);
MPI_Get_address(results->malleability_time, &displs[6]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -87,6 +88,7 @@ void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr)
} else {
results->async_time[grp-1] = 0;
}
results->malleability_time[grp-1] = results->malleability_end - results->malleability_time[grp-1];
}
/*
......@@ -100,6 +102,7 @@ void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr)
*/
void reset_results_index(results_data *results) {
results->iter_index = 0;
results->iters_async = 0;
}
//=============================================================== FIXME BORRAR?
......@@ -162,17 +165,18 @@ void compute_results_stages(results_data *results, int myId, int numP, int root,
int i;
if(myId == root) {
for(i=0; i<stages; i++) {
MPI_Reduce(MPI_IN_PLACE, results->stage_times[i], results->iter_index, MPI_DOUBLE, MPI_SUM, root, comm);
for(size_t j=0; j<results->iter_index; j++) {
MPI_Reduce(MPI_IN_PLACE, results->stage_times[i], results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
/* for(size_t j=0; j<results->iter_index; j++) {
results->stage_times[i][j] = results->stage_times[i][j] / numP;
}
}*/
}
}
else {
for(i=0; i<stages; i++) {
MPI_Reduce(results->stage_times[i], NULL, results->iter_index, MPI_DOUBLE, MPI_SUM, root, comm);
MPI_Reduce(results->stage_times[i], NULL, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
}
}
//MPI_Barrier(comm); //FIXME Esto debería de borrarse
}
//======================================================||
......@@ -189,12 +193,12 @@ void compute_results_stages(results_data *results, int myId, int numP, int root,
void print_iter_results(results_data results) {
size_t i;
printf("Async_Iters: %ld\n", results.iters_async);
printf("T_iter: ");
for(i=0; i< results.iter_index; i++) {
printf("%lf ", results.iters_time[i]);
}
printf("\nAsync_Iters: %ld\n", results.iters_async);
printf("\n");
}
/*
......@@ -240,6 +244,11 @@ void print_global_results(results_data results, size_t resizes) {
printf("%lf ", results.async_time[i]);
}
printf("\nT_Malleability: ");
for(i=0; i < resizes; i++) {
printf("%lf ", results.malleability_time[i]);
}
printf("\nT_total: %lf\n", results.exec_time);
}
......@@ -262,6 +271,7 @@ void init_results_data(results_data *results, size_t resizes, size_t stages, siz
results->spawn_real_time = calloc(resizes, sizeof(double));
results->sync_time = calloc(resizes, sizeof(double));
results->async_time = calloc(resizes, sizeof(double));
results->malleability_time = calloc(resizes, sizeof(double));
results->wasted_time = 0;
results->iters_size = iters_size + RESULTS_EXTRA_SIZE;
......@@ -280,20 +290,24 @@ void realloc_results_iters(results_data *results, size_t stages, size_t needed)
int error = 0;
double *time_aux;
size_t i;
if(results->iters_size >= needed) return;
time_aux = (double *) realloc(results->iters_time, needed * sizeof(double));
if(time_aux == NULL) error = 1;
for(i=0; i<stages; i++) { //TODO Comprobar que no da error el realloc
results->stage_times[i] = (double *) realloc(results->stage_times[i], needed * sizeof(double));
if(results->stage_times[i] == NULL) error = 1;
}
if(time_aux == NULL) error = 1;
if(error) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria de resultados\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
results->iters_time = time_aux;
results->iters_size = needed;
}
/*
......@@ -318,6 +332,10 @@ void free_results_data(results_data *results, size_t stages) {
free(results->async_time);
results->async_time = NULL;
}
if(results->malleability_time != NULL) {
free(results->malleability_time);
results->malleability_time = NULL;
}
if(results->iters_time != NULL) {
free(results->iters_time);
......
......@@ -14,8 +14,9 @@ typedef struct {
// Spawn, Thread, Sync, Async and Exec time
double spawn_start, *spawn_time, *spawn_real_time;
double sync_start, sync_end, *sync_time;
double async_start, async_end, *async_time;
double sync_end, *sync_time;
double async_end, *async_time;
double malleability_end, *malleability_time;
double exec_start, exec_time;
double wasted_time; // Time spent recalculating iter stages
} results_data;
......
......@@ -12,6 +12,8 @@
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
#define DR_MAX_SIZE 1000000000
int work();
double iterate(int async_comm);
double iterate_relaxed(double *time, double *times_stages);
......@@ -37,6 +39,7 @@ int main(int argc, char *argv[]) {
int numP, myId, res;
int req;
int im_child;
size_t i;
int num_cpus, num_nodes;
char *nodelist = NULL;
......@@ -54,6 +57,8 @@ int main(int argc, char *argv[]) {
if(req != MPI_THREAD_MULTIPLE) {
printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
fflush(stdout);
MPI_Abort(MPI_COMM_WORLD, -50);
}
init_group_struct(argv, argc, myId, numP);
......@@ -66,11 +71,27 @@ int main(int argc, char *argv[]) {
set_benchmark_configuration(config_file);
set_benchmark_results(results);
if(config_file->n_groups > 1) {
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
if(config_file->sdr) {
for(i=0; i<group->sync_data_groups; i++) {
malleability_add_data(group->sync_array[i], group->sync_qty[i], MAL_CHAR, 0, 1);
}
}
if(config_file->adr) {
for(i=0; i<group->async_data_groups; i++) {
malleability_add_data(group->async_array[i], group->async_qty[i], MAL_CHAR, 0, 0);
}
}
}
MPI_Barrier(comm);
results->exec_start = MPI_Wtime();
} else { //Init hijos
......@@ -82,6 +103,7 @@ int main(int argc, char *argv[]) {
// TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos
void *value = NULL;
size_t entries;
malleability_get_data(&value, 0, 1, 1);
group->grp = *((int *)value);
......@@ -91,7 +113,25 @@ int main(int argc, char *argv[]) {
malleability_get_data(&value, 2, 1, 1);
group->iter_start = *((int *)value);
if(config_file->sdr) {
malleability_get_entries(&entries, 0, 1);
group->sync_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 1);
group->sync_array[i] = (char *)value;
}
}
if(config_file->adr) {
malleability_get_entries(&entries, 0, 0);
group->async_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 0);
group->async_array[i] = (char *)value;
}
}
group->grp = group->grp + 1;
realloc_results_iters(results, config_file->n_stages, config_file->groups[group->grp].iters);
}
//
......@@ -105,14 +145,15 @@ int main(int argc, char *argv[]) {
MPI_Comm_rank(comm, &(group->myId));
group->grp = group->grp + 1;
set_benchmark_grp(group->grp);
if(group->grp != 0) {
obtain_op_times(1); //Obtener los nuevos valores de tiempo para el computo
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr);
}
if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].at, -1);
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
if(group->grp != 0) {
......@@ -122,11 +163,11 @@ int main(int argc, char *argv[]) {
res = work();
if(res == MALL_ZOMBIE) break;
if(res==1) { // Se ha llegado al final de la aplicacion
MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
MPI_Barrier(comm);
results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
}
print_local_results();
reset_results_index(results);
} while(config_file->n_groups > group->grp + 1 && config_file->groups[group->grp+1].sm == MALL_SPAWN_MERGE);
......@@ -180,8 +221,8 @@ int work() {
state = malleability_checkpoint();
iter = 0;
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE) {
if(iter < config_file->groups[group->grp+1].iters) {
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE || state == MALL_SPAWN_ADAPT_PENDING) {
if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
iterate(state);
iter++;
group->iter_start = iter;
......@@ -227,6 +268,7 @@ double iterate(int async_comm) {
results->iters_async += 1;
}
// TODO Pasar el resto de este código a results.c
if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
}
......@@ -235,6 +277,7 @@ double iterate(int async_comm) {
results->stage_times[i][results->iter_index] = times_stages_aux[i];
}
results->iter_index = results->iter_index + 1;
// TODO Pasar hasta aqui
free(times_stages_aux);
......@@ -395,6 +438,8 @@ void init_group_struct(char *argv[], int argc, int myId, int numP) {
* se comunican con los padres para inicializar sus datos.
*/
void init_application() {
int i, last_index;
if(group->argc < 2) {
printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
MPI_Abort(MPI_COMM_WORLD, -1);
......@@ -407,10 +452,29 @@ void init_application() {
results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
if(config_file->sdr) {
malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
group->sync_data_groups = config_file->sdr % DR_MAX_SIZE ? config_file->sdr/DR_MAX_SIZE+1 : config_file->sdr/DR_MAX_SIZE;
group->sync_qty = (int *) malloc(group->sync_data_groups * sizeof(int));
group->sync_array = (char **) malloc(group->sync_data_groups * sizeof(char *));
last_index = group->sync_data_groups-1;
for(i=0; i<last_index; i++) {
group->sync_qty[i] = DR_MAX_SIZE;
malloc_comm_array(&(group->sync_array[i]), group->sync_qty[i], group->myId, group->numP);
}
group->sync_qty[last_index] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
malloc_comm_array(&(group->sync_array[last_index]), group->sync_qty[last_index], group->myId, group->numP);
}
if(config_file->adr) {
malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
group->async_data_groups = config_file->adr % DR_MAX_SIZE ? config_file->adr/DR_MAX_SIZE+1 : config_file->adr/DR_MAX_SIZE;
group->async_qty = (int *) malloc(group->async_data_groups * sizeof(int));
group->async_array = (char **) malloc(group->async_data_groups * sizeof(char *));
last_index = group->async_data_groups-1;
for(i=0; i<last_index; i++) {
group->async_qty[i] = DR_MAX_SIZE;
malloc_comm_array(&(group->async_array[i]), group->async_qty[i], group->myId, group->numP);
}
group->async_qty[last_index] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
malloc_comm_array(&(group->async_array[last_index]), group->async_qty[last_index], group->myId, group->numP);
}
obtain_op_times(1);
......@@ -440,13 +504,29 @@ void obtain_op_times(int compute) {
* Libera toda la memoria asociada con la aplicacion
*/
void free_application_data() {
if(config_file->sdr) {
size_t i;
if(config_file->sdr && group->sync_array != NULL) {
for(i=0; i<group->sync_data_groups; i++) {
free(group->sync_array[i]);
group->sync_array[i] = NULL;
}
free(group->sync_qty);
group->sync_qty = NULL;
free(group->sync_array);
group->sync_array = NULL;
}
if(config_file->adr) {
if(config_file->adr && group->async_array != NULL) {
for(i=0; i<group->async_data_groups; i++) {
free(group->async_array[i]);
group->async_array[i] = NULL;
}
free(group->async_qty);
group->async_qty = NULL;
free(group->async_array);
group->async_array = NULL;
}
free_malleability();
free_results_data(results, config_file->n_stages);
......
......@@ -15,13 +15,14 @@ typedef struct {
unsigned int grp;
int iter_start;
int argc;
size_t sync_data_groups, async_data_groups;
int numS; // Cantidad de procesos hijos
MPI_Comm children, parents;
char *compute_comm_array, *compute_comm_recv;
char **argv;
char *sync_array, *async_array;
char **sync_array, **async_array;
int *sync_qty, *async_qty;
} group_data;
......@@ -48,7 +49,7 @@ typedef struct
typedef struct
{
int iters, procs;
int sm, ss, phy_dist, at;
int sm, ss, phy_dist, rm, rs;
float factor;
} group_config_t;
......@@ -57,7 +58,8 @@ typedef struct
size_t n_groups, n_resizes, n_stages; // n_groups==n_resizes+1
size_t actual_group, actual_stage;
int rigid_times;
int granularity, sdr, adr;
int granularity;
size_t sdr, adr;
MPI_Datatype config_type, group_type, iter_stage_type;
iter_stage_t *stages;
......
......@@ -71,7 +71,8 @@ void malloc_config_resizes(configuration *user_config) {
user_config->groups[i].sm = 0;
user_config->groups[i].ss = 1;
user_config->groups[i].phy_dist = 0;
user_config->groups[i].at = 0;
user_config->groups[i].rm = 0;
user_config->groups[i].rs = 1;
user_config->groups[i].factor = 1;
}
def_struct_groups(user_config);
......@@ -135,18 +136,14 @@ void free_config(configuration *user_config) {
}
}
//Liberar tipos derivados
if(user_config->config_type != MPI_DATATYPE_NULL) {
MPI_Type_free(&(user_config->config_type));
user_config->config_type = MPI_DATATYPE_NULL;
}
if(user_config->group_type != MPI_DATATYPE_NULL) {
MPI_Type_free(&(user_config->group_type));
user_config->group_type = MPI_DATATYPE_NULL;
}
if(user_config->iter_stage_type != MPI_DATATYPE_NULL) {
MPI_Type_free(&(user_config->iter_stage_type));
user_config->iter_stage_type = MPI_DATATYPE_NULL;
}
free(user_config->groups);
free(user_config->stages);
......@@ -162,17 +159,17 @@ void free_config(configuration *user_config) {
void print_config(configuration *user_config) {
if(user_config != NULL) {
size_t i;
printf("Config loaded: R=%zu, S=%zu, granularity=%d, SDR=%d, ADR=%d\n",
printf("Config loaded: R=%zu, S=%zu, granularity=%d, SDR=%zu, ADR=%zu\n",
user_config->n_resizes, user_config->n_stages, user_config->granularity, user_config->sdr, user_config->adr);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %zu: PT=%d, T_stage=%lf, bytes=%d, T_capped=%d\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].t_capped);
}
for(i=0; i<user_config->n_groups; i++) {
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, AT=%d, SM=%d, SS=%d\n",
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, RM=%d, RS=%d, SM=%d, SS=%d\n",
i, user_config->groups[i].iters, user_config->groups[i].procs, user_config->groups[i].factor,
user_config->groups[i].phy_dist, user_config->groups[i].at, user_config->groups[i].sm,
user_config->groups[i].ss);
user_config->groups[i].phy_dist, user_config->groups[i].rm, user_config->groups[i].rs,
user_config->groups[i].sm, user_config->groups[i].ss);
}
}
}
......@@ -194,16 +191,16 @@ void print_config_group(configuration *user_config, size_t grp) {
sons = user_config->groups[grp+1].procs;
}
printf("Config: granularity=%d, SDR=%d, ADR=%d\n",
printf("Config: granularity=%d, SDR=%zu, ADR=%zu\n",
user_config->granularity, user_config->sdr, user_config->adr);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %zu: PT=%d, T_stage=%lf, bytes=%d, T_capped=%d\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].t_capped);
}
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, AT=%d, SM=%d, SS=%d, parents=%d, children=%d\n",
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, RM=%d, RS=%d, SM=%d, SS=%d, parents=%d, children=%d\n",
grp, user_config->groups[grp].iters, user_config->groups[grp].procs, user_config->groups[grp].factor,
user_config->groups[grp].phy_dist, user_config->groups[grp].at, user_config->groups[grp].sm,
user_config->groups[grp].ss, parents, sons);
user_config->groups[grp].phy_dist, user_config->groups[grp].rm, user_config->groups[grp].rs,
user_config->groups[grp].sm, user_config->groups[grp].ss, parents, sons);
}
}
......@@ -270,17 +267,17 @@ void def_struct_config_file(configuration *config_file) {
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = MPI_UNSIGNED_LONG;
types[2] = types[3] = types[4] = types[5] = MPI_INT;
types[0] = types[1] = types[2] = types[3] = MPI_UNSIGNED_LONG;
types[4] = types[5] = MPI_INT;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(&(config_file->n_groups), &displs[0]);
MPI_Get_address(&(config_file->n_stages), &displs[1]);
MPI_Get_address(&(config_file->granularity), &displs[2]);
MPI_Get_address(&(config_file->sdr), &displs[3]);
MPI_Get_address(&(config_file->adr), &displs[4]);
MPI_Get_address(&(config_file->sdr), &displs[2]);
MPI_Get_address(&(config_file->adr), &displs[3]);
MPI_Get_address(&(config_file->granularity), &displs[4]);
MPI_Get_address(&(config_file->rigid_times), &displs[5]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -295,15 +292,15 @@ void def_struct_config_file(configuration *config_file) {
* en una sola comunicacion.
*/
void def_struct_groups(configuration *config_file) {
int i, counts = 7;
int blocklengths[7] = {1, 1, 1, 1, 1, 1, 1};
int i, counts = 8;
int blocklengths[8] = {1, 1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
group_config_t *groups = config_file->groups;
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_INT;
types[6] = MPI_FLOAT;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = MPI_INT;
types[7] = MPI_FLOAT;
// Rellenar vector displs
MPI_Get_address(groups, &dir);
......@@ -313,8 +310,9 @@ void def_struct_groups(configuration *config_file) {
MPI_Get_address(&(groups->sm), &displs[2]);
MPI_Get_address(&(groups->ss), &displs[3]);
MPI_Get_address(&(groups->phy_dist), &displs[4]);
MPI_Get_address(&(groups->at), &displs[5]);
MPI_Get_address(&(groups->factor), &displs[6]);
MPI_Get_address(&(groups->rm), &displs[5]);
MPI_Get_address(&(groups->rs), &displs[6]);
MPI_Get_address(&(groups->factor), &displs[7]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -326,7 +324,7 @@ void def_struct_groups(configuration *config_file) {
// Tipo derivado para enviar N elementos de la estructura
MPI_Type_create_resized(aux, 0, sizeof(group_config_t), &(config_file->group_type));
MPI_Type_commit(&(config_file->group_type));
// MPI_Type_free(&aux); //FIXME It should be freed
MPI_Type_free(&aux);
}
}
......@@ -364,6 +362,6 @@ void def_struct_iter_stage(configuration *config_file) {
// Tipo derivado para enviar N elementos de la estructura
MPI_Type_create_resized(aux, 0, sizeof(iter_stage_t), &(config_file->iter_stage_type));
MPI_Type_commit(&(config_file->iter_stage_type));
// MPI_Type_free(&aux); //FIXME It should be freed
MPI_Type_free(&aux);
}
}
CC = gcc
MCC = mpicc
#C_FLAGS_ALL = -Wconversion -Wpedantic
C_FLAGS = -Wall -Wextra -Wshadow -Wfatal-errors -g
C_FLAGS = -Wall -Wextra -Wshadow -Wfatal-errors
LD_FLAGS = -lm -pthread
DEF =
......
......@@ -5,16 +5,22 @@
#include "distribution_methods/block_distribution.h"
#include "CommDist.h"
void send_sync_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts);
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_strategies, MPI_Request **requests, size_t *request_qty);
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method);
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win);
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
void async_point2point(char *send, char *recv, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
void async_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win);
void async_rma_lock(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void perform_manual_communication(char *send, char *recv, int myId, struct Counts s_counts, struct Counts r_counts);
/*
* Reserva memoria para un vector de hasta "qty" elementos.
* Los "qty" elementos se disitribuyen entre los "numP" procesos
......@@ -24,7 +30,7 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
struct Dist_data dist_data;
get_block_dist(qty, myId, numP, &dist_data);
if( (*array = malloc(dist_data.tamBl * sizeof(char))) == NULL) {
if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl);
exit(1);
}
......@@ -46,102 +52,200 @@ void malloc_comm_array(char **array, int qty, int myId, int numP) {
//================================================================================
/*
* Realiza un envio síncrono del vector array desde este grupo de procesos al grupo
* enlazado por el intercomunicador intercomm.
* Performs a communication to redistribute an array in a block distribution.
* In the redistribution is differenciated parent group from the children and the values each group indicates can be
* different.
*
* El vector array no se modifica en esta funcion.
* - send (IN): Array with the data to send. This data can not be null for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - comm (IN): Communicator to use to perform the redistribution.
*
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
*/
int send_sync(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child) {
int *idS = NULL;
struct Counts counts;
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, MPI_Comm comm) {
int is_intercomm, aux_comm_used = 0;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
MPI_Comm aux_comm = MPI_COMM_NULL;
get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
dist_data.intercomm = intercomm;
// Create arrays which contains info about how many elements will be send to each created process
mallocCounts(&counts, numP_child);
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
/* PERFORM COMMUNICATION */
switch(red_method) {
send_sync_arrays(dist_data, array, numP_child, counts);
freeCounts(&counts);
free(idS);
case MALL_RED_RMA_LOCKALL:
case MALL_RED_RMA_LOCK:
if(is_children_group) {
dist_data.tamBl = 0;
} else {
get_block_dist(qty, myId, numO, &dist_data);
}
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else { aux_comm = comm; }
sync_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, red_method);
break;
case MALL_RED_POINT:
sync_point2point(send, *recv, is_intercomm, myId, s_counts, r_counts, comm);
break;
case MALL_RED_BASELINE:
default:
MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm);
break;
}
return 1;
if(aux_comm_used) {
MPI_Comm_free(&aux_comm);
}
freeCounts(&s_counts);
freeCounts(&r_counts);
return 1; //FIXME In this case is always true...
}
/*
* Realiza una recepcion síncrona del vector array a este grupo de procesos desde el grupo
* enlazado por el intercomunicador intercomm.
* Performs a series of blocking point2point communications to redistribute an array in a block distribution.
* It should be called after calculating how data should be redistributed.
*
* - send (IN): Array with the data to send. This value can not be NULL for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to
* receive data. If the process receives data and is NULL, the behaviour is undefined.
* - is_intercomm (IN): Indicates wether the communicator is an intercommunicator (TRUE) or an
* intracommunicator (FALSE).
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - s_counts (IN): Struct which describes how many elements will send this process to each children and
* the displacements.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent
* and the displacements.
* - comm (IN): Communicator to use to perform the redistribution.
*
* El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
* Tiene que ser liberado posteriormente por el usuario.
*/
void recv_sync(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents) {
int *idS = NULL;
struct Counts counts;
struct Dist_data dist_data;
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
int i, j, init, end, total_sends;
MPI_Request *sends;
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*array = malloc(dist_data.tamBl * sizeof(char));
//(*array)[dist_data.tamBl] = '\0';
dist_data.intercomm = intercomm;
init = s_counts.idI;
end = s_counts.idE;
if(!is_intercomm && (s_counts.idI == myId || s_counts.idE == myId + 1)) {
perform_manual_communication(send, recv, myId, s_counts, r_counts);
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
mallocCounts(&counts, numP_parents);
if(s_counts.idI == myId) init = s_counts.idI+1;
else end = s_counts.idE-1;
}
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
total_sends = end - init;
j = 0;
if(total_sends > 0) {
sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
}
for(i=init; i<end; i++) {
sends[j] = MPI_REQUEST_NULL;
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], MPI_CHAR, i, 99, comm, &(sends[j]));
j++;
}
recv_sync_arrays(dist_data, *array, numP_parents, counts);
//printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
init = r_counts.idI;
end = r_counts.idE;
if(!is_intercomm) {
if(r_counts.idI == myId) init = r_counts.idI+1;
else if(r_counts.idE == myId + 1) end = r_counts.idE-1;
}
freeCounts(&counts);
free(idS);
for(i=init; i<end; i++) {
MPI_Recv(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, 99, comm, MPI_STATUS_IGNORE);
}
if(total_sends > 0) {
MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
}
}
/*
* Envia a los hijos un vector que es redistribuido a los procesos
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos.
* Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
* how data should be redistributed
*
* - send (IN): Array with the data to send. This value can be NULL for children.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - tamBl (IN): How many elements are stored in the parameter "send".
* - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
* - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
*
*/
void send_sync_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts) {
void sync_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) {
MPI_Win win;
MPI_Win_create(send, (MPI_Aint)tamBl, sizeof(char), MPI_INFO_NULL, comm, &win);
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
sync_rma_lockall(recv, r_counts, win);
break;
case MALL_RED_RMA_LOCK:
sync_rma_lock(recv, r_counts, win);
break;
}
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
/* COMUNICACION DE DATOS */
MPI_Alltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm);
MPI_Win_free(&win);
}
/*
* Recibe de los padres un vector que es redistribuido a los procesos
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos.
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
*
*/
void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts) {
void sync_rma_lock(char *recv, struct Counts r_counts, MPI_Win win) {
int i, target_displs;
target_displs = r_counts.first_target_displs;
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
MPI_Get(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win);
MPI_Win_unlock(i, win);
target_displs=0;
}
}
char aux;
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
// Ajustar los valores de recepcion
/*
if(idI == 0) {
set_counts(0, numP_parents, dist_data, counts.counts);
idI++;
/*
* Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
*
*/
void sync_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win) {
int i, target_displs;
target_displs = r_counts.first_target_displs;
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Get(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win);
target_displs=0;
}
for(i=idI; i<idE; i++) {
set_counts(i, numP_parents, dist_data, counts.counts);
counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
}*/
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
/* COMUNICACION DE DATOS */
MPI_Alltoallv(&aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm);
MPI_Win_unlock_all(win);
}
//================================================================================
//================================================================================
//========================ASYNCHRONOUS FUNCTIONS==================================
......@@ -149,187 +253,270 @@ void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents,
//================================================================================
/*
* Realiza un envio asincrono del vector array desde este grupo de procesos al grupo
* enlazado por el intercomunicador intercomm.
* Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions.
* In the redistribution is differenciated parent group from the children and the values each group indicates can be
* different.
*
* El objeto MPI_Request se devuelve con el manejador para comprobar si la comunicacion
* ha terminado.
* - send (IN): Array with the data to send. This data can not be null for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - comm (IN): Communicator to use to perform the redistribution.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer
* is null or not enough space has been reserved the pointer is allocated/reallocated.
* - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be
* modified to the expected value.
*
* El vector array no se modifica en esta funcion.
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always false...
*/
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
int i;
int *idS = NULL;
struct Counts counts;
int async_communication_start(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
int is_intercomm, aux_comm_used = 0;
struct Counts s_counts, r_counts;
struct Dist_data dist_data;
MPI_Comm aux_comm = MPI_COMM_NULL;
/* PREPARE COMMUNICATION */
MPI_Comm_test_inter(comm, &is_intercomm);
prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
check_requests(s_counts, r_counts, red_strategies, requests, request_qty);
/* PERFORM COMMUNICATION */
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
case MALL_RED_RMA_LOCK:
if(is_children_group) {
dist_data.tamBl = 0;
} else {
get_block_dist(qty, myId, numO, &dist_data);
}
if(is_intercomm) {
MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
aux_comm_used = 1;
} else { aux_comm = comm; }
async_rma(send, *recv, r_counts, dist_data.tamBl, aux_comm, red_method, *requests, win);
break;
case MALL_RED_POINT:
async_point2point(send, *recv, s_counts, r_counts, comm, *requests);
break;
case MALL_RED_BASELINE:
default:
MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm, &((*requests)[0]));
break;
}
get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
dist_data.intercomm = intercomm;
// Create arrays which contains info about how many elements will be send to each created process
mallocCounts(&counts, numP_child);
getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos
// MAL_USE_THREAD sigue el camino sincrono
if(parents_wait == MAL_USE_NORMAL) {
//*comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
*comm_req[0] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, numP_child, counts, &(*comm_req[0]));
} else if (parents_wait == MAL_USE_IBARRIER){
//*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
*comm_req[0] = MPI_REQUEST_NULL;
*comm_req[1] = MPI_REQUEST_NULL;
send_async_arrays(dist_data, array, numP_child, counts, &((*comm_req)[1]));
MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
} else if (parents_wait == MAL_USE_POINT){
//*comm_req = (MPI_Request *) malloc(numP_child * sizeof(MPI_Request));
for(i=0; i<numP_child; i++){
(*comm_req)[i] = MPI_REQUEST_NULL;
/* POST REQUESTS CHECKS */
if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
if(!is_children_group && (is_intercomm || myId >= numO)) {
MPI_Ibarrier(comm, &((*requests)[*request_qty-1]) ); //FIXME Not easy to read...
}
send_async_point_arrays(dist_data, array, numP_child, counts, *comm_req);
} else if (parents_wait == MAL_USE_THREAD) { //TODO
}
freeCounts(&counts);
free(idS);
if(aux_comm_used) {
MPI_Comm_free(&aux_comm);
}
return 1;
freeCounts(&s_counts);
freeCounts(&r_counts);
return 0; //FIXME In this case is always false...
}
/*
* Realiza una recepcion asincrona del vector array a este grupo de procesos desde el grupo
* enlazado por el intercomunicador intercomm.
* Checks if a set of requests have been completed (1) or not (0).
*
* El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
* Tiene que ser liberado posteriormente por el usuario.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - red_strategies (IN):
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
*
* El argumento "parents_wait" sirve para indicar si se usará la versión en la los padres
* espera a que terminen de enviar, o en la que esperan a que los hijos acaben de recibir.
* returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
*/
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait) {
int *idS = NULL;
int wait_err, i;
struct Counts counts;
struct Dist_data dist_data;
MPI_Request *comm_req, aux;
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*array = malloc( dist_data.tamBl * sizeof(char));
dist_data.intercomm = intercomm;
/* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
mallocCounts(&counts, numP_parents);
getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos
// MAL_USE_THREAD sigue el camino sincrono
if(parents_wait == MAL_USE_POINT) {
comm_req = (MPI_Request *) malloc(numP_parents * sizeof(MPI_Request));
for(i=0; i<numP_parents; i++){
comm_req[i] = MPI_REQUEST_NULL;
int async_communication_check(int myId, int is_children_group, int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
int completed, req_completed, all_req_null, test_err, aux_condition;
size_t i;
completed = 1;
all_req_null = 1;
test_err = MPI_SUCCESS;
if (is_children_group) return 1;
if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
// The Ibarrier should only be posted at this point if the process
// has other requests which has not confirmed as completed yet,
// but are confirmed now.
if (requests[request_qty-1] == MPI_REQUEST_NULL) {
for(i=0; i<request_qty; i++) {
aux_condition = requests[i] == MPI_REQUEST_NULL;
all_req_null = all_req_null && aux_condition;
test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
completed = completed && req_completed;
}
recv_async_point_arrays(dist_data, *array, numP_parents, counts, comm_req);
wait_err = MPI_Waitall(numP_parents, comm_req, MPI_STATUSES_IGNORE);
} else if (parents_wait == MAL_USE_NORMAL || parents_wait == MAL_USE_IBARRIER) {
comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
*comm_req = MPI_REQUEST_NULL;
recv_async_arrays(dist_data, *array, numP_parents, counts, comm_req);
wait_err = MPI_Wait(comm_req, MPI_STATUS_IGNORE);
} else if (parents_wait == MAL_USE_THREAD) { //TODO
if(completed && !all_req_null) { MPI_Ibarrier(comm, &(requests[request_qty-1])); }
}
test_err = MPI_Test(&(requests[request_qty-1]), &completed, MPI_STATUS_IGNORE);
if(wait_err != MPI_SUCCESS) {
MPI_Abort(MPI_COMM_WORLD, wait_err);
} else {
for(i=0; i<request_qty; i++) {
test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
completed = completed && req_completed;
}
// test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
}
if(parents_wait == MAL_USE_IBARRIER) { //MAL USE IBARRIER END
MPI_Ibarrier(intercomm, &aux);
MPI_Wait(&aux, MPI_STATUS_IGNORE); //Es necesario comprobar que la comunicación ha terminado para desconectar los grupos de procesos
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
}
//printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
freeCounts(&counts);
free(idS);
free(comm_req);
return completed;
}
/*
* Envia a los hijos un vector que es redistribuido a los procesos
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos.
* Waits until the completion of a set of requests. If the Ibarrier strategy
* is being used, the corresponding ibarrier is posted.
*
* El envio se realiza a partir de una comunicación colectiva.
* - red_strategies (IN):
* - comm (IN): Communicator to use to confirm finalizations of redistribution
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
*/
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
void async_communication_wait(int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE);
if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
MPI_Ibarrier(comm, &(requests[request_qty-1]) );
MPI_Wait(&(requests[request_qty-1]), MPI_STATUS_IGNORE); //TODO Is it really needed? It will be ensured later
}
}
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
/*
* Frees Requests/Windows associated to a particular redistribution.
* Should be called for each output result of calling "async_communication_start".
*
* - red_method (IN):
* - red_strategies (IN):
* - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
* - request_qty (IN): Quantity of requests in "requests".
* - win (IN): Window to free.
*/
void async_communication_end(int red_method, int red_strategies, MPI_Request *requests, size_t request_qty, MPI_Win *win) {
/* COMUNICACION DE DATOS */
MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); }
if(red_method == MALL_RED_RMA_LOCKALL || red_method == MALL_RED_RMA_LOCK) { MPI_Win_free(win); }
}
/*
* Envia a los hijos un vector que es redistribuido a los procesos
* hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
* del otro grupo se transmiten elementos.
* Performs a series of non-blocking point2point communications to redistribute an array in a block distribution.
* It should be called after calculating how data should be redistributed.
*
* - send (IN): Array with the data to send. This value can not be NULL for parents.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to
* receive data. If the process receives data and is NULL, the behaviour is undefined.
* - s_counts (IN): Struct which describes how many elements will send this process to each children and
* the displacements.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent
* and the displacements.
* - comm (IN): Communicator to use to perform the redistribution.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
* El envio se realiza a partir de varias comunicaciones punto a punto.
*/
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
int i;
// PREPARAR ENVIO DEL VECTOR
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
void async_point2point(char *send, char *recv, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
int i, j = 0;
for(i=0; i<numP_child; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
if(counts.counts[0] != 0) {
MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
for(i=s_counts.idI; i<s_counts.idE; i++) {
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], MPI_CHAR, i, 99, comm, &(requests[j]));
j++;
}
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Irecv(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, 99, comm, &(requests[j]));
j++;
}
//print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
}
/*
* Recibe de los padres un vector que es redistribuido a los procesos
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos.
* Performs asynchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
* how data should be redistributed.
*
* - send (IN): Array with the data to send. This value can be NULL for children.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - tamBl (IN): How many elements are stored in the parameter "send".
* - comm (IN): Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
* - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
* - window (OUT): Pointer to a window object used for the RMA operations.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
* La recepcion se realiza a partir de una comunicacion colectiva.
*/
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
char *aux = malloc(1);
// Ajustar los valores de recepcion
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
/* COMUNICACION DE DATOS */
MPI_Ialltoallv(aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm, comm_req);
free(aux);
void async_rma(char *send, char *recv, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win) {
MPI_Win_create(send, (MPI_Aint)tamBl, sizeof(char), MPI_INFO_NULL, comm, win);
switch(red_method) {
case MALL_RED_RMA_LOCKALL:
async_rma_lockall(recv, r_counts, *win, requests);
break;
case MALL_RED_RMA_LOCK:
async_rma_lock(recv, r_counts, *win, requests);
break;
}
}
/*
* Recibe de los padres un vector que es redistribuido a los procesos
* de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
* del otro grupo se transmiten elementos.
* Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
* La recepcion se realiza a partir de varias comunicaciones punto a punto.
*/
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
int i;
// Ajustar los valores de recepcion
prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
for(i=0; i<numP_parents; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
if(counts.counts[0] != 0) {
MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
void async_rma_lock(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
int i, target_displs, j = 0;
target_displs = r_counts.first_target_displs;
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
MPI_Rget(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win, &(requests[j]));
MPI_Win_unlock(i, win);
target_displs=0;
j++;
}
}
/*
* Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* If the process receives data and is NULL, the behaviour is undefined.
* - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
* displacements.
* - win (IN): Window to use to perform the redistribution.
* - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
*
*/
void async_rma_lockall(char *recv, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
int i, target_displs, j = 0;
target_displs = r_counts.first_target_displs;
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
for(i=r_counts.idI; i<r_counts.idE; i++) {
MPI_Rget(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, target_displs, r_counts.counts[i], MPI_CHAR, win, &(requests[j]));
target_displs=0;
j++;
}
//print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
MPI_Win_unlock_all(win);
}
/*
......@@ -341,40 +528,141 @@ void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_p
*/
/*
* Obtiene para un proceso de un grupo a que rango procesos de
* otro grupo tiene que enviar o recibir datos.
* Performs a communication to redistribute an array in a block distribution. For each process calculates
* how many elements sends/receives to other processes for the new group.
*
* - qty (IN): Sum of elements shared by all processes that will send data.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - numP (IN): Size of the local group. If it is a children group, this parameter must correspond to using
* "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
* - numO (IN): Amount of processes in the remote group. For the parents is the target quantity of processes after the
* resize, while for the children is the amount of parents.
* - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
* - is_intercomm (IN): Indicates wether the used communicator is a intercomunicator(TRUE) or intracommunicator(FALSE).
* - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
* process receives data and is NULL, the behaviour is undefined.
* - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
*
* Devuelve el primer identificador y el último (Excluido) con el que
* comunicarse.
*/
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
int idI, idE;
int tamOther = dist_data.qty / numP_other;
int remOther = dist_data.qty % numP_other;
// Indica el punto de corte del grupo de procesos externo que
// divide entre los procesos que tienen
// un tamaño tamOther + 1 y un tamaño tamOther
int middle = (tamOther + 1) * remOther;
// Calcular idI teniendo en cuenta si se comunica con un
// proceso con tamano tamOther o tamOther+1
if(middle > dist_data.ini) { // First subgroup (tamOther+1)
idI = dist_data.ini / (tamOther + 1);
} else { // Second subgroup (tamOther)
idI = ((dist_data.ini - middle) / tamOther) + remOther;
}
// Calcular idR teniendo en cuenta si se comunica con un
// proceso con tamano tamOther o tamOther+1
if(middle >= dist_data.fin) { // First subgroup (tamOther +1)
idE = dist_data.fin / (tamOther + 1);
idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
} else { // Second subgroup (tamOther)
idE = ((dist_data.fin - middle) / tamOther) + remOther;
idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
}
*idS = malloc(2 * sizeof(int));
(*idS)[0] = idI;
(*idS)[1] = idE;
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
int array_size = numO;
int offset_ids = 0;
struct Dist_data dist_data;
if(is_intercomm) {
//offset_ids = !is_children_group ? numP : 0; //FIXME Modify only if active?
} else {
array_size = numP > numO ? numP : numO;
}
mallocCounts(s_counts, array_size+offset_ids);
mallocCounts(r_counts, array_size+offset_ids);
if(is_children_group) {
prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, r_counts);
// Obtener distribución para este hijo
get_block_dist(qty, myId, numP, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Children C ");
} else {
//get_block_dist(qty, myId, numP, &dist_data);
prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, s_counts);
if(!is_intercomm && myId < numO) {
prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts);
// Obtener distribución para este hijo y reservar vector de recibo
get_block_dist(qty, myId, numO, &dist_data);
*recv = malloc(dist_data.tamBl * sizeof(char));
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Children P ");
}
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Parents ");
}
}
/*
* Ensures that the array of request of a process has an amount of elements equal to the amount of communication
* functions the process will perform. In case the array is not initialized or does not have enough space it is
* allocated/reallocated to the minimum amount of space needed.
*
* - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
* - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer
* is null or not enough space has been reserved the pointer is allocated/reallocated.
* - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
* functions to perform, it is modified to the minimum value.
*/
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_strategies, MPI_Request **requests, size_t *request_qty) {
size_t i, sum;
MPI_Request *aux;
sum = (size_t) s_counts.idE - s_counts.idI;
sum += (size_t) r_counts.idE - r_counts.idI;
if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
sum++;
}
if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests
if (*requests == NULL) {
*requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request));
} else { // Array exists, but is too small
aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request));
*requests = aux;
}
if (*requests == NULL) {
fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
for(i=0; i < sum; i++) {
(*requests)[i] = MPI_REQUEST_NULL;
}
*request_qty = sum;
}
/*
* Special case to perform a manual copy of data when a process has to send data to itself. Only used
* when the MPI communication is not able to hand this situation. An example is when using point to point
* communications and the process has to perform a Send and Recv to itself
* - send (IN): Array with the data to send. This value can not be NULL.
* - recv (OUT): Array where data will be written. This value can not be NULL.
* - myId (IN): Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
* - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
* - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
*/
void perform_manual_communication(char *send, char *recv, int myId, struct Counts s_counts, struct Counts r_counts) {
int i;
for(i=0; i<s_counts.counts[myId];i++) {
recv[i+r_counts.displs[myId]] = send[i+s_counts.displs[myId]];
}
}
/*
* Función para obtener si entre las estrategias elegidas, se utiliza
* la estrategia pasada como segundo argumento.
*
* Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso
* contrario.
*/
int malleability_red_contains_strat(int comm_strategies, int strategy, int *result) {
int value = comm_strategies % strategy ? 0 : 1;
if(result != NULL) *result = value;
return value;
}
/*
* Función para anyadir una estrategia a un conjunto.
*
* Devuelve en "result" 1(Verdadero) si se ha anyadido, 0(Falso) en caso
* contrario.
*/
int malleability_red_add_strat(int *comm_strategies, int strategy) {
if(malleability_red_contains_strat(*comm_strategies, strategy, NULL)) return 1;
*comm_strategies = *comm_strategies * strategy;
return 1;
}
......@@ -16,13 +16,18 @@
//#define MAL_USE_POINT 2
//#define MAL_USE_THREAD 3
int send_sync(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child);
void recv_sync(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents);
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
//int async_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait);
int async_communication_start(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_check(int myId, int is_children_group, int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_wait(int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_end(int red_method, int red_strategies, MPI_Request *requests, size_t request_qty, MPI_Win *win);
//int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies);
//void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies);
void malloc_comm_array(char **array, int qty, int myId, int numP);
int malleability_red_contains_strat(int comm_strategies, int strategy, int *result);
int malleability_red_add_strat(int *comm_strategies, int strategy);
#endif
......@@ -3,7 +3,7 @@
#include <mpi.h>
#include "block_distribution.h"
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int offset_ids, int *sendcounts);
void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
/*
......@@ -13,22 +13,43 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
*
* The struct should be freed with freeCounts
*/
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts) {
int i, *idS;
struct Dist_data dist_data;
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int offset_ids, struct Counts *counts) {
int i, *idS, first_id = 0;
struct Dist_data dist_data, dist_target;
if(counts == NULL) {
fprintf(stderr, "Counts is NULL for rank %d/%d ", myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
get_block_dist(n, myId, numP, &dist_data);
mallocCounts(counts, numP_other);
get_util_ids(dist_data, numP_other, &idS);
if(idS[0] == 0) {
set_interblock_counts(0, numP_other, dist_data, counts->counts);
idS[0]++;
counts->idI = idS[0] + offset_ids;
counts->idE = idS[1] + offset_ids;
get_block_dist(n, idS[0], numP_other, &dist_target); // RMA Specific operation -- uses idS[0], not idI
counts->first_target_displs = dist_data.ini - dist_target.ini; // RMA Specific operation
if(idS[0] == 0) { // Uses idS[0], not idI
set_interblock_counts(counts->idI, numP_other, dist_data, offset_ids, counts->counts);
first_id++;
}
for(i=idS[0]; i<idS[1]; i++) {
set_interblock_counts(i, numP_other, dist_data, counts->counts);
for(i=counts->idI + first_id; i<counts->idE; i++) {
set_interblock_counts(i, numP_other, dist_data, offset_ids, counts->counts);
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
}
free(idS);
for(i=0; i<numP_other; i++) {
if(counts->counts[i] < 0) {
fprintf(stderr, "Counts value [i=%d/%d] is negative for rank %d/%d ", i, numP_other, myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
if(counts->displs[i] < 0) {
fprintf(stderr, "Displs value [i=%d/%d] is negative for rank %d/%d ", i, numP_other, myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
}
}
/*
......@@ -83,12 +104,8 @@ void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
dist_data->fin = (id+1) * dist_data->tamBl + rem;
}
if(dist_data->fin > qty) {
dist_data->fin = qty;
}
if(dist_data->ini > dist_data->fin) {
dist_data->ini = dist_data->fin;
}
if(dist_data->fin > qty) { dist_data->fin = qty; }
if(dist_data->ini > dist_data->fin) { dist_data->ini = dist_data->fin; }
dist_data->tamBl = dist_data->fin - dist_data->ini;
}
......@@ -98,11 +115,11 @@ void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
* Obtiene para el Id de un proceso dado, cuantos elementos
* enviara o recibira desde el proceso indicado en Dist_data.
*/
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts) {
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int offset_ids, int *sendcounts) {
struct Dist_data other;
int biggest_ini, smallest_end;
get_block_dist(data_dist.qty, id, numP, &other);
get_block_dist(data_dist.qty, id - offset_ids, numP, &other);
// Si el rango de valores no coincide, se pasa al siguiente proceso
if(data_dist.ini >= other.fin || data_dist.fin <= other.ini) {
......@@ -110,18 +127,10 @@ void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *se
}
// Obtiene el proceso con mayor ini entre los dos procesos
if(data_dist.ini > other.ini) {
biggest_ini = data_dist.ini;
} else {
biggest_ini = other.ini;
}
biggest_ini = (data_dist.ini > other.ini) ? data_dist.ini : other.ini;
// Obtiene el proceso con menor fin entre los dos procesos
if(data_dist.fin < other.fin) {
smallest_end = data_dist.fin;
} else {
smallest_end = other.fin;
}
smallest_end = (data_dist.fin < other.fin) ? data_dist.fin : other.fin;
sendcounts[id] = smallest_end - biggest_ini; // Numero de elementos a enviar/recibir del proceso Id
}
......@@ -184,18 +193,19 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS) {
* El vector displs indica los desplazamientos necesarios para cada comunicacion
* con el proceso "i" del otro grupo.
*
* El vector zero_arr se utiliza cuando se quiere indicar un vector incializado
* a 0 en todos sus elementos. Sirve para indicar que no hay comunicacion.
*/
void mallocCounts(struct Counts *counts, size_t numP) {
counts->counts = calloc(numP, sizeof(int));
if(counts->counts == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->displs = calloc(numP, sizeof(int));
if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->zero_arr = calloc(numP, sizeof(int));
if(counts->zero_arr == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->len = numP;
counts->idI = -1;
counts->idE = -1;
counts->first_target_displs = -1;
}
......@@ -206,12 +216,18 @@ void mallocCounts(struct Counts *counts, size_t numP) {
* de forma dinamica.
*/
void freeCounts(struct Counts *counts) {
if(counts == NULL) {
return;
}
if(counts->counts != NULL) {
free(counts->counts);
free(counts->displs);
free(counts->zero_arr);
counts->counts = NULL;
}
if(counts->displs != NULL) {
free(counts->displs);
counts->displs = NULL;
counts->zero_arr = NULL;
}
}
/*
......
......@@ -18,12 +18,13 @@ struct Dist_data {
};
struct Counts {
int len, idI, idE;
int first_target_displs; // RMA. Indicates displacement for first target when performing a Get.
int *counts;
int *displs;
int *zero_arr;
};
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts);
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int offset_ids, struct Counts *counts);
void prepare_comm_allgatherv(int numP, int n, struct Counts *counts);
void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data);
......
......@@ -30,15 +30,14 @@ int thread_check();
void* thread_async_work();
void print_comms_state();
void malleability_comms_update(MPI_Comm comm);
typedef struct {
int spawn_method;
int spawn_dist;
int spawn_strategies;
//int spawn_is_single;
//int spawn_threaded;
int comm_type;
int comm_threaded;
int red_method;
int red_strategies;
int grp;
configuration *config_file;
......@@ -51,6 +50,7 @@ typedef struct { //FIXME numC_spawned no se esta usando
MPI_Comm comm, thread_comm;
MPI_Comm intercomm;
MPI_Comm user_comm;
int dup_user_comm;
char *name_exec, *nodelist;
int num_cpus, num_nodes, nodelist_len;
......@@ -86,6 +86,7 @@ int init_malleability(int myId, int numP, int root, MPI_Comm comm, char *name_ex
rep_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
dist_a_data = (malleability_data_t *) malloc(sizeof(malleability_data_t));
mall->dup_user_comm = 0;
MPI_Comm_dup(comm, &dup_comm);
MPI_Comm_dup(comm, &thread_comm);
MPI_Comm_set_name(dup_comm, "MPI_COMM_MALL");
......@@ -181,6 +182,7 @@ int malleability_checkpoint() {
break;
case MALL_NOT_STARTED:
// Comprobar si se tiene que realizar un redimensionado
mall_conf->results->malleability_time[mall_conf->grp] = MPI_Wtime();
//if(CHECK_RMS()) {return MALL_DENIED;}
state = spawn_step();
......@@ -208,7 +210,7 @@ int malleability_checkpoint() {
break;
case MALL_DIST_PENDING:
if(mall_conf->comm_type == MAL_USE_THREAD) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
state = thread_check();
} else {
state = check_redistribution();
......@@ -225,6 +227,7 @@ int malleability_checkpoint() {
if(!malleability_spawn_contains_strat(mall_conf->spawn_strategies, MALL_SPAWN_PTHREAD, NULL)) {
mall_conf->results->spawn_time[mall_conf->grp] = MPI_Wtime() - mall_conf->results->spawn_start;
malleability_checkpoint();
}
break;
......@@ -234,6 +237,7 @@ int malleability_checkpoint() {
break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
mall_conf->results->malleability_end = MPI_Wtime();
state = MALL_COMPLETED;
break;
}
......@@ -263,12 +267,17 @@ void get_benchmark_results(results_data **results) {
}
//-------------------------------------------------------------------------------------------------------------
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int comm_type, int comm_threaded) {
void set_malleability_configuration(int spawn_method, int spawn_strategies, int spawn_dist, int red_method, int red_strategies) {
mall_conf->spawn_method = spawn_method;
mall_conf->spawn_strategies = spawn_strategies;
mall_conf->spawn_dist = spawn_dist;
mall_conf->comm_type = comm_type;
mall_conf->comm_threaded = comm_threaded;
mall_conf->red_method = red_method;
mall_conf->red_strategies = red_strategies;
if(!malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL) &&
(mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL)) {
malleability_red_add_strat(&(mall_conf->red_strategies), MALL_RED_IBARRIER);
}
}
/*
......@@ -294,6 +303,12 @@ void set_children_number(int numC){
* TODO
*/
void get_malleability_user_comm(MPI_Comm *comm) {
if(mall->dup_user_comm) {
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm));
MPI_Comm_dup(mall->comm, &(mall->user_comm));
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
mall->dup_user_comm = 0;
}
*comm = mall->user_comm;
}
......@@ -304,28 +319,30 @@ void get_malleability_user_comm(MPI_Comm *comm) {
* Los datos variables se tienen que anyadir cuando quieran ser mandados, no antes
*
* Mas informacion en la funcion "add_data".
*
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_add_data(void *data, size_t total_qty, int type, int is_replicated, int is_constant) {
size_t total_reqs = 0;
if(is_constant) {
if(is_replicated) {
add_data(data, total_qty, type, 0, rep_s_data); //FIXME Numero magico
add_data(data, total_qty, type, total_reqs, rep_s_data);
} else {
add_data(data, total_qty, type, 0, dist_s_data); //FIXME Numero magico
add_data(data, total_qty, type, total_reqs, dist_s_data);
}
} else {
if(is_replicated) {
add_data(data, total_qty, type, 0, rep_a_data); //FIXME Numero magico || Un request?
add_data(data, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
} else {
size_t total_reqs = 0;
if(mall_conf->comm_type == MAL_USE_NORMAL) {
if(mall_conf->red_method == MALL_RED_BASELINE) {
total_reqs = 1;
} else if(mall_conf->comm_type == MAL_USE_IBARRIER) {
total_reqs = 2;
} else if(mall_conf->comm_type == MAL_USE_POINT) {
} else if(mall_conf->red_method == MALL_RED_POINT || mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL) {
total_reqs = mall->numC;
}
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
total_reqs++;
}
add_data(data, total_qty, type, total_reqs, dist_a_data);
}
......@@ -339,29 +356,31 @@ void malleability_add_data(void *data, size_t total_qty, int type, int is_replic
* Los datos variables se tienen que modificar cuando quieran ser mandados, no antes
*
* Mas informacion en la funcion "modify_data".
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_modify_data(void *data, size_t index, size_t total_qty, int type, int is_replicated, int is_constant) {
size_t total_reqs = 0;
if(is_constant) {
if(is_replicated) {
modify_data(data, index, total_qty, type, 0, rep_s_data); //FIXME Numero magico
modify_data(data, index, total_qty, type, total_reqs, rep_s_data);
} else {
modify_data(data, index, total_qty, type, 0, dist_s_data); //FIXME Numero magico
modify_data(data, index, total_qty, type, total_reqs, dist_s_data);
}
} else {
if(is_replicated) {
modify_data(data, index, total_qty, type, 0, rep_a_data); //FIXME Numero magico || UN request?
modify_data(data, index, total_qty, type, total_reqs, rep_a_data); //FIXME total_reqs==0 ???
} else {
size_t total_reqs = 0;
if(mall_conf->comm_type == MAL_USE_NORMAL) {
if(mall_conf->red_method == MALL_RED_BASELINE) {
total_reqs = 1;
} else if(mall_conf->comm_type == MAL_USE_IBARRIER) {
total_reqs = 2;
} else if(mall_conf->comm_type == MAL_USE_POINT) {
} else if(mall_conf->red_method == MALL_RED_POINT || mall_conf->red_method == MALL_RED_RMA_LOCK || mall_conf->red_method == MALL_RED_RMA_LOCKALL) {
total_reqs = mall->numC;
}
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_IBARRIER, NULL)) {
total_reqs++;
}
modify_data(data, index, total_qty, type, total_reqs, dist_a_data); //FIXME Numero magico
modify_data(data, index, total_qty, type, total_reqs, dist_a_data);
}
}
}
......@@ -369,6 +388,7 @@ void malleability_modify_data(void *data, size_t index, size_t total_qty, int ty
/*
* Devuelve el numero de entradas para la estructura de descripcion de
* datos elegida.
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_get_entries(size_t *entries, int is_replicated, int is_constant){
......@@ -393,8 +413,9 @@ void malleability_get_entries(size_t *entries, int is_replicated, int is_constan
* con la funcion "malleability_add_data()".
* Es tarea del usuario saber el tipo de esos datos.
* TODO Refactor a que sea automatico
* //FIXME Si es constante se debería ir a asincrono, no sincrono
*/
void malleability_get_data(void **data, int index, int is_replicated, int is_constant) {
void malleability_get_data(void **data, size_t index, int is_replicated, int is_constant) {
malleability_data_t *data_struct;
if(is_constant) {
......@@ -421,7 +442,6 @@ void malleability_get_data(void **data, int index, int is_replicated, int is_con
//======================================================||
//======================================================||
/*
* Funcion generalizada para enviar datos desde los hijos.
* La asincronizidad se refiere a si el hilo padre e hijo lo hacen
......@@ -429,17 +449,22 @@ void malleability_get_data(void **data, int index, int is_replicated, int is_con
*/
void send_data(int numP_children, malleability_data_t *data_struct, int is_asynchronous) {
size_t i;
char *aux;
char *aux_send, *aux_recv;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_async(aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children, data_struct->requests, mall_conf->comm_type);
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_recv = NULL;
async_communication_start(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall_conf->red_strategies,
mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
send_sync(aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_children);
aux_send = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
aux_recv = NULL;
sync_communication(aux_send, &aux_recv, data_struct->qty[i], mall->myId, mall->numP, numP_children, MALLEABILITY_NOT_CHILDREN, mall_conf->red_method, mall->intercomm);
if(aux_recv != NULL) data_struct->arrays[i] = (void *) aux_recv;
}
}
}
......@@ -451,18 +476,19 @@ void send_data(int numP_children, malleability_data_t *data_struct, int is_async
*/
void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynchronous) {
size_t i;
char *aux;
char *aux, aux_s;
if(is_asynchronous) {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_async(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_parents, mall_conf->comm_type);
async_communication_start(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall_conf->red_strategies,
mall->intercomm, &(data_struct->requests[i]), &(data_struct->request_qty[i]), &(data_struct->windows[i]));
data_struct->arrays[i] = (void *) aux;
}
} else {
for(i=0; i < data_struct->entries; i++) {
aux = (char *) data_struct->arrays[i]; //TODO Comprobar que realmente es un char
recv_sync(&aux, data_struct->qty[i], mall->myId, mall->numP, mall->intercomm, numP_parents);
sync_communication(&aux_s, &aux, data_struct->qty[i], mall->myId, mall->numP, numP_parents, MALLEABILITY_CHILDREN, mall_conf->red_method, mall->intercomm);
data_struct->arrays[i] = (void *) aux;
}
}
......@@ -473,7 +499,6 @@ void recv_data(int numP_parents, malleability_data_t *data_struct, int is_asynch
//=====================CHILDREN=========================||
//======================================================||
//======================================================||
/*
* Inicializacion de los datos de los hijos.
* En la misma se reciben datos de los padres: La configuracion
......@@ -487,10 +512,15 @@ void Children_init() {
malleability_connect_children(mall->myId, mall->numP, mall->root, mall->comm, &numP_parents, &root_parents, &(mall->intercomm));
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
// TODO A partir de este punto tener en cuenta si es BASELINE o MERGE
if(!is_intercomm) { // For intracommunicators, these processes will be added
MPI_Comm_rank(mall->intercomm, &(mall->myId));
MPI_Comm_size(mall->intercomm, &(mall->numP));
}
recv_config_file(mall->root, mall->intercomm, &(mall_conf->config_file));
comm_node_data(root_parents, MALLEABILITY_CHILDREN);
MPI_Bcast(&(mall_conf->red_method), 1, MPI_INT, root_parents, mall->intercomm);
MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, root_parents, mall->intercomm);
mall_conf->results = (results_data *) malloc(sizeof(results_data));
init_results_data(mall_conf->results, mall_conf->config_file->n_resizes, mall_conf->config_file->n_stages, RESULTS_INIT_DATA_QTY);
......@@ -498,19 +528,25 @@ void Children_init() {
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Recibir datos asincronos
if(mall_conf->comm_type == MAL_USE_NORMAL || mall_conf->comm_type == MAL_USE_IBARRIER || mall_conf->comm_type == MAL_USE_POINT) {
recv_data(numP_parents, dist_a_data, 1);
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
} else {
recv_data(numP_parents, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
} else if (mall_conf->comm_type == MAL_USE_THREAD) { //TODO Modificar uso para que tenga sentido comm_threaded
recv_data(numP_parents, dist_a_data, 0);
for(i=0; i<dist_a_data->entries; i++) {
async_communication_wait(mall_conf->red_strategies, mall->intercomm, dist_a_data->requests[i], dist_a_data->request_qty[i]);
}
for(i=0; i<dist_a_data->entries; i++) {
async_communication_end(mall_conf->red_method, mall_conf->red_strategies, dist_a_data->requests[i], dist_a_data->request_qty[i], &(dist_a_data->windows[i]));
}
}
mall_conf->results->async_end= MPI_Wtime(); // Obtener timestamp de cuando termina comm asincrona
}
comm_data_info(rep_s_data, dist_s_data, MALLEABILITY_CHILDREN, mall->myId, root_parents, mall->intercomm);
if(dist_s_data->entries || rep_s_data->entries) { // Recibir datos sincronos
recv_data(numP_parents, dist_s_data, 0);
recv_data(numP_parents, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
mall_conf->results->sync_end = MPI_Wtime(); // Obtener timestamp de cuando termina comm sincrona
// TODO Crear funcion especifica y anyadir para Asinc
......@@ -525,20 +561,15 @@ void Children_init() {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
}
}
mall_conf->results->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
// Guardar los resultados de esta transmision
comm_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm);
if(!is_intercomm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
malleability_comms_update(mall->intercomm);
}
MPI_Comm_disconnect(&(mall->intercomm));
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
}
//======================================================||
......@@ -597,11 +628,14 @@ int start_redistribution() {
send_config_file(mall_conf->config_file, rootBcast, mall->intercomm);
comm_node_data(rootBcast, MALLEABILITY_NOT_CHILDREN);
MPI_Bcast(&(mall_conf->red_method), 1, MPI_INT, rootBcast, mall->intercomm);
MPI_Bcast(&(mall_conf->red_strategies), 1, MPI_INT, rootBcast, mall->intercomm);
comm_data_info(rep_a_data, dist_a_data, MALLEABILITY_NOT_CHILDREN, mall->myId, mall->root, mall->intercomm);
if(dist_a_data->entries || rep_a_data->entries) { // Enviar datos asincronos
//FIXME No se envian los datos replicados (rep_a_data)
mall_conf->results->async_time[mall_conf->grp] = MPI_Wtime();
if(mall_conf->comm_type == MAL_USE_THREAD) {
if(malleability_red_contains_strat(mall_conf->red_strategies, MALL_RED_THREAD, NULL)) {
return thread_creation();
} else {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_ASYNCHRONOUS);
......@@ -613,7 +647,6 @@ int start_redistribution() {
/*
* @deprecated
* Comprueba si la redistribucion asincrona ha terminado.
* Si no ha terminado la funcion termina indicandolo, en caso contrario,
* se continua con la comunicacion sincrona, el envio de resultados y
......@@ -625,38 +658,34 @@ int start_redistribution() {
* terminada cuando los padres terminan de enviar.
* Si se utiliza el modo "MAL_USE_IBARRIER", se considera terminada cuando
* los hijos han terminado de recibir.
* //FIXME Modificar para que se tenga en cuenta rep_a_data
*/
int check_redistribution() {
int completed, all_completed, test_err;
int is_intercomm, completed, local_completed, all_completed;
size_t i, req_qty;
MPI_Request *req_completed;
//dist_a_data->requests[0][X] //FIXME Numero magico 0 -- Modificar para que sea un for?
if (mall_conf->comm_type == MAL_USE_POINT) {
test_err = MPI_Testall(mall->numC, dist_a_data->requests[0], &completed, MPI_STATUSES_IGNORE);
} else {
if(mall_conf->comm_type == MAL_USE_NORMAL) {
req_completed = &(dist_a_data->requests[0][0]);
} else if (mall_conf->comm_type == MAL_USE_IBARRIER) {
req_completed = &(dist_a_data->requests[0][1]);
}
test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
}
MPI_Win window;
local_completed = 1;
if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
printf("P%d aborting -- Test Async\n", mall->myId);
MPI_Abort(MPI_COMM_WORLD, test_err);
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
completed = async_communication_check(mall->myId, MALLEABILITY_NOT_CHILDREN, mall_conf->red_strategies, mall->intercomm, req_completed, req_qty);
local_completed = local_completed && completed;
}
MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
MPI_Allreduce(&local_completed, &all_completed, 1, MPI_INT, MPI_MIN, mall->comm);
if(!all_completed) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
if(mall_conf->comm_type == MAL_USE_IBARRIER) {
MPI_Wait(&(dist_a_data->requests[0][0]), MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
//Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
//ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
for(i=0; i<dist_a_data->entries; i++) {
req_completed = dist_a_data->requests[i];
req_qty = dist_a_data->request_qty[i];
window = dist_a_data->windows[i];
async_communication_end(mall_conf->red_method, mall_conf->red_strategies, req_completed, req_qty, &window);
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......@@ -673,14 +702,7 @@ int end_redistribution() {
size_t i;
int is_intercomm, rootBcast, local_state;
is_intercomm = 0;
if(mall->intercomm != MPI_COMM_NULL) {
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
} else {
// Si no tiene comunicador creado, se debe a que se ha pospuesto el Spawn
// y se trata del spawn Merge Shrink
mall->intercomm = mall->comm;
}
if(is_intercomm) {
rootBcast = mall->myId == mall->root ? MPI_ROOT : MPI_PROC_NULL;
} else {
......@@ -691,6 +713,7 @@ int end_redistribution() {
if(dist_s_data->entries || rep_s_data->entries) { // Enviar datos sincronos
mall_conf->results->sync_time[mall_conf->grp] = MPI_Wtime();
send_data(mall->numC, dist_s_data, MALLEABILITY_USE_SYNCHRONOUS);
if(!is_intercomm) mall_conf->results->sync_end = MPI_Wtime(); // Merge method only
// TODO Crear funcion especifica y anyadir para Asinc
// TODO Tener en cuenta el tipo
......@@ -710,25 +733,14 @@ int end_redistribution() {
local_state = MALL_DIST_COMPLETED;
if(!is_intercomm) { // Merge Spawn
if(mall->numP < mall->numC) { // Expand
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
malleability_comms_update(mall->intercomm);
} else { // Shrink || Merge Shrink requiere de mas tareas
local_state = MALL_SPAWN_ADAPT_PENDING;
}
}
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) {
MPI_Comm_disconnect(&(mall->intercomm));
MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
}
return local_state;
......@@ -742,21 +754,19 @@ int end_redistribution() {
int shrink_redistribution() {
double time_extra = MPI_Wtime();
//TODO REFACTOR -- Que solo la llamada de collect iters este fuera de los hilos
zombies_collect_suspended(mall->comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall_conf->config_file->n_stages);
//TODO Create new state before collecting zombies. Processes can perform tasks before that. Then call again Malleability to commit the change
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall_conf->config_file->n_stages);
if(mall->myId < mall->numC) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
mall->dup_user_comm = 1;
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
MPI_Comm_free(&(mall->intercomm));
......@@ -821,16 +831,19 @@ void def_nodeinfo_type(MPI_Datatype *node_type) {
//======================================================||
//======================================================||
int comm_state; //FIXME Usar un handler
/*
* Crea una hebra para ejecutar una comunicación en segundo plano.
*/
int thread_creation() {
comm_state = MALL_DIST_PENDING;
if(pthread_create(&(mall->async_thread), NULL, thread_async_work, NULL)) {
printf("Error al crear el hilo\n");
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
return MALL_DIST_PENDING;
return comm_state;
}
/*
......@@ -840,10 +853,10 @@ int thread_creation() {
* El estado de la comunicación es devuelto al finalizar la función.
*/
int thread_check() {
int all_completed = 0;
int all_completed = 0, is_intercomm;
// Comprueba que todos los hilos han terminado la distribucion (Mismo valor en commAsync)
MPI_Allreduce(&state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
MPI_Allreduce(&comm_state, &all_completed, 1, MPI_INT, MPI_MAX, mall->comm);
if(all_completed != MALL_DIST_COMPLETED) return MALL_DIST_PENDING; // Continue only if asynchronous send has ended
//FIXME No se tiene en cuenta el estado MALL_APP_ENDED
......@@ -852,6 +865,8 @@ int thread_check() {
MPI_Abort(MPI_COMM_WORLD, -1);
return -2;
}
MPI_Comm_test_inter(mall->intercomm, &is_intercomm);
if(!is_intercomm) mall_conf->results->async_end = MPI_Wtime(); // Merge method only
return end_redistribution();
}
......@@ -866,7 +881,7 @@ int thread_check() {
*/
void* thread_async_work() {
send_data(mall->numC, dist_a_data, MALLEABILITY_USE_SYNCHRONOUS);
state = MALL_DIST_COMPLETED;
comm_state = MALL_DIST_COMPLETED;
pthread_exit(NULL);
}
......@@ -889,3 +904,17 @@ void print_comms_state() {
}
free(test);
}
void malleability_comms_update(MPI_Comm comm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
MPI_Comm_dup(comm, &(mall->thread_comm));
MPI_Comm_dup(comm, &(mall->comm));
MPI_Comm_dup(comm, &(mall->user_comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment