from copy import deepcopy
import fdg.global_config
from fdg.control.ftn_search_strategy import FunctionSearchStrategy
from fdg.fwrg_manager import FWRG_manager
from fdg.output_data import print_list, print_assigned_functions
from fdg.instruction_modification import InstructionModification
from fdg.preprocessing.preprocess import Preprocessing
from fdg.utils import get_key_1, get_ftn_seq_from_key_1
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.svm import LaserEVM
from mythril.laser.plugin.plugins.dependency_pruner import get_ftn_seq_annotation_from_ws
[docs]
class Guider():
def __init__(self, ftn_search_strategy:FunctionSearchStrategy,functions:list):
self.ftn_search_strategy=ftn_search_strategy
self.instructionModification = InstructionModification(fdg.global_config.method_identifiers)
self.all_functions=functions
self.termination=False
self.genesis_states=[]
self.state_index=0 # used to form state keys
[docs]
def init(self, start_functions:list, depth_k_functions:list, preprocess:Preprocessing):
fwrg_manager=FWRG_manager(start_functions, depth_k_functions, preprocess)
if self.ftn_search_strategy.name in ['seq']:
self.ftn_search_strategy.initialize(fwrg_manager.acyclicPaths.main_paths_sf, fwrg_manager.updateFWRG.main_paths_df, fwrg_manager)
elif self.ftn_search_strategy.name in ['mine','bfs','dfs']:
flag_one_start_function=True if len(start_functions)==1 else False #to-do: how to update flag_one_state_dpeht1
if preprocess.coverage is None:
preprocess.coverage=0
self.ftn_search_strategy.initialize(flag_one_start_function,preprocess.timeout,preprocess.coverage,preprocess.write_read_info.all_functions,fwrg_manager)
[docs]
def organize_states(self, states:[WorldState]):
all_states = {}
for state in states:
ftn_seq = get_ftn_seq_annotation_from_ws(state)
if ftn_seq is None or len(ftn_seq) == 0:
key = get_key_1(['constructor'], self.state_index)
else:
# do not save states that are generated at maximum depth
if len(ftn_seq) >= fdg.global_config.seq_len_limit:
continue
key = get_key_1(ftn_seq, self.state_index)
# save state
all_states[key] = [state]
self.state_index += 1
return all_states
[docs]
def start_iteration(self, laserEVM:LaserEVM=None, dk_functions:list=None, iteration:int=0):
"""
prepare for states and functions to be executed on the states
:param laserEVM:
:param deep_function:
:param iteration:
:return:
"""
#============================================
if self.ftn_search_strategy.name in ['seq']:
if iteration == 1:
self.ftn_search_strategy.initialize()
organize_states_dict={'constructor':laserEVM.open_states}
states_functions,flag_world_state_del = self.ftn_search_strategy.assign_states(
dk_functions=None, fdfg=None, states_dict=organize_states_dict)
print_assigned_functions(states_functions)
self._prepare_states(laserEVM, organize_states_dict, states_functions,flag_world_state_del)
else:
organize_states_dict = self.organize_states(laserEVM.open_states)
states_functions,flag_world_state_del = self.ftn_search_strategy.assign_states(
dk_functions=None, fdfg=None,
states_dict=organize_states_dict)
organize_states_dict = {}
if len(states_functions)>0:
for key, function in states_functions.items():
if len(function) > 0:
organize_states_dict[key] = deepcopy(self.ftn_search_strategy.world_states[key])
print_assigned_functions(states_functions)
self._prepare_states(laserEVM, organize_states_dict, states_functions,flag_world_state_del)
return
# ============================================
if self.ftn_search_strategy.name in ['baseline']:
if iteration==1:
laserEVM.open_states = self.genesis_states
sequences= [[ftn] for ftn in self.all_functions]
print_list(sequences, f'current all sequence(s):')
else:
organize_states_dict = self.organize_states(laserEVM.open_states)
states_functions,flag_world_state_del = self.ftn_search_strategy.assign_states(
dk_functions=None, states_dict=organize_states_dict)
print_assigned_functions(states_functions)
if self.ftn_search_strategy.name in ['baseline']:
organize_states_dict = {}
for key, function in states_functions.items():
if len(function) > 0:
organize_states_dict[key] = deepcopy(self.ftn_search_strategy.world_states[key])
self._prepare_states(laserEVM, organize_states_dict, states_functions,False)
return
# =============== bfs,dfs,mine=============================
# prepare for depth 1 execution for deep function collection
if iteration==2 :
laserEVM.open_states=self.genesis_states
sequences=[[ftn] for ftn in self.all_functions]
print_list(sequences, f'current all sequence(s):')
else:
# when iteration >2
organize_states_dict=self.organize_states(laserEVM.open_states)
states_functions,flag_world_state_del=self.ftn_search_strategy.assign_states(dk_functions=dk_functions, states_dict=organize_states_dict, iteration=iteration)
if len(states_functions)==0:
# no functions will be executed
self.termination=True
# get the states for the state keys in data states_functions returned from assign_states
if self.ftn_search_strategy.name in ['dfs','mine','bfs','mine1']:
organize_states_dict = {}
for key,function in states_functions.items():
if len(function)>0:
organize_states_dict[key]=self.ftn_search_strategy.world_states[key]
print_assigned_functions(states_functions)
self._prepare_states(laserEVM, organize_states_dict, states_functions,flag_world_state_del)
def _prepare_states(self, laserEVM:LaserEVM, states_dict:dict, states_functions:dict,flag_wd_del:bool):
cur_iteration_all_sequences = []
# specify the functions to be executed on each open states(world states)
modified_states = []
for key, states in states_dict.items():
ftn_seq = get_ftn_seq_from_key_1(key)
next_functions = states_functions[key]
if len(next_functions) > 0:
# # save sequences to be executed
# for child in next_functions:
# cur_iteration_all_sequences.append(ftn_seq + [child])
# modify function dispatcher so that only specified functions are executed
to_modify_states = deepcopy(states)
self.instructionModification.modity_on_multiple_states(to_modify_states, next_functions)
modified_states += to_modify_states
# update the open states so that the states having no children nodes are removed
laserEVM.open_states = modified_states
if flag_wd_del:
# delete states
# print(f'\nbefore state delete:{self.ftn_search_strategy.world_states.keys()}')
for key in states_dict.keys():
self.ftn_search_strategy.delete_state(key)
# print(f'\n after state delete:{self.ftn_search_strategy.world_states.keys()}')
# print_list(cur_iteration_all_sequences, f'current all sequence(s):')
[docs]
def end_iteration(self,laserEVM:LaserEVM,iteration:int):
state_changing_seq = []
len_seq=0
for state in laserEVM.open_states:
seq = get_ftn_seq_annotation_from_ws(state)
if seq is None:continue
len_seq=len(seq)
if len_seq==0:continue
if seq not in state_changing_seq:
state_changing_seq.append(seq)
print_list(state_changing_seq, f'current state changing sequence(s):')
self.termination=self.ftn_search_strategy.termination(
states_num=len(laserEVM.open_states),
current_seq_length=len_seq,
sequence_depth_limit=fdg.global_config.seq_len_limit,
iteration=iteration)
[docs]
def get_start_sequence(self,laserEVM:LaserEVM):
"""
get the sequences used to annotate the states (world states) at the end of Phase 1
"""
state_chaning_seq = []
for state in laserEVM.open_states:
seq = get_ftn_seq_annotation_from_ws(state)
if seq not in state_chaning_seq:
state_chaning_seq.append(seq)
return state_chaning_seq
[docs]
def should_terminate(self):
return self.termination
[docs]
def save_genesis_states(self,states:list):
self.genesis_states=deepcopy(states)
if self.ftn_search_strategy.name in ['mine']:
states_dict=self.organize_states(states)
# if len(states_dict)>=2:
# print('Check when two or more genesis states are generated (guider.py)')
self.ftn_search_strategy.update_states(states_dict)
self.ftn_search_strategy.state_key_assigned_at_last =list(states_dict.keys())[0]