Commit f1511cb4 authored by Iker Martín Álvarez's avatar Iker Martín Álvarez
Browse files

Merge branch 'RMA-Distributions' into 'dev'

RMA functionality and refactor of many of the codes

See merge request martini/malleability_benchmark!4
parents 2f81e29c 6633cd95
import sys
import glob
import numpy as np
import pandas as pd
from enum import Enum
class G_enum(Enum):
TOTAL_RESIZES = 0
TOTAL_GROUPS = 1
TOTAL_STAGES = 2
GRANULARITY = 3
SDR = 4
ADR = 5
DR = 6
RED_METHOD = 7
RED_STRATEGY = 8
SPAWN_METHOD = 9
SPAWN_STRATEGY = 10
GROUPS = 11
FACTOR_S = 12
DIST = 13
STAGE_TYPES = 14
STAGE_TIMES = 15
STAGE_BYTES = 16
ITERS = 17
ASYNCH_ITERS = 18
T_ITER = 19
T_STAGES = 20
T_SPAWN = 21
T_SPAWN_REAL = 22
T_SR = 23
T_AR = 24
T_MALLEABILITY = 25
T_TOTAL = 26
#Malleability specific
NP = 0
NC = 1
#Iteration specific
IS_DYNAMIC = 11
N_PARENTS = 17
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
# "Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
# "Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_Malleability", "T_total"] #27
columnsL = ["NP", "NC", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Is_Dynamic", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
"Stage_Bytes", "N_Parents", "Asynch_Iters", "T_iter", "T_stages"] #20
def copy_iteration(row, dataL_it, group, iteration, is_asynch):
basic_indexes = [G_enum.TOTAL_STAGES.value, G_enum.GRANULARITY.value, \
G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
basic_asynch = [G_enum.SDR.value, G_enum.ADR.value, G_enum.DR.value]
array_asynch_group = [G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, \
G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value, G_enum.DIST.value]
dataL_it[G_enum.FACTOR_S.value] = row[G_enum.FACTOR_S.value][group]
dataL_it[G_enum.NP.value] = row[G_enum.GROUPS.value][group]
dataL_it[G_enum.ASYNCH_ITERS.value] = is_asynch
dataL_it[G_enum.T_ITER.value] = row[G_enum.T_ITER.value][group][iteration]
dataL_it[G_enum.T_STAGES.value] = list(row[G_enum.T_STAGES.value][group][iteration])
dataL_it[G_enum.IS_DYNAMIC.value] = True if group > 0 else False
for index in basic_indexes:
dataL_it[index] = row[index]
for index in array_asynch_group:
dataL_it[index] = [None, -1]
dataL_it[index][0] = row[index][group]
dataL_it[G_enum.N_PARENTS.value] = -1
if group > 0:
dataL_it[G_enum.N_PARENTS.value] = row[G_enum.GROUPS.value][group-1]
if is_asynch:
dataL_it[G_enum.NC.value] = row[G_enum.GROUPS.value][group+1]
for index in basic_asynch:
dataL_it[index] = row[index]
for index in array_asynch_group:
dataL_it[index][1] = row[index][group+1]
for index in array_asynch_group: # Convert to tuple
dataL_it[index] = tuple(dataL_it[index])
#-----------------------------------------------
def write_iter_dataframe(dataL, name, i, first=False):
dfL = pd.DataFrame(dataL, columns=columnsL)
dfL.to_pickle(name + str(i) + '.pkl')
if first:
print(dfL)
#-----------------------------------------------
def create_iter_dataframe(dfG, name, max_it_L):
it = -1
file_i = 0
first = True
dataL = []
for row_index in range(len(dfG)):
row = dfG.iloc[row_index]
groups = row[G_enum.TOTAL_GROUPS.value]
for group in range(groups):
real_iterations = len(row[G_enum.T_ITER.value][group])
real_asynch = row[G_enum.ASYNCH_ITERS.value][group]
is_asynch = False
for iteration in range(real_iterations-real_asynch):
it += 1
dataL.append( [None] * len(columnsL) )
copy_iteration(row, dataL[it], group, iteration, is_asynch)
is_asynch = True
for iteration in range(real_iterations-real_asynch, real_iterations):
it += 1
dataL.append( [None] * len(columnsL) )
copy_iteration(row, dataL[it], group, iteration, is_asynch)
if it >= max_it_L-1: #Var "it" starts at -1, so one more must be extracted for precise cut
write_iter_dataframe(dataL, name, file_i, first)
dataL = []
file_i += 1
first = False
it = -1
if it != -1:
write_iter_dataframe(dataL, name, file_i)
#-----------------------------------------------
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 CreateIterDataframe.py input_file.pkl output_name [max_rows_per_file]")
exit(1)
input_name = sys.argv[1]
if len(sys.argv) > 2:
name = sys.argv[2]
else:
name = "dataL"
print("File names will be: " + name + ".pkl")
if len(sys.argv) > 3:
max_it_L = int(sys.argv[3])
else:
max_it_L = 100000
dfG = pd.read_pickle(input_name)
print(dfG)
create_iter_dataframe(dfG, name, max_it_L)
import sys
import glob
import numpy as np
import pandas as pd
from enum import Enum
class G_enum(Enum):
TOTAL_RESIZES = 0
TOTAL_GROUPS = 1
TOTAL_STAGES = 2
GRANULARITY = 3
SDR = 4
ADR = 5
DR = 6
RED_METHOD = 7
RED_STRATEGY = 8
SPAWN_METHOD = 9
SPAWN_STRATEGY = 10
GROUPS = 11
FACTOR_S = 12
DIST = 13
STAGE_TYPES = 14
STAGE_TIMES = 15
STAGE_BYTES = 16
ITERS = 17
ASYNCH_ITERS = 18
T_ITER = 19
T_STAGES = 20
T_SPAWN = 21
T_SPAWN_REAL = 22
T_SR = 23
T_AR = 24
T_MALLEABILITY = 25
T_TOTAL = 26
#Malleability specific
NP = 0
NC = 1
#Iteration specific
IS_DYNAMIC = 11
N_PARENTS = 17
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
# "Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
# "Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_Malleability", "T_total"] #27
columnsM = ["NP", "NC", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "FactorS", "Dist", "Stage_Type", "Stage_Time", \
"Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_Malleability"] #25
def copy_resize(row, dataM_it, resize):
basic_indexes = [G_enum.TOTAL_STAGES.value, G_enum.GRANULARITY.value, G_enum.SDR.value, \
G_enum.ADR.value, G_enum.DR.value]
basic_group = [G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
array_actual_group = [G_enum.FACTOR_S.value, G_enum.ITERS.value, G_enum.ASYNCH_ITERS.value, \
G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, \
G_enum.T_AR.value, G_enum.T_MALLEABILITY.value, G_enum.T_ITER.value, G_enum.T_STAGES.value]
array_next_group = [G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, \
G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value]
dataM_it[G_enum.NP.value] = row[G_enum.GROUPS.value][resize]
dataM_it[G_enum.NC.value] = row[G_enum.GROUPS.value][resize+1]
dataM_it[G_enum.DIST.value-1] = [None, None]
dataM_it[G_enum.DIST.value-1][0] = row[G_enum.DIST.value][resize]
dataM_it[G_enum.DIST.value-1][1] = row[G_enum.DIST.value][resize+1]
for index in basic_indexes:
dataM_it[index] = row[index]
for index in basic_group:
dataM_it[index-1] = row[index]
for index in array_actual_group:
dataM_it[index-1] = row[index][resize]
for index in array_next_group:
dataM_it[index] = row[index][resize+1]
#-----------------------------------------------
def create_resize_dataframe(dfG, dataM):
it = -1
for row_index in range(len(dfG)):
row = dfG.iloc[row_index]
resizes = row[G_enum.TOTAL_RESIZES.value]
for resize in range(resizes):
it += 1
dataM.append( [None] * len(columnsM) )
copy_resize(row, dataM[it], resize)
#-----------------------------------------------
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 CreateResizeDataframe.py input_file.pkl output_name")
exit(1)
input_name = sys.argv[1]
if len(sys.argv) > 2:
name = sys.argv[2]
else:
name = "dataM"
print("File name will be: " + name + ".pkl")
dfG = pd.read_pickle(input_name)
dataM = []
create_resize_dataframe(dfG, dataM)
dfM = pd.DataFrame(dataM, columns=columnsM)
dfM.to_pickle(name + '.pkl')
print(dfG)
print(dfM)
......@@ -12,34 +12,56 @@ class G_enum(Enum):
SDR = 4
ADR = 5
DR = 6
ASYNCH_REDISTRIBUTION_TYPE = 7
SPAWN_METHOD = 8
SPAWN_STRATEGY = 9
GROUPS = 10
FACTOR_S = 11
DIST = 12
STAGE_TYPES = 13
STAGE_TIMES = 14
STAGE_BYTES = 15
ITERS = 16
ASYNCH_ITERS = 17
T_ITER = 18
T_STAGES = 19
T_SPAWN = 20
T_SPAWN_REAL = 21
T_SR = 22
T_AR = 23
T_TOTAL = 24
columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Asynch_Redistribution_Type", \
"Spawn_Method", "Spawn_Strategy", "Groups", "Factor_S", "Dist", "Stage_Types", "Stage_Times", "Stage_Bytes", \
"Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #25
RED_METHOD = 7
RED_STRATEGY = 8
SPAWN_METHOD = 9
SPAWN_STRATEGY = 10
GROUPS = 11
FACTOR_S = 12
DIST = 13
STAGE_TYPES = 14
STAGE_TIMES = 15
STAGE_BYTES = 16
ITERS = 17
ASYNCH_ITERS = 18
T_ITER = 19
T_STAGES = 20
T_SPAWN = 21
T_SPAWN_REAL = 22
T_SR = 23
T_AR = 24
T_MALLEABILITY = 25
T_TOTAL = 26
#Malleability specific
NP = 0
NC = 1
#Iteration specific
IS_DYNAMIC = 11
N_PARENTS = 17
columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
"Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_Malleability", "T_total"] #27
#-----------------------------------------------
# Obtains the value of a given index in a splited line
# and returns it as a float values
def get_value(line, index):
return float(line[index].split('=')[1].split(',')[0])
# and returns it as a float values if possible, string otherwise
def get_value(line, index, separator=True):
if separator:
value = line[index].split('=')[1].split(',')[0]
else:
value = line[index]
try:
value = float(value)
if value.is_integer():
value = int(value)
except ValueError:
return value
return value
#-----------------------------------------------
# Obtains the general parameters of an execution and
# stores them for creating a global dataframe
def record_config_line(lineS, dataG_it):
......@@ -48,27 +70,25 @@ def record_config_line(lineS, dataG_it):
offset_line = 2
for i in range(len(ordered_indexes)):
value = get_value(lineS, i+offset_line)
if value.is_integer():
value = int(value)
index = ordered_indexes[i]
dataG_it[index] = value
dataG_it[G_enum.TOTAL_GROUPS.value] = dataG_it[G_enum.TOTAL_RESIZES.value]
dataG_it[G_enum.TOTAL_RESIZES.value] -=1 #FIXME Modificar en App sintetica
dataG_it[G_enum.TOTAL_GROUPS.value] = dataG_it[G_enum.TOTAL_RESIZES.value]+1
#FIXME Modificar cuando ADR ya no sea un porcentaje
dataG_it[G_enum.DR.value] = dataG_it[G_enum.SDR.value] + dataG_it[G_enum.ADR.value]
# Init lists for each column
array_groups = [G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, G_enum.ITERS.value, \
G_enum.ASYNCH_ITERS.value, G_enum.T_ITER.value, G_enum.T_STAGES.value]
array_resizes = [G_enum.ASYNCH_REDISTRIBUTION_TYPE.value, G_enum.SPAWN_METHOD.value, \
G_enum.SPAWN_STRATEGY.value, G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, \
G_enum.T_SR.value, G_enum.T_AR.value]
G_enum.ASYNCH_ITERS.value, G_enum.T_ITER.value, G_enum.T_STAGES.value, G_enum.RED_METHOD.value, \
G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value,]
array_resizes = [ G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value, G_enum.T_MALLEABILITY.value]
array_stages = [G_enum.STAGE_TYPES.value, \
G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
for index in array_groups:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_GROUPS.value]
for group in range(dataG_it[G_enum.TOTAL_GROUPS.value]):
dataG_it[G_enum.T_ITER.value][group] = []
for index in array_resizes:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_RESIZES.value]
......@@ -76,6 +96,7 @@ def record_config_line(lineS, dataG_it):
for index in array_stages:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_STAGES.value]
#-----------------------------------------------
# Obtains the parameters of a stage line
# and stores it in the dataframe
# Is needed to indicate in which stage is
......@@ -86,144 +107,194 @@ def record_stage_line(lineS, dataG_it, stage):
offset_lines = 2
for i in range(len(array_stages)):
value = get_value(lineS, i+offset_lines)
if value.is_integer():
value = int(value)
index = array_stage[i]
index = array_stages[i]
dataG_it[index][stage] = value
#-----------------------------------------------
# Obtains the parameters of a resize line
# and stores them in the dataframe
# Is needed to indicate to which group refers
# the resize line
def record_resize_line(lineS, dataG_it, group):
array_stages = [G_enum.ITERS.value, G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, \
G_enum.ASYNCH_REDISTRIBUTION_TYPE.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value]
def record_group_line(lineS, dataG_it, group):
array_groups = [G_enum.ITERS.value, G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, \
G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value]
offset_lines = 2
for i in range(len(array_stages)):
for i in range(len(array_groups)):
value = get_value(lineS, i+offset_lines)
if value.is_integer():
value = int(value)
index = array_stage[i]
index = array_groups[i]
dataG_it[index][group] = value
#-----------------------------------------------
def record_time_line(lineS, dataG_it):
T_names = ["T_spawn:", "T_spawn_real:", "T_SR:", "T_AR:", "T_total:"]
T_values = [G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value, G_enum.T_TOTAL.value]
T_names = ["T_spawn:", "T_spawn_real:", "T_SR:", "T_AR:", "T_Malleability:", "T_total:"]
T_values = [G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value, G_enum.T_MALLEABILITY.value, G_enum.T_TOTAL.value]
if not (lineS[0] in T_names): # Execute only if line represents a Time
return
index = T_names.index(linesS[0])
index = T_names.index(lineS[0])
index = T_values[index]
offset_lines = 1
for i in range(len(dataG_it[index])):
value = get_value(lineS, i+offset_lines)
dataG_it[index][i] = value
len_index = 1
if dataG_it[index] != None:
len_index = len(dataG_it[index])
for i in range(len_index):
dataG_it[index][i] = get_value(lineS, i+offset_lines, False)
else:
dataG_it[index] = get_value(lineS, offset_lines, False)
#-----------------------------------------------
def read_global_file(f, dataG, it):
resizes = 0
timer = 0
previousNP = 0
def record_multiple_times_line(lineS, dataG_it, group):
T_names = ["T_iter:", "T_stage"]
T_values = [G_enum.T_ITER.value, G_enum.T_STAGES.value]
if not (lineS[0] in T_names): # Execute only if line represents a Time
return
for line in f:
index = T_names.index(lineS[0])
index = T_values[index]
offset_lines = 1
if index == G_enum.T_STAGES.value:
offset_lines += 1
total_iters = len(lineS)-offset_lines
stage = int(lineS[1].split(":")[0])
if stage == 0:
dataG_it[index][group] = [None] * total_iters
for i in range(total_iters):
dataG_it[index][group][i] = [None] * dataG_it[G_enum.TOTAL_STAGES.value]
for i in range(total_iters):
dataG_it[index][group][i][stage] = get_value(lineS, i+offset_lines, False)
else:
total_iters = len(lineS)-offset_lines
for i in range(total_iters):
dataG_it[index][group].append(get_value(lineS, i+offset_lines, False))
#-----------------------------------------------
def read_local_file(f, dataG, it, runs_in_file):
offset = 0
real_it = 0
group = 0
for line in f:
lineS = line.split()
if len(lineS) > 0:
if lineS[0] == "Config": # CONFIG LINE
it += 1
dataG.append([None]*(25+1))
#dataG[it][-1] = None Indicates if local data has been recorded(1) or not(None)
record_config(lineS, dataG[it])
resize = 0
stage = 0
elif lineS[0] == "Stage":
record_stage_line(lineS, dataG[it], stage)
stage+=1
elif lineS[0] == "Resize":
record_resize_line(lineS, dataG[it], resize)
resize+=1
elif lineS[0] == "T_total:":
value = get_value(lineS, 1)
dataG[it][G_enum.T_TOTAL.value] = value
if lineS[0] == "Group": # GROUP number
offset += 1
real_it = it - (runs_in_file-offset)
group = int(lineS[1].split(":")[0])
elif lineS[0] == "Async_Iters:":
offset_line = 1
dataG[real_it][G_enum.ASYNCH_ITERS.value][group] = get_value(lineS, offset_line, False)
else:
record_time_line(lineS, dataG[it])
return it
record_multiple_times_line(lineS, dataG[real_it], group)
#-----------------------------------------------
def read_local_file(f, dataG, it):
resizes = 0
timer = 0
previousNP = 0
def read_global_file(f, dataG, it):
runs_in_file=0
for line in f:
lineS = line.split()
if len(lineS) > 0:
if lineS[0] == "Config": # CONFIG LINE
it += 1
record_config(lineS, dataG[it], dataM[it])
resize = 0
runs_in_file += 1
group = 0
stage = 0
dataG.append([None]*len(columnsG))
record_config_line(lineS, dataG[it])
elif lineS[0] == "Stage":
record_stage_line(lineS, dataG[it], stage)
stage+=1
elif lineS[0] == "Resize":
record_resize_line(lineS, dataG[it], resize)
resize+=1
elif lineS[0] == "T_total:":
value = get_value(lineS, 1)
dataG[it][G_enum.T_TOTAL.value] = value
elif lineS[0] == "Group":
record_group_line(lineS, dataG[it], group)
group+=1
else:
record_time_line(lineS, dataG[it])
return it
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Asynch_Redistribution_Type", \\
# "Spawn_Method", "Spawn_Strategy", "Groups", "Dist", "Stage_Types", "Stage_Times", "Stage_Bytes", \\
# "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #24
return it,runs_in_file
#-----------------------------------------------
#-----------------------------------------------
def convert_to_tuples(dfG):
array_list_items = [G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, G_enum.ITERS.value, \
G_enum.ASYNCH_ITERS.value, G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, \
G_enum.SPAWN_STRATEGY.value, G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, \
G_enum.T_AR.value, G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
#TODO Falta T_malleability?
array_multiple_list_items = [G_enum.T_ITER.value, G_enum.T_STAGES.value]
for item in array_list_items:
name = columnsG[item]
values = dfG[name].copy()
for index in range(len(values)):
values[index] = tuple(values[index])
dfG[name] = values
for item in array_multiple_list_items:
name = columnsG[item]
values = dfG[name].copy()
for i in range(len(values)):
for j in range(len(values[i])):
if(type(values[i][j][0]) == list):
for r in range(len(values[i][j])):
values[i][j][r] = tuple(values[i][j][r])
values[i][j] = tuple(values[i][j])
values[i] = tuple(values[i])
dfG[name] = values
#-----------------------------------------------
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 iterTimes.py resultsName directory csvOutName")
print("The files name is missing\nUsage: python3 MallTimes.py commonName directory OutName")
exit(1)
common_name = sys.argv[1]
if len(sys.argv) >= 3:
BaseDir = sys.argv[2]
print("Searching in directory: "+ BaseDir)
else:
BaseDir = sys.argv[2]
BaseDir = "./"
if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + "G.csv & " + sys.argv[3] + "M.csv")
name = sys.argv[3]
else:
name = "data"
print("File name will be: " + name + "G.pkl")
insideDir = "Run"
lista = glob.glob("./" + BaseDir + insideDir + "*/" + sys.argv[1]+ "*Global.o*")
lista += (glob.glob("./" + BaseDir + sys.argv[1]+ "*Global.o*")) # Se utiliza cuando solo hay un nivel de directorios
lista = glob.glob(BaseDir + insideDir + "*/" + common_name + "*_Global.out")
lista += (glob.glob(BaseDir + common_name + "*_Global.out")) # Se utiliza cuando solo hay un nivel de directorios
print("Number of files found: "+ str(len(lista)));
it = -1
dataG = []
dataM = []
columnsG = ["N", "%Async", "Groups", "NP", "NS", "Dist", "Matrix", "CommTam", "Cst", "Css", "Time", "Iters", "TE"] #13
columnsM = ["N", "%Async", "NP", "NS", "Dist", "Matrix", "CommTam", "Cst", "Css", "Time", "Iters", "TC", "TH", "TS", "TA"] #15
for elem in lista:
f = open(elem, "r")
it = read_file(f, dataG, dataM, it)
id_run = elem.split("_Global.out")[0].split(common_name)[1]
path_to_run = elem.split(common_name)[0]
lista_local = glob.glob(path_to_run + common_name + id_run + "_G*NP*.out")
it,runs_in_file = read_global_file(f, dataG, it)
f.close()
for elem_local in lista_local:
f_local = open(elem_local, "r")
read_local_file(f_local, dataG, it, runs_in_file)
f_local.close()
#print(data)
dfG = pd.DataFrame(dataG, columns=columnsG)
dfG.to_csv(name + 'G.csv')
convert_to_tuples(dfG)
print(dfG)
dfG.to_pickle(name + 'G.pkl')
dfM = pd.DataFrame(dataM, columns=columnsM)
#dfM = pd.DataFrame(dataM, columns=columnsM)
#Poner en TC el valor real y en TH el necesario para la app
cond = dfM.TH != 0
dfM.loc[cond, ['TC', 'TH']] = dfB.loc[cond, ['TH', 'TC']].values
dfM.to_csv(name + 'M.csv')
#cond = dfM.TH != 0
#dfM.loc[cond, ['TC', 'TH']] = dfM.loc[cond, ['TH', 'TC']].values
#dfM.to_csv(name + 'M.csv')
This diff is collapsed.
This diff is collapsed.
'''
Created on Oct 24, 2016
@author: David Llorens (dllorens@uji.es)
(c) Universitat Jaume I 2016
@license: GPL2
'''
from abc import ABCMeta, abstractmethod
infinity = float("infinity")
## Esquema para BT básico --------------------------------------------------------------------------
class PartialSolution(metaclass=ABCMeta):
@abstractmethod
def is_solution(self)-> "bool":
pass
@abstractmethod
def get_solution(self) -> "solution":
pass
@abstractmethod
def successors(self) -> "IEnumerable<PartialSolution>":
pass
class BacktrackingSolver(metaclass=ABCMeta):
@staticmethod
def solve(initial_ps : "PartialSolution") -> "IEnumerable<Solution>":
def bt(ps):
if ps.is_solution():
yield ps.get_solution()
else:
for new_ps in ps.successors():
yield from bt(new_ps)
yield from bt(initial_ps)
class BacktrackingSolverOld(metaclass=ABCMeta):
def solve(self, initial_ps : "PartialSolution") -> "IEnumerable<Solution>":
def bt(ps):
if ps.is_solution():
return [ps.get_solution()]
else:
solutions = []
for new_ps in ps.successors():
solutions.extend(bt(new_ps))
return solutions
return bt(initial_ps)
## Esquema para BT con control de visitados --------------------------------------------------------
class PartialSolutionWithVisitedControl(PartialSolution):
@abstractmethod
def state(self)-> "state":
# the returned object must be of an inmutable type
pass
class BacktrackingVCSolver(metaclass=ABCMeta):
@staticmethod
def solve(initial_ps : "PartialSolutionWithVisitedControl") -> "IEnumerable<Solution>":
def bt(ps):
seen.add(ps.state())
if ps.is_solution():
yield ps.get_solution()
else:
for new_ps in ps.successors():
state = new_ps.state()
if state not in seen:
yield from bt(new_ps)
seen = set()
yield from bt(initial_ps)
## Esquema para BT para optimización ----------------------------------------------------------------
class PartialSolutionWithOptimization(PartialSolutionWithVisitedControl):
@abstractmethod
def f(self)-> "int or double":
# result of applying the objective function to the partial solution
pass
class BacktrackingOptSolver(metaclass=ABCMeta):
@staticmethod
def solve(initial_ps : "PartialSolutionWithOptimization") -> "IEnumerable<Solution>":
def bt(ps):
nonlocal best_solution_found_score
ps_score = ps.f()
best_seen[ps.state()] = ps_score
if ps.is_solution() and ps_score < best_solution_found_score: #sólo muestra una solución si mejora la última mostrada
best_solution_found_score = ps_score
yield ps.get_solution()
else:
for new_ps in ps.successors():
state = new_ps.state()
if state not in best_seen or new_ps.f() < best_seen[state]:
yield from bt(new_ps)
best_seen = {}
best_solution_found_score = infinity
yield from bt(initial_ps)
import sys
import glob
import numpy as numpy
import pandas as pd
#-----------------------------------------------
def read_file(f, dataA, dataB, itA, itB):
compute_tam = 0
comm_tam = 0
sdr = 0
adr = 0
dist = 0
css = 0
cst = 0
time = 0
recording = False
it_line = 0
aux_itA = 0
aux_itB = 0
iters = 0
np = 0
np_par = 0
ns = 0
array = []
columnas = ['Titer','Ttype','Top']
#print(f)
for line in f:
lineS = line.split()
if len(lineS) > 1:
if recording and lineS[0].split(':')[0] in columnas: #Record data
aux_itA = 0
lineS.pop(0)
if it_line==0:
for observation in lineS:
dataA.append([None]*15)
dataA[itA+aux_itA][0] = sdr
dataA[itA+aux_itA][1] = adr
dataA[itA+aux_itA][2] = np
dataA[itA+aux_itA][3] = np_par
dataA[itA+aux_itA][4] = ns
dataA[itA+aux_itA][5] = dist
dataA[itA+aux_itA][6] = compute_tam
dataA[itA+aux_itA][7] = comm_tam
dataA[itA+aux_itA][8] = cst
dataA[itA+aux_itA][9] = css
dataA[itA+aux_itA][10] = time
dataA[itA+aux_itA][11] = iters
dataA[itA+aux_itA][12] = float(observation)
array.append(float(observation))
aux_itA+=1
elif it_line==1:
deleted = 0
for observation in lineS:
dataA[itA+aux_itA][13] = float(observation)
if float(observation) == 0:
array.pop(aux_itA - deleted)
deleted+=1
aux_itA+=1
else:
for observation in lineS:
dataA[itA+aux_itA][14] = float(observation)
aux_itA+=1
it_line += 1
if(it_line % 3 == 0): # Comprobar si se ha terminado de mirar esta ejecucion
recording = False
it_line = 0
itA = itA + aux_itA
if ns != 0: # Solo obtener datos de grupos con hijos
dataB.append([None]*14)
dataB[itB][0] = sdr
dataB[itB][1] = adr
dataB[itB][2] = np
dataB[itB][3] = np_par
dataB[itB][4] = ns
dataB[itB][5] = dist
dataB[itB][6] = compute_tam
dataB[itB][7] = comm_tam
dataB[itB][8] = cst
dataB[itB][9] = css
dataB[itB][10] = time
dataB[itB][11] = iters
dataB[itB][12] = tuple(array)
dataB[itB][13] = numpy.sum(array)
itB+=1
array = []
if lineS[0] == "Config:":
compute_tam = int(lineS[1].split('=')[1].split(',')[0])
comm_tam = int(lineS[2].split('=')[1].split(',')[0])
sdr = int(lineS[3].split('=')[1].split(',')[0])
adr = int(lineS[4].split('=')[1].split(',')[0])
css = int(lineS[6].split('=')[1].split(',')[0])
cst = int(lineS[7].split('=')[1].split(',')[0])
time = float(lineS[8].split('=')[1])
elif lineS[0] == "Config":
recording = True
iters = int(lineS[2].split('=')[1].split(',')[0])
dist = int(lineS[4].split('=')[1].split(',')[0])
np = int(lineS[5].split('=')[1].split(',')[0])
np_par = int(lineS[6].split('=')[1].split(',')[0])
ns = int(float(lineS[7].split('=')[1]))
return itA,itB
#-----------------------------------------------
#Config: matrix=1000, sdr=1000000000, adr=0, aib=0 time=2.000000
#Config Group: iters=100, factor=1.000000, phy=2, procs=2, parents=0, sons=4
#Ttype: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 iterTimes.py resultsName directory csvOutName")
exit(1)
if len(sys.argv) >= 3:
BaseDir = sys.argv[2]
print("Searching in directory: "+ BaseDir)
else: #FIXME
BaseDir = sys.argv[2]
if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + ".csv and "+ sys.argv[3] + "_Total.csv")
name = sys.argv[3]
else:
name = "data"
insideDir = "Run"
lista = glob.glob("./" + BaseDir + insideDir + "*/" + sys.argv[1]+ "*ID*.o*")
print("Number of files found: "+ str(len(lista)));
itA = itB = 0
dataA = []
dataB = [] #0 #1 #2 #3 #4 #5 #6 #7 #8 #9 #10 #11 #12 #13 #14
columnsA = ["N", "%Async", "NP", "N_par", "NS", "Dist", "Compute_tam", "Comm_tam", "Cst", "Css","Time", "Iters", "Ti", "Tt", "To"] #15
columnsB = ["N", "%Async", "NP", "N_par", "NS", "Dist", "Compute_tam", "Comm_tam", "Cst", "Css","Time", "Iters", "Ti", "Sum"] #14
for elem in lista:
f = open(elem, "r")
itA,itB = read_file(f, dataA, dataB, itA, itB)
f.close()
#print(data)
dfA = pd.DataFrame(dataA, columns=columnsA)
dfB = pd.DataFrame(dataB, columns=columnsB)
dfA['N'] += dfA['%Async']
dfA['%Async'] = (dfA['%Async'] / dfA['N']) * 100
dfA.to_csv(name + '.csv')
dfB['N'] += dfB['%Async']
dfB['%Async'] = (dfB['%Async'] / dfB['N']) * 100
dfB.to_csv(name + '_Total.csv')
......@@ -3,22 +3,19 @@ import glob
import numpy as numpy
import pandas as pd
if len(sys.argv) < 3:
print("The files name is missing\nUsage: python3 joinDf.py resultsName1.csv resultsName2.csv csvOutName")
print("The files name is missing\nUsage: python3 joinDf.py resultsName1.pkl resultsName2.pkl OutName")
exit(1)
if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + ".csv")
name = sys.argv[3]
else:
name = "dataJOINED"
df1 = pd.read_csv( sys.argv[1] )
df2 = pd.read_csv( sys.argv[2] )
print("File name will be: " + name + ".pkl")
df1 = pd.read_pickle( sys.argv[1] )
df2 = pd.read_pickle( sys.argv[2] )
frames = [df1, df2]
df3 = pd.concat(frames)
df3 = df3.drop(columns=df3.columns[0])
df3.to_csv(name + '.csv')
df3.to_pickle(name + '.pkl')
This diff is collapsed.
This diff is collapsed.
......@@ -42,9 +42,9 @@ static int handler(void* user, const char* section, const char* name,
} else if (MATCH("general", "Granularity")) {
pconfig->granularity = atoi(value);
} else if (MATCH("general", "SDR")) { // TODO Refactor a nombre manual
pconfig->sdr = atoi(value);
pconfig->sdr = strtoul(value, NULL, 10);
} else if (MATCH("general", "ADR")) { // TODO Refactor a nombre manual
pconfig->adr = atoi(value);
pconfig->adr = strtoul(value, NULL, 10);
} else if (MATCH("general", "Rigid")) {
pconfig->rigid_times = atoi(value);
......@@ -72,8 +72,10 @@ static int handler(void* user, const char* section, const char* name,
aux_value = MALL_DIST_SPREAD;
}
pconfig->groups[pconfig->actual_group].phy_dist = aux_value;
} else if (MATCH(resize_name, "Asynch_Redistribution_Type") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].at = atoi(value);
} else if (MATCH(resize_name, "Redistribution_Method") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].rm = atoi(value);
} else if (MATCH(resize_name, "Redistribution_Strategy") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].rs = atoi(value);
} else if (MATCH(resize_name, "Spawn_Method") && LAST(pconfig->actual_group, pconfig->n_groups)) {
pconfig->groups[pconfig->actual_group].sm = atoi(value);
} else if (MATCH(resize_name, "Spawn_Strategy") && LAST(pconfig->actual_group, pconfig->n_groups)) {
......
......@@ -41,14 +41,14 @@ void comm_results(results_data *results, int root, size_t resizes, MPI_Comm inte
* En concreto son tres escalares y dos vectores de tamaño "resizes"
*/
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type) {
int i, counts = 6;
int blocklengths[] = {1, 1, 1, 1, 1, 1};
int i, counts = 7;
int blocklengths[] = {1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_DOUBLE;
blocklengths[2] = blocklengths[3] = blocklengths[4] = blocklengths[5] = resizes;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = MPI_DOUBLE;
blocklengths[2] = blocklengths[3] = blocklengths[4] = blocklengths[5] = blocklengths[6] = resizes;
// Rellenar vector displs
MPI_Get_address(results, &dir);
......@@ -59,6 +59,7 @@ void def_results_type(results_data *results, int resizes, MPI_Datatype *results_
MPI_Get_address(results->async_time, &displs[3]);
MPI_Get_address(results->spawn_real_time, &displs[4]);
MPI_Get_address(results->spawn_time, &displs[5]);
MPI_Get_address(results->malleability_time, &displs[6]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -87,6 +88,7 @@ void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr)
} else {
results->async_time[grp-1] = 0;
}
results->malleability_time[grp-1] = results->malleability_end - results->malleability_time[grp-1];
}
/*
......@@ -100,6 +102,7 @@ void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr)
*/
void reset_results_index(results_data *results) {
results->iter_index = 0;
results->iters_async = 0;
}
//=============================================================== FIXME BORRAR?
......@@ -162,17 +165,18 @@ void compute_results_stages(results_data *results, int myId, int numP, int root,
int i;
if(myId == root) {
for(i=0; i<stages; i++) {
MPI_Reduce(MPI_IN_PLACE, results->stage_times[i], results->iter_index, MPI_DOUBLE, MPI_SUM, root, comm);
for(size_t j=0; j<results->iter_index; j++) {
MPI_Reduce(MPI_IN_PLACE, results->stage_times[i], results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
/* for(size_t j=0; j<results->iter_index; j++) {
results->stage_times[i][j] = results->stage_times[i][j] / numP;
}
}*/
}
}
else {
for(i=0; i<stages; i++) {
MPI_Reduce(results->stage_times[i], NULL, results->iter_index, MPI_DOUBLE, MPI_SUM, root, comm);
MPI_Reduce(results->stage_times[i], NULL, results->iter_index, MPI_DOUBLE, MPI_MAX, root, comm);
}
}
//MPI_Barrier(comm); //FIXME Esto debería de borrarse
}
//======================================================||
......@@ -189,12 +193,12 @@ void compute_results_stages(results_data *results, int myId, int numP, int root,
void print_iter_results(results_data results) {
size_t i;
printf("Async_Iters: %ld\n", results.iters_async);
printf("T_iter: ");
for(i=0; i< results.iter_index; i++) {
printf("%lf ", results.iters_time[i]);
}
printf("\nAsync_Iters: %ld\n", results.iters_async);
printf("\n");
}
/*
......@@ -240,6 +244,11 @@ void print_global_results(results_data results, size_t resizes) {
printf("%lf ", results.async_time[i]);
}
printf("\nT_Malleability: ");
for(i=0; i < resizes; i++) {
printf("%lf ", results.malleability_time[i]);
}
printf("\nT_total: %lf\n", results.exec_time);
}
......@@ -262,6 +271,7 @@ void init_results_data(results_data *results, size_t resizes, size_t stages, siz
results->spawn_real_time = calloc(resizes, sizeof(double));
results->sync_time = calloc(resizes, sizeof(double));
results->async_time = calloc(resizes, sizeof(double));
results->malleability_time = calloc(resizes, sizeof(double));
results->wasted_time = 0;
results->iters_size = iters_size + RESULTS_EXTRA_SIZE;
......@@ -280,20 +290,24 @@ void realloc_results_iters(results_data *results, size_t stages, size_t needed)
int error = 0;
double *time_aux;
size_t i;
if(results->iters_size >= needed) return;
time_aux = (double *) realloc(results->iters_time, needed * sizeof(double));
if(time_aux == NULL) error = 1;
for(i=0; i<stages; i++) { //TODO Comprobar que no da error el realloc
results->stage_times[i] = (double *) realloc(results->stage_times[i], needed * sizeof(double));
if(results->stage_times[i] == NULL) error = 1;
}
if(time_aux == NULL) error = 1;
if(error) {
fprintf(stderr, "Fatal error - No se ha podido realojar la memoria de resultados\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
results->iters_time = time_aux;
results->iters_size = needed;
}
/*
......@@ -318,6 +332,10 @@ void free_results_data(results_data *results, size_t stages) {
free(results->async_time);
results->async_time = NULL;
}
if(results->malleability_time != NULL) {
free(results->malleability_time);
results->malleability_time = NULL;
}
if(results->iters_time != NULL) {
free(results->iters_time);
......
......@@ -14,8 +14,9 @@ typedef struct {
// Spawn, Thread, Sync, Async and Exec time
double spawn_start, *spawn_time, *spawn_real_time;
double sync_start, sync_end, *sync_time;
double async_start, async_end, *async_time;
double sync_end, *sync_time;
double async_end, *async_time;
double malleability_end, *malleability_time;
double exec_start, exec_time;
double wasted_time; // Time spent recalculating iter stages
} results_data;
......
......@@ -12,6 +12,8 @@
#include "../malleability/malleabilityManager.h"
#include "../malleability/malleabilityStates.h"
#define DR_MAX_SIZE 1000000000
int work();
double iterate(int async_comm);
double iterate_relaxed(double *time, double *times_stages);
......@@ -37,6 +39,7 @@ int main(int argc, char *argv[]) {
int numP, myId, res;
int req;
int im_child;
size_t i;
int num_cpus, num_nodes;
char *nodelist = NULL;
......@@ -54,6 +57,8 @@ int main(int argc, char *argv[]) {
if(req != MPI_THREAD_MULTIPLE) {
printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
fflush(stdout);
MPI_Abort(MPI_COMM_WORLD, -50);
}
init_group_struct(argv, argc, myId, numP);
......@@ -66,10 +71,26 @@ int main(int argc, char *argv[]) {
set_benchmark_configuration(config_file);
set_benchmark_results(results);
if(config_file->n_groups > 1) {
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1);
malleability_add_data(&run_id, 1, MAL_INT, 1, 1);
malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1);
if(config_file->sdr) {
for(i=0; i<group->sync_data_groups; i++) {
malleability_add_data(group->sync_array[i], group->sync_qty[i], MAL_CHAR, 0, 1);
}
}
if(config_file->adr) {
for(i=0; i<group->async_data_groups; i++) {
malleability_add_data(group->async_array[i], group->async_qty[i], MAL_CHAR, 0, 0);
}
}
}
MPI_Barrier(comm);
results->exec_start = MPI_Wtime();
......@@ -82,6 +103,7 @@ int main(int argc, char *argv[]) {
// TODO Refactor - Que sea una unica funcion
// Obtiene las variables que van a utilizar los hijos
void *value = NULL;
size_t entries;
malleability_get_data(&value, 0, 1, 1);
group->grp = *((int *)value);
......@@ -91,7 +113,25 @@ int main(int argc, char *argv[]) {
malleability_get_data(&value, 2, 1, 1);
group->iter_start = *((int *)value);
if(config_file->sdr) {
malleability_get_entries(&entries, 0, 1);
group->sync_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 1);
group->sync_array[i] = (char *)value;
}
}
if(config_file->adr) {
malleability_get_entries(&entries, 0, 0);
group->async_array = (char **) malloc(entries * sizeof(char *));
for(i=0; i<entries; i++) {
malleability_get_data(&value, i, 0, 0);
group->async_array[i] = (char *)value;
}
}
group->grp = group->grp + 1;
realloc_results_iters(results, config_file->n_stages, config_file->groups[group->grp].iters);
}
//
......@@ -105,14 +145,15 @@ int main(int argc, char *argv[]) {
MPI_Comm_rank(comm, &(group->myId));
group->grp = group->grp + 1;
set_benchmark_grp(group->grp);
if(group->grp != 0) {
obtain_op_times(1); //Obtener los nuevos valores de tiempo para el computo
obtain_op_times(0); //Obtener los nuevos valores de tiempo para el computo
set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr);
}
if(config_file->n_groups != group->grp + 1) { //TODO Llevar a otra funcion
set_malleability_configuration(config_file->groups[group->grp+1].sm, config_file->groups[group->grp+1].ss,
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].at, -1);
config_file->groups[group->grp+1].phy_dist, config_file->groups[group->grp+1].rm, config_file->groups[group->grp+1].rs);
set_children_number(config_file->groups[group->grp+1].procs); // TODO TO BE DEPRECATED
if(group->grp != 0) {
......@@ -122,11 +163,11 @@ int main(int argc, char *argv[]) {
res = work();
if(res == MALL_ZOMBIE) break;
if(res==1) { // Se ha llegado al final de la aplicacion
MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK
MPI_Barrier(comm);
results->exec_time = MPI_Wtime() - results->exec_start - results->wasted_time;
}
print_local_results();
reset_results_index(results);
} while(config_file->n_groups > group->grp + 1 && config_file->groups[group->grp+1].sm == MALL_SPAWN_MERGE);
......@@ -180,8 +221,8 @@ int work() {
state = malleability_checkpoint();
iter = 0;
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE) {
if(iter < config_file->groups[group->grp+1].iters) {
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE || state == MALL_SPAWN_ADAPT_PENDING) {
if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
iterate(state);
iter++;
group->iter_start = iter;
......@@ -227,6 +268,7 @@ double iterate(int async_comm) {
results->iters_async += 1;
}
// TODO Pasar el resto de este código a results.c
if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados
realloc_results_iters(results, config_file->n_stages, results->iters_size + 100);
}
......@@ -235,6 +277,7 @@ double iterate(int async_comm) {
results->stage_times[i][results->iter_index] = times_stages_aux[i];
}
results->iter_index = results->iter_index + 1;
// TODO Pasar hasta aqui
free(times_stages_aux);
......@@ -395,6 +438,8 @@ void init_group_struct(char *argv[], int argc, int myId, int numP) {
* se comunican con los padres para inicializar sus datos.
*/
void init_application() {
int i, last_index;
if(group->argc < 2) {
printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n");
MPI_Abort(MPI_COMM_WORLD, -1);
......@@ -407,10 +452,29 @@ void init_application() {
results = malloc(sizeof(results_data));
init_results_data(results, config_file->n_resizes, config_file->n_stages, config_file->groups[group->grp].iters);
if(config_file->sdr) {
malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
group->sync_data_groups = config_file->sdr % DR_MAX_SIZE ? config_file->sdr/DR_MAX_SIZE+1 : config_file->sdr/DR_MAX_SIZE;
group->sync_qty = (int *) malloc(group->sync_data_groups * sizeof(int));
group->sync_array = (char **) malloc(group->sync_data_groups * sizeof(char *));
last_index = group->sync_data_groups-1;
for(i=0; i<last_index; i++) {
group->sync_qty[i] = DR_MAX_SIZE;
malloc_comm_array(&(group->sync_array[i]), group->sync_qty[i], group->myId, group->numP);
}
group->sync_qty[last_index] = config_file->sdr % DR_MAX_SIZE ? config_file->sdr % DR_MAX_SIZE : DR_MAX_SIZE;
malloc_comm_array(&(group->sync_array[last_index]), group->sync_qty[last_index], group->myId, group->numP);
}
if(config_file->adr) {
malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
group->async_data_groups = config_file->adr % DR_MAX_SIZE ? config_file->adr/DR_MAX_SIZE+1 : config_file->adr/DR_MAX_SIZE;
group->async_qty = (int *) malloc(group->async_data_groups * sizeof(int));
group->async_array = (char **) malloc(group->async_data_groups * sizeof(char *));
last_index = group->async_data_groups-1;
for(i=0; i<last_index; i++) {
group->async_qty[i] = DR_MAX_SIZE;
malloc_comm_array(&(group->async_array[i]), group->async_qty[i], group->myId, group->numP);
}
group->async_qty[last_index] = config_file->adr % DR_MAX_SIZE ? config_file->adr % DR_MAX_SIZE : DR_MAX_SIZE;
malloc_comm_array(&(group->async_array[last_index]), group->async_qty[last_index], group->myId, group->numP);
}
obtain_op_times(1);
......@@ -440,13 +504,29 @@ void obtain_op_times(int compute) {
* Libera toda la memoria asociada con la aplicacion
*/
void free_application_data() {
if(config_file->sdr) {
size_t i;
if(config_file->sdr && group->sync_array != NULL) {
for(i=0; i<group->sync_data_groups; i++) {
free(group->sync_array[i]);
group->sync_array[i] = NULL;
}
free(group->sync_qty);
group->sync_qty = NULL;
free(group->sync_array);
group->sync_array = NULL;
}
if(config_file->adr) {
if(config_file->adr && group->async_array != NULL) {
for(i=0; i<group->async_data_groups; i++) {
free(group->async_array[i]);
group->async_array[i] = NULL;
}
free(group->async_qty);
group->async_qty = NULL;
free(group->async_array);
group->async_array = NULL;
}
free_malleability();
free_results_data(results, config_file->n_stages);
......
......@@ -15,13 +15,14 @@ typedef struct {
unsigned int grp;
int iter_start;
int argc;
size_t sync_data_groups, async_data_groups;
int numS; // Cantidad de procesos hijos
MPI_Comm children, parents;
char *compute_comm_array, *compute_comm_recv;
char **argv;
char *sync_array, *async_array;
char **sync_array, **async_array;
int *sync_qty, *async_qty;
} group_data;
......@@ -48,7 +49,7 @@ typedef struct
typedef struct
{
int iters, procs;
int sm, ss, phy_dist, at;
int sm, ss, phy_dist, rm, rs;
float factor;
} group_config_t;
......@@ -57,7 +58,8 @@ typedef struct
size_t n_groups, n_resizes, n_stages; // n_groups==n_resizes+1
size_t actual_group, actual_stage;
int rigid_times;
int granularity, sdr, adr;
int granularity;
size_t sdr, adr;
MPI_Datatype config_type, group_type, iter_stage_type;
iter_stage_t *stages;
......
......@@ -71,7 +71,8 @@ void malloc_config_resizes(configuration *user_config) {
user_config->groups[i].sm = 0;
user_config->groups[i].ss = 1;
user_config->groups[i].phy_dist = 0;
user_config->groups[i].at = 0;
user_config->groups[i].rm = 0;
user_config->groups[i].rs = 1;
user_config->groups[i].factor = 1;
}
def_struct_groups(user_config);
......@@ -135,18 +136,14 @@ void free_config(configuration *user_config) {
}
}
//Liberar tipos derivados
if(user_config->config_type != MPI_DATATYPE_NULL) {
MPI_Type_free(&(user_config->config_type));
user_config->config_type = MPI_DATATYPE_NULL;
}
if(user_config->group_type != MPI_DATATYPE_NULL) {
MPI_Type_free(&(user_config->group_type));
user_config->group_type = MPI_DATATYPE_NULL;
}
if(user_config->iter_stage_type != MPI_DATATYPE_NULL) {
MPI_Type_free(&(user_config->iter_stage_type));
user_config->iter_stage_type = MPI_DATATYPE_NULL;
}
MPI_Type_free(&(user_config->config_type));
user_config->config_type = MPI_DATATYPE_NULL;
MPI_Type_free(&(user_config->group_type));
user_config->group_type = MPI_DATATYPE_NULL;
MPI_Type_free(&(user_config->iter_stage_type));
user_config->iter_stage_type = MPI_DATATYPE_NULL;
free(user_config->groups);
free(user_config->stages);
......@@ -162,17 +159,17 @@ void free_config(configuration *user_config) {
void print_config(configuration *user_config) {
if(user_config != NULL) {
size_t i;
printf("Config loaded: R=%zu, S=%zu, granularity=%d, SDR=%d, ADR=%d\n",
printf("Config loaded: R=%zu, S=%zu, granularity=%d, SDR=%zu, ADR=%zu\n",
user_config->n_resizes, user_config->n_stages, user_config->granularity, user_config->sdr, user_config->adr);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %zu: PT=%d, T_stage=%lf, bytes=%d, T_capped=%d\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].t_capped);
}
for(i=0; i<user_config->n_groups; i++) {
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, AT=%d, SM=%d, SS=%d\n",
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, RM=%d, RS=%d, SM=%d, SS=%d\n",
i, user_config->groups[i].iters, user_config->groups[i].procs, user_config->groups[i].factor,
user_config->groups[i].phy_dist, user_config->groups[i].at, user_config->groups[i].sm,
user_config->groups[i].ss);
user_config->groups[i].phy_dist, user_config->groups[i].rm, user_config->groups[i].rs,
user_config->groups[i].sm, user_config->groups[i].ss);
}
}
}
......@@ -194,16 +191,16 @@ void print_config_group(configuration *user_config, size_t grp) {
sons = user_config->groups[grp+1].procs;
}
printf("Config: granularity=%d, SDR=%d, ADR=%d\n",
printf("Config: granularity=%d, SDR=%zu, ADR=%zu\n",
user_config->granularity, user_config->sdr, user_config->adr);
for(i=0; i<user_config->n_stages; i++) {
printf("Stage %zu: PT=%d, T_stage=%lf, bytes=%d, T_capped=%d\n",
i, user_config->stages[i].pt, user_config->stages[i].t_stage, user_config->stages[i].real_bytes, user_config->stages[i].t_capped);
}
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, AT=%d, SM=%d, SS=%d, parents=%d, children=%d\n",
printf("Group %zu: Iters=%d, Procs=%d, Factors=%f, Dist=%d, RM=%d, RS=%d, SM=%d, SS=%d, parents=%d, children=%d\n",
grp, user_config->groups[grp].iters, user_config->groups[grp].procs, user_config->groups[grp].factor,
user_config->groups[grp].phy_dist, user_config->groups[grp].at, user_config->groups[grp].sm,
user_config->groups[grp].ss, parents, sons);
user_config->groups[grp].phy_dist, user_config->groups[grp].rm, user_config->groups[grp].rs,
user_config->groups[grp].sm, user_config->groups[grp].ss, parents, sons);
}
}
......@@ -270,17 +267,17 @@ void def_struct_config_file(configuration *config_file) {
MPI_Datatype types[counts];
// Rellenar vector types
types[0] = types[1] = MPI_UNSIGNED_LONG;
types[2] = types[3] = types[4] = types[5] = MPI_INT;
types[0] = types[1] = types[2] = types[3] = MPI_UNSIGNED_LONG;
types[4] = types[5] = MPI_INT;
// Rellenar vector displs
MPI_Get_address(config_file, &dir);
MPI_Get_address(&(config_file->n_groups), &displs[0]);
MPI_Get_address(&(config_file->n_stages), &displs[1]);
MPI_Get_address(&(config_file->granularity), &displs[2]);
MPI_Get_address(&(config_file->sdr), &displs[3]);
MPI_Get_address(&(config_file->adr), &displs[4]);
MPI_Get_address(&(config_file->sdr), &displs[2]);
MPI_Get_address(&(config_file->adr), &displs[3]);
MPI_Get_address(&(config_file->granularity), &displs[4]);
MPI_Get_address(&(config_file->rigid_times), &displs[5]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -295,15 +292,15 @@ void def_struct_config_file(configuration *config_file) {
* en una sola comunicacion.
*/
void def_struct_groups(configuration *config_file) {
int i, counts = 7;
int blocklengths[7] = {1, 1, 1, 1, 1, 1, 1};
int i, counts = 8;
int blocklengths[8] = {1, 1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir;
MPI_Datatype aux, types[counts];
group_config_t *groups = config_file->groups;
// Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_INT;
types[6] = MPI_FLOAT;
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = MPI_INT;
types[7] = MPI_FLOAT;
// Rellenar vector displs
MPI_Get_address(groups, &dir);
......@@ -313,8 +310,9 @@ void def_struct_groups(configuration *config_file) {
MPI_Get_address(&(groups->sm), &displs[2]);
MPI_Get_address(&(groups->ss), &displs[3]);
MPI_Get_address(&(groups->phy_dist), &displs[4]);
MPI_Get_address(&(groups->at), &displs[5]);
MPI_Get_address(&(groups->factor), &displs[6]);
MPI_Get_address(&(groups->rm), &displs[5]);
MPI_Get_address(&(groups->rs), &displs[6]);
MPI_Get_address(&(groups->factor), &displs[7]);
for(i=0;i<counts;i++) displs[i] -= dir;
......@@ -326,7 +324,7 @@ void def_struct_groups(configuration *config_file) {
// Tipo derivado para enviar N elementos de la estructura
MPI_Type_create_resized(aux, 0, sizeof(group_config_t), &(config_file->group_type));
MPI_Type_commit(&(config_file->group_type));
// MPI_Type_free(&aux); //FIXME It should be freed
MPI_Type_free(&aux);
}
}
......@@ -364,6 +362,6 @@ void def_struct_iter_stage(configuration *config_file) {
// Tipo derivado para enviar N elementos de la estructura
MPI_Type_create_resized(aux, 0, sizeof(iter_stage_t), &(config_file->iter_stage_type));
MPI_Type_commit(&(config_file->iter_stage_type));
// MPI_Type_free(&aux); //FIXME It should be freed
MPI_Type_free(&aux);
}
}
CC = gcc
MCC = mpicc
#C_FLAGS_ALL = -Wconversion -Wpedantic
C_FLAGS = -Wall -Wextra -Wshadow -Wfatal-errors -g
C_FLAGS = -Wall -Wextra -Wshadow -Wfatal-errors
LD_FLAGS = -lm -pthread
DEF =
......
This diff is collapsed.
......@@ -16,13 +16,18 @@
//#define MAL_USE_POINT 2
//#define MAL_USE_THREAD 3
int send_sync(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child);
void recv_sync(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents);
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int comm_type, MPI_Comm comm);
//int async_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty);
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait);
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait);
int async_communication_start(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win);
int async_communication_check(int myId, int is_children_group, int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_wait(int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty);
void async_communication_end(int red_method, int red_strategies, MPI_Request *requests, size_t request_qty, MPI_Win *win);
//int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int red_method, int red_strategies);
//void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int red_method, int red_strategies);
void malloc_comm_array(char **array, int qty, int myId, int numP);
int malleability_red_contains_strat(int comm_strategies, int strategy, int *result);
int malleability_red_add_strat(int *comm_strategies, int strategy);
#endif
......@@ -3,7 +3,7 @@
#include <mpi.h>
#include "block_distribution.h"
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts);
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int offset_ids, int *sendcounts);
void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
/*
......@@ -13,22 +13,43 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS);
*
* The struct should be freed with freeCounts
*/
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts) {
int i, *idS;
struct Dist_data dist_data;
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int offset_ids, struct Counts *counts) {
int i, *idS, first_id = 0;
struct Dist_data dist_data, dist_target;
if(counts == NULL) {
fprintf(stderr, "Counts is NULL for rank %d/%d ", myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
get_block_dist(n, myId, numP, &dist_data);
mallocCounts(counts, numP_other);
get_util_ids(dist_data, numP_other, &idS);
if(idS[0] == 0) {
set_interblock_counts(0, numP_other, dist_data, counts->counts);
idS[0]++;
counts->idI = idS[0] + offset_ids;
counts->idE = idS[1] + offset_ids;
get_block_dist(n, idS[0], numP_other, &dist_target); // RMA Specific operation -- uses idS[0], not idI
counts->first_target_displs = dist_data.ini - dist_target.ini; // RMA Specific operation
if(idS[0] == 0) { // Uses idS[0], not idI
set_interblock_counts(counts->idI, numP_other, dist_data, offset_ids, counts->counts);
first_id++;
}
for(i=idS[0]; i<idS[1]; i++) {
set_interblock_counts(i, numP_other, dist_data, counts->counts);
for(i=counts->idI + first_id; i<counts->idE; i++) {
set_interblock_counts(i, numP_other, dist_data, offset_ids, counts->counts);
counts->displs[i] = counts->displs[i-1] + counts->counts[i-1];
}
free(idS);
for(i=0; i<numP_other; i++) {
if(counts->counts[i] < 0) {
fprintf(stderr, "Counts value [i=%d/%d] is negative for rank %d/%d ", i, numP_other, myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
if(counts->displs[i] < 0) {
fprintf(stderr, "Displs value [i=%d/%d] is negative for rank %d/%d ", i, numP_other, myId, numP);
MPI_Abort(MPI_COMM_WORLD, -3);
}
}
}
/*
......@@ -83,12 +104,8 @@ void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
dist_data->fin = (id+1) * dist_data->tamBl + rem;
}
if(dist_data->fin > qty) {
dist_data->fin = qty;
}
if(dist_data->ini > dist_data->fin) {
dist_data->ini = dist_data->fin;
}
if(dist_data->fin > qty) { dist_data->fin = qty; }
if(dist_data->ini > dist_data->fin) { dist_data->ini = dist_data->fin; }
dist_data->tamBl = dist_data->fin - dist_data->ini;
}
......@@ -98,11 +115,11 @@ void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data) {
* Obtiene para el Id de un proceso dado, cuantos elementos
* enviara o recibira desde el proceso indicado en Dist_data.
*/
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *sendcounts) {
void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int offset_ids, int *sendcounts) {
struct Dist_data other;
int biggest_ini, smallest_end;
get_block_dist(data_dist.qty, id, numP, &other);
get_block_dist(data_dist.qty, id - offset_ids, numP, &other);
// Si el rango de valores no coincide, se pasa al siguiente proceso
if(data_dist.ini >= other.fin || data_dist.fin <= other.ini) {
......@@ -110,18 +127,10 @@ void set_interblock_counts(int id, int numP, struct Dist_data data_dist, int *se
}
// Obtiene el proceso con mayor ini entre los dos procesos
if(data_dist.ini > other.ini) {
biggest_ini = data_dist.ini;
} else {
biggest_ini = other.ini;
}
biggest_ini = (data_dist.ini > other.ini) ? data_dist.ini : other.ini;
// Obtiene el proceso con menor fin entre los dos procesos
if(data_dist.fin < other.fin) {
smallest_end = data_dist.fin;
} else {
smallest_end = other.fin;
}
smallest_end = (data_dist.fin < other.fin) ? data_dist.fin : other.fin;
sendcounts[id] = smallest_end - biggest_ini; // Numero de elementos a enviar/recibir del proceso Id
}
......@@ -184,18 +193,19 @@ void get_util_ids(struct Dist_data dist_data, int numP_other, int **idS) {
* El vector displs indica los desplazamientos necesarios para cada comunicacion
* con el proceso "i" del otro grupo.
*
* El vector zero_arr se utiliza cuando se quiere indicar un vector incializado
* a 0 en todos sus elementos. Sirve para indicar que no hay comunicacion.
*/
void mallocCounts(struct Counts *counts, size_t numP) {
counts->counts = calloc(numP, sizeof(int));
if(counts->counts == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->displs = calloc(numP, sizeof(int));
if(counts->displs == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->zero_arr = calloc(numP, sizeof(int));
if(counts->zero_arr == NULL) { MPI_Abort(MPI_COMM_WORLD, -2);}
counts->len = numP;
counts->idI = -1;
counts->idE = -1;
counts->first_target_displs = -1;
}
......@@ -206,12 +216,18 @@ void mallocCounts(struct Counts *counts, size_t numP) {
* de forma dinamica.
*/
void freeCounts(struct Counts *counts) {
free(counts->counts);
free(counts->displs);
free(counts->zero_arr);
counts->counts = NULL;
counts->displs = NULL;
counts->zero_arr = NULL;
if(counts == NULL) {
return;
}
if(counts->counts != NULL) {
free(counts->counts);
counts->counts = NULL;
}
if(counts->displs != NULL) {
free(counts->displs);
counts->displs = NULL;
}
}
/*
......
......@@ -18,12 +18,13 @@ struct Dist_data {
};
struct Counts {
int len, idI, idE;
int first_target_displs; // RMA. Indicates displacement for first target when performing a Get.
int *counts;
int *displs;
int *zero_arr;
};
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, struct Counts *counts);
void prepare_comm_alltoall(int myId, int numP, int numP_other, int n, int offset_ids, struct Counts *counts);
void prepare_comm_allgatherv(int numP, int n, struct Counts *counts);
void get_block_dist(int qty, int id, int numP, struct Dist_data *dist_data);
......
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment