There should be nothing unwanted there and a simpe main.cpp which includes SB*.h
should compile and link with the LLDB framework."""
-import os, re, StringIO
+import lldb_shared
+
+import os, re
import unittest2
from lldbtest import *
import lldbutil
"""Test the lldb public C++ api when doing multiple debug sessions simultaneously."""
-import os, re, StringIO
+import lldb_shared
+
+import os, re
import unittest2
from lldbtest import *
import lldbutil
"""Test the lldb public C++ api breakpoint callbacks."""
-import os, re, StringIO
+import lldb_shared
+
+import os, re
import unittest2
from lldbtest import *
import lldbutil
from __future__ import print_function
+import lldb_shared
+
# system packages and modules
import asyncore
import distutils.version
import multiprocessing.pool
import os
import platform
-import Queue
import re
import signal
import sys
import threading
+from six.moves import queue
+
# Add our local test_runner/lib dir to the python path.
sys.path.append(os.path.join(os.path.dirname(__file__), "test_runner", "lib"))
result = process_dir(job[0], job[1], job[2], job[3],
inferior_pid_events)
result_queue.put(result)
- except Queue.Empty:
+ except queue.Empty:
# Fine, we're done.
pass
result = process_dir(job[0], job[1], job[2], job[3],
inferior_pid_events)
result_queue.put(result)
- except Queue.Empty:
+ except queue.Empty:
# Fine, we're done.
pass
try:
# Just drain it to stop more work from being started.
job_queue.get_nowait()
- except Queue.Empty:
+ except queue.Empty:
pass
with output_lock:
print("Stopped more work from being started.")
initialize_global_vars_threading(num_threads, test_work_items)
# Create jobs.
- job_queue = Queue.Queue()
+ job_queue = queue.Queue()
for test_work_item in test_work_items:
job_queue.put(test_work_item)
- result_queue = Queue.Queue()
+ result_queue = queue.Queue()
# Create queues for started child pids. Terminating
# the threading threads does not terminate the
# child processes they spawn.
- inferior_pid_events = Queue.Queue()
+ inferior_pid_events = queue.Queue()
# Create workers. We don't use multiprocessing.pool.ThreadedPool
# due to challenges with handling ^C keyboard interrupts.
framework.
"""
+import lldb_shared
+
import asyncore
-import cPickle
import socket
+from six.moves import cPickle
class UnpicklingForwardingReaderChannel(asyncore.dispatcher):
"""Provides an unpickling, forwarding asyncore dispatch channel reader.
"""
Test that argdumper is a viable launching strategy.
"""
-import commands
import lldb
import os
import time
"""
Test some lldb command abbreviations.
"""
-import commands
import lldb
import os
import time
"""
Test that argdumper is a viable launching strategy.
"""
-import commands
import lldb
import os
import time
Test lldb target stop-hook command.
"""
+import lldb_shared
+
import os
import unittest2
-import StringIO
import lldb
from lldbtest import *
import lldbutil
import re
import signal
from subprocess import *
-import StringIO
import time
import types
import unittest2
import lldbutil
from six import add_metaclass
-
-if sys.version_info.major < 3:
- import urlparse
-else:
- import urllib.parse as urlparse
+from six import StringIO as SixStringIO
+from six.moves.urllib import parse as urlparse
# dosep.py starts lots and lots of dotest instances
# This option helps you find if two (or more) dotest instances are using the same
return exe_file
return None
-class recording(StringIO.StringIO):
+class recording(SixStringIO):
"""
A nice little context manager for recording the debugger interactions into
our session object. If trace flag is ON, it also emits the interactions
into the stderr.
"""
def __init__(self, test, trace):
- """Create a StringIO instance; record the session obj and trace flag."""
- StringIO.StringIO.__init__(self)
+ """Create a SixStringIO instance; record the session obj and trace flag."""
+ SixStringIO.__init__(self)
# The test might not have undergone the 'setUp(self)' phase yet, so that
# the attribute 'session' might not even exist yet.
self.session = getattr(test, "session", None) if test else None
def __enter__(self):
"""
Context management protocol on entry to the body of the with statement.
- Just return the StringIO object.
+ Just return the SixStringIO object.
"""
return self
"""
Context management protocol on exit from the body of the with statement.
If trace is ON, it emits the recordings into stderr. Always add the
- recordings to our session object. And close the StringIO object, too.
+ recordings to our session object. And close the SixStringIO object, too.
"""
if self.trace:
print(self.getvalue(), file=sys.stderr)
from __future__ import print_function
+import lldb_shared
+
import lldb
import os, sys
-import StringIO
import re
+from six import StringIO as SixStringIO
+
# ===================================================
# Utilities for locating/checking executable programs
# ===================================================
It returns the disassembly content in a string object.
"""
- buf = StringIO.StringIO()
+ buf = SixStringIO()
insts = function_or_symbol.GetInstructions(target)
for i in insts:
print(i, file=buf)
def print_stacktrace(thread, string_buffer = False):
"""Prints a simple stack trace of this thread."""
- output = StringIO.StringIO() if string_buffer else sys.stdout
+ output = SixStringIO() if string_buffer else sys.stdout
target = thread.GetProcess().GetTarget()
depth = thread.GetNumFrames()
def print_stacktraces(process, string_buffer = False):
"""Prints the stack traces of all the threads."""
- output = StringIO.StringIO() if string_buffer else sys.stdout
+ output = SixStringIO() if string_buffer else sys.stdout
print("Stack traces for " + str(process), file=output)
def print_registers(frame, string_buffer = False):
"""Prints all the register sets of the frame."""
- output = StringIO.StringIO() if string_buffer else sys.stdout
+ output = SixStringIO() if string_buffer else sys.stdout
print("Register sets for " + str(frame), file=output)
"""The basic formatter inspects the value object and prints the value."""
def format(self, value, buffer=None, indent=0):
if not buffer:
- output = StringIO.StringIO()
+ output = SixStringIO()
else:
output = buffer
# If there is a summary, it suffices.
self.cindent = indent_child
def format(self, value, buffer=None):
if not buffer:
- output = StringIO.StringIO()
+ output = SixStringIO()
else:
output = buffer
self.cindent = indent_child
def format(self, value, buffer=None):
if not buffer:
- output = StringIO.StringIO()
+ output = SixStringIO()
else:
output = buffer
And other SBFrame API tests.
"""
+import lldb_shared
+
import os, time
import re
import unittest2
# depth of 3 of the 'c' leaf function.
callsOfA = 0
- import StringIO
- session = StringIO.StringIO()
+ from six import StringIO as SixStringIO
+ session = SixStringIO()
while process.GetState() == lldb.eStateStopped:
thread = process.GetThreadAtIndex(0)
# Inspect at most 3 frames.
within the LLDB test suite.
"""
+import lldb_shared
+
import argparse
-import cPickle
import inspect
import os
import pprint
import traceback
import xml.sax.saxutils
+from six.moves import cPickle
+
class EventBuilder(object):
"""Helper class to build test result event dictionaries."""
"""Module for supporting unit testing of the lldb-server debug monitor exe.
"""
+import lldb_shared
+
import os
import os.path
import platform
-import Queue
import re
import socket_packet_pump
import subprocess
import time
from lldbtest import *
+from six.moves import queue
+
def _get_debug_monitor_from_lldb(lldb_exe, debug_monitor_basename):
"""Return the debug monitor exe path given the lldb exe path.
try:
# Grab next entry from the output queue.
content = pump.output_queue().get(True, timeout_seconds)
- except Queue.Empty:
+ except queue.Empty:
if logger:
logger.warning("timeout waiting for stub output (accumulated output:{})".format(pump.get_accumulated_output()))
raise Exception("timed out while waiting for output match (accumulated output: {})".format(pump.get_accumulated_output()))
else:
try:
content = pump.packet_queue().get(True, timeout_seconds)
- except Queue.Empty:
+ except queue.Empty:
if logger:
logger.warning("timeout waiting for packet match (receive buffer: {})".format(pump.get_receive_buffer()))
raise Exception("timed out while waiting for packet match (receive buffer: {})".format(pump.get_receive_buffer()))
-import Queue
+
+import lldb_shared
+
import re
import select
import threading
import traceback
import codecs
+from six.moves import queue
+
def _handle_output_packet_string(packet_contents):
if (not packet_contents) or (len(packet_contents) < 1):
return None
if not pump_socket:
raise Exception("pump_socket cannot be None")
- self._output_queue = Queue.Queue()
- self._packet_queue = Queue.Queue()
+ self._output_queue = queue.Queue()
+ self._packet_queue = queue.Queue()
self._thread = None
self._stop_thread = False
self._socket = pump_socket
"""Test result object"""
+import lldb_shared
+
import sys
import traceback
import unittest
-from StringIO import StringIO
+from six import StringIO as SixStringIO
from unittest2 import util
from unittest2.compatibility import wraps
self._mirrorOutput = False
if self.buffer:
if self._stderr_buffer is None:
- self._stderr_buffer = StringIO()
- self._stdout_buffer = StringIO()
+ self._stderr_buffer = SixStringIO()
+ self._stdout_buffer = SixStringIO()
sys.stdout = self._stdout_buffer
sys.stderr = self._stderr_buffer