self.set_inferior_startup_launch()
# Uncomment this code to force only a single test to run (by name).
- # if not re.search(r"m_packet_reads_memory", self._testMethodName):
+ # if not re.search(r"qMemoryRegionInfo", self._testMethodName):
# self.skipTest("focusing on one test")
def reset_test_sequence(self):
self.assertTrue("encoding" in reg_info)
self.assertTrue("format" in reg_info)
+ def assert_address_within_range(self, test_address, range_start, range_size):
+ range_end = range_start + range_size
+
+ if test_address < range_start:
+ self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+ elif test_address >= range_end:
+ self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+
def add_threadinfo_collection_packets(self):
self.test_sequence.add_log_lines(
[ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
self.set_inferior_startup_launch()
self.qMemoryRegionInfo_is_supported()
+ def qMemoryRegionInfo_reports_code_address_as_executable(self):
+ # Start up the inferior.
+ procs = self.prep_debug_monitor_and_inferior(
+ inferior_args=["get-code-address-hex:", "sleep:5"])
+
+ # Run the process
+ self.test_sequence.add_log_lines(
+ [
+ # Start running after initial stop.
+ "read packet: $c#00",
+ # Match output line that prints the memory address of the message buffer within the inferior.
+ # Note we require launch-only testing so we can get inferior otuput.
+ { "type":"output_match", "regex":r"^code address: 0x([0-9a-fA-F]+)\r\n$", "capture":{ 1:"code_address"} },
+ # Now stop the inferior.
+ "read packet: {}".format(chr(03)),
+ # And wait for the stop notification.
+ {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }],
+ True)
+
+ # Run the packet stream.
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+
+ # Grab the code address.
+ self.assertIsNotNone(context.get("code_address"))
+ code_address = int(context.get("code_address"), 16)
+
+ # Grab memory region info from the inferior.
+ self.reset_test_sequence()
+ self.test_sequence.add_log_lines(
+ ["read packet: $qMemoryRegionInfo:{0:x}#00".format(code_address),
+ {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
+ True)
+
+ # Run the packet stream.
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+
+ # Parse the memory region response
+ self.assertIsNotNone(context.get("memory_region_response"))
+ mem_region_dict = {match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", context.get("memory_region_response"))}
+ # print "mem_region_dict: {}".format(mem_region_dict)
+
+ for (key, val) in mem_region_dict.items():
+ self.assertTrue(key in ["start", "size", "permissions", "error"])
+ self.assertIsNotNone(val)
+
+ # Ensure code address is readable and executable.
+ self.assertTrue("permissions" in mem_region_dict)
+ self.assertTrue("r" in mem_region_dict["permissions"])
+ self.assertTrue("x" in mem_region_dict["permissions"])
+
+ # Ensure it has a start address and a size.
+ self.assertTrue("start" in mem_region_dict)
+ self.assertTrue("size" in mem_region_dict)
+
+ # Ensure the start address and size encompass the address we queried.
+ self.assert_address_within_range(code_address, int(mem_region_dict["start"], 16), int(mem_region_dict["size"], 16))
+
+
+ @debugserver_test
+ @dsym_test
+ def test_qMemoryRegionInfo_reports_code_address_as_executable_debugserver_dsym(self):
+ self.init_debugserver_test()
+ self.buildDsym()
+ self.set_inferior_startup_launch()
+ self.qMemoryRegionInfo_reports_code_address_as_executable()
+
+ @llgs_test
+ @dwarf_test
+ @unittest2.expectedFailure()
+ def test_qMemoryRegionInfo_reports_code_address_as_executable_llgs_dwarf(self):
+ self.init_llgs_test()
+ self.buildDwarf()
+ self.set_inferior_startup_launch()
+ self.qMemoryRegionInfo_reports_code_address_as_executable()
+
+
if __name__ == '__main__':
unittest2.main()
static const char *const STDERR_PREFIX = "stderr:";
static const char *const SET_MESSAGE_PREFIX = "set-message:";
static const char *const GET_MESSAGE_ADDRESS_COMMAND = "get-message-address-hex:";
+static const char *const GET_STACK_ADDRESS_COMMAND = "get-stack-address-hex:";
+static const char *const GET_HEAP_ADDRESS_COMMAND = "get-heap-address-hex:";
+static const char *const GET_CODE_ADDRESS_COMMAND = "get-code-address-hex:";
static const char *const THREAD_PREFIX = "thread:";
static const char *const THREAD_COMMAND_NEW = "new";
int main (int argc, char **argv)
{
std::vector<pthread_t> threads;
+ std::unique_ptr<uint8_t[]> heap_array_up;
int return_value = 0;
// Set the signal handler.
printf ("message address: %p\n", &g_message[0]);
pthread_mutex_unlock (&g_print_mutex);
}
+ else if (std::strstr (argv[i], GET_HEAP_ADDRESS_COMMAND))
+ {
+ // Create a byte array if not already present.
+ if (!heap_array_up)
+ heap_array_up.reset (new uint8_t[32]);
+
+ pthread_mutex_lock (&g_print_mutex);
+ printf ("heap address: %p\n", heap_array_up.get ());
+ pthread_mutex_unlock (&g_print_mutex);
+ }
+ else if (std::strstr (argv[i], GET_STACK_ADDRESS_COMMAND))
+ {
+ pthread_mutex_lock (&g_print_mutex);
+ printf ("stack address: %p\n", &return_value);
+ pthread_mutex_unlock (&g_print_mutex);
+ }
+ else if (std::strstr (argv[i], GET_CODE_ADDRESS_COMMAND))
+ {
+ pthread_mutex_lock (&g_print_mutex);
+ printf ("code address: %p\n", main);
+ pthread_mutex_unlock (&g_print_mutex);
+ }
else if (std::strstr (argv[i], THREAD_PREFIX))
{
// Check if we're creating a new thread.