class ServerProcess:
"""This class provides a wrapper around a subprocess that
implements a simple request/response usage model. The primary benefit
- is that reading responses takes a timeout, so that we don't ever block
+ is that reading responses takes a deadline, so that we don't ever block
indefinitely. The class also handles transparently restarting processes
as necessary to keep issuing commands."""
def _reset(self):
self._proc = None
- self._output = ''
+ self._output = str() # bytesarray() once we require Python 2.6
+ self._error = str() # bytesarray() once we require Python 2.6
self.crashed = False
self.timed_out = False
- self.error = ''
+
+ def process_name(self):
+ return self._name
def _start(self):
if self._proc:
return self._proc.poll()
return None
- def write(self, input):
+ def write(self, bytes):
"""Write a request to the subprocess. The subprocess is (re-)start()'ed
if is not already running."""
if not self._proc:
self._start()
try:
- self._proc.stdin.write(input)
+ self._proc.stdin.write(bytes)
except IOError, e:
self.stop()
# stop() calls _reset(), so we have to set crashed to True after calling stop().
self.crashed = True
- def read_line(self, timeout):
- """Read a single line from the subprocess, waiting until the deadline.
- If the deadline passes, the call times out. Note that even if the
- subprocess has crashed or the deadline has passed, if there is output
- pending, it will be returned.
-
- Args:
- timeout: floating-point number of seconds the call is allowed
- to block for. A zero or negative number will attempt to read
- any existing data, but will not block. There is no way to
- block indefinitely.
- Returns:
- output: data returned, if any. If no data is available and the
- call times out or crashes, an empty string is returned. Note
- that the returned string includes the newline ('\n')."""
- return self._read(timeout, size=0)
-
- def read(self, timeout, size):
- """Attempts to read size characters from the subprocess, waiting until
- the deadline passes. If the deadline passes, any available data will be
- returned. Note that even if the deadline has passed or if the
- subprocess has crashed, any available data will still be returned.
-
- Args:
- timeout: floating-point number of seconds the call is allowed
- to block for. A zero or negative number will attempt to read
- any existing data, but will not block. There is no way to
- block indefinitely.
- size: amount of data to read. Must be a postive integer.
- Returns:
- output: data returned, if any. If no data is available, an empty
- string is returned.
- """
+ def _pop_stdout_line_if_ready(self):
+ index_after_newline = self._output.find('\n') + 1
+ if index_after_newline > 0:
+ return self._pop_output_bytes(index_after_newline)
+ return None
+
+ def _pop_stderr_line_if_ready(self):
+ index_after_newline = self._error.find('\n') + 1
+ if index_after_newline > 0:
+ return self._pop_error_bytes(index_after_newline)
+ return None
+
+ def pop_all_buffered_stderr(self):
+ return self._pop_error_bytes(len(self._error))
+
+ def read_stdout_line(self, deadline):
+ return self._read(deadline, self._pop_stdout_line_if_ready)
+
+ def read_stderr_line(self, deadline):
+ return self._read(deadline, self._pop_stderr_line_if_ready)
+
+ def read_either_stdout_or_stderr_line(self, deadline):
+ def retrieve_bytes_from_buffers():
+ stdout_line = self._pop_stdout_line_if_ready()
+ if stdout_line:
+ return stdout_line, None
+ stderr_line = self._pop_stderr_line_if_ready()
+ if stderr_line:
+ return None, stderr_line
+ return None # Instructs the caller to keep waiting.
+
+ return_value = self._read(deadline, retrieve_bytes_from_buffers)
+ # FIXME: This is a bit of a hack around the fact that _read normally only returns one value, but this caller wants it to return two.
+ if return_value is None:
+ return None, None
+ return return_value
+
+ def read_stdout(self, deadline, size):
if size <= 0:
raise ValueError('ServerProcess.read() called with a non-positive size: %d ' % size)
- return self._read(timeout, size)
+
+ def retrieve_bytes_from_stdout_buffer():
+ if len(self._output) >= size:
+ return self._pop_output_bytes(size)
+ return None
+
+ return self._read(deadline, retrieve_bytes_from_stdout_buffer)
def _check_for_crash(self):
if self._proc.poll() != None:
except ScriptError, e:
self._log('Unable to sample process.')
- def _read(self, timeout, size):
- """Internal routine that actually does the read."""
- index = -1
+ def _handle_timeout(self):
+ self._executive.wait_newest(self._port.is_crash_reporter)
+ if not self.crashed:
+ self._check_for_crash()
+ self.timed_out = True
+ if not self.crashed:
+ self._sample()
+
+ def _split_string_after_index(self, string, index):
+ return string[:index], string[index:]
+
+ def _pop_output_bytes(self, bytes_count):
+ output, self._output = self._split_string_after_index(self._output, bytes_count)
+ return output
+
+ def _pop_error_bytes(self, bytes_count):
+ output, self._error = self._split_string_after_index(self._error, bytes_count)
+ return output
+
+ def _wait_for_data_and_update_buffers(self, deadline):
out_fd = self._proc.stdout.fileno()
err_fd = self._proc.stderr.fileno()
select_fds = (out_fd, err_fd)
- deadline = time.time() + timeout
- while not self.timed_out and not self.crashed:
- self._check_for_crash()
+ read_fds, _, _ = select.select(select_fds, [], select_fds, deadline - time.time())
+ try:
+ if out_fd in read_fds:
+ self._output += self._proc.stdout.read()
+ if err_fd in read_fds:
+ self._error += self._proc.stderr.read()
+ except IOError, e:
+ # FIXME: Why do we ignore all IOErrors here?
+ pass
+
+ def _check_for_abort(self, deadline):
+ self._check_for_crash()
+
+ if time.time() > deadline:
+ self._handle_timeout()
+
+ return self.crashed or self.timed_out
+
+ # This read function is a bit oddly-designed, as it polls both stdout and stderr, yet
+ # only reads/returns from one of them (buffering both in local self._output/self._error).
+ # It might be cleaner to pass in the file descriptor to poll instead.
+ def _read(self, deadline, fetch_bytes_from_buffers_callback):
+ while True:
+ if self._check_for_abort(deadline):
+ return None
+
+ bytes = fetch_bytes_from_buffers_callback()
+ if bytes is not None:
+ return bytes
- now = time.time()
- if now > deadline:
- self._executive.wait_newest(self._port.is_crash_reporter)
- if not self.crashed:
- self._check_for_crash()
- self.timed_out = True
- if not self.crashed:
- self._sample()
-
- # Check to see if we have any output we can return.
- if size and len(self._output) >= size:
- index = size
- elif size == 0:
- index = self._output.find('\n') + 1
-
- if index > 0 or self.crashed or self.timed_out:
- output = self._output[0:index]
- self._output = self._output[index:]
- return output
-
- # Nope - wait for more data.
- (read_fds, write_fds, err_fds) = select.select(select_fds, [], select_fds, deadline - now)
- try:
- if out_fd in read_fds:
- self._output += self._proc.stdout.read()
- if err_fd in read_fds:
- self.error += self._proc.stderr.read()
- except IOError, e:
- pass
+ self._wait_for_data_and_update_buffers(deadline)
def stop(self):
- """Stop (shut down) the subprocess), if it is running."""
if not self._proc:
return