--- /dev/null
+"""
+Test the functionality of interactive scripted processes
+"""
+
+import lldb
+import lldbsuite.test.lldbutil as lldbutil
+from lldbsuite.test.lldbtest import *
+import json, os
+
+
+class TestInteractiveScriptedProcess(TestBase):
+
+ NO_DEBUG_INFO_TESTCASE = True
+
+ def test_passthrough_launch(self):
+ """Test a simple pass-through process launch"""
+ self.build()
+ self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET)
+ self.main_source_file = lldb.SBFileSpec("main.cpp")
+ self.script_module = "interactive_scripted_process"
+ self.script_file = self.script_module + ".py"
+ self.passthrough_launch()
+
+ def duplicate_target(self, driving_target):
+ exe = driving_target.executable.fullpath
+ triple = driving_target.triple
+ return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple)
+
+ def get_launch_info(self, class_name, script_dict):
+ structured_data = lldb.SBStructuredData()
+ structured_data.SetFromJSON(json.dumps(script_dict))
+
+ launch_info = lldb.SBLaunchInfo(None)
+ launch_info.SetProcessPluginName("ScriptedProcess")
+ launch_info.SetScriptedProcessClassName(class_name)
+ launch_info.SetScriptedProcessDictionary(structured_data)
+ return launch_info
+
+ def passthrough_launch(self):
+ """Test that a simple passthrough wrapper functions correctly"""
+ # First build the real target:
+ self.assertEqual(self.dbg.GetNumTargets(), 1)
+ real_target_id = 0
+ real_target = self.dbg.GetTargetAtIndex(real_target_id)
+ lldbutil.run_break_set_by_source_regexp(self, "Break here")
+ self.assertEqual(real_target.GetNumBreakpoints(), 1)
+
+ # Now source in the scripted module:
+ script_path = os.path.join(self.getSourceDir(), self.script_file)
+ self.runCmd(f"command script import '{script_path}'")
+
+ mux_target = self.duplicate_target(real_target)
+ self.assertTrue(mux_target.IsValid(), "duplicate target succeeded")
+
+ mux_class = f"{self.script_module}.MultiplexerScriptedProcess"
+ script_dict = {"driving_target_idx": real_target_id}
+ mux_launch_info = self.get_launch_info(mux_class, script_dict)
+ mux_process_listener = lldb.SBListener(
+ "lldb.test.interactive-scripted-process.listener"
+ )
+ mux_launch_info.SetPassthroughListener(mux_process_listener)
+
+ self.dbg.SetAsync(True)
+ error = lldb.SBError()
+ mux_process = mux_target.Launch(mux_launch_info, error)
+ self.assertSuccess(error, "Launched multiplexer scripted process")
+ self.assertTrue(mux_process.IsValid(), "Got a valid process")
+
+ # Check that the mux process started running
+ event = lldbutil.fetch_next_event(
+ self, mux_process_listener, mux_process.GetBroadcaster(), timeout=60 * 5
+ )
+ self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
+ # Check that the real process started running
+ event = lldbutil.fetch_next_event(
+ self, self.dbg.GetListener(), mux_process.GetBroadcaster()
+ )
+ self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
+
+ # Check that the real process stopped
+ event = lldbutil.fetch_next_event(
+ self, self.dbg.GetListener(), mux_process.GetBroadcaster(), timeout=60 * 5
+ )
+ self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
+ # Check that the mux process stopped
+ event = lldbutil.fetch_next_event(
+ self, mux_process_listener, mux_process.GetBroadcaster(), timeout=60 * 5
+ )
+ self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
+
+ real_process = real_target.GetProcess()
+ self.assertTrue(real_process.IsValid(), "Got a valid process")
+ self.assertState(
+ real_process.GetState(), lldb.eStateStopped, "Process is stopped"
+ )
+
+ # This is a passthrough, so the two processes should have the same state:
+ # Check that we got the right threads:
+ self.assertEqual(
+ len(real_process.threads),
+ len(mux_process.threads),
+ "Same number of threads",
+ )
+ for id in range(len(real_process.threads)):
+ real_pc = real_process.threads[id].frame[0].pc
+ mux_pc = mux_process.threads[id].frame[0].pc
+ self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}")
+
+ lldbutil.run_break_set_by_source_regexp(self, "also break here")
+ self.assertEqual(mux_target.GetNumBreakpoints(), 1)
+ error = mux_process.Continue()
+ self.assertSuccess(error, "Resuming multiplexer scripted process")
+ self.assertTrue(mux_process.IsValid(), "Got a valid process")
+
+ event = lldbutil.fetch_next_event(
+ self, mux_process_listener, mux_process.GetBroadcaster()
+ )
+ self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
+ event = lldbutil.fetch_next_event(
+ self, mux_process_listener, mux_process.GetBroadcaster()
+ )
+ self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
--- /dev/null
+# Usage:
+# ./bin/lldb $LLVM/lldb/test/API/functionalities/interactive_scripted_process/main \
+# -o "br set -p 'Break here'" \
+# -o "command script import $LLVM/lldb/test/API/functionalities/interactive_scripted_process/interactive_scripted_process.py" \
+# -o "create_mux" \
+# -o "create_sub" \
+# -o "br set -p 'also break here'" -o 'continue'
+
+import os, json, struct, signal
+
+from threading import Thread
+from typing import Any, Dict
+
+import lldb
+from lldb.plugins.scripted_process import ScriptedProcess
+from lldb.plugins.scripted_process import ScriptedThread
+
+
+class PassthruScriptedProcess(ScriptedProcess):
+ driving_target = None
+ driving_process = None
+
+ def __init__(
+ self,
+ exe_ctx: lldb.SBExecutionContext,
+ args: lldb.SBStructuredData,
+ launched_driving_process: bool = True,
+ ):
+ super().__init__(exe_ctx, args)
+
+ self.driving_target = None
+ self.driving_process = None
+
+ self.driving_target_idx = args.GetValueForKey("driving_target_idx")
+ if self.driving_target_idx and self.driving_target_idx.IsValid():
+ if self.driving_target_idx.GetType() == lldb.eStructuredDataTypeInteger:
+ idx = self.driving_target_idx.GetIntegerValue(42)
+ if self.driving_target_idx.GetType() == lldb.eStructuredDataTypeString:
+ idx = int(self.driving_target_idx.GetStringValue(100))
+ self.driving_target = self.target.GetDebugger().GetTargetAtIndex(idx)
+
+ if launched_driving_process:
+ self.driving_process = self.driving_target.GetProcess()
+ for driving_thread in self.driving_process:
+ structured_data = lldb.SBStructuredData()
+ structured_data.SetFromJSON(
+ json.dumps(
+ {
+ "driving_target_idx": idx,
+ "thread_idx": driving_thread.GetIndexID(),
+ }
+ )
+ )
+
+ self.threads[driving_thread.GetThreadID()] = PassthruScriptedThread(
+ self, structured_data
+ )
+
+ for module in self.driving_target.modules:
+ path = module.file.fullpath
+ load_addr = module.GetObjectFileHeaderAddress().GetLoadAddress(
+ self.driving_target
+ )
+ self.loaded_images.append({"path": path, "load_addr": load_addr})
+
+ def get_memory_region_containing_address(
+ self, addr: int
+ ) -> lldb.SBMemoryRegionInfo:
+ mem_region = lldb.SBMemoryRegionInfo()
+ error = self.driving_process.GetMemoryRegionInfo(addr, mem_region)
+ if error.Fail():
+ return None
+ return mem_region
+
+ def read_memory_at_address(
+ self, addr: int, size: int, error: lldb.SBError
+ ) -> lldb.SBData:
+ data = lldb.SBData()
+ bytes_read = self.driving_process.ReadMemory(addr, size, error)
+
+ if error.Fail():
+ return data
+
+ data.SetDataWithOwnership(
+ error,
+ bytes_read,
+ self.driving_target.GetByteOrder(),
+ self.driving_target.GetAddressByteSize(),
+ )
+
+ return data
+
+ def write_memory_at_address(
+ self, addr: int, data: lldb.SBData, error: lldb.SBError
+ ) -> int:
+ return self.driving_process.WriteMemory(
+ addr, bytearray(data.uint8.all()), error
+ )
+
+ def get_process_id(self) -> int:
+ return 42
+
+ def is_alive(self) -> bool:
+ return True
+
+ def get_scripted_thread_plugin(self) -> str:
+ return f"{PassthruScriptedThread.__module__}.{PassthruScriptedThread.__name__}"
+
+
+class MultiplexedScriptedProcess(PassthruScriptedProcess):
+ def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
+ super().__init__(exe_ctx, args)
+ self.multiplexer = None
+ if isinstance(self.driving_process, lldb.SBProcess) and self.driving_process:
+ parity = args.GetValueForKey("parity")
+ # TODO: Change to Walrus operator (:=) with oneline if assignment
+ # Requires python 3.8
+ val = extract_value_from_structured_data(parity, 0)
+ if val is not None:
+ self.parity = val
+
+ # Turn PassThruScriptedThread into MultiplexedScriptedThread
+ for thread in self.threads.values():
+ thread.__class__ = MultiplexedScriptedThread
+
+ def get_process_id(self) -> int:
+ return self.parity + 420
+
+ def launch(self, should_stop: bool = True) -> lldb.SBError:
+ self.first_launch = True
+ return lldb.SBError()
+
+ def resume(self, should_stop: bool) -> lldb.SBError:
+ if self.first_launch:
+ self.first_launch = False
+ return super().resume()
+ else:
+ if not self.multiplexer:
+ error = lldb.SBError("Multiplexer is not set.")
+ return error
+ return self.multiplexer.resume(should_stop)
+
+ def get_threads_info(self) -> Dict[int, Any]:
+ if not self.multiplexer:
+ return super().get_threads_info()
+ filtered_threads = self.multiplexer.get_threads_info(pid=self.get_process_id())
+ # Update the filtered thread class from PassthruScriptedThread to MultiplexedScriptedThread
+ return dict(
+ map(
+ lambda pair: (pair[0], MultiplexedScriptedThread(pair[1])),
+ filtered_threads.items(),
+ )
+ )
+
+ def get_scripted_thread_plugin(self) -> str:
+ return f"{MultiplexedScriptedThread.__module__}.{MultiplexedScriptedThread.__name__}"
+
+
+class PassthruScriptedThread(ScriptedThread):
+ def __init__(self, process, args):
+ super().__init__(process, args)
+ driving_target_idx = args.GetValueForKey("driving_target_idx")
+ thread_idx = args.GetValueForKey("thread_idx")
+
+ # TODO: Change to Walrus operator (:=) with oneline if assignment
+ # Requires python 3.8
+ val = extract_value_from_structured_data(thread_idx, 0)
+ if val is not None:
+ self.idx = val
+
+ self.driving_target = None
+ self.driving_process = None
+ self.driving_thread = None
+
+ # TODO: Change to Walrus operator (:=) with oneline if assignment
+ # Requires python 3.8
+ val = extract_value_from_structured_data(driving_target_idx, 42)
+ if val is not None:
+ self.driving_target = self.target.GetDebugger().GetTargetAtIndex(val)
+ self.driving_process = self.driving_target.GetProcess()
+ self.driving_thread = self.driving_process.GetThreadByIndexID(self.idx)
+
+ if self.driving_thread:
+ self.id = self.driving_thread.GetThreadID()
+
+ def get_thread_id(self) -> int:
+ return self.id
+
+ def get_name(self) -> str:
+ return f"{PassthruScriptedThread.__name__}.thread-{self.idx}"
+
+ def get_stop_reason(self) -> Dict[str, Any]:
+ stop_reason = {"type": lldb.eStopReasonInvalid, "data": {}}
+
+ if (
+ self.driving_thread
+ and self.driving_thread.IsValid()
+ and self.get_thread_id() == self.driving_thread.GetThreadID()
+ ):
+ stop_reason["type"] = lldb.eStopReasonNone
+
+ if self.driving_thread.GetStopReason() != lldb.eStopReasonNone:
+ if "arm64" in self.scripted_process.arch:
+ stop_reason["type"] = lldb.eStopReasonException
+ stop_reason["data"][
+ "desc"
+ ] = self.driving_thread.GetStopDescription(100)
+ elif self.scripted_process.arch == "x86_64":
+ stop_reason["type"] = lldb.eStopReasonSignal
+ stop_reason["data"]["signal"] = signal.SIGTRAP
+ else:
+ stop_reason["type"] = self.driving_thread.GetStopReason()
+
+ return stop_reason
+
+ def get_register_context(self) -> str:
+ if not self.driving_thread or self.driving_thread.GetNumFrames() == 0:
+ return None
+ frame = self.driving_thread.GetFrameAtIndex(0)
+
+ GPRs = None
+ registerSet = frame.registers # Returns an SBValueList.
+ for regs in registerSet:
+ if "general purpose" in regs.name.lower():
+ GPRs = regs
+ break
+
+ if not GPRs:
+ return None
+
+ for reg in GPRs:
+ self.register_ctx[reg.name] = int(reg.value, base=16)
+
+ return struct.pack(f"{len(self.register_ctx)}Q", *self.register_ctx.values())
+
+
+class MultiplexedScriptedThread(PassthruScriptedThread):
+ def get_name(self) -> str:
+ parity = "Odd" if self.scripted_process.parity % 2 else "Even"
+ return f"{parity}{MultiplexedScriptedThread.__name__}.thread-{self.idx}"
+
+
+class MultiplexerScriptedProcess(PassthruScriptedProcess):
+ listener = None
+ multiplexed_processes = None
+
+ def wait_for_driving_process_to_stop(self):
+ def handle_process_state_event():
+ # Update multiplexer process
+ log("Updating interactive scripted process threads")
+ dbg = self.driving_target.GetDebugger()
+ log("Clearing interactive scripted process threads")
+ self.threads.clear()
+ for driving_thread in self.driving_process:
+ log(f"{len(self.threads)} New thread {hex(driving_thread.id)}")
+ structured_data = lldb.SBStructuredData()
+ structured_data.SetFromJSON(
+ json.dumps(
+ {
+ "driving_target_idx": dbg.GetIndexOfTarget(
+ self.driving_target
+ ),
+ "thread_idx": driving_thread.GetIndexID(),
+ }
+ )
+ )
+
+ self.threads[driving_thread.GetThreadID()] = PassthruScriptedThread(
+ self, structured_data
+ )
+
+ mux_process = self.target.GetProcess()
+ mux_process.ForceScriptedState(lldb.eStateRunning)
+ mux_process.ForceScriptedState(lldb.eStateStopped)
+
+ for child_process in self.multiplexed_processes.values():
+ child_process.ForceScriptedState(lldb.eStateRunning)
+ child_process.ForceScriptedState(lldb.eStateStopped)
+
+ event = lldb.SBEvent()
+ while True:
+ if self.listener.WaitForEvent(1, event):
+ event_mask = event.GetType()
+ if event_mask & lldb.SBProcess.eBroadcastBitStateChanged:
+ state = lldb.SBProcess.GetStateFromEvent(event)
+ log(f"Received public process state event: {state}")
+ if state == lldb.eStateStopped:
+ # If it's a stop event, iterate over the driving process
+ # thread, looking for a breakpoint stop reason, if internal
+ # continue.
+ handle_process_state_event()
+ else:
+ continue
+
+ def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
+ super().__init__(exe_ctx, args, launched_driving_process=False)
+ if isinstance(self.driving_target, lldb.SBTarget) and self.driving_target:
+ self.listener = lldb.SBListener(
+ "lldb.listener.multiplexer-scripted-process"
+ )
+ self.multiplexed_processes = {}
+
+ self.listener_thread = Thread(
+ target=self.wait_for_driving_process_to_stop, daemon=True
+ )
+ self.listener_thread.start()
+
+ def launch(self, should_stop: bool = True) -> lldb.SBError:
+ if not self.driving_target:
+ return lldb.SBError(
+ f"{self.__class__.__name__}.resume: Invalid driving target."
+ )
+
+ if self.driving_process:
+ return lldb.SBError(
+ f"{self.__class__.__name__}.resume: Invalid driving process."
+ )
+
+ error = lldb.SBError()
+ launch_info = lldb.SBLaunchInfo(None)
+ launch_info.SetListener(self.listener)
+ driving_process = self.driving_target.Launch(launch_info, error)
+
+ if not driving_process or error.Fail():
+ return error
+
+ self.driving_process = driving_process
+
+ for module in self.driving_target.modules:
+ path = module.file.fullpath
+ load_addr = module.GetObjectFileHeaderAddress().GetLoadAddress(
+ self.driving_target
+ )
+ self.loaded_images.append({"path": path, "load_addr": load_addr})
+
+ self.first_resume = True
+ return error
+
+ def resume(self, should_stop: bool = True) -> lldb.SBError:
+ if self.first_resume:
+ # When we resume the multiplexer process for the first time,
+ # we shouldn't do anything because lldb's execution machinery
+ # will resume the driving process by itself.
+
+ # Also, no need to update the multiplexer scripted process state
+ # here because since it's listening for the real process stop events.
+ # Once it receives the stop event from the driving process,
+ # `wait_for_driving_process_to_stop` will update the multiplexer
+ # state for us.
+
+ self.first_resume = False
+ return lldb.SBError()
+
+ if not self.driving_process:
+ return lldb.SBError(
+ f"{self.__class__.__name__}.resume: Invalid driving process."
+ )
+
+ return self.driving_process.Continue()
+
+ def get_threads_info(self, pid: int = None) -> Dict[int, Any]:
+ if not pid:
+ return super().get_threads_info()
+ parity = pid % 2
+ return dict(filter(lambda pair: pair[0] % 2 == parity, self.threads.items()))
+
+
+def multiplex(mux_process, muxed_process):
+ muxed_process.GetScriptedImplementation().multiplexer = (
+ mux_process.GetScriptedImplementation()
+ )
+ mux_process.GetScriptedImplementation().multiplexed_processes[
+ muxed_process.GetProcessID()
+ ] = muxed_process
+
+
+def launch_scripted_process(target, class_name, dictionary):
+ structured_data = lldb.SBStructuredData()
+ structured_data.SetFromJSON(json.dumps(dictionary))
+
+ launch_info = lldb.SBLaunchInfo(None)
+ launch_info.SetProcessPluginName("ScriptedProcess")
+ launch_info.SetScriptedProcessClassName(class_name)
+ launch_info.SetScriptedProcessDictionary(structured_data)
+
+ error = lldb.SBError()
+ return target.Launch(launch_info, error)
+
+
+def duplicate_target(driving_target):
+ error = lldb.SBError()
+ exe = driving_target.executable.fullpath
+ triple = driving_target.triple
+ debugger = driving_target.GetDebugger()
+ return debugger.CreateTargetWithFileAndTargetTriple(exe, triple)
+
+
+def extract_value_from_structured_data(data, default_val):
+ if data and data.IsValid():
+ if data.GetType() == lldb.eStructuredDataTypeInteger:
+ return data.GetIntegerValue(default_val)
+ if data.GetType() == lldb.eStructuredDataTypeString:
+ return int(data.GetStringValue(100))
+ return default_val
+
+
+def create_mux_process(debugger, command, exe_ctx, result, dict):
+ if not debugger.GetNumTargets() > 0:
+ return result.SetError(
+ "Interactive scripted processes requires one non scripted process."
+ )
+
+ debugger.SetAsync(True)
+
+ driving_target = debugger.GetSelectedTarget()
+ if not driving_target:
+ return result.SetError("Driving target is invalid")
+
+ # Create a seconde target for the multiplexer scripted process
+ mux_target = duplicate_target(driving_target)
+ if not mux_target:
+ return result.SetError(
+ "Couldn't duplicate driving target to launch multiplexer scripted process"
+ )
+
+ class_name = f"{__name__}.{MultiplexerScriptedProcess.__name__}"
+ dictionary = {"driving_target_idx": debugger.GetIndexOfTarget(driving_target)}
+ mux_process = launch_scripted_process(mux_target, class_name, dictionary)
+ if not mux_process:
+ return result.SetError("Couldn't launch multiplexer scripted process")
+
+
+def create_child_processes(debugger, command, exe_ctx, result, dict):
+ if not debugger.GetNumTargets() >= 2:
+ return result.SetError("Scripted Multiplexer process not setup")
+
+ debugger.SetAsync(True)
+
+ # Create a seconde target for the multiplexer scripted process
+ mux_target = debugger.GetSelectedTarget()
+ if not mux_target:
+ return result.SetError("Couldn't get multiplexer scripted process target")
+ mux_process = mux_target.GetProcess()
+ if not mux_process:
+ return result.SetError("Couldn't get multiplexer scripted process")
+
+ driving_target = mux_process.GetScriptedImplementation().driving_target
+ if not driving_target:
+ return result.SetError("Driving target is invalid")
+
+ # Create a target for the multiplexed even scripted process
+ even_target = duplicate_target(driving_target)
+ if not even_target:
+ return result.SetError(
+ "Couldn't duplicate driving target to launch multiplexed even scripted process"
+ )
+
+ class_name = f"{__name__}.{MultiplexedScriptedProcess.__name__}"
+ dictionary = {"driving_target_idx": debugger.GetIndexOfTarget(mux_target)}
+ dictionary["parity"] = 0
+ even_process = launch_scripted_process(even_target, class_name, dictionary)
+ if not even_process:
+ return result.SetError("Couldn't launch multiplexed even scripted process")
+ multiplex(mux_process, even_process)
+
+ # Create a target for the multiplexed odd scripted process
+ odd_target = duplicate_target(driving_target)
+ if not odd_target:
+ return result.SetError(
+ "Couldn't duplicate driving target to launch multiplexed odd scripted process"
+ )
+
+ dictionary["parity"] = 1
+ odd_process = launch_scripted_process(odd_target, class_name, dictionary)
+ if not odd_process:
+ return result.SetError("Couldn't launch multiplexed odd scripted process")
+ multiplex(mux_process, odd_process)
+
+
+def log(message):
+ # FIXME: For now, we discard the log message until we can pass it to an lldb
+ # logging channel.
+ should_log = False
+ if should_log:
+ print(message)
+
+
+def __lldb_init_module(dbg, dict):
+ dbg.HandleCommand(
+ "command script add -o -f interactive_scripted_process.create_mux_process create_mux"
+ )
+ dbg.HandleCommand(
+ "command script add -o -f interactive_scripted_process.create_child_processes create_sub"
+ )