"""
import abc
+from collections import namedtuple
from typing import List
+StepExpectInfo = namedtuple('StepExpectInfo', 'expression, path, frame_idx, line_range')
+
class CommandBase(object, metaclass=abc.ABCMeta):
def __init__(self):
self.path = None
from itertools import chain
-from dex.command.CommandBase import CommandBase
+from dex.command.CommandBase import CommandBase, StepExpectInfo
from dex.dextIR import ProgramState, SourceLocation, StackFrame, DextIR
def frame_from_dict(source: dict) -> StackFrame:
return __class__.__name__
def get_watches(self):
- frame_expects = chain.from_iterable(frame.watches
- for frame in self.expected_program_state.frames)
- return set(frame_expects)
+ frame_expects = set()
+ for idx, frame in enumerate(self.expected_program_state.frames):
+ path = (frame.location.path if
+ frame.location and frame.location.path else self.path)
+ line_range = (
+ range(frame.location.lineno, frame.location.lineno + 1)
+ if frame.location and frame.location.lineno else None)
+ for watch in frame.watches:
+ frame_expects.add(
+ StepExpectInfo(
+ expression=watch,
+ path=path,
+ frame_idx=idx,
+ line_range=line_range
+ )
+ )
+ return frame_expects
def eval(self, step_collection: DextIR) -> bool:
for step in step_collection.steps:
import abc
import difflib
import os
+from collections import namedtuple
-from dex.command.CommandBase import CommandBase
+from dex.command.CommandBase import CommandBase, StepExpectInfo
from dex.command.StepValueInfo import StepValueInfo
+
class DexExpectWatchBase(CommandBase):
def __init__(self, *args, **kwargs):
if len(args) < 2:
def get_watches(self):
- return [self.expression]
+ return [StepExpectInfo(self.expression, self.path, 0, range(self._from_line, self._to_line + 1))]
@property
def line_range(self):
return differences
def eval(self, step_collection):
+ assert os.path.exists(self.path)
for step in step_collection.steps:
loc = step.current_location
if (loc.path and os.path.exists(loc.path) and
- os.path.exists(self.path) and
os.path.samefile(loc.path, self.path) and
loc.lineno in self.line_range):
try:
import unittest
from types import SimpleNamespace
+from dex.command.CommandBase import StepExpectInfo
from dex.dextIR import DebuggerIR, FrameIR, LocIR, StepIR, ValueIR
from dex.utils.Exceptions import DebuggerException
from dex.utils.Exceptions import NotYetLoadedDebuggerException
from dex.utils.ReturnCode import ReturnCode
+def watch_is_active(watch_info: StepExpectInfo, path, frame_idx, line_no):
+ _, watch_path, watch_frame_idx, watch_line_range = watch_info
+ # If this watch should only be active for a specific file...
+ if watch_path and os.path.isfile(watch_path):
+ # If the current path does not match the expected file, this watch is
+ # not active.
+ if not (path and os.path.isfile(path) and
+ os.path.samefile(path, watch_path)):
+ return False
+ if watch_frame_idx != frame_idx:
+ return False
+ if watch_line_range and line_no not in list(watch_line_range):
+ return False
+ return True
class DebuggerBase(object, metaclass=abc.ABCMeta):
def __init__(self, context):
import os
import platform
-from dex.debugger.DebuggerBase import DebuggerBase
+from dex.debugger.DebuggerBase import DebuggerBase, watch_is_active
from dex.dextIR import FrameIR, LocIR, StepIR, StopReason, ValueIR
from dex.dextIR import ProgramState, StackFrame, SourceLocation
from dex.utils.Exceptions import DebuggerException, LoadDebuggerException
column=0),
watches={})
for expr in map(
- lambda watch, idx=i: self.evaluate_expression(watch, idx),
- watches):
+ # Filter out watches that are not active in the current frame,
+ # and then evaluate all the active watches.
+ lambda watch_info, idx=i:
+ self.evaluate_expression(watch_info.expression, idx),
+ filter(
+ lambda watch_info, idx=i, line_no=loc.lineno, path=loc.path:
+ watch_is_active(watch_info, path, idx, line_no),
+ watches)):
state_frame.watches[expr.expression] = expr
state_frames.append(state_frame)
from subprocess import CalledProcessError, check_output, STDOUT
import sys
-from dex.debugger.DebuggerBase import DebuggerBase
+from dex.debugger.DebuggerBase import DebuggerBase, watch_is_active
from dex.dextIR import FrameIR, LocIR, StepIR, StopReason, ValueIR
from dex.dextIR import StackFrame, SourceLocation, ProgramState
from dex.utils.Exceptions import DebuggerException, LoadDebuggerException
'column': sb_line.GetColumn()
}
loc = LocIR(**loc_dict)
+ valid_loc_for_watch = loc.path and os.path.exists(loc.path)
frame = FrameIR(
function=function, is_inlined=sb_frame.IsInlined(), loc=loc)
is_inlined=frame.is_inlined,
location=SourceLocation(**loc_dict),
watches={})
- for expr in map(
- lambda watch, idx=i: self.evaluate_expression(watch, idx),
- watches):
- state_frame.watches[expr.expression] = expr
- state_frames.append(state_frame)
+ if valid_loc_for_watch:
+ for expr in map(
+ # Filter out watches that are not active in the current frame,
+ # and then evaluate all the active watches.
+ lambda watch_info, idx=i:
+ self.evaluate_expression(watch_info.expression, idx),
+ filter(
+ lambda watch_info, idx=i, line_no=loc.lineno, loc_path=loc.path:
+ watch_is_active(watch_info, loc_path, idx, line_no),
+ watches)):
+ state_frame.watches[expr.expression] = expr
+ state_frames.append(state_frame)
if len(frames) == 1 and frames[0].function is None:
frames = []
import os
import sys
from pathlib import PurePath
-from collections import namedtuple
-from collections import defaultdict
+from collections import defaultdict, namedtuple
-from dex.debugger.DebuggerBase import DebuggerBase
+from dex.command.CommandBase import StepExpectInfo
+from dex.debugger.DebuggerBase import DebuggerBase, watch_is_active
from dex.dextIR import FrameIR, LocIR, StepIR, StopReason, ValueIR
from dex.dextIR import StackFrame, SourceLocation, ProgramState
from dex.utils.Exceptions import Error, LoadDebuggerException
state_frames = []
+ loc = LocIR(**self._location)
+ valid_loc_for_watch = loc.path and os.path.exists(loc.path)
+
for idx, sf in enumerate(stackframes):
frame = FrameIR(
function=self._sanitize_function_name(sf.FunctionName),
if any(name in fname for name in self.frames_below_main):
break
-
state_frame = StackFrame(function=frame.function,
is_inlined=frame.is_inlined,
watches={})
- for watch in watches:
- state_frame.watches[watch] = self.evaluate_expression(
- watch, idx)
+ if valid_loc_for_watch and idx == 0:
+ for watch_info in watches:
+ if watch_is_active(watch_info, loc.path, idx, loc.lineno):
+ watch_expr = watch_info.expression
+ state_frame.watches[watch_expr] = self.evaluate_expression(watch_expr, idx)
state_frames.append(state_frame)
frames.append(frame)
- loc = LocIR(**self._location)
if frames:
frames[0].loc = loc
state_frames[0].location = SourceLocation(**self._location)
]
def evaluate_expression(self, expression, frame_idx=0) -> ValueIR:
- self.set_current_stack_frame(frame_idx)
+ if frame_idx != 0:
+ self.set_current_stack_frame(frame_idx)
result = self._debugger.GetExpression(expression)
- self.set_current_stack_frame(0)
+ if frame_idx != 0:
+ self.set_current_stack_frame(0)
value = result.Value
is_optimized_away = any(s in value for s in [