{ "direction":"send", "regex":r"^\$pid:([0-9a-fA-F]+);", "capture":{1:"pid"} }],
True)
+ def add_register_info_collection_packets(self):
+ self.test_sequence.add_log_lines(
+ [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
+ "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
+ "save_key":"reg_info_responses" } ],
+ True)
+
+ def parse_register_info_packets(self, context):
+ """Return an array of register info dictionaries, one per register info."""
+ reg_info_responses = context.get("reg_info_responses")
+ self.assertIsNotNone(reg_info_responses)
+
+ # Parse register infos.
+ return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
+
def expect_gdbremote_sequence(self):
return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence, self._TIMEOUT_SECONDS, self.logger)
"invalidate-regs"
]
- def assert_valid_reg_info_packet(self, reginfo_packet):
- keyval_pairs = reginfo_packet.split(";")
- self.assertTrue(len(keyval_pairs) >= 5)
-
- values = {}
- for kv in keyval_pairs:
- (key, val) = kv.split(':')
- values[key] = val
- # Ensure key is something we expect.
+ def assert_valid_reg_info(self, reg_info):
+ # Assert we know about all the reginfo keys parsed.
+ for key in reg_info:
self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
# Check the bare-minimum expected set of register info keys.
- self.assertTrue("name" in values)
- self.assertTrue("bitsize" in values)
- self.assertTrue("offset" in values)
- self.assertTrue("encoding" in values)
- self.assertTrue("format" in values)
-
+ self.assertTrue("name" in reg_info)
+ self.assertTrue("bitsize" in reg_info)
+ self.assertTrue("offset" in reg_info)
+ self.assertTrue("encoding" in reg_info)
+ self.assertTrue("format" in reg_info)
+
@debugserver_test
def test_exe_starts_debugserver(self):
# Run the stream
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
- self.assertIsNotNone(context.get("reginfo_0"))
- self.assert_valid_reg_info_packet(context.get("reginfo_0"))
+
+ reg_info_packet = context.get("reginfo_0")
+ self.assertIsNotNone(reg_info_packet)
+ self.assert_valid_reg_info(parse_reg_info_response(reg_info_packet))
@debugserver_test
@dsym_test
self.buildDwarf()
self.qRegisterInfo_returns_one_valid_result()
+ def qRegisterInfo_returns_all_valid_results(self):
+ server = self.start_server()
+ self.assertIsNotNone(server)
+
+ # Build launch args.
+ launch_args = [os.path.abspath('a.out')]
+
+ # Build the expected protocol stream.
+ self.add_no_ack_remote_stream()
+ self.add_verified_launch_packets(launch_args)
+ self.add_register_info_collection_packets()
+
+ # Run the stream.
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+
+ # Validate that each register info returned validates.
+ for reg_info in self.parse_register_info_packets(context):
+ self.assert_valid_reg_info(reg_info)
+
+ @debugserver_test
+ @dsym_test
+ def test_qRegisterInfo_returns_all_valid_results_debugserver_dsym(self):
+ self.init_debugserver_test()
+ self.buildDsym()
+ self.qRegisterInfo_returns_all_valid_results()
+
+ @llgs_test
+ @dwarf_test
+ @unittest2.expectedFailure()
+ def test_qRegisterInfo_returns_all_valid_results_llgs_dwarf(self):
+ self.init_llgs_test()
+ self.buildDwarf()
+ self.qRegisterInfo_returns_all_valid_results()
+
+ def qRegisterInfo_contains_required_generics(self):
+ server = self.start_server()
+ self.assertIsNotNone(server)
+
+ # Build launch args
+ launch_args = [os.path.abspath('a.out')]
+
+ # Build the expected protocol stream
+ self.add_no_ack_remote_stream()
+ self.add_verified_launch_packets(launch_args)
+ self.add_register_info_collection_packets()
+
+ # Run the packet stream.
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+
+ # Gather register info entries.
+ reg_infos = self.parse_register_info_packets(context)
+
+ # Collect all generics found.
+ generic_regs = { reg_info['generic']:1 for reg_info in reg_infos if 'generic' in reg_info }
+
+ # Ensure we have a program counter register.
+ self.assertTrue('pc' in generic_regs)
+
+ # Ensure we have a frame pointer register.
+ self.assertTrue('fp' in generic_regs)
+
+ # Ensure we have a stack pointer register.
+ self.assertTrue('sp' in generic_regs)
+
+ # Ensure we have a flags register.
+ self.assertTrue('flags' in generic_regs)
+
+
+ @debugserver_test
+ @dsym_test
+ def test_qRegisterInfo_contains_required_generics_debugserver_dsym(self):
+ self.init_debugserver_test()
+ self.buildDsym()
+ self.qRegisterInfo_contains_required_generics()
+
+ @llgs_test
+ @dwarf_test
+ @unittest2.expectedFailure()
+ def test_qRegisterInfo_contains_required_generics_llgs_dwarf(self):
+ self.init_llgs_test()
+ self.buildDwarf()
+ self.qRegisterInfo_contains_required_generics()
+
+
+ def qRegisterInfo_contains_at_least_one_register_set(self):
+ server = self.start_server()
+ self.assertIsNotNone(server)
+
+ # Build launch args
+ launch_args = [os.path.abspath('a.out')]
+
+ # Build the expected protocol stream
+ self.add_no_ack_remote_stream()
+ self.add_verified_launch_packets(launch_args)
+ self.add_register_info_collection_packets()
+
+ # Run the packet stream.
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+
+ # Gather register info entries.
+ reg_infos = self.parse_register_info_packets(context)
+
+ # Collect all generics found.
+ register_sets = { reg_info['set']:1 for reg_info in reg_infos if 'set' in reg_info }
+ self.assertTrue(len(register_sets) >= 1)
+
+
+ @debugserver_test
+ @dsym_test
+ def test_qRegisterInfo_contains_at_least_one_register_set_debugserver_dsym(self):
+ self.init_debugserver_test()
+ self.buildDsym()
+ self.qRegisterInfo_contains_at_least_one_register_set()
+
+
+ @llgs_test
+ @dwarf_test
+ @unittest2.expectedFailure()
+ def test_qRegisterInfo_contains_at_least_one_register_set_llgs_dwarf(self):
+ self.init_llgs_test()
+ self.buildDwarf()
+ self.qRegisterInfo_contains_at_least_one_register_set()
+
+
if __name__ == '__main__':
unittest2.main()
_STRIP_CHECKSUM_REGEX = re.compile(r'#[0-9a-fA-F]{2}$')
+_STRIP_COMMAND_PREFIX_REGEX = re.compile(r"^\$")
+
def assert_packets_equal(asserter, actual_packet, expected_packet):
# strip off the checksum digits of the packet. When we're in
"""
# Ensure we have some work to do.
- if len(sequence_entry) < 1:
+ if len(test_sequence.entries) < 1:
return {}
received_lines = []
while sequence_entry:
if sequence_entry.is_send_to_remote():
# This is an entry to send to the remote debug monitor.
+ send_packet = sequence_entry.get_send_packet()
if logger:
- logger.info("sending packet to remote: {}".format(sequence_entry.exact_payload))
- sock.sendall(sequence_entry.get_send_packet())
+ logger.info("sending packet to remote: %s" % send_packet)
+ sock.sendall(send_packet)
else:
# This is an entry to expect to receive from the remote debug monitor.
if logger:
- logger.info("receiving packet from remote, should match: {}".format(sequence_entry.exact_payload))
+ logger.info("receiving packet from remote")
start_time = time.time()
timeout_time = start_time + timeout_seconds
# return the packetized payload
return gdbremote_packet_encode_string(payload)
+
+def parse_reg_info_response(response_packet):
+ if not response_packet:
+ raise Exception("response_packet cannot be None")
+
+ # Strip off prefix $ and suffix #xx if present.
+ response_packet = _STRIP_COMMAND_PREFIX_REGEX.sub("", response_packet)
+ response_packet = _STRIP_CHECKSUM_REGEX.sub("", response_packet)
+
+ # Build keyval pairs
+ values = {}
+ for kv in response_packet.split(";"):
+ if len(kv) < 1:
+ continue
+ (key, val) = kv.split(':')
+ values[key] = val
+
+ return values
+
class GdbRemoteEntry(object):
def __init__(self, is_send_to_remote=True, exact_payload=None, regex=None, capture=None, expect_captures=None):
context's previously stored "thread_id" key. This arg
only makes sense when regex is specified.
"""
- self.is_send_to_remote = is_send_to_remote
+ self._is_send_to_remote = is_send_to_remote
self.exact_payload = exact_payload
self.regex = regex
self.capture = capture
self.expect_captures = expect_captures
def is_send_to_remote(self):
- return self.is_send_to_remote
+ return self._is_send_to_remote
+
+ def is_consumed(self):
+ # For now, all packets are consumed after first use.
+ return True
+
+ def get_send_packet(self):
+ if not self.is_send_to_remote():
+ raise Exception("get_send_packet() called on GdbRemoteEntry that is not a send-to-remote packet")
+ if not self.exact_payload:
+ raise Exception("get_send_packet() called on GdbRemoteEntry but it doesn't have an exact payload")
+ return self.exact_payload
def _assert_exact_payload_match(self, asserter, actual_packet):
assert_packets_equal(asserter, actual_packet, self.exact_payload)
def assert_match(self, asserter, actual_packet, context=None):
# This only makes sense for matching lines coming from the
# remote debug monitor.
- if self.is_send_to_remote:
+ if self.is_send_to_remote():
raise Exception("Attempted to match a packet being sent to the remote debug monitor, doesn't make sense.")
# Create a new context if needed.
else:
raise Exception("Don't know how to match a remote-sent packet when exact_payload isn't specified.")
+class MultiResponseGdbRemoteEntry(object):
+ """Represents a query/response style packet.
+
+ Assumes the first item is sent to the gdb remote.
+ An end sequence regex indicates the end of the query/response
+ packet sequence. All responses up through (but not including) the
+ end response are stored in a context variable.
+
+ Settings accepted from params:
+
+ next_query or query: required. The typical query packet without the $ prefix or #xx suffix.
+ If there is a special first packet to start the iteration query, see the
+ first_query key.
+
+ first_query: optional. If the first query requires a special query command, specify
+ it with this key. Do not specify the $ prefix or #xx suffix.
+
+ append_iteration_suffix: defaults to False. Specify True if the 0-based iteration
+ index should be appended as a suffix to the command. e.g. qRegisterInfo with
+ this key set true will generate query packets of qRegisterInfo0, qRegisterInfo1,
+ etc.
+
+ end_regex: required. Specifies a compiled regex object that will match the full text
+ of any response that signals an end to the iteration. It must include the
+ initial $ and ending #xx and must match the whole packet.
+
+ save_key: required. Specifies the key within the context where an array will be stored.
+ Each packet received from the gdb remote that does not match the end_regex will get
+ appended ot the array stored within the context at that key.
+
+ runaway_response_count: optional. Defaults to 10000. If this many responses are retrieved,
+ assume there is something wrong with either the response collection or the ending
+ detection regex and throw an exception.
+ """
+ def __init__(self, params):
+ self._next_query = params.get("next_query", params["query"])
+ self._first_query = params.get("first_query", self._next_query)
+ self._append_iteration_suffix = params.get("append_iteration_suffix", False)
+ self._iteration = 0
+ self._end_regex = params["end_regex"]
+ self._save_key = params["save_key"]
+ self._runaway_response_count = params.get("runaway_response_count", 10000)
+ self._is_send_to_remote = True
+ self._end_matched = False
+
+ def is_send_to_remote(self):
+ return self._is_send_to_remote
+
+ def get_send_packet(self):
+ if not self.is_send_to_remote():
+ raise Exception("get_send_packet() called on MultiResponseGdbRemoteEntry that is not in the send state")
+ if self._end_matched:
+ raise Exception("get_send_packet() called on MultiResponseGdbRemoteEntry but end of query/response sequence has already been seen.")
+
+ # Choose the first or next query for the base payload.
+ if self._iteration == 0 and self._first_query:
+ payload = self._first_query
+ else:
+ payload = self._next_query
+
+ # Append the suffix as needed.
+ if self._append_iteration_suffix:
+ payload += "%d" % self._iteration
+
+ # Keep track of the iteration.
+ self._iteration += 1
+
+ # Now that we've given the query packet, flip the mode to receive/match.
+ self._is_send_to_remote = False
+
+ # Return the result, converted to packet form.
+ return gdbremote_packet_encode_string(payload)
+
+ def is_consumed(self):
+ return self._end_matched
+
+ def assert_match(self, asserter, actual_packet, context=None):
+ # This only makes sense for matching lines coming from the remote debug monitor.
+ if self.is_send_to_remote():
+ raise Exception("assert_match() called on MultiResponseGdbRemoteEntry but state is set to send a query packet.")
+
+ if self._end_matched:
+ raise Exception("assert_match() called on MultiResponseGdbRemoteEntry but end of query/response sequence has already been seen.")
+
+ # Set up a context as needed.
+ if not context:
+ context = {}
+
+ # Check if the packet matches the end condition.
+ match = self._end_regex.match(actual_packet)
+ if match:
+ # We're done iterating.
+ self._end_matched = True
+ return context
+
+ # Not done iterating - save the packet.
+ context[self._save_key] = context.get(self._save_key, [])
+ context[self._save_key].append(actual_packet)
+
+ # Check for a runaway response cycle.
+ if len(context[self._save_key]) >= self._runaway_response_count:
+ raise Exception("runaway query/response cycle detected: %d responses captured so far. Last response: %s" %
+ (len(context[self._save_key]), context[self._save_key][-1]))
+
+ # Flip the mode to send for generating the query.
+ self._is_send_to_remote = True
+ return context
+
class GdbRemoteTestSequence(object):
_LOG_LINE_REGEX = re.compile(r'^.*(read|send)\s+packet:\s+(.+)$')
else:
raise Exception("failed to interpret log line: {}".format(line))
elif type(line) == dict:
- # Handle more explicit control over details via dictionary.
- direction = line.get("direction", None)
- regex = line.get("regex", None)
- capture = line.get("capture", None)
- expect_captures = line.get("expect_captures", None)
-
- # Compile the regex.
- if regex and (type(regex) == str):
- regex = re.compile(regex)
-
- if _is_packet_lldb_gdbserver_input(direction, remote_input_is_read):
- # Handle as something to send to the remote debug monitor.
- if self.logger:
- self.logger.info("processed dict sequence to send to remote")
- self.entries.append(GdbRemoteEntry(is_send_to_remote=True, regex=regex, capture=capture, expect_captures=expect_captures))
+ entry_type = line.get("type", "regex_capture")
+ if entry_type == "regex_capture":
+ # Handle more explicit control over details via dictionary.
+ direction = line.get("direction", None)
+ regex = line.get("regex", None)
+ capture = line.get("capture", None)
+ expect_captures = line.get("expect_captures", None)
+
+ # Compile the regex.
+ if regex and (type(regex) == str):
+ regex = re.compile(regex)
+
+ if _is_packet_lldb_gdbserver_input(direction, remote_input_is_read):
+ # Handle as something to send to the remote debug monitor.
+ if self.logger:
+ self.logger.info("processed dict sequence to send to remote")
+ self.entries.append(GdbRemoteEntry(is_send_to_remote=True, regex=regex, capture=capture, expect_captures=expect_captures))
+ else:
+ # Log line represents content to be expected from the remote debug monitor.
+ if self.logger:
+ self.logger.info("processed dict sequence to match receiving from remote")
+ self.entries.append(GdbRemoteEntry(is_send_to_remote=False, regex=regex, capture=capture, expect_captures=expect_captures))
+ elif entry_type == "multi_response":
+ self.entries.append(MultiResponseGdbRemoteEntry(line))
else:
- # Log line represents content to be expected from the remote debug monitor.
- if self.logger:
- self.logger.info("processed dict sequence to match receiving from remote")
- self.entries.append(GdbRemoteEntry(is_send_to_remote=False, regex=regex, capture=capture, expect_captures=expect_captures))
+ raise Exception("unknown entry type \"%s\"" % entry_type)
def process_is_running(pid, unknown_value=True):
"""If possible, validate that the given pid represents a running process on the local system.