"""This module contains a representation class for EVM instructions and
transitions between them."""
import logging
import time
from copy import copy, deepcopy
from typing import cast, Callable, List, Union, Tuple
from fdg.expression_slot import map_concrete_hash_key_to_slot_normal
from mythril.exceptions import UnsatError
from mythril.laser.smt import (
Extract,
Expression,
UDiv,
simplify,
Concat,
ULT,
UGT,
BitVec,
is_false,
URem,
SRem,
If,
Bool,
Not,
LShR,
UGE,
)
from mythril.laser.smt import symbol_factory
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.state.calldata import ConcreteCalldata, SymbolicCalldata
import mythril.laser.ethereum.util as helper
from mythril.laser.ethereum import util
from mythril.laser.ethereum.function_managers import (
keccak_function_manager,
exponent_function_manager,
)
from mythril.laser.ethereum.call import (
get_call_parameters,
native_call,
get_call_data,
SYMBOLIC_CALLDATA_SIZE,
)
from mythril.laser.ethereum.evm_exceptions import (
VmException,
StackUnderflowException,
InvalidJumpDestination,
InvalidInstruction,
OutOfGasException,
WriteProtection,
)
from mythril.laser.ethereum.instruction_data import get_opcode_gas, calculate_sha3_gas
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.return_data import ReturnData
from mythril.laser.ethereum.transaction import (
MessageCallTransaction,
TransactionStartSignal,
ContractCreationTransaction,
tx_id_manager,
)
from mythril.support.model import get_model
from mythril.support.support_utils import get_code_hash
from mythril.support.loader import DynLoader
import fdg
from mythril.laser.smt.expression import simplify_yes
log = logging.getLogger(__name__)
TT256 = symbol_factory.BitVecVal(0, 256)
TT256M1 = symbol_factory.BitVecVal(2**256 - 1, 256)
[docs]
def transfer_ether(
global_state: GlobalState,
sender: BitVec,
receiver: BitVec,
value: Union[int, BitVec],
):
"""
Perform an Ether transfer between two accounts
:param global_state: The global state in which the Ether transfer occurs
:param sender: The sender of the Ether
:param receiver: The recipient of the Ether
:param value: The amount of Ether to send
:return:
"""
value = value if isinstance(value, BitVec) else symbol_factory.BitVecVal(value, 256)
global_state.world_state.constraints.append(
UGE(global_state.world_state.balances[sender], value)
)
global_state.world_state.balances[receiver] += value
global_state.world_state.balances[sender] -= value
[docs]
class StateTransition(object):
"""Decorator that handles global state copy and original return.
This decorator calls the decorated instruction mutator function on a
copy of the state that is passed to it. After the call, the
resulting new states' program counter is automatically incremented
if `increment_pc=True`.
"""
def __init__(
self, increment_pc=True, enable_gas=True, is_state_mutation_instruction=False
):
"""
:param increment_pc:
:param enable_gas:
:param is_state_mutation_instruction: The function mutates state
"""
self.increment_pc = increment_pc
self.enable_gas = enable_gas
self.is_state_mutation_instruction = is_state_mutation_instruction
[docs]
@staticmethod
def call_on_state_copy(func: Callable, func_obj: "Instruction", state: GlobalState):
"""
:param func:
:param func_obj:
:param state:
:return:
"""
global_state_copy = copy(state)
return func(func_obj, global_state_copy)
[docs]
def increment_states_pc(self, states: List[GlobalState]) -> List[GlobalState]:
"""
:param states:
:return:
"""
if self.increment_pc:
for state in states:
state.mstate.pc += 1
return states
[docs]
@staticmethod
def check_gas_usage_limit(global_state: GlobalState):
"""
:param global_state:
:return:
"""
global_state.mstate.check_gas()
if isinstance(global_state.current_transaction.gas_limit, BitVec):
value = global_state.current_transaction.gas_limit.value
if value is None:
return
global_state.current_transaction.gas_limit = value
if (
global_state.mstate.min_gas_used
>= global_state.current_transaction.gas_limit
):
raise OutOfGasException()
[docs]
def accumulate_gas(self, global_state: GlobalState):
"""
:param global_state:
:return:
"""
if not self.enable_gas:
return global_state
opcode = global_state.instruction["opcode"]
min_gas, max_gas = get_opcode_gas(opcode)
global_state.mstate.min_gas_used += min_gas
global_state.mstate.max_gas_used += max_gas
self.check_gas_usage_limit(global_state)
return global_state
def __call__(self, func: Callable) -> Callable:
def wrapper(
func_obj: "Instruction", global_state: GlobalState
) -> List[GlobalState]:
"""
:param func_obj:
:param global_state:
:return:
"""
if self.is_state_mutation_instruction and global_state.environment.static:
raise WriteProtection(
"The function {} cannot be executed in a static call".format(
func.__name__[:-1]
)
)
new_global_states = self.call_on_state_copy(func, func_obj, global_state)
new_global_states = [
self.accumulate_gas(state) for state in new_global_states
]
return self.increment_states_pc(new_global_states)
return wrapper
[docs]
class Instruction:
"""Instruction class is used to mutate a state according to the current
instruction."""
def __init__(
self,
op_code: str,
dynamic_loader: DynLoader,
pre_hooks: List[Callable] = None,
post_hooks: List[Callable] = None,
) -> None:
"""
:param op_code:
:param dynamic_loader:
:param iprof:
"""
self.dynamic_loader = dynamic_loader
self.op_code = op_code.upper()
self.pre_hook = pre_hooks if pre_hooks else []
self.post_hook = post_hooks if post_hooks else []
def _execute_pre_hooks(self, global_state: GlobalState):
for hook in self.pre_hook:
hook(global_state)
def _execute_post_hooks(self, global_state: GlobalState):
for hook in self.post_hook:
hook(global_state)
[docs]
def evaluate(self, global_state: GlobalState, post=False) -> List[GlobalState]:
"""Performs the mutation for this instruction.
:param global_state:
:param post:
:return:
"""
# Generalize some ops
log.debug("Evaluating %s at %i", self.op_code, global_state.mstate.pc)
op = self.op_code.lower()
if self.op_code.startswith("PUSH"):
op = "push"
elif self.op_code.startswith("DUP"):
op = "dup"
elif self.op_code.startswith("SWAP"):
op = "swap"
elif self.op_code.startswith("LOG"):
op = "log"
elif self.op_code.startswith("EMPTY"):#@wei: to handle opcode "-"
op = "empty"
instruction_mutator = (
getattr(self, op + "_", None)
if not post
else getattr(self, op + "_" + "post", None)
)
if instruction_mutator is None:
raise NotImplementedError
self._execute_pre_hooks(global_state)
result = instruction_mutator(global_state)
self._execute_post_hooks(global_state)
return result
[docs]
@StateTransition()
def jumpdest_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
return [global_state]
[docs]
@StateTransition()
def push_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
push_instruction = global_state.get_current_instruction()
push_value = push_instruction["argument"]
try:
length_of_value = 2 * int(push_instruction["opcode"][4:])
except ValueError:
raise VmException("Invalid Push instruction")
if type(push_value) == tuple:
if type(push_value[0]) == int:
new_value = symbol_factory.BitVecVal(push_value[0], 8)
else:
new_value = push_value[0]
if len(push_value) > 1:
for val in push_value[1:]:
if type(val) == int:
new_value = Concat(new_value, symbol_factory.BitVecVal(val, 8))
else:
new_value = Concat(new_value, val)
pad_length = length_of_value // 2 - len(push_value)
if pad_length > 0:
new_value = Concat(new_value, symbol_factory.BitVecVal(0, pad_length))
if new_value.size() < 256:
new_value = Concat(
symbol_factory.BitVecVal(0, 256 - new_value.size()), new_value
)
# global_state.mstate.stack.append(new_value)
global_state.mstate.stack.append(simplify_yes(new_value))
else:
push_value += "0" * max(length_of_value - (len(push_value) - 2), 0)
# global_state.mstate.stack.append(
# symbol_factory.BitVecVal(int(push_value, 16), 256)
# )
global_state.mstate.stack.append(
simplify_yes(symbol_factory.BitVecVal(int(push_value, 16), 256))
)
return [global_state]
[docs]
@StateTransition()
def dup_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
value = int(global_state.get_current_instruction()["opcode"][3:], 10)
global_state.mstate.stack.append(global_state.mstate.stack[-value])
return [global_state]
[docs]
@StateTransition()
def empty_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
return [global_state]
[docs]
@StateTransition()
def swap_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
depth = int(self.op_code[4:])
stack = global_state.mstate.stack
stack[-depth - 1], stack[-1] = stack[-1], stack[-depth - 1]
return [global_state]
[docs]
@StateTransition()
def pop_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.pop()
return [global_state]
[docs]
@StateTransition()
def and_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
stack = global_state.mstate.stack
op1, op2 = stack.pop(), stack.pop()
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if not isinstance(op1, Expression):
op1 = symbol_factory.BitVecVal(op1, 256)
if not isinstance(op2, Expression):
op2 = symbol_factory.BitVecVal(op2, 256)
stack.append(op1 & op2)
return [global_state]
[docs]
@StateTransition()
def or_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
stack = global_state.mstate.stack
op1, op2 = stack.pop(), stack.pop()
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
stack.append(op1 | op2)
return [global_state]
[docs]
@StateTransition()
def xor_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
mstate = global_state.mstate
mstate.stack.append(mstate.stack.pop() ^ mstate.stack.pop())
return [global_state]
[docs]
@StateTransition()
def not_(self, global_state: GlobalState):
"""
:param global_state:
:return:
"""
mstate = global_state.mstate
mstate.stack.append(TT256M1 - mstate.stack.pop())
return [global_state]
[docs]
@StateTransition()
def byte_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
mstate = global_state.mstate
op0, op1 = mstate.stack.pop(), mstate.stack.pop()
if not isinstance(op1, Expression):
op1 = symbol_factory.BitVecVal(op1, 256)
try:
index = util.get_concrete_int(op0)
offset = (31 - index) * 8
if offset >= 0:
result = simplify(
Concat(
symbol_factory.BitVecVal(0, 248),
Extract(offset + 7, offset, op1),
)
) # type: Union[int, Expression]
else:
result = 0
except TypeError:
log.debug("BYTE: Unsupported symbolic byte offset")
result = global_state.new_bitvec(
str(simplify(op1)) + "[" + str(simplify(op0)) + "]", 256
)
mstate.stack.append(result)
return [global_state]
# Arithmetic
[docs]
@StateTransition()
def add_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(
(
helper.pop_bitvec(global_state.mstate)
+ helper.pop_bitvec(global_state.mstate)
)
)
return [global_state]
[docs]
@StateTransition()
def sub_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(
(
helper.pop_bitvec(global_state.mstate)
- helper.pop_bitvec(global_state.mstate)
)
)
return [global_state]
[docs]
@StateTransition()
def mul_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(
(
helper.pop_bitvec(global_state.mstate)
* helper.pop_bitvec(global_state.mstate)
)
)
return [global_state]
[docs]
@StateTransition()
def div_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
op0, op1 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
if op1 == 0:
global_state.mstate.stack.append(symbol_factory.BitVecVal(0, 256))
else:
global_state.mstate.stack.append(UDiv(op0, op1))
return [global_state]
[docs]
@StateTransition()
def sdiv_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
if s1 == 0:
global_state.mstate.stack.append(symbol_factory.BitVecVal(0, 256))
else:
global_state.mstate.stack.append(s0 / s1)
return [global_state]
[docs]
@StateTransition()
def mod_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(0 if s1 == 0 else URem(s0, s1))
return [global_state]
[docs]
@StateTransition()
def shl_(self, global_state: GlobalState) -> List[GlobalState]:
shift, value = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(value << shift)
return [global_state]
[docs]
@StateTransition()
def shr_(self, global_state: GlobalState) -> List[GlobalState]:
shift, value = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(LShR(value, shift))
return [global_state]
[docs]
@StateTransition()
def sar_(self, global_state: GlobalState) -> List[GlobalState]:
shift, value = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(value >> shift)
return [global_state]
[docs]
@StateTransition()
def smod_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(0 if s1 == 0 else SRem(s0, s1))
return [global_state]
[docs]
@StateTransition()
def addmod_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1, s2 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(URem(URem(s0, s2) + URem(s1, s2), s2))
return [global_state]
[docs]
@StateTransition()
def mulmod_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1, s2 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(URem(URem(s0, s2) * URem(s1, s2), s2))
return [global_state]
[docs]
@StateTransition()
def exp_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
base, exponent = util.pop_bitvec(state), util.pop_bitvec(state)
exponentiation, constraint = exponent_function_manager.create_condition(
base, exponent
)
state.stack.append(exponentiation)
global_state.world_state.constraints.append(constraint)
return [global_state]
[docs]
@StateTransition()
def signextend_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
mstate = global_state.mstate
s0, s1 = mstate.stack.pop(), mstate.stack.pop()
testbit = s0 * symbol_factory.BitVecVal(8, 256) + symbol_factory.BitVecVal(
7, 256
)
set_testbit = symbol_factory.BitVecVal(1, 256) << testbit
sign_bit_set = simplify(Not(s1 & set_testbit == 0))
mstate.stack.append(
simplify(
If(
s0 <= 31,
If(
sign_bit_set, s1 | (TT256 - set_testbit), s1 & (set_testbit - 1)
),
s1,
)
)
)
return [global_state]
# Comparisons
[docs]
@StateTransition()
def lt_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
exp = ULT(util.pop_bitvec(state), util.pop_bitvec(state))
state.stack.append(exp)
return [global_state]
[docs]
@StateTransition()
def gt_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
op1, op2 = util.pop_bitvec(state), util.pop_bitvec(state)
exp = UGT(op1, op2)
state.stack.append(exp)
return [global_state]
[docs]
@StateTransition()
def slt_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
exp = util.pop_bitvec(state) < util.pop_bitvec(state)
state.stack.append(exp)
return [global_state]
[docs]
@StateTransition()
def sgt_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
exp = util.pop_bitvec(state) > util.pop_bitvec(state)
state.stack.append(exp)
return [global_state]
[docs]
@StateTransition()
def eq_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
op1 = state.stack.pop()
op2 = state.stack.pop()
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
exp = op1 == op2
state.stack.append(exp)
return [global_state]
[docs]
@StateTransition()
def iszero_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
val = state.stack.pop()
exp = Not(val) if isinstance(val, Bool) else val == 0
exp = If(
exp, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
state.stack.append(simplify(exp))
return [global_state]
# Call data
[docs]
@StateTransition()
def callvalue_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
state.stack.append(environment.callvalue)
return [global_state]
[docs]
@StateTransition()
def calldataload_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
op0 = state.stack.pop()
value = environment.calldata.get_word_at(op0)
state.stack.append(value)
return [global_state]
[docs]
@StateTransition()
def calldatasize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.calldata.calldatasize)
return [global_state]
@staticmethod
def _calldata_copy_helper(global_state, mstate, mstart, dstart, size):
environment = global_state.environment
try:
mstart = util.get_concrete_int(mstart)
except TypeError:
log.debug("Unsupported symbolic memory offset in CALLDATACOPY")
return [global_state]
try:
dstart = util.get_concrete_int(dstart) # type: Union[int, BitVec]
except TypeError:
log.debug("Unsupported symbolic calldata offset in CALLDATACOPY")
dstart = simplify(dstart)
try:
size = util.get_concrete_int(size) # type: Union[int, BitVec]
except TypeError:
log.debug("Unsupported symbolic size in CALLDATACOPY")
size = SYMBOLIC_CALLDATA_SIZE # The excess size will get overwritten
size = cast(int, size)
if size > 0:
try:
mstate.mem_extend(mstart, size)
except TypeError as e:
log.debug("Memory allocation error: {}".format(e))
mstate.mem_extend(mstart, 1)
mstate.memory[mstart] = global_state.new_bitvec(
"calldata_"
+ str(environment.active_account.contract_name)
+ "["
+ str(dstart)
+ ": + "
+ str(size)
+ "]",
8,
)
return [global_state]
try:
i_data = dstart
new_memory = []
for i in range(size):
value = environment.calldata[i_data]
new_memory.append(value)
i_data = (
i_data + 1
if isinstance(i_data, int)
else simplify(cast(BitVec, i_data) + 1)
)
for i in range(len(new_memory)):
mstate.memory[i + mstart] = new_memory[i]
except IndexError:
log.debug("Exception copying calldata to memory")
mstate.memory[mstart] = global_state.new_bitvec(
"calldata_"
+ str(environment.active_account.contract_name)
+ "["
+ str(dstart)
+ ": + "
+ str(size)
+ "]",
8,
)
return [global_state]
[docs]
@StateTransition()
def calldatacopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
op0, op1, op2 = state.stack.pop(), state.stack.pop(), state.stack.pop()
if isinstance(global_state.current_transaction, ContractCreationTransaction):
log.debug("Attempt to use CALLDATACOPY in creation transaction")
return [global_state]
return self._calldata_copy_helper(global_state, state, op0, op1, op2)
# Environment
[docs]
@StateTransition()
def address_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
state.stack.append(environment.address)
return [global_state]
[docs]
@StateTransition()
def balance_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
address = state.stack.pop()
onchain_access = True
if address.symbolic is False:
try:
balance = global_state.world_state.accounts_exist_or_load(
address.value, self.dynamic_loader
).balance()
except ValueError:
onchain_access = False
else:
onchain_access = False
if onchain_access is False:
balance = symbol_factory.BitVecVal(0, 256)
for account in global_state.world_state.accounts.values():
balance = If(address == account.address, account.balance(), balance)
state.stack.append(balance)
return [global_state]
[docs]
@StateTransition()
def origin_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
state.stack.append(environment.origin)
return [global_state]
[docs]
@StateTransition()
def caller_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
state.stack.append(environment.sender)
return [global_state]
[docs]
@StateTransition()
def chainid_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.chainid)
return [global_state]
[docs]
@StateTransition()
def selfbalance_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
balance = global_state.environment.active_account.balance()
global_state.mstate.stack.append(balance)
return [global_state]
[docs]
@StateTransition()
def codesize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
disassembly = environment.code
calldata = global_state.environment.calldata
if isinstance(global_state.current_transaction, ContractCreationTransaction):
# Hacky way to ensure constructor arguments work - Pick some reasonably large size.
no_of_bytes = len(disassembly.bytecode) // 2
if isinstance(calldata, ConcreteCalldata):
no_of_bytes += calldata.size
else:
no_of_bytes += 0x200 # space for 16 32-byte arguments
global_state.world_state.constraints.append(
global_state.environment.calldata.size == no_of_bytes
)
else:
no_of_bytes = len(disassembly.bytecode) // 2
state.stack.append(no_of_bytes)
return [global_state]
@staticmethod
def _sha3_gas_helper(global_state, length):
min_gas, max_gas = calculate_sha3_gas(length)
global_state.mstate.min_gas_used += min_gas
global_state.mstate.max_gas_used += max_gas
StateTransition.check_gas_usage_limit(global_state)
return global_state
[docs]
@StateTransition(enable_gas=False)
def sha3_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
index, op1 = state.stack.pop(), state.stack.pop()
try:
length = util.get_concrete_int(op1)
except TypeError:
# Can't access symbolic memory offsets
length = 64
global_state.world_state.constraints.append(op1 == length)
Instruction._sha3_gas_helper(global_state, length)
state.mem_extend(index, length)
data_list = [
b if isinstance(b, BitVec) else symbol_factory.BitVecVal(b, 8)
for b in state.memory[index : index + length]
]
if len(data_list) > 1:
data = simplify(Concat(data_list))
# still need to collect the data when data is a concrete value.
# @wei
if fdg.global_config.flag_preprocessing or fdg.global_config.tx_len == 0:
fdg.preprocessing.slot_location.map_concrete_hash_key_to_slot(
data,data_list[-1])
# @wei
if not fdg.global_config.flag_preprocessing:
# collect the map from data to concrete hash in the normal execution
map_concrete_hash_key_to_slot_normal(data, data_list[-1])
# # # @wei
# # print(f'_=_=_')
# # print(f'in sha3_() of instructions.py: ')
# # print(f'slot index: {data_list[-1]} ')
# # print(f'simplified data: {data}\n')
#
# seconds_start = time.time()
#
# if fdg.global_config.flag_preprocessing or fdg.global_config.tx_len == 0:
# fdg.preprocessing.slot_location.map_key_to_slot(Concat(data_list), data_list)
# seconds_end = time.time()
# print(f'#@time sha3')
# print(f'time used(s):{seconds_end - seconds_start}')
elif len(data_list) == 1:
data = data_list[0]
# # # @wei
# # print(f'_=_=_')
# # print(f'in sha3_() of instructions.py: ')
# # print(f'slot index: {data_list[-1]} ')
# # print(f'simplified data: {data}')
#
# if fdg.global_config.flag_preprocessing or fdg.global_config.tx_len == 0:
# try:
# # sim_data = simplify_yes(data_list[0])
# fdg.preprocessing.slot_location.map_key_to_slot(data_list[0], data_list)
# print(f'instructions.py: sim_data: {data_list[0]}')
# except:
# print(f'instructions.py: has a problem to collect data')
else:
# @wei
print(f'in sha3_() of instructions.py: get empty keccak hash')
# TODO: handle finding x where func(x)==func("")
result = keccak_function_manager.get_empty_keccak_hash()
state.stack.append(result)
return [global_state]
result = keccak_function_manager.create_keccak(data)
# if fdg.global_config.flag_preprocessing or fdg.global_config.tx_len == 0:
# fdg.preprocessing.slot_location.map_key_to_slot(result, data_list)
state.stack.append(result)
return [global_state]
[docs]
@StateTransition()
def gasprice_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.gasprice)
return [global_state]
[docs]
@StateTransition()
def basefee_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.basefee)
return [global_state]
[docs]
@StateTransition()
def codecopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
memory_offset, code_offset, size = (
global_state.mstate.stack.pop(),
global_state.mstate.stack.pop(),
global_state.mstate.stack.pop(),
)
code = global_state.environment.code.bytecode
if code[0:2] == "0x":
code = code[2:]
code_size = len(code) // 2
if isinstance(global_state.current_transaction, ContractCreationTransaction):
# Treat creation code after the expected disassembly as calldata.
# This is a slightly hacky way to ensure that symbolic constructor
# arguments work correctly.
mstate = global_state.mstate
offset = code_offset - code_size
log.debug("Copying from code offset: {} with size: {}".format(offset, size))
if isinstance(global_state.environment.calldata, SymbolicCalldata):
if code_offset >= code_size:
return self._calldata_copy_helper(
global_state, mstate, memory_offset, offset, size
)
else:
# Copy from both code and calldata appropriately.
concrete_code_offset = helper.get_concrete_int(code_offset)
concrete_size = helper.get_concrete_int(size)
code_copy_offset = concrete_code_offset
code_copy_size = (
concrete_size
if concrete_code_offset + concrete_size <= code_size
else code_size - concrete_code_offset
)
code_copy_size = code_copy_size if code_copy_size >= 0 else 0
calldata_copy_offset = (
concrete_code_offset - code_size
if concrete_code_offset - code_size > 0
else 0
)
calldata_copy_size = concrete_code_offset + concrete_size - code_size
calldata_copy_size = (
calldata_copy_size if calldata_copy_size >= 0 else 0
)
[global_state] = self._code_copy_helper(
code=global_state.environment.code.bytecode,
memory_offset=memory_offset,
code_offset=code_copy_offset,
size=code_copy_size,
op="CODECOPY",
global_state=global_state,
)
return self._calldata_copy_helper(
global_state=global_state,
mstate=mstate,
mstart=memory_offset + code_copy_size,
dstart=calldata_copy_offset,
size=calldata_copy_size,
)
return self._code_copy_helper(
code=global_state.environment.code.bytecode,
memory_offset=memory_offset,
code_offset=code_offset,
size=size,
op="CODECOPY",
global_state=global_state,
)
[docs]
@StateTransition()
def extcodesize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
addr = state.stack.pop()
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
log.debug("unsupported symbolic address for EXTCODESIZE")
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
try:
code = global_state.world_state.accounts_exist_or_load(
addr, self.dynamic_loader
).code.bytecode
except (ValueError, AttributeError) as e:
log.debug("error accessing contract storage due to: " + str(e))
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
state.stack.append(len(code) // 2)
return [global_state]
@staticmethod
def _code_copy_helper(
code: Union[str, Tuple],
memory_offset: Union[int, BitVec],
code_offset: Union[int, BitVec],
size: Union[int, BitVec],
op: str,
global_state: GlobalState,
) -> List[GlobalState]:
try:
concrete_memory_offset = helper.get_concrete_int(memory_offset)
except TypeError:
log.debug("Unsupported symbolic memory offset in {}".format(op))
return [global_state]
try:
concrete_size = helper.get_concrete_int(size)
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
except TypeError:
# except both attribute error and Exception
global_state.mstate.mem_extend(concrete_memory_offset, 1)
global_state.mstate.memory[
concrete_memory_offset
] = global_state.new_bitvec(
"code({})".format(
global_state.environment.active_account.contract_name
),
8,
)
return [global_state]
try:
concrete_code_offset = helper.get_concrete_int(code_offset)
except TypeError:
log.debug("Unsupported symbolic code offset in {}".format(op))
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
for i in range(concrete_size):
global_state.mstate.memory[
concrete_memory_offset + i
] = global_state.new_bitvec(
"code({})".format(
global_state.environment.active_account.contract_name
),
8,
)
return [global_state]
if code[0:2] == "0x":
code = code[2:]
for i in range(concrete_size):
if isinstance(code, str):
if 2 * (concrete_code_offset + i + 1) > len(code):
break
global_state.mstate.memory[concrete_memory_offset + i] = int(
code[
2
* (concrete_code_offset + i) : 2
* (concrete_code_offset + i + 1)
],
16,
)
else:
if (concrete_code_offset + i + 1) > len(code):
break
global_state.mstate.memory[concrete_memory_offset + i] = code[
concrete_code_offset + i
]
return [global_state]
[docs]
@StateTransition()
def extcodecopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
addr, memory_offset, code_offset, size = (
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
)
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
log.debug("unsupported symbolic address for EXTCODECOPY")
return [global_state]
try:
code = global_state.world_state.accounts_exist_or_load(
addr, self.dynamic_loader
).code.bytecode
except (ValueError, AttributeError) as e:
log.debug("error accessing contract storage due to: " + str(e))
return [global_state]
return self._code_copy_helper(
code=code,
memory_offset=memory_offset,
code_offset=code_offset,
size=size,
op="EXTCODECOPY",
global_state=global_state,
)
[docs]
@StateTransition()
def extcodehash_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return: List of global states possible, list of size 1 in this case
"""
world_state = global_state.world_state
stack = global_state.mstate.stack
address = Extract(159, 0, stack.pop())
if address.symbolic:
code_hash = symbol_factory.BitVecVal(int(get_code_hash(""), 16), 256)
elif address.value not in world_state.accounts:
code_hash = symbol_factory.BitVecVal(0, 256)
else:
addr = "0" * (40 - len(hex(address.value)[2:])) + hex(address.value)[2:]
code = world_state.accounts_exist_or_load(
addr, self.dynamic_loader
).code.bytecode
code_hash = symbol_factory.BitVecVal(int(get_code_hash(code), 16), 256)
stack.append(code_hash)
return [global_state]
[docs]
@StateTransition()
def returndatacopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
memory_offset, return_offset, size = (
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
)
try:
concrete_memory_offset = helper.get_concrete_int(memory_offset)
except TypeError:
log.debug("Unsupported symbolic memory offset in RETURNDATACOPY")
return [global_state]
try:
concrete_return_offset = helper.get_concrete_int(return_offset)
except TypeError:
log.debug("Unsupported symbolic return offset in RETURNDATACOPY")
return [global_state]
try:
concrete_size = helper.get_concrete_int(size)
except TypeError:
log.debug("Unsupported symbolic max_length offset in RETURNDATACOPY")
return [global_state]
if global_state.last_return_data is None:
return [global_state]
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
for i in range(concrete_size):
global_state.mstate.memory[concrete_memory_offset + i] = (
global_state.last_return_data[concrete_return_offset + i]
if concrete_return_offset + i < global_state.last_return_data.size
else 0
)
return [global_state]
[docs]
@StateTransition()
def returndatasize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
if global_state.last_return_data:
global_state.mstate.stack.append(global_state.last_return_data.size)
else:
global_state.mstate.stack.append(0)
return [global_state]
[docs]
@StateTransition()
def blockhash_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
blocknumber = state.stack.pop()
state.stack.append(
global_state.new_bitvec("blockhash_block_" + str(blocknumber), 256)
)
return [global_state]
[docs]
@StateTransition()
def coinbase_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.new_bitvec("coinbase", 256))
return [global_state]
[docs]
@StateTransition()
def timestamp_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.new_bitvec("timestamp", 256))
return [global_state]
[docs]
@StateTransition()
def number_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.block_number)
return [global_state]
[docs]
@StateTransition()
def difficulty_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(
global_state.new_bitvec("block_difficulty", 256)
)
return [global_state]
[docs]
@StateTransition()
def gaslimit_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.mstate.gas_limit)
return [global_state]
# Memory operations
[docs]
@StateTransition()
def mload_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
offset = state.stack.pop()
state.mem_extend(offset, 32)
data = state.memory.get_word_at(offset)
state.stack.append(data)
return [global_state]
[docs]
@StateTransition()
def mstore_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
mstart, value = state.stack.pop(), state.stack.pop()
try:
state.mem_extend(mstart, 32)
except Exception:
log.debug("Error extending memory")
state.memory.write_word_at(mstart, value)
return [global_state]
[docs]
@StateTransition()
def mstore8_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
offset, value = state.stack.pop(), state.stack.pop()
state.mem_extend(offset, 1)
try:
value_to_write = (
util.get_concrete_int(value) % 256
) # type: Union[int, BitVec]
except TypeError: # BitVec
value_to_write = Extract(7, 0, value)
state.memory[offset] = value_to_write
return [global_state]
[docs]
@StateTransition()
def sload_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
index = state.stack.pop()
state.stack.append(global_state.environment.active_account.storage[index])
return [global_state]
[docs]
@StateTransition(is_state_mutation_instruction=True)
def sstore_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
index, value = state.stack.pop(), state.stack.pop()
global_state.environment.active_account.storage[index] = value
return [global_state]
[docs]
@StateTransition(increment_pc=False, enable_gas=False)
def jump_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
disassembly = global_state.environment.code
try:
jump_addr = util.get_concrete_int(state.stack.pop())
except TypeError:
raise InvalidJumpDestination("Invalid jump argument (symbolic address)")
except IndexError:
raise StackUnderflowException()
index = util.get_instruction_index(disassembly.instruction_list, jump_addr)
if index is None:
raise InvalidJumpDestination("JUMP to invalid address")
op_code = disassembly.instruction_list[index]["opcode"]
if op_code != "JUMPDEST":
raise InvalidJumpDestination(
"Skipping JUMP to invalid destination (not JUMPDEST): " + str(jump_addr)
)
new_state = copy(global_state)
# add JUMP gas cost
min_gas, max_gas = get_opcode_gas("JUMP")
new_state.mstate.min_gas_used += min_gas
new_state.mstate.max_gas_used += max_gas
# manually set PC to destination
new_state.mstate.pc = index
return [new_state]
[docs]
@StateTransition(increment_pc=False, enable_gas=False)
def jumpi_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
disassembly = global_state.environment.code
min_gas, max_gas = get_opcode_gas("JUMPI")
states = []
op0, condition = state.stack.pop(), state.stack.pop()
try:
jump_addr = util.get_concrete_int(op0)
except TypeError:
log.debug("Skipping JUMPI to invalid destination.")
global_state.mstate.pc += 1
global_state.mstate.min_gas_used += min_gas
global_state.mstate.max_gas_used += max_gas
return [global_state]
if fdg.global_config.flag_preprocessing:
negated = True
condi = True
negated_cond=True
positive_cond=True
else:
# False case
negated = (
simplify(Not(condition)) if isinstance(condition, Bool) else condition == 0
)
negated.simplify() # it is simplified
# True case
condi = simplify(condition) if isinstance(condition, Bool) else condition != 0
condi.simplify()
negated_cond = (type(negated) == bool and negated) or (
isinstance(negated, Bool) and not is_false(negated)
)
positive_cond = (type(condi) == bool and condi) or (
isinstance(condi, Bool) and not is_false(condi)
)
if negated_cond:
# States have to be deep copied during a fork as summaries assume independence across states.
new_state = deepcopy(global_state)
# add JUMPI gas cost
new_state.mstate.min_gas_used += min_gas
new_state.mstate.max_gas_used += max_gas
# manually increment PC
new_state.mstate.depth += 1
new_state.mstate.pc += 1
new_state.world_state.constraints.append(negated)
states.append(new_state)
else:
log.debug("Pruned unreachable states.")
# Get jump destination
index = util.get_instruction_index(disassembly.instruction_list, jump_addr)
if index is None:
log.debug("Invalid jump destination: " + str(jump_addr))
return states
instr = disassembly.instruction_list[index]
if instr["opcode"] == "JUMPDEST":
if positive_cond:
new_state = deepcopy(global_state)
# add JUMPI gas cost
new_state.mstate.min_gas_used += min_gas
new_state.mstate.max_gas_used += max_gas
# manually set PC to destination
new_state.mstate.pc = index
new_state.mstate.depth += 1
new_state.world_state.constraints.append(condi)
states.append(new_state)
else:
log.debug("Pruned unreachable states.")
return states
[docs]
@StateTransition()
def beginsub_(self, global_state: GlobalState) -> List[GlobalState]:
"""
This opcode depicts the start of the subroutine
"""
raise OutOfGasException("Encountered BEGINSUB")
[docs]
@StateTransition()
def jumpsub_(self, global_state: GlobalState) -> List[GlobalState]:
"""
Jump to the subroutine
"""
disassembly = global_state.environment.code
try:
location = util.get_concrete_int(global_state.mstate.stack.pop())
except TypeError:
raise VmException("Encountered symbolic JUMPSUB location")
index = util.get_instruction_index(disassembly.instruction_list, location)
instr = disassembly.instruction_list[index]
if instr["opcode"] != "BEGINSUB":
raise VmException(
"Encountered invalid JUMPSUB location :{}".format(instr["address"])
)
global_state.mstate.subroutine_stack.append(global_state.mstate.pc + 1)
global_state.mstate.pc = location
return [global_state]
[docs]
@StateTransition(increment_pc=False)
def returnsub_(self, global_state: GlobalState) -> List[GlobalState]:
"""
Returns control to the caller of the subroutine
"""
global_state.mstate.pc = global_state.mstate.subroutine_stack.pop()
return [global_state]
[docs]
@StateTransition()
def pc_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
index = global_state.mstate.pc
program_counter = global_state.environment.code.instruction_list[index][
"address"
]
global_state.mstate.stack.append(symbol_factory.BitVecVal(program_counter, 256))
return [global_state]
[docs]
@StateTransition()
def msize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.mstate.memory_size)
return [global_state]
[docs]
@StateTransition()
def gas_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
# TODO: Push a Constrained variable which lies between min gas and max gas
global_state.mstate.stack.append(global_state.new_bitvec("gas", 256))
return [global_state]
[docs]
@StateTransition(is_state_mutation_instruction=True)
def log_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
# TODO: implement me
state = global_state.mstate
depth = int(self.op_code[3:])
state.stack.pop(), state.stack.pop()
_ = [state.stack.pop() for _ in range(depth)]
# Not supported
return [global_state]
def _create_transaction_helper(
self, global_state, call_value, mem_offset, mem_size, create2_salt=None
) -> List[GlobalState]:
mstate = global_state.mstate
environment = global_state.environment
world_state = global_state.world_state
call_data = get_call_data(global_state, mem_offset, mem_offset + mem_size)
code_raw = []
code_end = call_data.size
size = call_data.size
if isinstance(size, BitVec):
# Other size restriction checks handle this
if size.symbolic:
size = 10**4
else:
size = size.value
code_raw = []
constraints = global_state.world_state.constraints
try:
model = get_model(constraints)
except UnsatError:
model = None
except TypeError:
# # fix the error:
# # TypeError: unhashable type: 'list'
model = None
if isinstance(call_data, ConcreteCalldata):
for element in call_data.concrete(model):
if isinstance(element, BitVec) and element.symbolic:
break
if isinstance(element, BitVec):
code_raw.append(element.value)
else:
code_raw.append(element)
if len(code_raw) < 1:
global_state.mstate.stack.append(1)
log.debug("No code found for trying to execute a create type instruction.")
return [global_state]
code_str = bytes.hex(bytes(code_raw))
next_transaction_id = tx_id_manager.get_next_tx_id()
constructor_arguments = ConcreteCalldata(
next_transaction_id, call_data[code_end:]
)
code = Disassembly(code_str)
caller = environment.active_account.address
gas_price = environment.gasprice
origin = environment.origin
contract_address = None # type: Union[BitVec, int]
Instruction._sha3_gas_helper(global_state, len(code_str[2:]) // 2)
if create2_salt:
if create2_salt.symbolic:
if create2_salt.size() != 256:
pad = symbol_factory.BitVecVal(0, 256 - create2_salt.size())
create2_salt = Concat(pad, create2_salt)
address = keccak_function_manager.create_keccak(
Concat(
symbol_factory.BitVecVal(255, 8),
caller,
create2_salt,
symbol_factory.BitVecVal(int(get_code_hash(code_str), 16), 256),
)
)
contract_address = Extract(255, 96, address)
else:
salt = hex(create2_salt.value)[2:]
salt = "0" * (64 - len(salt)) + salt
addr = hex(caller.value)[2:]
addr = "0" * (40 - len(addr)) + addr
contract_address = int(
get_code_hash("0xff" + addr + salt + get_code_hash(code_str)[2:])[
26:
],
16,
)
transaction = ContractCreationTransaction(
world_state=world_state,
caller=caller,
code=code,
call_data=constructor_arguments,
gas_price=gas_price,
gas_limit=mstate.gas_limit,
origin=origin,
call_value=call_value,
contract_address=contract_address,
)
log.info("Raise transaction start signal")
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs]
@StateTransition(is_state_mutation_instruction=True)
def create_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
call_value, mem_offset, mem_size = global_state.mstate.pop(3)
return self._create_transaction_helper(
global_state, call_value, mem_offset, mem_size
)
[docs]
@StateTransition()
def create_post(self, global_state: GlobalState) -> List[GlobalState]:
return self._handle_create_type_post(global_state)
[docs]
@StateTransition(is_state_mutation_instruction=True)
def create2_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
call_value, mem_offset, mem_size, salt = global_state.mstate.pop(4)
return self._create_transaction_helper(
global_state, call_value, mem_offset, mem_size, salt
)
[docs]
@StateTransition()
def create2_post(self, global_state: GlobalState) -> List[GlobalState]:
return self._handle_create_type_post(global_state, opcode="create2")
@staticmethod
def _handle_create_type_post(global_state, opcode="create"):
if opcode == "create2":
global_state.mstate.pop(4)
else:
global_state.mstate.pop(3)
if global_state.last_return_data:
return_val = symbol_factory.BitVecVal(
int(global_state.last_return_data.return_data, 16), 256
)
else:
return_val = symbol_factory.BitVecVal(0, 256)
global_state.mstate.stack.append(return_val)
return [global_state]
[docs]
@StateTransition()
def return_(self, global_state: GlobalState):
"""
:param global_state:
"""
state = global_state.mstate
offset, length = state.stack.pop(), state.stack.pop()
if length.symbolic:
return_data = [global_state.new_bitvec("return_data", 8)]
log.debug("Return with symbolic length or offset. Not supported")
else:
state.mem_extend(offset, length)
StateTransition.check_gas_usage_limit(global_state)
return_data = state.memory[offset : offset + length]
global_state.current_transaction.end(
global_state, ReturnData(return_data, length)
)
[docs]
@StateTransition(is_state_mutation_instruction=True)
def selfdestruct_(self, global_state: GlobalState):
"""
:param global_state:
"""
target = global_state.mstate.stack.pop()
transfer_amount = global_state.environment.active_account.balance()
# Often the target of the selfdestruct instruction will be symbolic
# If it isn't then we'll transfer the balance to the indicated contract
global_state.world_state.balances[target] += transfer_amount
global_state.environment.active_account = deepcopy(
global_state.environment.active_account
)
global_state.accounts[
global_state.environment.active_account.address.value
] = global_state.environment.active_account
global_state.environment.active_account.set_balance(0)
global_state.environment.active_account.deleted = True
global_state.current_transaction.end(global_state)
[docs]
@StateTransition()
def revert_(self, global_state: GlobalState) -> None:
"""
:param global_state:
"""
state = global_state.mstate
offset, length = state.stack.pop(), state.stack.pop()
if length.symbolic is False:
return_data = [
global_state.new_bitvec(
f"{global_state.current_transaction.id}_return_data_{i}", 8
)
for i in range(length.value)
]
else:
return_data = [
If(
i < length,
global_state.new_bitvec(
f"{global_state.current_transaction.id}_return_data_{i}", 8
),
0,
)
for i in range(300)
]
try:
return_data = state.memory[
util.get_concrete_int(offset) : util.get_concrete_int(offset + length)
]
except TypeError:
log.debug("Return with symbolic length or offset. Not supported")
global_state.current_transaction.end(
global_state, return_data=ReturnData(return_data, length), revert=True
)
[docs]
@StateTransition()
def assert_fail_(self, global_state: GlobalState):
"""
:param global_state:
"""
# 0xfe: designated invalid opcode
raise InvalidInstruction
[docs]
@StateTransition()
def invalid_(self, global_state: GlobalState):
"""
:param global_state:
"""
raise InvalidInstruction
[docs]
@StateTransition()
def stop_(self, global_state: GlobalState):
"""
:param global_state:
"""
global_state.current_transaction.end(global_state)
@staticmethod
def _write_symbolic_returndata(
global_state: GlobalState, memory_out_offset: BitVec, memory_out_size: BitVec
):
"""
Writes symbolic return-data into memory, The memory offset and size should be concrete
:param global_state:
:param memory_out_offset:
:param memory_out_size:
:return:
"""
if memory_out_offset.symbolic is True or memory_out_size.symbolic is True:
return
return_data = []
return_data_size = global_state.new_bitvec("returndatasize", 256)
for i in range(memory_out_size.value):
data = global_state.new_bitvec(
"call_output_var({})_{}".format(
simplify(memory_out_offset + i), global_state.mstate.pc
),
8,
)
return_data.append(data)
global_state.mstate.mem_extend(memory_out_offset, memory_out_size)
for i in range(memory_out_size.value):
global_state.mstate.memory[memory_out_offset + i] = If(
i <= return_data_size,
return_data[i],
global_state.mstate.memory[memory_out_offset + i],
)
global_state.last_return_data = ReturnData(
return_data=return_data, return_data_size=return_data_size
)
[docs]
@StateTransition()
def call_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
environment = global_state.environment
memory_out_size, memory_out_offset = global_state.mstate.stack[-7:-5]
try:
(
callee_address,
callee_account,
call_data,
value,
gas,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader, True)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = environment.active_account.address
receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
# TODO: decide what to do in this case
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
if environment.static:
if isinstance(value, int) and value > 0:
raise WriteProtection(
"Cannot call with non zero value in a static call"
)
if isinstance(value, BitVec):
if value.symbolic:
global_state.world_state.constraints.append(
value == symbol_factory.BitVecVal(0, 256)
)
elif value.value > 0:
raise WriteProtection(
"Cannot call with non zero value in a static call"
)
native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size
)
if native_result:
return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
caller=environment.active_account.address,
callee_account=callee_account,
call_data=call_data,
call_value=value,
static=environment.static,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs]
@StateTransition()
def call_post(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
return self.post_handler(global_state, function_name="call")
[docs]
@StateTransition()
def callcode_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
environment = global_state.environment
memory_out_size, memory_out_offset = global_state.mstate.stack[-7:-5]
try:
(
callee_address,
callee_account,
call_data,
value,
gas,
_,
_,
) = get_call_parameters(global_state, self.dynamic_loader, True)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = global_state.environment.active_account.address
receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size
)
if native_result:
return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
code=callee_account.code,
caller=environment.address,
callee_account=environment.active_account,
call_data=call_data,
call_value=value,
static=environment.static,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs]
@StateTransition()
def callcode_post(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
memory_out_size, memory_out_offset = global_state.mstate.stack[-7:-5]
try:
(
_,
_,
_,
_,
_,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader, True)
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
if global_state.last_return_data is None:
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(instr["address"]), 256
)
global_state.mstate.stack.append(return_value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.world_state.constraints.append(return_value == 0)
return [global_state]
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
# Copy memory
global_state.mstate.mem_extend(
memory_out_offset, min(memory_out_size, global_state.last_return_data.size)
)
if global_state.last_return_data.size.symbolic:
ret_size = 500
else:
ret_size = global_state.last_return_data.size.value
for i in range(min(memory_out_size, ret_size)):
global_state.mstate.memory[
i + memory_out_offset
] = global_state.last_return_data[i]
# Put return value on stack
return_value = global_state.new_bitvec("retval_" + str(instr["address"]), 256)
global_state.mstate.stack.append(return_value)
global_state.world_state.constraints.append(return_value == 1)
return [global_state]
[docs]
@StateTransition()
def delegatecall_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
environment = global_state.environment
memory_out_size, memory_out_offset = global_state.mstate.stack[-6:-4]
try:
(
callee_address,
callee_account,
call_data,
value,
gas,
_,
_,
) = get_call_parameters(global_state, self.dynamic_loader)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = global_state.environment.active_account.address
receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size
)
if native_result:
return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
code=callee_account.code,
caller=environment.sender,
callee_account=environment.active_account,
call_data=call_data,
call_value=environment.callvalue,
static=environment.static,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs]
@StateTransition()
def delegatecall_post(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
memory_out_size, memory_out_offset = global_state.mstate.stack[-6:-4]
try:
(
_,
_,
_,
_,
_,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader)
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
return [global_state]
if global_state.last_return_data is None:
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(instr["address"]), 256
)
global_state.mstate.stack.append(return_value)
global_state.world_state.constraints.append(return_value == 0)
return [global_state]
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
# Copy memory
global_state.mstate.mem_extend(
memory_out_offset, min(memory_out_size, global_state.last_return_data.size)
)
if global_state.last_return_data.size.symbolic:
ret_size = 500
else:
ret_size = global_state.last_return_data.size.value
for i in range(min(memory_out_size, ret_size)):
global_state.mstate.memory[
i + memory_out_offset
] = global_state.last_return_data[i]
# Put return value on stack
return_value = global_state.new_bitvec("retval_" + str(instr["address"]), 256)
global_state.mstate.stack.append(return_value)
global_state.world_state.constraints.append(return_value == 1)
return [global_state]
[docs]
@StateTransition()
def staticcall_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
environment = global_state.environment
memory_out_size, memory_out_offset = global_state.mstate.stack[-6:-4]
try:
(
callee_address,
callee_account,
call_data,
value,
gas,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = environment.active_account.address
receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size
)
if native_result:
return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
code=callee_account.code,
caller=environment.address,
callee_account=callee_account,
call_data=call_data,
call_value=value,
static=True,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs]
@StateTransition()
def staticcall_post(self, global_state: GlobalState) -> List[GlobalState]:
return self.post_handler(global_state, function_name="staticcall")
[docs]
def post_handler(self, global_state, function_name: str):
instr = global_state.get_current_instruction()
if function_name in ("staticcall", "delegatecall"):
memory_out_size, memory_out_offset = global_state.mstate.stack[-6:-4]
else:
memory_out_size, memory_out_offset = global_state.mstate.stack[-7:-5]
try:
with_value = function_name != "staticcall"
(
_,
_,
_,
_,
_,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader, with_value)
except ValueError as e:
log.debug(
"Could not determine required parameters for {}, putting fresh symbol on the stack. \n{}".format(
function_name, e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
if global_state.last_return_data is None:
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(instr["address"]), 256
)
global_state.mstate.stack.append(return_value)
return [global_state]
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
global_state.mstate.mem_extend(
memory_out_offset, min(memory_out_size, global_state.last_return_data.size)
)
if global_state.last_return_data.size.symbolic:
ret_size = 500
else:
ret_size = global_state.last_return_data.size.value
for i in range(min(memory_out_size, ret_size)):
global_state.mstate.memory[
i + memory_out_offset
] = global_state.last_return_data[i]
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(global_state.get_current_instruction()["address"]), 256
)
global_state.mstate.stack.append(return_value)
global_state.world_state.constraints.append(return_value == 1)
return [global_state]