Commit 74d354ac authored by Iker Martín Álvarez's avatar Iker Martín Álvarez
Browse files

Merge branch 'RMA-Distributions' into 'Extrae-Executions'

Update Extrae-Executions with current code

See merge request martini/malleability_benchmark!3
parents 88a1e68f 412646ab
import sys
import glob
import numpy as np
import pandas as pd
from enum import Enum
class G_enum(Enum):
TOTAL_RESIZES = 0
TOTAL_GROUPS = 1
TOTAL_STAGES = 2
GRANULARITY = 3
SDR = 4
ADR = 5
DR = 6
RED_METHOD = 7
RED_STRATEGY = 8
SPAWN_METHOD = 9
SPAWN_STRATEGY = 10
GROUPS = 11
FACTOR_S = 12
DIST = 13
STAGE_TYPES = 14
STAGE_TIMES = 15
STAGE_BYTES = 16
ITERS = 17
ASYNCH_ITERS = 18
T_ITER = 19
T_STAGES = 20
T_SPAWN = 21
T_SPAWN_REAL = 22
T_SR = 23
T_AR = 24
T_TOTAL = 25
#Malleability specific
NP = 0
NC = 1
#Iteration specific
IS_DYNAMIC = 11
N_PARENTS = 17
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
# "Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
# "Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #26
#columnsM = ["NP", "NC", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
# "Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "FactorS", "Dist", "Stage_Type", "Stage_Time", \
# "Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR"] #24
columnsL = ["NP", "NC", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Is_Dynamic", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
"Stage_Bytes", "N_Parents", "Asynch_Iters", "T_iter", "T_stages"] #20
def copy_iteration(row, dataL_it, group, iteration, is_asynch):
basic_indexes = [G_enum.TOTAL_STAGES.value, G_enum.GRANULARITY.value, \
G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
basic_asynch = [G_enum.SDR.value, G_enum.ADR.value, G_enum.DR.value]
array_asynch_group = [G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, \
G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value, G_enum.DIST.value]
dataL_it[G_enum.FACTOR_S.value] = row[G_enum.FACTOR_S.value][group]
dataL_it[G_enum.NP.value] = row[G_enum.GROUPS.value][group]
dataL_it[G_enum.ASYNCH_ITERS.value] = is_asynch
dataL_it[G_enum.T_ITER.value] = row[G_enum.T_ITER.value][group][iteration]
dataL_it[G_enum.T_STAGES.value] = list(row[G_enum.T_STAGES.value][group][iteration])
dataL_it[G_enum.IS_DYNAMIC.value] = True if group > 0 else False
for index in basic_indexes:
dataL_it[index] = row[index]
for index in array_asynch_group:
dataL_it[index] = [None, -1]
dataL_it[index][0] = row[index][group]
dataL_it[G_enum.N_PARENTS.value] = -1
if group > 0:
dataL_it[G_enum.N_PARENTS.value] = row[G_enum.GROUPS.value][group-1]
if is_asynch:
dataL_it[G_enum.NC.value] = row[G_enum.GROUPS.value][group+1]
for index in basic_asynch:
dataL_it[index] = row[index]
for index in array_asynch_group:
dataL_it[index][1] = row[index][group+1]
for index in array_asynch_group: # Convert to tuple
dataL_it[index] = tuple(dataL_it[index])
#-----------------------------------------------
def create_iter_dataframe(dfG, dataL):
it = -1
for row_index in range(len(dfG)):
row = dfG.iloc[row_index]
groups = row[G_enum.TOTAL_GROUPS.value]
for group in range(groups):
real_iterations = len(row[G_enum.T_ITER.value][group])
real_asynch = row[G_enum.ASYNCH_ITERS.value][group]
is_asynch = False
for iteration in range(real_iterations-real_asynch):
it += 1
dataL.append( [None] * len(columnsL) )
copy_iteration(row, dataL[it], group, iteration, is_asynch)
is_asynch = True
for iteration in range(real_iterations-real_asynch, real_iterations):
it += 1
dataL.append( [None] * len(columnsL) )
copy_iteration(row, dataL[it], group, iteration, is_asynch)
#-----------------------------------------------
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 CreateIterDataframe.py input_file.pkl output_name")
exit(1)
input_name = sys.argv[1]
if len(sys.argv) > 2:
name = sys.argv[2]
else:
name = "dataL"
print("File name will be: " + name + ".pkl")
dfG = pd.read_pickle(input_name)
dataL = []
create_iter_dataframe(dfG, dataL)
dfL = pd.DataFrame(dataL, columns=columnsL)
dfL.to_pickle(name + '.pkl')
#dfL.to_excel(name + '.xlsx')
print(dfG)
print(dfL)
import sys
import glob
import numpy as np
import pandas as pd
from enum import Enum
class G_enum(Enum):
TOTAL_RESIZES = 0
TOTAL_GROUPS = 1
TOTAL_STAGES = 2
GRANULARITY = 3
SDR = 4
ADR = 5
DR = 6
RED_METHOD = 7
RED_STRATEGY = 8
SPAWN_METHOD = 9
SPAWN_STRATEGY = 10
GROUPS = 11
FACTOR_S = 12
DIST = 13
STAGE_TYPES = 14
STAGE_TIMES = 15
STAGE_BYTES = 16
ITERS = 17
ASYNCH_ITERS = 18
T_ITER = 19
T_STAGES = 20
T_SPAWN = 21
T_SPAWN_REAL = 22
T_SR = 23
T_AR = 24
T_TOTAL = 25
#Malleability specific
NP = 0
NC = 1
#Iteration specific
IS_DYNAMIC = 11
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
# "Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
# "Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #26
columnsM = ["NP", "NC", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "FactorS", "Dist", "Stage_Type", "Stage_Time", \
"Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR"] #24
def copy_resize(row, dataM_it, resize):
basic_indexes = [G_enum.TOTAL_STAGES.value, G_enum.GRANULARITY.value, G_enum.SDR.value, \
G_enum.ADR.value, G_enum.DR.value]
basic_group = [G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
array_actual_group = [G_enum.FACTOR_S.value, G_enum.ITERS.value, G_enum.ASYNCH_ITERS.value, \
G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, \
G_enum.T_AR.value, G_enum.T_ITER.value, G_enum.T_STAGES.value]
array_next_group = [G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, \
G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value]
dataM_it[G_enum.NP.value] = row[G_enum.GROUPS.value][resize]
dataM_it[G_enum.NC.value] = row[G_enum.GROUPS.value][resize+1]
dataM_it[G_enum.DIST.value-1] = [None, None]
dataM_it[G_enum.DIST.value-1][0] = row[G_enum.DIST.value][resize]
dataM_it[G_enum.DIST.value-1][1] = row[G_enum.DIST.value][resize+1]
for index in basic_indexes:
dataM_it[index] = row[index]
for index in basic_group:
dataM_it[index-1] = row[index]
for index in array_actual_group:
dataM_it[index-1] = row[index][resize]
for index in array_next_group:
dataM_it[index] = row[index][resize+1]
#-----------------------------------------------
def create_resize_dataframe(dfG, dataM):
it = -1
for row_index in range(len(dfG)):
row = dfG.iloc[row_index]
resizes = row[G_enum.TOTAL_RESIZES.value]
for resize in range(resizes):
it += 1
dataM.append( [None] * len(columnsM) )
copy_resize(row, dataM[it], resize)
#-----------------------------------------------
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 CreateResizeDataframe.py input_file.pkl output_name")
exit(1)
input_name = sys.argv[1]
if len(sys.argv) > 2:
name = sys.argv[2]
else:
name = "dataM"
print("File name will be: " + name + ".pkl")
dfG = pd.read_pickle(input_name)
dataM = []
create_resize_dataframe(dfG, dataM)
dfM = pd.DataFrame(dataM, columns=columnsM)
dfM.to_pickle(name + '.pkl')
print(dfG)
print(dfM)
...@@ -12,34 +12,54 @@ class G_enum(Enum): ...@@ -12,34 +12,54 @@ class G_enum(Enum):
SDR = 4 SDR = 4
ADR = 5 ADR = 5
DR = 6 DR = 6
ASYNCH_REDISTRIBUTION_TYPE = 7 RED_METHOD = 7
SPAWN_METHOD = 8 RED_STRATEGY = 8
SPAWN_STRATEGY = 9 SPAWN_METHOD = 9
GROUPS = 10 SPAWN_STRATEGY = 10
FACTOR_S = 11 GROUPS = 11
DIST = 12 FACTOR_S = 12
STAGE_TYPES = 13 DIST = 13
STAGE_TIMES = 14 STAGE_TYPES = 14
STAGE_BYTES = 15 STAGE_TIMES = 15
ITERS = 16 STAGE_BYTES = 16
ASYNCH_ITERS = 17 ITERS = 17
T_ITER = 18 ASYNCH_ITERS = 18
T_STAGES = 19 T_ITER = 19
T_SPAWN = 20 T_STAGES = 20
T_SPAWN_REAL = 21 T_SPAWN = 21
T_SR = 22 T_SPAWN_REAL = 22
T_AR = 23 T_SR = 23
T_TOTAL = 24 T_AR = 24
T_TOTAL = 25
columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Asynch_Redistribution_Type", \ #Malleability specific
"Spawn_Method", "Spawn_Strategy", "Groups", "Factor_S", "Dist", "Stage_Types", "Stage_Times", "Stage_Bytes", \ NP = 0
"Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #25 NC = 1
#Iteration specific
IS_DYNAMIC = 11
columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Redistribution_Method", \
"Redistribution_Strategy", "Spawn_Method", "Spawn_Strategy", "Groups", "FactorS", "Dist", "Stage_Types", "Stage_Times", \
"Stage_Bytes", "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #26
#-----------------------------------------------
# Obtains the value of a given index in a splited line # Obtains the value of a given index in a splited line
# and returns it as a float values # and returns it as a float values if possible, string otherwise
def get_value(line, index): def get_value(line, index, separator=True):
return float(line[index].split('=')[1].split(',')[0]) if separator:
value = line[index].split('=')[1].split(',')[0]
else:
value = line[index]
try:
value = float(value)
if value.is_integer():
value = int(value)
except ValueError:
return value
return value
#-----------------------------------------------
# Obtains the general parameters of an execution and # Obtains the general parameters of an execution and
# stores them for creating a global dataframe # stores them for creating a global dataframe
def record_config_line(lineS, dataG_it): def record_config_line(lineS, dataG_it):
...@@ -48,27 +68,25 @@ def record_config_line(lineS, dataG_it): ...@@ -48,27 +68,25 @@ def record_config_line(lineS, dataG_it):
offset_line = 2 offset_line = 2
for i in range(len(ordered_indexes)): for i in range(len(ordered_indexes)):
value = get_value(lineS, i+offset_line) value = get_value(lineS, i+offset_line)
if value.is_integer():
value = int(value)
index = ordered_indexes[i] index = ordered_indexes[i]
dataG_it[index] = value dataG_it[index] = value
dataG_it[G_enum.TOTAL_GROUPS.value] = dataG_it[G_enum.TOTAL_RESIZES.value] dataG_it[G_enum.TOTAL_GROUPS.value] = dataG_it[G_enum.TOTAL_RESIZES.value]+1
dataG_it[G_enum.TOTAL_RESIZES.value] -=1 #FIXME Modificar en App sintetica
#FIXME Modificar cuando ADR ya no sea un porcentaje #FIXME Modificar cuando ADR ya no sea un porcentaje
dataG_it[G_enum.DR.value] = dataG_it[G_enum.SDR.value] + dataG_it[G_enum.ADR.value] dataG_it[G_enum.DR.value] = dataG_it[G_enum.SDR.value] + dataG_it[G_enum.ADR.value]
# Init lists for each column # Init lists for each column
array_groups = [G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, G_enum.ITERS.value, \ array_groups = [G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, G_enum.ITERS.value, \
G_enum.ASYNCH_ITERS.value, G_enum.T_ITER.value, G_enum.T_STAGES.value] G_enum.ASYNCH_ITERS.value, G_enum.T_ITER.value, G_enum.T_STAGES.value, G_enum.RED_METHOD.value, \
array_resizes = [G_enum.ASYNCH_REDISTRIBUTION_TYPE.value, G_enum.SPAWN_METHOD.value, \ G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value,]
G_enum.SPAWN_STRATEGY.value, G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, \ array_resizes = [ G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value]
G_enum.T_SR.value, G_enum.T_AR.value]
array_stages = [G_enum.STAGE_TYPES.value, \ array_stages = [G_enum.STAGE_TYPES.value, \
G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value] G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
for index in array_groups: for index in array_groups:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_GROUPS.value] dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_GROUPS.value]
for group in range(dataG_it[G_enum.TOTAL_GROUPS.value]):
dataG_it[G_enum.T_ITER.value][group] = []
for index in array_resizes: for index in array_resizes:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_RESIZES.value] dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_RESIZES.value]
...@@ -76,6 +94,7 @@ def record_config_line(lineS, dataG_it): ...@@ -76,6 +94,7 @@ def record_config_line(lineS, dataG_it):
for index in array_stages: for index in array_stages:
dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_STAGES.value] dataG_it[index] = [None]*dataG_it[G_enum.TOTAL_STAGES.value]
#-----------------------------------------------
# Obtains the parameters of a stage line # Obtains the parameters of a stage line
# and stores it in the dataframe # and stores it in the dataframe
# Is needed to indicate in which stage is # Is needed to indicate in which stage is
...@@ -86,144 +105,193 @@ def record_stage_line(lineS, dataG_it, stage): ...@@ -86,144 +105,193 @@ def record_stage_line(lineS, dataG_it, stage):
offset_lines = 2 offset_lines = 2
for i in range(len(array_stages)): for i in range(len(array_stages)):
value = get_value(lineS, i+offset_lines) value = get_value(lineS, i+offset_lines)
if value.is_integer(): index = array_stages[i]
value = int(value)
index = array_stage[i]
dataG_it[index][stage] = value dataG_it[index][stage] = value
#-----------------------------------------------
# Obtains the parameters of a resize line # Obtains the parameters of a resize line
# and stores them in the dataframe # and stores them in the dataframe
# Is needed to indicate to which group refers # Is needed to indicate to which group refers
# the resize line # the resize line
def record_resize_line(lineS, dataG_it, group): def record_group_line(lineS, dataG_it, group):
array_stages = [G_enum.ITERS.value, G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, \ array_groups = [G_enum.ITERS.value, G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, \
G_enum.ASYNCH_REDISTRIBUTION_TYPE.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value] G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, G_enum.SPAWN_STRATEGY.value]
offset_lines = 2 offset_lines = 2
for i in range(len(array_stages)): for i in range(len(array_groups)):
value = get_value(lineS, i+offset_lines) value = get_value(lineS, i+offset_lines)
if value.is_integer(): index = array_groups[i]
value = int(value)
index = array_stage[i]
dataG_it[index][group] = value dataG_it[index][group] = value
#-----------------------------------------------
def record_time_line(lineS, dataG_it): def record_time_line(lineS, dataG_it):
T_names = ["T_spawn:", "T_spawn_real:", "T_SR:", "T_AR:", "T_total:"] T_names = ["T_spawn:", "T_spawn_real:", "T_SR:", "T_AR:", "T_total:"]
T_values = [G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value, G_enum.T_TOTAL.value] T_values = [G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, G_enum.T_AR.value, G_enum.T_TOTAL.value]
if not (lineS[0] in T_names): # Execute only if line represents a Time if not (lineS[0] in T_names): # Execute only if line represents a Time
return return
index = T_names.index(linesS[0]) index = T_names.index(lineS[0])
index = T_values[index]
offset_lines = 1 offset_lines = 1
for i in range(len(dataG_it[index])):
value = get_value(lineS, i+offset_lines) len_index = 1
dataG_it[index][i] = value if dataG_it[index] != None:
len_index = len(dataG_it[index])
for i in range(len_index):
dataG_it[index][i] = get_value(lineS, i+offset_lines, False)
else:
dataG_it[index] = get_value(lineS, offset_lines, False)
#----------------------------------------------- #-----------------------------------------------
def read_global_file(f, dataG, it): def record_multiple_times_line(lineS, dataG_it, group):
resizes = 0 T_names = ["T_iter:", "T_stage"]
timer = 0 T_values = [G_enum.T_ITER.value, G_enum.T_STAGES.value]
previousNP = 0 if not (lineS[0] in T_names): # Execute only if line represents a Time
return
index = T_names.index(lineS[0])
index = T_values[index]
offset_lines = 1
if index == G_enum.T_STAGES.value:
offset_lines += 1
total_iters = len(lineS)-offset_lines
stage = int(lineS[1].split(":")[0])
if stage == 0:
dataG_it[index][group] = [None] * total_iters
for i in range(total_iters):
dataG_it[index][group][i] = [None] * dataG_it[G_enum.TOTAL_STAGES.value]
for i in range(total_iters):
dataG_it[index][group][i][stage] = get_value(lineS, i+offset_lines, False)
else:
total_iters = len(lineS)-offset_lines
for i in range(total_iters):
dataG_it[index][group].append(get_value(lineS, i+offset_lines, False))
#-----------------------------------------------
def read_local_file(f, dataG, it, runs_in_file):
offset = 0
real_it = 0
group = 0
for line in f: for line in f:
lineS = line.split() lineS = line.split()
if len(lineS) > 0: if len(lineS) > 0:
if lineS[0] == "Config": # CONFIG LINE if lineS[0] == "Group": # GROUP number
it += 1 offset += 1
dataG.append([None]*(25+1)) real_it = it - (runs_in_file-offset)
#dataG[it][-1] = None Indicates if local data has been recorded(1) or not(None) group = int(lineS[1].split(":")[0])
record_config(lineS, dataG[it]) elif lineS[0] == "Async_Iters:":
resize = 0 offset_line = 1
stage = 0 dataG[real_it][G_enum.ASYNCH_ITERS.value][group] = get_value(lineS, offset_line, False)
elif lineS[0] == "Stage":
record_stage_line(lineS, dataG[it], stage)
stage+=1
elif lineS[0] == "Resize":
record_resize_line(lineS, dataG[it], resize)
resize+=1
elif lineS[0] == "T_total:":
value = get_value(lineS, 1)
dataG[it][G_enum.T_TOTAL.value] = value
else: else:
record_time_line(lineS, dataG[it]) record_multiple_times_line(lineS, dataG[real_it], group)
return it
#----------------------------------------------- #-----------------------------------------------
def read_local_file(f, dataG, it): def read_global_file(f, dataG, it):
resizes = 0 runs_in_file=0
timer = 0
previousNP = 0
for line in f: for line in f:
lineS = line.split() lineS = line.split()
if len(lineS) > 0: if len(lineS) > 0:
if lineS[0] == "Config": # CONFIG LINE if lineS[0] == "Config": # CONFIG LINE
it += 1 it += 1
record_config(lineS, dataG[it], dataM[it]) runs_in_file += 1
resize = 0 group = 0
stage = 0 stage = 0
dataG.append([None]*len(columnsG))
record_config_line(lineS, dataG[it])
elif lineS[0] == "Stage": elif lineS[0] == "Stage":
record_stage_line(lineS, dataG[it], stage) record_stage_line(lineS, dataG[it], stage)
stage+=1 stage+=1
elif lineS[0] == "Resize": elif lineS[0] == "Group":
record_resize_line(lineS, dataG[it], resize) record_group_line(lineS, dataG[it], group)
resize+=1 group+=1
elif lineS[0] == "T_total:":
value = get_value(lineS, 1)
dataG[it][G_enum.T_TOTAL.value] = value
else: else:
record_time_line(lineS, dataG[it]) record_time_line(lineS, dataG[it])
return it return it,runs_in_file
#-----------------------------------------------
#columnsG = ["Total_Resizes", "Total_Groups", "Total_Stages", "Granularity", "SDR", "ADR", "DR", "Asynch_Redistribution_Type", \\ #-----------------------------------------------
# "Spawn_Method", "Spawn_Strategy", "Groups", "Dist", "Stage_Types", "Stage_Times", "Stage_Bytes", \\ def convert_to_tuples(dfG):
# "Iters", "Asynch_Iters", "T_iter", "T_stages", "T_spawn", "T_spawn_real", "T_SR", "T_AR", "T_total"] #24 array_list_items = [G_enum.GROUPS.value, G_enum.FACTOR_S.value, G_enum.DIST.value, G_enum.ITERS.value, \
G_enum.ASYNCH_ITERS.value, G_enum.RED_METHOD.value, G_enum.RED_STRATEGY.value, G_enum.SPAWN_METHOD.value, \
G_enum.SPAWN_STRATEGY.value, G_enum.T_SPAWN.value, G_enum.T_SPAWN_REAL.value, G_enum.T_SR.value, \
G_enum.T_AR.value, G_enum.STAGE_TYPES.value, G_enum.STAGE_TIMES.value, G_enum.STAGE_BYTES.value]
array_multiple_list_items = [G_enum.T_ITER.value, G_enum.T_STAGES.value]
for item in array_list_items:
name = columnsG[item]
values = dfG[name].copy()
for index in range(len(values)):
values[index] = tuple(values[index])
dfG[name] = values
for item in array_multiple_list_items:
name = columnsG[item]
values = dfG[name].copy()
for i in range(len(values)):
for j in range(len(values[i])):
if(type(values[i][j][0]) == list):
for r in range(len(values[i][j])):
values[i][j][r] = tuple(values[i][j][r])
values[i][j] = tuple(values[i][j])
values[i] = tuple(values[i])
dfG[name] = values
#----------------------------------------------- #-----------------------------------------------
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 iterTimes.py resultsName directory csvOutName") print("The files name is missing\nUsage: python3 MallTimes.py commonName directory OutName")
exit(1) exit(1)
common_name = sys.argv[1]
if len(sys.argv) >= 3: if len(sys.argv) >= 3:
BaseDir = sys.argv[2] BaseDir = sys.argv[2]
print("Searching in directory: "+ BaseDir) print("Searching in directory: "+ BaseDir)
else: else:
BaseDir = sys.argv[2] BaseDir = "./"
if len(sys.argv) >= 4: if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + "G.csv & " + sys.argv[3] + "M.csv")
name = sys.argv[3] name = sys.argv[3]
else: else:
name = "data" name = "data"
print("File name will be: " + name + "G.pkl")
insideDir = "Run" insideDir = "Run"
lista = glob.glob("./" + BaseDir + insideDir + "*/" + sys.argv[1]+ "*Global.o*") lista = glob.glob(BaseDir + insideDir + "*/" + common_name + "*_Global.out")
lista += (glob.glob("./" + BaseDir + sys.argv[1]+ "*Global.o*")) # Se utiliza cuando solo hay un nivel de directorios lista += (glob.glob(BaseDir + common_name + "*_Global.out")) # Se utiliza cuando solo hay un nivel de directorios
print("Number of files found: "+ str(len(lista))); print("Number of files found: "+ str(len(lista)));
it = -1 it = -1
dataG = [] dataG = []
dataM = []
columnsG = ["N", "%Async", "Groups", "NP", "NS", "Dist", "Matrix", "CommTam", "Cst", "Css", "Time", "Iters", "TE"] #13
columnsM = ["N", "%Async", "NP", "NS", "Dist", "Matrix", "CommTam", "Cst", "Css", "Time", "Iters", "TC", "TH", "TS", "TA"] #15
for elem in lista: for elem in lista:
f = open(elem, "r") f = open(elem, "r")
it = read_file(f, dataG, dataM, it) id_run = elem.split("_Global.out")[0].split(common_name)[1]
path_to_run = elem.split(common_name)[0]
lista_local = glob.glob(path_to_run + common_name + id_run + "_G*NP*.out")
it,runs_in_file = read_global_file(f, dataG, it)
f.close() f.close()
for elem_local in lista_local:
f_local = open(elem_local, "r")
read_local_file(f_local, dataG, it, runs_in_file)
f_local.close()
#print(data)
dfG = pd.DataFrame(dataG, columns=columnsG) dfG = pd.DataFrame(dataG, columns=columnsG)
dfG.to_csv(name + 'G.csv') convert_to_tuples(dfG)
print(dfG)
dfG.to_pickle(name + 'G.pkl')
dfM = pd.DataFrame(dataM, columns=columnsM) #dfM = pd.DataFrame(dataM, columns=columnsM)
#Poner en TC el valor real y en TH el necesario para la app #Poner en TC el valor real y en TH el necesario para la app
cond = dfM.TH != 0 #cond = dfM.TH != 0
dfM.loc[cond, ['TC', 'TH']] = dfB.loc[cond, ['TH', 'TC']].values #dfM.loc[cond, ['TC', 'TH']] = dfM.loc[cond, ['TH', 'TC']].values
dfM.to_csv(name + 'M.csv') #dfM.to_csv(name + 'M.csv')
This diff is collapsed.
This diff is collapsed.
import sys
import glob
import numpy as numpy
import pandas as pd
#-----------------------------------------------
def read_file(f, dataA, dataB, itA, itB):
compute_tam = 0
comm_tam = 0
sdr = 0
adr = 0
dist = 0
css = 0
cst = 0
time = 0
recording = False
it_line = 0
aux_itA = 0
aux_itB = 0
iters = 0
np = 0
np_par = 0
ns = 0
array = []
columnas = ['Titer','Ttype','Top']
#print(f)
for line in f:
lineS = line.split()
if len(lineS) > 1:
if recording and lineS[0].split(':')[0] in columnas: #Record data
aux_itA = 0
lineS.pop(0)
if it_line==0:
for observation in lineS:
dataA.append([None]*15)
dataA[itA+aux_itA][0] = sdr
dataA[itA+aux_itA][1] = adr
dataA[itA+aux_itA][2] = np
dataA[itA+aux_itA][3] = np_par
dataA[itA+aux_itA][4] = ns
dataA[itA+aux_itA][5] = dist
dataA[itA+aux_itA][6] = compute_tam
dataA[itA+aux_itA][7] = comm_tam
dataA[itA+aux_itA][8] = cst
dataA[itA+aux_itA][9] = css
dataA[itA+aux_itA][10] = time
dataA[itA+aux_itA][11] = iters
dataA[itA+aux_itA][12] = float(observation)
array.append(float(observation))
aux_itA+=1
elif it_line==1:
deleted = 0
for observation in lineS:
dataA[itA+aux_itA][13] = float(observation)
if float(observation) == 0:
array.pop(aux_itA - deleted)
deleted+=1
aux_itA+=1
else:
for observation in lineS:
dataA[itA+aux_itA][14] = float(observation)
aux_itA+=1
it_line += 1
if(it_line % 3 == 0): # Comprobar si se ha terminado de mirar esta ejecucion
recording = False
it_line = 0
itA = itA + aux_itA
if ns != 0: # Solo obtener datos de grupos con hijos
dataB.append([None]*14)
dataB[itB][0] = sdr
dataB[itB][1] = adr
dataB[itB][2] = np
dataB[itB][3] = np_par
dataB[itB][4] = ns
dataB[itB][5] = dist
dataB[itB][6] = compute_tam
dataB[itB][7] = comm_tam
dataB[itB][8] = cst
dataB[itB][9] = css
dataB[itB][10] = time
dataB[itB][11] = iters
dataB[itB][12] = tuple(array)
dataB[itB][13] = numpy.sum(array)
itB+=1
array = []
if lineS[0] == "Config:":
compute_tam = int(lineS[1].split('=')[1].split(',')[0])
comm_tam = int(lineS[2].split('=')[1].split(',')[0])
sdr = int(lineS[3].split('=')[1].split(',')[0])
adr = int(lineS[4].split('=')[1].split(',')[0])
css = int(lineS[6].split('=')[1].split(',')[0])
cst = int(lineS[7].split('=')[1].split(',')[0])
time = float(lineS[8].split('=')[1])
elif lineS[0] == "Config":
recording = True
iters = int(lineS[2].split('=')[1].split(',')[0])
dist = int(lineS[4].split('=')[1].split(',')[0])
np = int(lineS[5].split('=')[1].split(',')[0])
np_par = int(lineS[6].split('=')[1].split(',')[0])
ns = int(float(lineS[7].split('=')[1]))
return itA,itB
#-----------------------------------------------
#Config: matrix=1000, sdr=1000000000, adr=0, aib=0 time=2.000000
#Config Group: iters=100, factor=1.000000, phy=2, procs=2, parents=0, sons=4
#Ttype: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
if len(sys.argv) < 2:
print("The files name is missing\nUsage: python3 iterTimes.py resultsName directory csvOutName")
exit(1)
if len(sys.argv) >= 3:
BaseDir = sys.argv[2]
print("Searching in directory: "+ BaseDir)
else: #FIXME
BaseDir = sys.argv[2]
if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + ".csv and "+ sys.argv[3] + "_Total.csv")
name = sys.argv[3]
else:
name = "data"
insideDir = "Run"
lista = glob.glob("./" + BaseDir + insideDir + "*/" + sys.argv[1]+ "*ID*.o*")
print("Number of files found: "+ str(len(lista)));
itA = itB = 0
dataA = []
dataB = [] #0 #1 #2 #3 #4 #5 #6 #7 #8 #9 #10 #11 #12 #13 #14
columnsA = ["N", "%Async", "NP", "N_par", "NS", "Dist", "Compute_tam", "Comm_tam", "Cst", "Css","Time", "Iters", "Ti", "Tt", "To"] #15
columnsB = ["N", "%Async", "NP", "N_par", "NS", "Dist", "Compute_tam", "Comm_tam", "Cst", "Css","Time", "Iters", "Ti", "Sum"] #14
for elem in lista:
f = open(elem, "r")
itA,itB = read_file(f, dataA, dataB, itA, itB)
f.close()
#print(data)
dfA = pd.DataFrame(dataA, columns=columnsA)
dfB = pd.DataFrame(dataB, columns=columnsB)
dfA['N'] += dfA['%Async']
dfA['%Async'] = (dfA['%Async'] / dfA['N']) * 100
dfA.to_csv(name + '.csv')
dfB['N'] += dfB['%Async']
dfB['%Async'] = (dfB['%Async'] / dfB['N']) * 100
dfB.to_csv(name + '_Total.csv')
...@@ -3,22 +3,19 @@ import glob ...@@ -3,22 +3,19 @@ import glob
import numpy as numpy import numpy as numpy
import pandas as pd import pandas as pd
if len(sys.argv) < 3: if len(sys.argv) < 3:
print("The files name is missing\nUsage: python3 joinDf.py resultsName1.csv resultsName2.csv csvOutName") print("The files name is missing\nUsage: python3 joinDf.py resultsName1.pkl resultsName2.pkl OutName")
exit(1) exit(1)
if len(sys.argv) >= 4: if len(sys.argv) >= 4:
print("Csv name will be: " + sys.argv[3] + ".csv")
name = sys.argv[3] name = sys.argv[3]
else: else:
name = "dataJOINED" name = "dataJOINED"
df1 = pd.read_csv( sys.argv[1] ) print("File name will be: " + name + ".pkl")
df2 = pd.read_csv( sys.argv[2] ) df1 = pd.read_pickle( sys.argv[1] )
df2 = pd.read_pickle( sys.argv[2] )
frames = [df1, df2] frames = [df1, df2]
df3 = pd.concat(frames) df3 = pd.concat(frames)
df3 = df3.drop(columns=df3.columns[0])
df3.to_csv(name + '.csv') df3.to_pickle(name + '.pkl')
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -41,14 +41,14 @@ void comm_results(results_data *results, int root, size_t resizes, MPI_Comm inte ...@@ -41,14 +41,14 @@ void comm_results(results_data *results, int root, size_t resizes, MPI_Comm inte
* En concreto son tres escalares y dos vectores de tamaño "resizes" * En concreto son tres escalares y dos vectores de tamaño "resizes"
*/ */
void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type) { void def_results_type(results_data *results, int resizes, MPI_Datatype *results_type) {
int i, counts = 6; int i, counts = 7;
int blocklengths[] = {1, 1, 1, 1, 1, 1}; int blocklengths[] = {1, 1, 1, 1, 1, 1, 1};
MPI_Aint displs[counts], dir; MPI_Aint displs[counts], dir;
MPI_Datatype types[counts]; MPI_Datatype types[counts];
// Rellenar vector types // Rellenar vector types
types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = MPI_DOUBLE; types[0] = types[1] = types[2] = types[3] = types[4] = types[5] = types[6] = MPI_DOUBLE;
blocklengths[2] = blocklengths[3] = blocklengths[4] = blocklengths[5] = resizes; blocklengths[2] = blocklengths[3] = blocklengths[4] = blocklengths[5] = blocklengths[6] = resizes;
// Rellenar vector displs // Rellenar vector displs
MPI_Get_address(results, &dir); MPI_Get_address(results, &dir);
...@@ -59,6 +59,7 @@ void def_results_type(results_data *results, int resizes, MPI_Datatype *results_ ...@@ -59,6 +59,7 @@ void def_results_type(results_data *results, int resizes, MPI_Datatype *results_
MPI_Get_address(results->async_time, &displs[3]); MPI_Get_address(results->async_time, &displs[3]);
MPI_Get_address(results->spawn_real_time, &displs[4]); MPI_Get_address(results->spawn_real_time, &displs[4]);
MPI_Get_address(results->spawn_time, &displs[5]); MPI_Get_address(results->spawn_time, &displs[5]);
MPI_Get_address(results->malleability_time, &displs[6]);
for(i=0;i<counts;i++) displs[i] -= dir; for(i=0;i<counts;i++) displs[i] -= dir;
...@@ -87,6 +88,7 @@ void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr) ...@@ -87,6 +88,7 @@ void set_results_post_reconfig(results_data *results, int grp, int sdr, int adr)
} else { } else {
results->async_time[grp-1] = 0; results->async_time[grp-1] = 0;
} }
results->malleability_time[grp-1] = results->malleability_end - results->malleability_time[grp-1];
} }
/* /*
...@@ -191,12 +193,12 @@ void compute_results_stages(results_data *results, int myId, int numP, int root, ...@@ -191,12 +193,12 @@ void compute_results_stages(results_data *results, int myId, int numP, int root,
void print_iter_results(results_data results) { void print_iter_results(results_data results) {
size_t i; size_t i;
printf("Async_Iters: %ld\n", results.iters_async);
printf("T_iter: "); printf("T_iter: ");
for(i=0; i< results.iter_index; i++) { for(i=0; i< results.iter_index; i++) {
printf("%lf ", results.iters_time[i]); printf("%lf ", results.iters_time[i]);
} }
printf("\n");
printf("\nAsync_Iters: %ld\n", results.iters_async);
} }
/* /*
...@@ -242,6 +244,11 @@ void print_global_results(results_data results, size_t resizes) { ...@@ -242,6 +244,11 @@ void print_global_results(results_data results, size_t resizes) {
printf("%lf ", results.async_time[i]); printf("%lf ", results.async_time[i]);
} }
printf("\nT_Malleability: ");
for(i=0; i < resizes; i++) {
printf("%lf ", results.malleability_time[i]);
}
printf("\nT_total: %lf\n", results.exec_time); printf("\nT_total: %lf\n", results.exec_time);
} }
...@@ -264,6 +271,7 @@ void init_results_data(results_data *results, size_t resizes, size_t stages, siz ...@@ -264,6 +271,7 @@ void init_results_data(results_data *results, size_t resizes, size_t stages, siz
results->spawn_real_time = calloc(resizes, sizeof(double)); results->spawn_real_time = calloc(resizes, sizeof(double));
results->sync_time = calloc(resizes, sizeof(double)); results->sync_time = calloc(resizes, sizeof(double));
results->async_time = calloc(resizes, sizeof(double)); results->async_time = calloc(resizes, sizeof(double));
results->malleability_time = calloc(resizes, sizeof(double));
results->wasted_time = 0; results->wasted_time = 0;
results->iters_size = iters_size + RESULTS_EXTRA_SIZE; results->iters_size = iters_size + RESULTS_EXTRA_SIZE;
...@@ -324,6 +332,10 @@ void free_results_data(results_data *results, size_t stages) { ...@@ -324,6 +332,10 @@ void free_results_data(results_data *results, size_t stages) {
free(results->async_time); free(results->async_time);
results->async_time = NULL; results->async_time = NULL;
} }
if(results->malleability_time != NULL) {
free(results->malleability_time);
results->malleability_time = NULL;
}
if(results->iters_time != NULL) { if(results->iters_time != NULL) {
free(results->iters_time); free(results->iters_time);
......
...@@ -14,8 +14,9 @@ typedef struct { ...@@ -14,8 +14,9 @@ typedef struct {
// Spawn, Thread, Sync, Async and Exec time // Spawn, Thread, Sync, Async and Exec time
double spawn_start, *spawn_time, *spawn_real_time; double spawn_start, *spawn_time, *spawn_real_time;
double sync_start, sync_end, *sync_time; double sync_end, *sync_time;
double async_start, async_end, *async_time; double async_end, *async_time;
double malleability_end, *malleability_time;
double exec_start, exec_time; double exec_start, exec_time;
double wasted_time; // Time spent recalculating iter stages double wasted_time; // Time spent recalculating iter stages
} results_data; } results_data;
......
...@@ -57,6 +57,8 @@ int main(int argc, char *argv[]) { ...@@ -57,6 +57,8 @@ int main(int argc, char *argv[]) {
if(req != MPI_THREAD_MULTIPLE) { if(req != MPI_THREAD_MULTIPLE) {
printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE); printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE);
fflush(stdout);
MPI_Abort(MPI_COMM_WORLD, -50);
} }
init_group_struct(argv, argc, myId, numP); init_group_struct(argv, argc, myId, numP);
...@@ -218,7 +220,7 @@ int work() { ...@@ -218,7 +220,7 @@ int work() {
iter = 0; iter = 0;
while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE || state == MALL_SPAWN_ADAPT_PENDING) { while(state == MALL_DIST_PENDING || state == MALL_SPAWN_PENDING || state == MALL_SPAWN_SINGLE_PENDING || state == MALL_SPAWN_ADAPT_POSTPONE || state == MALL_SPAWN_ADAPT_PENDING) {
if(iter < config_file->groups[group->grp+1].iters) { if(group->grp+1 < config_file->n_groups && iter < config_file->groups[group->grp+1].iters) {
iterate(state); iterate(state);
iter++; iter++;
group->iter_start = iter; group->iter_start = iter;
......
CC = gcc CC = gcc
MCC = mpicc MCC = mpicc
#C_FLAGS_ALL = -Wconversion -Wpedantic #C_FLAGS_ALL = -Wconversion -Wpedantic
C_FLAGS = -Wall -Wextra -Wshadow -Wfatal-errors -g C_FLAGS = -Wall -Wextra -Wshadow -Wfatal-errors
LD_FLAGS = -lm -pthread LD_FLAGS = -lm -pthread
DEF = DEF =
......
...@@ -129,7 +129,8 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int ...@@ -129,7 +129,8 @@ int sync_communication(char *send, char **recv, int qty, int myId, int numP, int
* *
*/ */
void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) { void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
int i, init, end; int i, j, init, end, total_sends;
MPI_Request *sends;
init = s_counts.idI; init = s_counts.idI;
end = s_counts.idE; end = s_counts.idE;
...@@ -140,8 +141,15 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct ...@@ -140,8 +141,15 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
else end = s_counts.idE-1; else end = s_counts.idE-1;
} }
total_sends = end - init;
j = 0;
if(total_sends > 0) {
sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
}
for(i=init; i<end; i++) { for(i=init; i<end; i++) {
MPI_Send(send+s_counts.displs[i], s_counts.counts[i], MPI_CHAR, i, 99, comm); sends[j] = MPI_REQUEST_NULL;
MPI_Isend(send+s_counts.displs[i], s_counts.counts[i], MPI_CHAR, i, 99, comm, &(sends[j]));
j++;
} }
init = r_counts.idI; init = r_counts.idI;
...@@ -150,9 +158,14 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct ...@@ -150,9 +158,14 @@ void sync_point2point(char *send, char *recv, int is_intercomm, int myId, struct
if(r_counts.idI == myId) init = r_counts.idI+1; if(r_counts.idI == myId) init = r_counts.idI+1;
else if(r_counts.idE == myId + 1) end = r_counts.idE-1; else if(r_counts.idE == myId + 1) end = r_counts.idE-1;
} }
for(i=init; i<end; i++) { for(i=init; i<end; i++) {
MPI_Recv(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, 99, comm, MPI_STATUS_IGNORE); MPI_Recv(recv+r_counts.displs[i], r_counts.counts[i], MPI_CHAR, i, 99, comm, MPI_STATUS_IGNORE);
} }
if(total_sends > 0) {
MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
}
} }
/* /*
......
...@@ -30,6 +30,7 @@ int thread_check(); ...@@ -30,6 +30,7 @@ int thread_check();
void* thread_async_work(); void* thread_async_work();
void print_comms_state(); void print_comms_state();
void malleability_comms_update(MPI_Comm comm);
typedef struct { typedef struct {
int spawn_method; int spawn_method;
...@@ -181,6 +182,7 @@ int malleability_checkpoint() { ...@@ -181,6 +182,7 @@ int malleability_checkpoint() {
break; break;
case MALL_NOT_STARTED: case MALL_NOT_STARTED:
// Comprobar si se tiene que realizar un redimensionado // Comprobar si se tiene que realizar un redimensionado
mall_conf->results->malleability_time[mall_conf->grp] = MPI_Wtime();
//if(CHECK_RMS()) {return MALL_DENIED;} //if(CHECK_RMS()) {return MALL_DENIED;}
state = spawn_step(); state = spawn_step();
...@@ -235,6 +237,7 @@ int malleability_checkpoint() { ...@@ -235,6 +237,7 @@ int malleability_checkpoint() {
break; break;
case MALL_DIST_COMPLETED: //TODO No es esto muy feo? case MALL_DIST_COMPLETED: //TODO No es esto muy feo?
mall_conf->results->malleability_end = MPI_Wtime();
state = MALL_COMPLETED; state = MALL_COMPLETED;
break; break;
} }
...@@ -547,20 +550,15 @@ void Children_init() { ...@@ -547,20 +550,15 @@ void Children_init() {
MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm); MPI_Bcast(rep_s_data->arrays[i], rep_s_data->qty[i], datatype, root_parents, mall->intercomm);
} }
} }
mall_conf->results->malleability_end = MPI_Wtime(); // Obtener timestamp de cuando termina maleabilidad
// Guardar los resultados de esta transmision // Guardar los resultados de esta transmision
comm_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm); comm_results(mall_conf->results, mall->root, mall_conf->config_file->n_resizes, mall->intercomm);
if(!is_intercomm) { if(!is_intercomm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); malleability_comms_update(mall->intercomm);
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
} }
MPI_Comm_disconnect(&(mall->intercomm)); MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
} }
//======================================================|| //======================================================||
...@@ -638,7 +636,6 @@ int start_redistribution() { ...@@ -638,7 +636,6 @@ int start_redistribution() {
/* /*
* @deprecated
* Comprueba si la redistribucion asincrona ha terminado. * Comprueba si la redistribucion asincrona ha terminado.
* Si no ha terminado la funcion termina indicandolo, en caso contrario, * Si no ha terminado la funcion termina indicandolo, en caso contrario,
* se continua con la comunicacion sincrona, el envio de resultados y * se continua con la comunicacion sincrona, el envio de resultados y
...@@ -738,24 +735,14 @@ int end_redistribution() { ...@@ -738,24 +735,14 @@ int end_redistribution() {
local_state = MALL_DIST_COMPLETED; local_state = MALL_DIST_COMPLETED;
if(!is_intercomm) { // Merge Spawn if(!is_intercomm) { // Merge Spawn
if(mall->numP < mall->numC) { // Expand if(mall->numP < mall->numC) { // Expand
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); malleability_comms_update(mall->intercomm);
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
MPI_Comm_dup(mall->intercomm, &(mall->thread_comm));
MPI_Comm_dup(mall->intercomm, &(mall->comm));
MPI_Comm_dup(mall->intercomm, &(mall->user_comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
} else { // Shrink || Merge Shrink requiere de mas tareas } else { // Shrink || Merge Shrink requiere de mas tareas
local_state = MALL_SPAWN_ADAPT_PENDING; local_state = MALL_SPAWN_ADAPT_PENDING;
} }
} }
if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) { if(mall->intercomm != MPI_COMM_NULL && mall->intercomm != MPI_COMM_WORLD) {
MPI_Comm_disconnect(&(mall->intercomm)); MPI_Comm_disconnect(&(mall->intercomm)); //FIXME Error en OpenMPI + Merge
} }
return local_state; return local_state;
...@@ -773,7 +760,7 @@ int shrink_redistribution() { ...@@ -773,7 +760,7 @@ int shrink_redistribution() {
zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall_conf->config_file->n_stages); zombies_collect_suspended(mall->user_comm, mall->myId, mall->numP, mall->numC, mall->root, (void *) mall_conf->results, mall_conf->config_file->n_stages);
if(mall->myId < mall->numC) { if(mall->myId < mall->numC) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm)); //FIXME Modificar a que se pida pro el usuario el cambio y se llama a comms_update
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm)); if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
mall->dup_user_comm = 1; mall->dup_user_comm = 1;
...@@ -919,3 +906,17 @@ void print_comms_state() { ...@@ -919,3 +906,17 @@ void print_comms_state() {
} }
free(test); free(test);
} }
void malleability_comms_update(MPI_Comm comm) {
if(mall->thread_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->thread_comm));
if(mall->comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->comm));
if(mall->user_comm != MPI_COMM_WORLD) MPI_Comm_free(&(mall->user_comm)); //TODO No es peligroso?
MPI_Comm_dup(comm, &(mall->thread_comm));
MPI_Comm_dup(comm, &(mall->comm));
MPI_Comm_dup(comm, &(mall->user_comm));
MPI_Comm_set_name(mall->thread_comm, "MPI_COMM_MALL_THREAD");
MPI_Comm_set_name(mall->comm, "MPI_COMM_MALL");
MPI_Comm_set_name(mall->user_comm, "MPI_COMM_MALL_USER");
}
...@@ -290,7 +290,7 @@ void deallocate_spawn_data() { ...@@ -290,7 +290,7 @@ void deallocate_spawn_data() {
* Cuando termina, modifica la variable global para indicar este cambio * Cuando termina, modifica la variable global para indicar este cambio
*/ */
void generic_spawn(MPI_Comm *child, int data_stage) { void generic_spawn(MPI_Comm *child, int data_stage) {
int local_state; int local_state, aux_state;
// WORK // WORK
if(spawn_data->myId == spawn_data->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES if(spawn_data->myId == spawn_data->root && spawn_data->spawn_qty > 0) { //SET MAPPING FOR NEW PROCESSES
...@@ -306,7 +306,10 @@ void generic_spawn(MPI_Comm *child, int data_stage) { ...@@ -306,7 +306,10 @@ void generic_spawn(MPI_Comm *child, int data_stage) {
} }
// END WORK // END WORK
end_time = MPI_Wtime(); end_time = MPI_Wtime();
aux_state = get_spawn_state(spawn_data->spawn_is_async);
if(!(aux_state == MALL_SPAWN_PENDING && local_state == MALL_SPAWN_ADAPT_POSTPONE)) {
set_spawn_state(local_state, spawn_data->spawn_is_async); set_spawn_state(local_state, spawn_data->spawn_is_async);
}
} }
...@@ -345,7 +348,7 @@ void* thread_work() { ...@@ -345,7 +348,7 @@ void* thread_work() {
generic_spawn(returned_comm, MALL_NOT_STARTED); generic_spawn(returned_comm, MALL_NOT_STARTED);
local_state = get_spawn_state(MALL_SPAWN_PTHREAD); local_state = get_spawn_state(MALL_SPAWN_PTHREAD);
if(local_state == MALL_SPAWN_ADAPT_POSTPONE) { if(local_state == MALL_SPAWN_ADAPT_POSTPONE || local_state == MALL_SPAWN_PENDING) {
// El grupo de procesos se terminara de juntar tras la redistribucion de datos // El grupo de procesos se terminara de juntar tras la redistribucion de datos
local_state = wait_wakeup(); local_state = wait_wakeup();
......
...@@ -168,7 +168,7 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -168,7 +168,7 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
int tamBl, remainder; int tamBl, remainder;
tamBl = dist.num_cpus / dist.num_nodes; tamBl = dist.num_cpus / dist.num_nodes;
asigCores = 0; asigCores = dist.already_created;
i = *used_nodes = dist.already_created / tamBl; i = *used_nodes = dist.already_created / tamBl;
remainder = dist.already_created % tamBl; remainder = dist.already_created % tamBl;
...@@ -176,12 +176,13 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) { ...@@ -176,12 +176,13 @@ void compact_dist(struct physical_dist dist, int *used_nodes, int *procs) {
//First nodes could already have existing procs //First nodes could already have existing procs
//Start from the first with free spaces //Start from the first with free spaces
if (remainder) { if (remainder) {
procs[i] = asigCores = tamBl - remainder; procs[i] = tamBl - remainder;
asigCores += procs[i];
i = (i+1) % dist.num_nodes; i = (i+1) % dist.num_nodes;
(*used_nodes)++; (*used_nodes)++;
} }
//Assing tamBl to each node //Assign tamBl to each node
while(asigCores+tamBl <= dist.target_qty) { while(asigCores+tamBl <= dist.target_qty) {
asigCores += tamBl; asigCores += tamBl;
procs[i] += tamBl; procs[i] += tamBl;
......
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <pthread.h> #include <pthread.h>
#include "Spawn_state.h"
pthread_mutex_t spawn_mutex; pthread_mutex_t spawn_mutex;
pthread_cond_t spawn_cond; pthread_cond_t spawn_cond;
int spawn_state; int spawn_state;
int waiting_redistribution=0;
void init_spawn_state() { void init_spawn_state() {
pthread_mutex_init(&spawn_mutex,NULL); pthread_mutex_init(&spawn_mutex,NULL);
pthread_cond_init(&spawn_cond,NULL); pthread_cond_init(&spawn_cond,NULL);
set_spawn_state(1,0); //FIXME First parameter is a horrible magical number
} }
void free_spawn_state() { void free_spawn_state() {
...@@ -40,13 +43,20 @@ void set_spawn_state(int value, int is_async) { ...@@ -40,13 +43,20 @@ void set_spawn_state(int value, int is_async) {
int wait_wakeup() { int wait_wakeup() {
pthread_mutex_lock(&spawn_mutex); pthread_mutex_lock(&spawn_mutex);
if(!waiting_redistribution) {
waiting_redistribution=1;
pthread_cond_wait(&spawn_cond, &spawn_mutex); pthread_cond_wait(&spawn_cond, &spawn_mutex);
}
waiting_redistribution=0;
pthread_mutex_unlock(&spawn_mutex); pthread_mutex_unlock(&spawn_mutex);
return get_spawn_state(1); return get_spawn_state(1);
} }
void wakeup() { void wakeup() {
pthread_mutex_lock(&spawn_mutex); pthread_mutex_lock(&spawn_mutex);
if(waiting_redistribution) {
pthread_cond_signal(&spawn_cond); pthread_cond_signal(&spawn_cond);
}
waiting_redistribution=1;
pthread_mutex_unlock(&spawn_mutex); pthread_mutex_unlock(&spawn_mutex);
} }
...@@ -5,7 +5,11 @@ ...@@ -5,7 +5,11 @@
#SBATCH --exclude=c01,c00,c02 #SBATCH --exclude=c01,c00,c02
dir="/home/martini/malleability_benchmark" dir="/home/martini/malleability_benchmark"
partition='P1'
codeDir="/Codes" codeDir="/Codes"
execDir="/Exec"
cores=$(bash $dir$execDir/BashScripts/getCores.sh $partition)
nodelist=$SLURM_JOB_NODELIST nodelist=$SLURM_JOB_NODELIST
nodes=$SLURM_JOB_NUM_NODES nodes=$SLURM_JOB_NUM_NODES
...@@ -18,21 +22,14 @@ then ...@@ -18,21 +22,14 @@ then
fi fi
echo "MPICH" echo "MPICH"
#module load mpich-3.4.1-noucx
#export HYDRA_DEBUG=1 #export HYDRA_DEBUG=1
aux=$(grep "\[resize0\]" -n $configFile | cut -d ":" -f1) numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0)
read -r ini fin <<<$(echo $aux) initial_nodelist=$(bash $dir$execDir/BashScripts/createInitialNodelist.sh $numP $cores $nodelist)
diff=$(( fin - ini )) echo $initial_nodelist
numP=$(head -$fin $configFile | tail -$diff | cut -d ';' -f1 | grep Procs | cut -d '=' -f2) echo "Test PreRUN $numP $nodelist"
mpirun -hosts $initial_nodelist -np $numP $dir$codeDir/build/a.out $configFile $outIndex $nodelist $nodes
ls /home/martini/malleability_benchmark/Codes/build/a.out
echo "Test PreRUN $numP $nodes"
mpirun -np $numP $dir$codeDir/build/a.out $configFile $outIndex $nodelist $nodes
echo "END RUN" echo "END RUN"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
sed -i 's/Abort(-100)/shrink cleaning/g' slurm-$SLURM_JOB_ID.out sed -i 's/Abort(-100)/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
dir="/home/martini/malleability_benchmark" dir="/home/martini/malleability_benchmark"
codeDir="/Codes/build" codeDir="/Codes/build"
resultsDir="/Results" resultsDir="/Results"
execDir="/Exec"
nodelist=$SLURM_JOB_NODELIST nodelist=$SLURM_JOB_NODELIST
nodes=$SLURM_JOB_NUM_NODES nodes=$SLURM_JOB_NUM_NODES
...@@ -15,10 +16,7 @@ outIndex=$2 ...@@ -15,10 +16,7 @@ outIndex=$2
echo "MPICH" echo "MPICH"
aux=$(grep "\[resize0\]" -n $configFile | cut -d ":" -f1) numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0)
read -r ini fin <<<$(echo $aux)
diff=$(( fin - ini ))
numP=$(head -$fin $configFile | tail -$diff | cut -d ';' -f1 | grep Procs | cut -d '=' -f2)
name_res="Extrae_"$nodes"_Test_"$numP name_res="Extrae_"$nodes"_Test_"$numP
dir_name_res=$dir$resultsDir"/"$name_res dir_name_res=$dir$resultsDir"/"$name_res
...@@ -28,6 +26,7 @@ srun -n$numP --mpi=pmi2 ./trace.sh $dir$codeDir/a.out $configFile $outIndex $nod ...@@ -28,6 +26,7 @@ srun -n$numP --mpi=pmi2 ./trace.sh $dir$codeDir/a.out $configFile $outIndex $nod
echo "END RUN" echo "END RUN"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
sed -i 's/Abort(-100)/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
rm hostfile.o$SLURM_JOB_ID rm hostfile.o$SLURM_JOB_ID
echo "MOVING DATA" echo "MOVING DATA"
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
dir="/home/martini/malleability_benchmark" dir="/home/martini/malleability_benchmark"
codeDir="/Codes" codeDir="/Codes"
execDir="/Exec"
nodelist="localhost" nodelist="localhost"
nodes=1 nodes=1
...@@ -16,12 +17,9 @@ echo "MPICH" ...@@ -16,12 +17,9 @@ echo "MPICH"
#module load mpich-3.4.1-noucx #module load mpich-3.4.1-noucx
#export HYDRA_DEBUG=1 #export HYDRA_DEBUG=1
aux=$(grep "\[resize0\]" -n $1 | cut -d ":" -f1) numP=$(bash $dir$execDir/BashScripts/getNumPNeeded.sh $configFile 0)
read -r ini fin <<<$(echo $aux)
diff=$(( fin - ini ))
numP=$(head -$fin $1 | tail -$diff | cut -d ';' -f1 | grep Procs | cut -d '=' -f2)
mpirun -np $numP valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --trace-children=yes --log-file=nc.vg.%p $dir$codeDir/build/a.out $configFile $outIndex $nodelist $nodes mpirun -np $numP valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --trace-children=yes --log-file=nc.vg.%p $dir$codeDir/build/a.out $configFile $outIndex $nodelist $nodes
echo "END RUN" echo "END RUN"
sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out sed -i 's/application called MPI_Abort(MPI_COMM_WORLD, -100) - process/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
sed -i 's/Abort(-100)/shrink cleaning/g' slurm-$SLURM_JOB_ID.out
#!/bin/bash
dir="/home/martini/malleability_benchmark" #FIXME Obtain from another way
# Runs in a given current directory all .ini files
# Parameter 1(Optional) - Amount of executions per file. Must be a positive number
#====== Do not modify these values =======
codeDir="/Codes/build"
execDir="/Exec"
ResultsDir="/Results"
numP=$1
cores=$2
nodelist=$3
initial_node_qty=$(($numP / $cores))
if [ $initial_node_qty -eq 0 ]
then
initial_node_qty=1
fi
common_node_name="n" #FIXME What if it uses another type of node?
if [[ $nodelist == *"["* ]]; then
common_node_name=$(echo $nodelist | cut -d '[' -f1)
fi
node_array=($(echo $nodelist | sed -e 's/[\[n]//g' -e 's/\]/ /g' -e 's/,/ /g'))
actual_node_qty=0
for ((i=0; $actual_node_qty<$initial_node_qty; i++))
do
element=($(echo ${node_array[$i]} | sed -e 's/-/ /g'))
nodes_qty=1
if [ "${#element[@]}" -gt 1 ];
then
nodes_qty=$((10#${element[1]}-10#${element[0]}+1))
fi
expected_node_qty=$(($actual_node_qty + $nodes_qty))
if [ "$expected_node_qty" -le "$initial_node_qty" ];
then
added_qty=$nodes_qty
actual_node_qty=$expected_node_qty
else
added_qty=$(($initial_node_qty - $actual_node_qty))
actual_node_qty=$initial_node_qty
fi
for ((j=0; j<$added_qty; j++))
do
index=$((10#${element[0]} + $j))
index=0$index # FIXME What if there are more than 9 nodes?
for ((core=0; core<$cores; core++)) # FIXME What if the user asks for a spread distribution
do
initial_nodelist="${initial_nodelist:+$initial_nodelist,}"$common_node_name$index
done
done
done
#Print result
echo $initial_nodelist
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment