Source code for fdg.control.function_assignment

import math
import fdg.global_config
from fdg.fwrg_manager import FWRG_manager
from fdg.output_data import print_function_assignmnets
from fdg.utils import get_ftn_seq_from_key_1, random_indices



[docs] class FunctionAssignment(): def __init__(self,all_functions:list,fwrg_manager:FWRG_manager,select_percent:int=0): self.all_functions = all_functions self.fwrg_manager = fwrg_manager self.assignment_times={} for ftn in self.all_functions: self.assignment_times[ftn]=0 if 'fallback' not in self.assignment_times.keys(): self.assignment_times['fallback']=0 self.times_limit=fdg.global_config.execution_times_limit self.random_select_percent=select_percent # for the case of baseline
[docs] def can_reach_targets(self,ftn:str,targets:list,max_depth:int)->bool: """ check there is a path from ftn to one of the targets the max length of such a path should be no larger than max_depth """ if ftn in targets:return True depth=0 functions=[ftn] while depth<max_depth: depth+=1 all_children=[] for func in functions: children=self.fwrg_manager.get_children_fwrg_T_A(func) if len([child for child in children if child in targets])>0: return True all_children+=children # prepare for the next depth functions=list(set(all_children)) # if not end after while statement, return False return False
[docs] def get_targets_be_reached(self, ftn:str, targets:list, max_depth:int)->int: targets_be_reached=[] if ftn in targets: targets_be_reached.append(ftn) functions=[ftn] depth=0 while depth<max_depth: depth+=1 all_children=[] for func in functions: children=self.fwrg_manager.get_children_fwrg_T_A(func) for child in children: if child in targets and child not in targets_be_reached: targets_be_reached.append(child) all_children+=children functions=list(set(all_children)) return targets_be_reached
[docs] def assign_functions_for_baseline(self)->list: to_consider_functions=self.all_functions if self.random_select_percent>10: self.random_select_percent=10 select_num=math.ceil((self.random_select_percent/10)*len(to_consider_functions)) select_indices=random_indices(0,len(to_consider_functions)-1,select_num) selected_functions=[ftn for idx,ftn in enumerate(to_consider_functions) if idx in select_indices] # self.record_assignment(selected_functions) return selected_functions
[docs] def assign_all_functions(self): if len(self.all_functions)==0: # print(f'keep the original instruction list in function_assignment.py') return ['original_instruction_list'] self.record_assignment(self.all_functions) return self.all_functions
[docs] def select_functions_randomly(self,percentage:int)->list: to_consider_functions = self.all_functions select_num = math.ceil( (percentage/ 10) * len(to_consider_functions)) select_indices = random_indices(0, len(to_consider_functions) - 1, select_num) selected_functions = [ftn for idx, ftn in enumerate(to_consider_functions) if idx in select_indices] return selected_functions
[docs] def record_assignment(self,assigned_functions:list): for ftn in assigned_functions: if ftn in self.assignment_times.keys(): self.assignment_times[ftn]+=1
[docs] def fallback_case(self, ftn_seq:list): if ftn_seq[-1] in ['fallback','Fallback']: if len(self.all_functions)==0: print(f'keep the original instruction list in function_assignment.py') return ['original_instruction_list'] return []
[docs] def assign_functions(self,state_key:str,dk_functions:list,to_execute_functions:list=[],not_to_execute:list=[]): print_function_assignmnets(self.assignment_times) if len(to_execute_functions)>0: self.record_assignment(to_execute_functions) return to_execute_functions else: ftn_seq = get_ftn_seq_from_key_1(state_key) fallback_case = self.fallback_case(ftn_seq) if len(fallback_case) > 0: return fallback_case # identify the dk functions for a particular state dk_left = [] for ftn, cov in dk_functions: # if ftn=='fallback':continue if self.assignment_times[ftn] < self.times_limit: dk_left.append(ftn) continue if self.assignment_times[ftn] >= 2 * self.times_limit: continue if cov < 70: dk_left.append(ftn) # get children from the augmented graph children = self.fwrg_manager.get_children_fwrg_T_A(ftn_seq[-1]) # another case to get children: when the flag is set that all reads in a function are considered if fdg.global_config.flag_consider_all_reads == 1: children_o = self.fwrg_manager.get_children_all_reads( ftn_seq[-1]) children += children_o children = list(set(children)) # remove children that should not be executed (i.e. already considered) children = [child for child in children if child not in not_to_execute] # consider children that are target or can reach a target children = [child for child in children if self.can_reach_targets(child, dk_left, fdg.global_config.seq_len_limit - len(ftn_seq)-1 ) ] #---------------- # to-do list # if no child or few children are found, is it possible to assign dk functions? #---------------- # consider a function that no function can reach it in the graph (because the reads in conditions are not captured) considered=self.consider_dk_functions_not_reachable() for ftn in considered: if ftn not in children: children.append(ftn) # permit self dependency once if len(ftn_seq) >= 2: children = [child for child in children if child not in ftn_seq[0:-1]] self.record_assignment(children) return children
[docs] def consider_dk_functions_not_reachable(self)->list: """ unreachable: the reads in a condition are not recognized, or no reads in a condition. handle: consider all the reads in a function symbol(),name(),version() are functions that are not reachable. """ if len(self.fwrg_manager.updateFWRG.dk_not_reachable)==0: return [] # consider it until the execution times reach the limits (on the first several states) considers=[] for ftn in self.fwrg_manager.updateFWRG.dk_not_reachable: # if ftn in ['symbol()','name()','version()']:continue if self.assignment_times[ftn] < self.times_limit: considers.append(ftn) return considers
[docs] def when_no_children_assigned(self,dk_functions:list): pass
[docs] def assign_functions_timeout(self, state_key: str,dk_functions:list, percent_of_functions:int=1): print_function_assignmnets(self.assignment_times) ftn_seq = get_ftn_seq_from_key_1(state_key) fallback_case = self.fallback_case(ftn_seq) if len(fallback_case) > 0: return fallback_case # identify the functions to be considered to_be_considered_functions = [] for ftn, cov in dk_functions: # if ftn=='fallback':continue if self.assignment_times[ftn] < self.times_limit: to_be_considered_functions.append(ftn) continue if self.assignment_times[ftn] >= 2 * self.times_limit: continue if cov < 70: to_be_considered_functions.append(ftn) # get children when all reads are considered due to preprocessing timeout,read/write info is partly obtained. so, consider all reads children = self.fwrg_manager.get_children_all_reads(ftn_seq[-1]) # consider children that are target or can reach a target children = [child for child in children if self.can_reach_targets(child, to_be_considered_functions, fdg.global_config.seq_len_limit - len( ftn_seq) - 1 ) ] # permit self dependency once if len(ftn_seq) >= 2: children = [child for child in children if child not in ftn_seq[0:-1]] # select functions from to be considered functions selected_functions = to_be_considered_functions if percent_of_functions < 10: if len(to_be_considered_functions) > 3: select_number = math.ceil( (percent_of_functions / 10) * len( to_be_considered_functions)) select_indices = random_indices(0, len(to_be_considered_functions) - 1, select_number) selected_functions = [ftn for idx, ftn in enumerate( to_be_considered_functions) if idx in select_indices] assigned_functions= list(set(selected_functions + children)) if len(assigned_functions)>0: self.record_assignment(assigned_functions) return assigned_functions
[docs] def assign_functions_timeout_mine(self, state_key: str,dk_functions:list,randomly_selected_functions:list): print_function_assignmnets(self.assignment_times) ftn_seq = get_ftn_seq_from_key_1(state_key) fallback_case = self.fallback_case(ftn_seq) if len(fallback_case) > 0: return fallback_case # identify the functions to be considered to_be_considered_functions = [] for ftn, cov in dk_functions: # if ftn=='fallback':continue if self.assignment_times[ftn] < self.times_limit: to_be_considered_functions.append(ftn) continue if self.assignment_times[ftn] >= 2 * self.times_limit: continue if cov < 70: to_be_considered_functions.append(ftn) # get children when all reads are considered due to preprocessing timeout,read/write info is partly obtained. so, consider all reads children = self.fwrg_manager.get_children_all_reads(ftn_seq[-1]) # consider children that are target or can reach a target children = [child for child in children if self.can_reach_targets(child, to_be_considered_functions, fdg.global_config.seq_len_limit - len( ftn_seq) - 1 ) ] # permit self dependency once if len(ftn_seq) >= 2: children = [child for child in children if child not in ftn_seq[0:-1]] # select functions from to be considered functions assigned_functions= list(set(randomly_selected_functions + children)) if len(assigned_functions)>0: self.record_assignment(assigned_functions) return assigned_functions