1 # -*- test-case-name: twisted.runner.test.test_procmon -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Support for starting, monitoring, and restarting child process.
10 from twisted.python import log
11 from twisted.internet import error, protocol, reactor as _reactor
12 from twisted.application import service
13 from twisted.protocols import basic
19 transport = DummyTransport()
21 class LineLogger(basic.LineReceiver):
26 def lineReceived(self, line):
27 log.msg('[%s] %s' % (self.tag, line))
30 class LoggingProtocol(protocol.ProcessProtocol):
36 def connectionMade(self):
37 self.output = LineLogger()
38 self.output.tag = self.name
39 self.output.makeConnection(transport)
42 def outReceived(self, data):
43 self.output.dataReceived(data)
44 self.empty = data[-1] == '\n'
46 errReceived = outReceived
49 def processEnded(self, reason):
51 self.output.dataReceived('\n')
52 self.service.connectionLost(self.name)
55 class ProcessMonitor(service.Service):
57 ProcessMonitor runs processes, monitors their progress, and restarts
60 The ProcessMonitor will not attempt to restart a process that appears to
61 die instantly -- with each "instant" death (less than 1 second, by
62 default), it will delay approximately twice as long before restarting
63 it. A successful run will reset the counter.
65 The primary interface is L{addProcess} and L{removeProcess}. When the
66 service is running (that is, when the application it is attached to is
67 running), adding a process automatically starts it.
69 Each process has a name. This name string must uniquely identify the
70 process. In particular, attempting to add two processes with the same
71 name will result in a C{KeyError}.
73 @type threshold: C{float}
74 @ivar threshold: How long a process has to live before the death is
75 considered instant, in seconds. The default value is 1 second.
77 @type killTime: C{float}
78 @ivar killTime: How long a process being killed has to get its affairs
79 in order before it gets killed with an unmaskable signal. The
80 default value is 5 seconds.
82 @type minRestartDelay: C{float}
83 @ivar minRestartDelay: The minimum time (in seconds) to wait before
84 attempting to restart a process. Default 1s.
86 @type maxRestartDelay: C{float}
87 @ivar maxRestartDelay: The maximum time (in seconds) to wait before
88 attempting to restart a process. Default 3600s (1h).
90 @type _reactor: L{IReactorProcess} provider
91 @ivar _reactor: A provider of L{IReactorProcess} and L{IReactorTime}
92 which will be used to spawn processes and register delayed calls.
98 maxRestartDelay = 3600
101 def __init__(self, reactor=_reactor):
102 self._reactor = reactor
107 self.timeStarted = {}
112 def __getstate__(self):
113 dct = service.Service.__getstate__(self)
115 dct['protocols'] = {}
117 dct['timeStarted'] = {}
123 def addProcess(self, name, args, uid=None, gid=None, env={}):
125 Add a new monitored process and start it immediately if the
126 L{ProcessMonitor} service is running.
128 Note that args are passed to the system call, not to the shell. If
129 running the shell is desired, the common idiom is to use
130 C{ProcessMonitor.addProcess("name", ['/bin/sh', '-c', shell_script])}
132 @param name: A name for this process. This value must be
133 unique across all processes added to this monitor.
135 @param args: The argv sequence for the process to launch.
136 @param uid: The user ID to use to run the process. If C{None},
137 the current UID is used.
139 @param gid: The group ID to use to run the process. If C{None},
140 the current GID is used.
142 @param env: The environment to give to the launched process. See
143 L{IReactorProcess.spawnProcess}'s C{env} parameter.
145 @raises: C{KeyError} if a process with the given name already
148 if name in self.processes:
149 raise KeyError("remove %s first" % (name,))
150 self.processes[name] = args, uid, gid, env
151 self.delay[name] = self.minRestartDelay
153 self.startProcess(name)
156 def removeProcess(self, name):
158 Stop the named process and remove it from the list of monitored
162 @param name: A string that uniquely identifies the process.
164 self.stopProcess(name)
165 del self.processes[name]
168 def startService(self):
170 Start all monitored processes.
172 service.Service.startService(self)
173 for name in self.processes:
174 self.startProcess(name)
177 def stopService(self):
179 Stop all monitored processes and cancel all scheduled process restarts.
181 service.Service.stopService(self)
183 # Cancel any outstanding restarts
184 for name, delayedCall in self.restart.items():
185 if delayedCall.active():
188 for name in self.processes:
189 self.stopProcess(name)
192 def connectionLost(self, name):
194 Called when a monitored processes exits. If
195 L{ProcessMonitor.running} is C{True} (ie the service is started), the
196 process will be restarted.
197 If the process had been running for more than
198 L{ProcessMonitor.threshold} seconds it will be restarted immediately.
199 If the process had been running for less than
200 L{ProcessMonitor.threshold} seconds, the restart will be delayed and
201 each time the process dies before the configured threshold, the restart
202 delay will be doubled - up to a maximum delay of maxRestartDelay sec.
205 @param name: A string that uniquely identifies the process
208 # Cancel the scheduled _forceStopProcess function if the process
210 if name in self.murder:
211 if self.murder[name].active():
212 self.murder[name].cancel()
213 del self.murder[name]
215 del self.protocols[name]
217 if self._reactor.seconds() - self.timeStarted[name] < self.threshold:
218 # The process died too fast - backoff
219 nextDelay = self.delay[name]
220 self.delay[name] = min(self.delay[name] * 2, self.maxRestartDelay)
223 # Process had been running for a significant amount of time
224 # restart immediately
226 self.delay[name] = self.minRestartDelay
228 # Schedule a process restart if the service is running
229 if self.running and name in self.processes:
230 self.restart[name] = self._reactor.callLater(nextDelay,
235 def startProcess(self, name):
237 @param name: The name of the process to be started
239 # If a protocol instance already exists, it means the process is
241 if name in self.protocols:
244 args, uid, gid, env = self.processes[name]
246 proto = LoggingProtocol()
249 self.protocols[name] = proto
250 self.timeStarted[name] = self._reactor.seconds()
251 self._reactor.spawnProcess(proto, args[0], args, uid=uid,
255 def _forceStopProcess(self, proc):
257 @param proc: An L{IProcessTransport} provider
260 proc.signalProcess('KILL')
261 except error.ProcessExitedAlready:
265 def stopProcess(self, name):
267 @param name: The name of the process to be stopped
269 if name not in self.processes:
270 raise KeyError('Unrecognized process name: %s' % (name,))
272 proto = self.protocols.get(name, None)
273 if proto is not None:
274 proc = proto.transport
276 proc.signalProcess('TERM')
277 except error.ProcessExitedAlready:
280 self.murder[name] = self._reactor.callLater(
282 self._forceStopProcess, proc)
285 def restartAll(self):
287 Restart all processes. This is useful for third party management
288 services to allow a user to restart servers because of an outside change
289 in circumstances -- for example, a new version of a library is
292 for name in self.processes:
293 self.stopProcess(name)
298 for name, proc in self.processes.items():
300 if proc[1] is not None:
301 uidgid = str(proc[1])
302 if proc[2] is not None:
303 uidgid += ':'+str(proc[2])
306 uidgid = '(' + uidgid + ')'
307 l.append('%r%s: %r' % (name, uidgid, proc[0]))
308 return ('<' + self.__class__.__name__ + ' '