"""This module contains functions setting up and executing transactions with
symbolic values."""
import logging
from typing import Optional, List
import fdg.preprocessing.address_collection
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.cfg import Node, Edge, JumpType
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.calldata import SymbolicCalldata
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.transaction.transaction_models import (
MessageCallTransaction,
ContractCreationTransaction,
tx_id_manager,
BaseTransaction,
)
from mythril.laser.smt import symbol_factory, Or, Bool, BitVec
FUNCTION_HASH_BYTE_LENGTH = 4
log = logging.getLogger(__name__)
[docs]
class Actors:
def __init__(
self,
creator=0xAFFEAFFEAFFEAFFEAFFEAFFEAFFEAFFEAFFEAFFE,
attacker=0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF,
someguy=0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA,
):
self.addresses = {
"CREATOR": symbol_factory.BitVecVal(creator, 256),
"ATTACKER": symbol_factory.BitVecVal(attacker, 256),
"SOMEGUY": symbol_factory.BitVecVal(someguy, 256),
}
def __setitem__(self, actor: str, address: Optional[str]):
"""
Sets an actor to a desired address
:param actor: Name of the actor to set
:param address: Address to set the actor to. None to delete the actor
"""
if address is None:
if actor in ("CREATOR", "ATTACKER"):
raise ValueError("Can't delete creator or attacker address")
del self.addresses[actor]
else:
if address[0:2] != "0x":
raise ValueError("Actor address not in valid format")
self.addresses[actor] = symbol_factory.BitVecVal(int(address[2:], 16), 256)
def __getitem__(self, actor: str):
return self.addresses[actor]
@property
def creator(self):
return self.addresses["CREATOR"]
@property
def attacker(self):
return self.addresses["ATTACKER"]
def __len__(self):
return len(self.addresses)
ACTORS = Actors()
#@wei
fdg.preprocessing.address_collection.actors=ACTORS
[docs]
def generate_function_constraints(
calldata: SymbolicCalldata, func_hashes: List[List[int]]
) -> List[Bool]:
"""
This will generate constraints for fixing the function call part of calldata
:param calldata: Calldata
:param func_hashes: The list of function hashes allowed for this transaction
:return: Constraints List
"""
if len(func_hashes) == 0:
return []
constraints = []
for i in range(FUNCTION_HASH_BYTE_LENGTH):
constraint = Bool(False)
for func_hash in func_hashes:
if func_hash == -1:
# Fallback function without calldata
constraint = Or(constraint, calldata.size < 4)
elif func_hash == -2:
# Receive function
constraint = Or(constraint, calldata.size == 0)
else:
constraint = Or(
constraint, calldata[i] == symbol_factory.BitVecVal(func_hash[i], 8)
)
constraints.append(constraint)
return constraints
[docs]
def execute_message_call(
laser_evm, callee_address: BitVec, func_hashes: List[List[int]] = None
) -> None:
"""Executes a message call transaction from all open states.
:param laser_evm:
:param callee_address:
"""
# TODO: Resolve circular import between .transaction and ..svm to import LaserEVM here
open_states = laser_evm.open_states[:]
del laser_evm.open_states[:]
for open_world_state in open_states:
if open_world_state[callee_address].deleted:
log.debug("Can not execute dead contract, skipping.")
continue
next_transaction_id = tx_id_manager.get_next_tx_id()
external_sender = symbol_factory.BitVecSym(
"sender_{}".format(next_transaction_id), 256
)
calldata = SymbolicCalldata(next_transaction_id)
transaction = MessageCallTransaction(
world_state=open_world_state,
identifier=next_transaction_id,
gas_price=symbol_factory.BitVecSym(
"gas_price{}".format(next_transaction_id), 256
),
gas_limit=8000000, # block gas limit
origin=external_sender,
caller=external_sender,
callee_account=open_world_state[callee_address],
call_data=calldata,
call_value=symbol_factory.BitVecSym(
"call_value{}".format(next_transaction_id), 256
),
)
constraints = (
generate_function_constraints(calldata, func_hashes)
if func_hashes
else None
)
_setup_global_state_for_execution(laser_evm, transaction, constraints)
laser_evm.exec()
[docs]
def execute_message_call_preprocessing(laser_evm, callee_address: BitVec) -> None:
"""Executes a message call transaction from all open states.
:param laser_evm:
:param callee_address:
"""
# TODO: Resolve circular import between .transaction and ..svm to import LaserEVM here
open_states = laser_evm.open_states[:]
del laser_evm.open_states[:]
for open_world_state in open_states:
if open_world_state[callee_address].deleted:
log.debug("Can not execute dead contract, skipping.")
continue
# next_transaction_id = get_next_transaction_id() (from old version)
next_transaction_id =1
external_sender = symbol_factory.BitVecSym(
"sender_{}".format(next_transaction_id), 256
)
transaction = MessageCallTransaction(
world_state=open_world_state,
identifier=next_transaction_id,
gas_price=symbol_factory.BitVecSym(
"gas_price{}".format(next_transaction_id), 256
),
gas_limit=8000000, # block gas limit
origin=external_sender,
caller=external_sender,
callee_account=open_world_state[callee_address],
call_data=SymbolicCalldata(next_transaction_id),
call_value=symbol_factory.BitVecSym(
"call_value{}".format(next_transaction_id), 256
),
)
_setup_global_state_for_execution(laser_evm, transaction)
laser_evm.exec_preprocessing()
[docs]
def execute_contract_creation(
laser_evm,
contract_initialization_code,
contract_name=None,
world_state=None,
origin=ACTORS["CREATOR"],
caller=ACTORS["CREATOR"],
) -> Account:
"""Executes a contract creation transaction from all open states.
:param laser_evm:
:param contract_initialization_code:
:param contract_name:
:return:
"""
world_state = world_state or WorldState()
open_states = [world_state]
del laser_evm.open_states[:]
new_account = None
for open_world_state in open_states:
next_transaction_id = tx_id_manager.get_next_tx_id()
# call_data "should" be '[]', but it is easier to model the calldata symbolically
# and add logic in codecopy/codesize/calldatacopy/calldatasize than to model code "correctly"
transaction = ContractCreationTransaction(
world_state=open_world_state,
identifier=next_transaction_id,
gas_price=symbol_factory.BitVecSym(
"gas_price{}".format(next_transaction_id), 256
),
gas_limit=8000000, # block gas limit
origin=origin,
code=Disassembly(contract_initialization_code),
caller=caller,
contract_name=contract_name,
call_data=None,
call_value=symbol_factory.BitVecSym(
"call_value{}".format(next_transaction_id), 256
),
)
_setup_global_state_for_execution(laser_evm, transaction)
new_account = new_account or transaction.callee_account
laser_evm.exec(True)
return new_account
def _setup_global_state_for_execution(
laser_evm,
transaction: BaseTransaction,
initial_constraints: Optional[List[Bool]] = None,
) -> None:
"""Sets up global state and cfg for a transactions execution.
:param laser_evm:
:param transaction:
"""
# TODO: Resolve circular import between .transaction and ..svm to import LaserEVM here
global_state = transaction.initial_global_state()
global_state.transaction_stack.append((transaction, None))
global_state.world_state.constraints += initial_constraints or []
global_state.world_state.constraints.append(
Or(*[transaction.caller == actor for actor in ACTORS.addresses.values()])
)
#@wei remove constraints as there is no constraint solving in preprocesing
if fdg.global_config.flag_preprocessing:
global_state.world_state.constraints=[]
new_node = Node(
global_state.environment.active_account.contract_name,
function_name=global_state.environment.active_function_name,
)
if laser_evm.requires_statespace:
laser_evm.nodes[new_node.uid] = new_node
if transaction.world_state.node:
if laser_evm.requires_statespace:
laser_evm.edges.append(
Edge(
transaction.world_state.node.uid,
new_node.uid,
edge_type=JumpType.Transaction,
condition=None,
)
)
new_node.constraints = global_state.world_state.constraints
global_state.world_state.transaction_sequence.append(transaction)
global_state.node = new_node
new_node.states.append(global_state)
laser_evm.work_list.append(global_state)
[docs]
def execute_transaction(*args, **kwargs):
"""
Chooses the transaction type based on callee address and
executes the transaction
"""
laser_evm = args[0]
if kwargs["callee_address"] == "":
for ws in laser_evm.open_states[:]:
execute_contract_creation(
laser_evm=laser_evm,
contract_initialization_code=kwargs["data"],
world_state=ws,
)
return
execute_message_call(
laser_evm=laser_evm,
callee_address=symbol_factory.BitVecVal(int(kwargs["callee_address"], 16), 256),
)