1 # -*- test-case-name: twisted.test.test_process -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Cross-platform process-related functionality used by different
7 L{IReactorProcess} implementations.
10 from twisted.python.reflect import qual
11 from twisted.python.deprecate import getWarningMethod
12 from twisted.python.failure import Failure
13 from twisted.python.log import err
14 from twisted.persisted.styles import Ephemeral
16 _missingProcessExited = ("Since Twisted 8.2, IProcessProtocol.processExited "
17 "is required. %s must implement it.")
19 class BaseProcess(Ephemeral):
25 def __init__(self, protocol):
29 def _callProcessExited(self, reason):
31 processExited = getattr(self.proto, 'processExited', default)
32 if processExited is default:
34 _missingProcessExited % (qual(self.proto.__class__),),
35 DeprecationWarning, stacklevel=0)
37 processExited(Failure(reason))
40 def processEnded(self, status):
42 This is called when the child terminates.
47 self._callProcessExited(self._getReason(status))
48 self.maybeCallProcessEnded()
51 def maybeCallProcessEnded(self):
53 Call processEnded on protocol after final cleanup.
55 if self.proto is not None:
56 reason = self._getReason(self.status)
60 proto.processEnded(Failure(reason))
62 err(None, "unexpected error in processEnded")