From b6e04ac5aa881c1fbb66da884b04e48dfb102474 Mon Sep 17 00:00:00 2001 From: Pavel Labath Date: Mon, 30 Nov 2020 09:05:54 +0100 Subject: [PATCH] [lldb/test] Avoid the socket "pump" thread A separate thread is not necessary, as we can do its work on the main thread, while waiting for the packet to arrive. This makes the code easier to understand and debug (other simplifications are possible too, but I'll leave that for separate patches). The new implementation also avoids busy waiting. --- .../test/tools/lldb-server/lldbgdbserverutils.py | 4 +- .../test/tools/lldb-server/socket_packet_pump.py | 80 +++++++++------------- 2 files changed, 34 insertions(+), 50 deletions(-) diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py index 0c01bdf..b5c635a 100644 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py +++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py @@ -236,7 +236,7 @@ def expect_lldb_gdbserver_replay( if sequence_entry.is_output_matcher(): try: # Grab next entry from the output queue. - content = pump_queues.output_queue().get(True, timeout_seconds) + content = pump.get_output(timeout_seconds) except queue.Empty: if logger: logger.warning( @@ -247,7 +247,7 @@ def expect_lldb_gdbserver_replay( pump.get_accumulated_output())) else: try: - content = pump_queues.packet_queue().get(True, timeout_seconds) + content = pump.get_packet(timeout_seconds) except queue.Empty: if logger: logger.warning( diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py index 3de7634..6c41ed4 100644 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py +++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py @@ -5,6 +5,7 @@ from __future__ import print_function import re import select import threading +import time import traceback from six.moves import queue @@ -74,8 +75,6 @@ class SocketPacketPump(object): if not pump_socket: raise Exception("pump_socket cannot be None") - self._thread = None - self._stop_thread = False self._socket = pump_socket self._logger = logger self._receive_buffer = "" @@ -83,29 +82,42 @@ class SocketPacketPump(object): self._pump_queues = pump_queues def __enter__(self): - """Support the python 'with' statement. - - Start the pump thread.""" - self.start_pump_thread() + self._receive_buffer = "" + self._accumulated_output = "" return self def __exit__(self, exit_type, value, the_traceback): - """Support the python 'with' statement. - - Shut down the pump thread.""" - self.stop_pump_thread() + pass + + def _read(self, timeout_seconds, q): + now = time.monotonic() + deadline = now + timeout_seconds + while q.empty() and now <= deadline: + can_read, _, _ = select.select([self._socket], [], [], deadline-now) + now = time.monotonic() + if can_read and self._socket in can_read: + try: + new_bytes = seven.bitcast_to_string(self._socket.recv(4096)) + if self._logger and new_bytes and len(new_bytes) > 0: + self._logger.debug( + "pump received bytes: {}".format(new_bytes)) + except: + # Likely a closed socket. Done with the pump thread. + if self._logger: + self._logger.debug( + "socket read failed, stopping pump read thread\n" + + traceback.format_exc(3)) + break + self._process_new_bytes(new_bytes) + if q.empty(): + raise queue.Empty() + return q.get(True) - def start_pump_thread(self): - if self._thread: - raise Exception("pump thread is already running") - self._stop_thread = False - self._thread = threading.Thread(target=self._run_method) - self._thread.start() + def get_output(self, timeout_seconds): + return self._read(timeout_seconds, self._pump_queues.output_queue()) - def stop_pump_thread(self): - self._stop_thread = True - if self._thread: - self._thread.join() + def get_packet(self, timeout_seconds): + return self._read(timeout_seconds, self._pump_queues.packet_queue()) def _process_new_bytes(self, new_bytes): if not new_bytes: @@ -162,34 +174,6 @@ class SocketPacketPump(object): # packet. Stop trying until we read more. has_more = False - def _run_method(self): - self._receive_buffer = "" - self._accumulated_output = "" - - if self._logger: - self._logger.info("socket pump starting") - - # Keep looping around until we're asked to stop the thread. - while not self._stop_thread: - can_read, _, _ = select.select([self._socket], [], [], 0) - if can_read and self._socket in can_read: - try: - new_bytes = seven.bitcast_to_string(self._socket.recv(4096)) - if self._logger and new_bytes and len(new_bytes) > 0: - self._logger.debug( - "pump received bytes: {}".format(new_bytes)) - except: - # Likely a closed socket. Done with the pump thread. - if self._logger: - self._logger.debug( - "socket read failed, stopping pump read thread\n" + - traceback.format_exc(3)) - break - self._process_new_bytes(new_bytes) - - if self._logger: - self._logger.info("socket pump exiting") - def get_accumulated_output(self): return self._accumulated_output -- 2.7.4