from copy import copy
from fdg.output_data import print_list, print_dict
from mythril.laser.ethereum.state.world_state import WorldState
[docs]
class InstructionModification():
"""
change the function dispatcher at the beginning part of the instruction list
"""
def __init__(self,ftn_identifier:dict):
self.function_identifier=ftn_identifier
self.contract_address=None # used to identity the instructions of the contract
self.instruction_list = []
self.functions_to_positions = {}
self.positions_to_instructions = {} # divide instructions into groups and use the positions to indicate the order
self.all_positions_related_function=[]
self.jumpdest_in_dispatcher=[]# record the number of jumpdest in the function dispatcher
self.address_jumpdest_revert_block=0
self.positions_contain_GT = [] # the positions are kept in this case as they involve in branches in dispatcher
[docs]
def feed_instructions(self, state:WorldState, contract_address):
"""
new way to group instructions.
:argument
"""
self.contract_address = contract_address
key = contract_address.value
code = state.accounts[key].code
self.instruction_list=code.instruction_list
# print_list(self.instruction_list, "instruction list")
if len(self.function_identifier)>0:
self._feed_instructions_update()
print(f'total instructions: {len(self.instruction_list)}')
def _feed_instructions(self):
"""
new way to group instructions.
:argument
"""
def record_position(position:int,instruction:dict):
self.positions_to_instructions[position] = []
# get the function's signature as a key
function_key = instruction['argument']
if len(function_key) == 8:
function_key = '0x00' + function_key[2:]
print(f'convert {instruction["argument"]} to {function_key}')
if function_key not in self.functions_to_positions.keys():
self.functions_to_positions[function_key] = [position]
else:
self.functions_to_positions[function_key] += [position]
# ============================================
# through PHSH4 and JUMPDEST to sperate instructions
offset_instr = 0
flag_status = 0 # change its value when "CALLDATASIZE" is met and the first PUSH after "CALLDATASIZE" is met
position = 0
self.positions_to_instructions[0] = []
self.address_jumpdest_revert_block = 0 # signal the end of function dispatcher
self.positions_contain_GT=[]
for instruction in self.instruction_list:
print(f'instruction:{instruction}')
opcode = instruction['opcode']
if str(opcode).__eq__('CALLDATASIZE'):
flag_status = 1 # ready to get address of JUMPDEST for the block of revert
elif str(opcode).startswith('PUSH'):
if flag_status==1:
print(f'flag_status==1:{instruction}')
jumpdest_address = int(instruction["argument"], 0)
if jumpdest_address > 0:
self.address_jumpdest_revert_block = jumpdest_address
if flag_status == 2:
if not str(instruction['argument']).__eq__('0xffffffff'):
if str(opcode).__eq__('PUSH4') or str(opcode).__eq__('PUSH3'):
if len(self.positions_to_instructions[position]) > 0:
if self.positions_to_instructions[position][-1]['opcode'] == 'DUP1':
# get a new key-value pair to hold instructions for the function whose signature specified by this PUSH instruction
position += 1
record_position(position, instruction)
elif str(opcode).__eq__('JUMPDEST'): # the entry to the revert block when call data size is less than 4(before the code of functions)
if flag_status == 2:
if self.address_jumpdest_revert_block == instruction["address"]:
# stop when reaching the end of function dispatcher
break
else:
position += 1
self.jumpdest_in_dispatcher.append((position,offset_instr))
self.positions_to_instructions[position] = []
elif str(opcode).__eq__(
'CALLDATALOAD'): # assume that CALLDATALOAD appears before funcion matching
flag_status=2 # ready to get matching instructions
elif str(opcode)in ['GT']:
if flag_status == 2:
self.positions_contain_GT.append(position)# get the positions that contain GT
# save the current instruction
self.positions_to_instructions[position] += [instruction]
offset_instr += 1
print_dict(self.positions_to_instructions,'instruction grouping in instruction modification')
# end of instruction iteration at the beginning section
last_position=position+1
# keep the last portion of instructions
self.positions_to_instructions[last_position] = self.instruction_list[offset_instr:]
# find all positions that are corresponding to functions
positions=[]
for p in self.functions_to_positions.values():
positions+=p
self.all_positions_related_function=positions
def _feed_instructions_update(self):
"""
new way to group instructions.
:argument
"""
def record_position(position:int,instruction:dict):
self.positions_to_instructions[position] = []
# get the function's signature as a key
function_key = get_back_functin_signature(instruction['argument'])
# if len(function_key) == 8:
# function_key = '0x00' + function_key[2:]
# print(f'convert {instruction["argument"]} to {function_key}')
if function_key not in self.functions_to_positions.keys():
self.functions_to_positions[function_key] = [position]
else:
self.functions_to_positions[function_key] += [position]
def get_back_functin_signature(byte_tuple)->str:
key='0x'
if len(byte_tuple)==3:
key+='00'
for item in byte_tuple:
v=hex(item)[2:]
key+=v if len(v)==2 else '0'+v
# print(f'{byte_tuple}=>{key}')
return key
def get_address_from_argument(int_tuple) -> int:
value = 0
exp = 0
for item in reversed(int_tuple):
value += item * 256 ** exp
exp += 1
return value
# ============================================
# through PHSH4 and JUMPDEST to sperate instructions
offset_instr = 0
flag_status = 0 # change its value when "CALLDATASIZE" is met and the first PUSH after "CALLDATASIZE" is met
position = 0
self.positions_to_instructions[0] = []
self.address_jumpdest_revert_block = 0 # signal the end of function dispatcher
self.positions_contain_GT=[]
for instruction in self.instruction_list:
# print(f'instruction:{instruction}')
opcode = instruction['opcode']
if str(opcode).__eq__('CALLDATASIZE'):
flag_status = 1 # ready to get address of JUMPDEST for the block of revert
elif str(opcode).startswith('PUSH'):
if flag_status==1:
jumpdest_address=get_address_from_argument( instruction["argument"])
if jumpdest_address> 0:
self.address_jumpdest_revert_block = jumpdest_address
if flag_status == 2:
if not get_back_functin_signature(instruction['argument']).__eq__('0xffffffff'):
if str(opcode).__eq__('PUSH4') or str(opcode).__eq__('PUSH3'):
if len(self.positions_to_instructions[position]) > 0:
if self.positions_to_instructions[position][-1]['opcode'] == 'DUP1':
# get a new key-value pair to hold instructions for the function whose signature specified by this PUSH instruction
position += 1
record_position(position, instruction)
elif str(opcode).__eq__('JUMPDEST'): # the entry to the revert block when call data size is less than 4(before the code of functions)
if flag_status == 2:
if self.address_jumpdest_revert_block == instruction["address"]:
# stop when reaching the end of function dispatcher
break
else:
position += 1
self.jumpdest_in_dispatcher.append((position,offset_instr))
self.positions_to_instructions[position] = []
elif str(opcode).__eq__(
'CALLDATALOAD'): # assume that CALLDATALOAD appears before funcion matching
flag_status=2 # ready to get matching instructions
elif str(opcode)in ['GT']:
if flag_status == 2:
self.positions_contain_GT.append(position)# get the positions that contain GT
# save the current instruction
self.positions_to_instructions[position] += [instruction]
offset_instr += 1
# print_dict(self.positions_to_instructions,'instruction grouping in instruction modification')
# end of instruction iteration at the beginning section
last_position=position+1
# keep the last portion of instructions
self.positions_to_instructions[last_position] = self.instruction_list[offset_instr:]
# find all positions that are corresponding to functions
positions=[]
for p in self.functions_to_positions.values():
positions+=p
self.all_positions_related_function=positions
[docs]
def modify_on_a_state__str(self, state: WorldState, functions: list):
"""
update the instructions on multiple states
"""
if len(functions)>=len(self.function_identifier.keys()):
final_instructions=self.instruction_list
state.accounts[self.contract_address.value].code.instruction_list = final_instructions
else:
fct_selectors=[]
for ftn in functions:
if ftn in ['fallback']:continue
if ftn not in self.function_identifier.keys():continue # can not do anything
fct_selectors.append(self.function_identifier[ftn])
final_instructions=self._get_modified_instructions_1(fct_selectors)
# update instructions for states
state.accounts[self.contract_address.value].code.instruction_list = final_instructions
state.accounts[self.contract_address.value].code.func_hashes = fct_selectors
[docs]
def modity_on_multiple_states(self,states:[WorldState],functions:list):
if 'original_instruction_list' in functions:
final_instructions = self.instruction_list
print(f'keep the original instruction list in instruction_modification.py')
for state in states:
state.accounts[self.contract_address.value].code.instruction_list = final_instructions
return
if len(functions) >= len(self.function_identifier.keys()):
final_instructions = self.instruction_list
for state in states:
state.accounts[self.contract_address.value].code.instruction_list = final_instructions
return
fct_selectors = []
for ftn in functions:
if ftn in ['fallback']: continue
if ftn not in self.function_identifier.keys(): continue # can not do anything
fct_selectors.append(self.function_identifier[ftn])
final_instructions = self._get_modified_instructions_1(fct_selectors)
for state in states:
# update instructions for states
state.accounts[self.contract_address.value].code.instruction_list = final_instructions
state.accounts[self.contract_address.value].code.func_hashes = fct_selectors
def _get_modified_instructions_1(self, fct_selectors: list) -> list:
"""
replace the matching instructions of other functions with EMPTY instruction
keep the matching instructions of the specified functions
* handle fallback() which has no selector
* make sure that the last "DUP1" is replaced when the max position of the kept functions is not the last function
"""
ftn_selectors_valid = [selector for selector in fct_selectors if selector in self.functions_to_positions.keys()]
# find positions not kept
# keep the functions having two positions(there are branches in the function dispatcher)
keep = copy(self.positions_contain_GT)
for ftn_selector in ftn_selectors_valid:
keep += self.functions_to_positions[ftn_selector]
not_kept = [p for p in self.all_positions_related_function if p not in keep]
# combine instruction groups
combined_instructions = []
for p in range(0,len(self.positions_to_instructions) - 1):
if p not in not_kept:
combined_instructions += self.positions_to_instructions[p]
else:
# replace with EMPTY instructions
empty_instructions = [] # do not remove them, it will cause inconsistency in terms of the total number of instructions
for instruction in self.positions_to_instructions[p]:
empty_instructions.append({"address": instruction["address"], "opcode": "EMPTY"})
combined_instructions += empty_instructions
# before combining the last instruction group (the instructions of regular functions)
# If the last non-EMPTY opcode is DUP,remove it in the already combined instructions
for index in range(len(combined_instructions) - 1, 0, -1):
if str(combined_instructions[index]['opcode']).__eq__('EMPTY'):
continue
else:
if str(combined_instructions[index]['opcode']).__eq__('DUP1'):
instruction = combined_instructions[index]
combined_instructions[index] = {"address": instruction["address"], "opcode": "EMPTY"}
break
#
for p,idx in self.jumpdest_in_dispatcher:
idx=idx-1
while True:
instruction=combined_instructions[idx]
opcode=instruction['opcode']
if str(opcode).__eq__('EMPTY'):
idx = idx - 1
continue
else:
if str(opcode) in ['JUMPDEST','PUSH4']:
break
if str(opcode).__eq__('DUP1'):
combined_instructions[idx] = {"address": instruction["address"], "opcode": "EMPTY"}
break
# print_list(combined_instructions)
combined_instructions += self.positions_to_instructions[len(self.positions_to_instructions) - 1]
return combined_instructions
[docs]
def modify_no_modification(self,state:WorldState):
state.accounts[self.contract_address.value].code.instruction_list = self.instruction_list