Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / base.py
1 # -*- test-case-name: twisted.test.test_internet -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Very basic functionality for a Reactor implementation.
7 """
8
9 import socket # needed only for sync-dns
10 from zope.interface import implements, classImplements
11
12 import sys
13 import warnings
14 from heapq import heappush, heappop, heapify
15
16 import traceback
17
18 from twisted.python.compat import set
19 from twisted.python.util import unsignedID
20 from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
21 from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
22 from twisted.internet.interfaces import IConnector, IDelayedCall
23 from twisted.internet import fdesc, main, error, abstract, defer, threads
24 from twisted.python import log, failure, reflect
25 from twisted.python.runtime import seconds as runtimeSeconds, platform
26 from twisted.internet.defer import Deferred, DeferredList
27 from twisted.persisted import styles
28
29 # This import is for side-effects!  Even if you don't see any code using it
30 # in this module, don't delete it.
31 from twisted.python import threadable
32
33
34 class DelayedCall(styles.Ephemeral):
35
36     implements(IDelayedCall)
37     # enable .debug to record creator call stack, and it will be logged if
38     # an exception occurs while the function is being run
39     debug = False
40     _str = None
41
42     def __init__(self, time, func, args, kw, cancel, reset,
43                  seconds=runtimeSeconds):
44         """
45         @param time: Seconds from the epoch at which to call C{func}.
46         @param func: The callable to call.
47         @param args: The positional arguments to pass to the callable.
48         @param kw: The keyword arguments to pass to the callable.
49         @param cancel: A callable which will be called with this
50             DelayedCall before cancellation.
51         @param reset: A callable which will be called with this
52             DelayedCall after changing this DelayedCall's scheduled
53             execution time. The callable should adjust any necessary
54             scheduling details to ensure this DelayedCall is invoked
55             at the new appropriate time.
56         @param seconds: If provided, a no-argument callable which will be
57             used to determine the current time any time that information is
58             needed.
59         """
60         self.time, self.func, self.args, self.kw = time, func, args, kw
61         self.resetter = reset
62         self.canceller = cancel
63         self.seconds = seconds
64         self.cancelled = self.called = 0
65         self.delayed_time = 0
66         if self.debug:
67             self.creator = traceback.format_stack()[:-2]
68
69     def getTime(self):
70         """Return the time at which this call will fire
71
72         @rtype: C{float}
73         @return: The number of seconds after the epoch at which this call is
74         scheduled to be made.
75         """
76         return self.time + self.delayed_time
77
78     def cancel(self):
79         """Unschedule this call
80
81         @raise AlreadyCancelled: Raised if this call has already been
82         unscheduled.
83
84         @raise AlreadyCalled: Raised if this call has already been made.
85         """
86         if self.cancelled:
87             raise error.AlreadyCancelled
88         elif self.called:
89             raise error.AlreadyCalled
90         else:
91             self.canceller(self)
92             self.cancelled = 1
93             if self.debug:
94                 self._str = str(self)
95             del self.func, self.args, self.kw
96
97     def reset(self, secondsFromNow):
98         """Reschedule this call for a different time
99
100         @type secondsFromNow: C{float}
101         @param secondsFromNow: The number of seconds from the time of the
102         C{reset} call at which this call will be scheduled.
103
104         @raise AlreadyCancelled: Raised if this call has been cancelled.
105         @raise AlreadyCalled: Raised if this call has already been made.
106         """
107         if self.cancelled:
108             raise error.AlreadyCancelled
109         elif self.called:
110             raise error.AlreadyCalled
111         else:
112             newTime = self.seconds() + secondsFromNow
113             if newTime < self.time:
114                 self.delayed_time = 0
115                 self.time = newTime
116                 self.resetter(self)
117             else:
118                 self.delayed_time = newTime - self.time
119
120     def delay(self, secondsLater):
121         """Reschedule this call for a later time
122
123         @type secondsLater: C{float}
124         @param secondsLater: The number of seconds after the originally
125         scheduled time for which to reschedule this call.
126
127         @raise AlreadyCancelled: Raised if this call has been cancelled.
128         @raise AlreadyCalled: Raised if this call has already been made.
129         """
130         if self.cancelled:
131             raise error.AlreadyCancelled
132         elif self.called:
133             raise error.AlreadyCalled
134         else:
135             self.delayed_time += secondsLater
136             if self.delayed_time < 0:
137                 self.activate_delay()
138                 self.resetter(self)
139
140     def activate_delay(self):
141         self.time += self.delayed_time
142         self.delayed_time = 0
143
144     def active(self):
145         """Determine whether this call is still pending
146
147         @rtype: C{bool}
148         @return: True if this call has not yet been made or cancelled,
149         False otherwise.
150         """
151         return not (self.cancelled or self.called)
152
153
154     def __le__(self, other):
155         """
156         Implement C{<=} operator between two L{DelayedCall} instances.
157
158         Comparison is based on the C{time} attribute (unadjusted by the
159         delayed time).
160         """
161         return self.time <= other.time
162
163
164     def __lt__(self, other):
165         """
166         Implement C{<} operator between two L{DelayedCall} instances.
167
168         Comparison is based on the C{time} attribute (unadjusted by the
169         delayed time).
170         """
171         return self.time < other.time
172
173
174     def __str__(self):
175         if self._str is not None:
176             return self._str
177         if hasattr(self, 'func'):
178             if hasattr(self.func, 'func_name'):
179                 func = self.func.func_name
180                 if hasattr(self.func, 'im_class'):
181                     func = self.func.im_class.__name__ + '.' + func
182             else:
183                 func = reflect.safe_repr(self.func)
184         else:
185             func = None
186
187         now = self.seconds()
188         L = ["<DelayedCall 0x%x [%ss] called=%s cancelled=%s" % (
189                 unsignedID(self), self.time - now, self.called,
190                 self.cancelled)]
191         if func is not None:
192             L.extend((" ", func, "("))
193             if self.args:
194                 L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
195                 if self.kw:
196                     L.append(", ")
197             if self.kw:
198                 L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
199             L.append(")")
200
201         if self.debug:
202             L.append("\n\ntraceback at creation: \n\n%s" % ('    '.join(self.creator)))
203         L.append('>')
204
205         return "".join(L)
206
207
208
209 class ThreadedResolver(object):
210     """
211     L{ThreadedResolver} uses a reactor, a threadpool, and
212     L{socket.gethostbyname} to perform name lookups without blocking the
213     reactor thread.  It also supports timeouts indepedently from whatever
214     timeout logic L{socket.gethostbyname} might have.
215
216     @ivar reactor: The reactor the threadpool of which will be used to call
217         L{socket.gethostbyname} and the I/O thread of which the result will be
218         delivered.
219     """
220     implements(IResolverSimple)
221
222     def __init__(self, reactor):
223         self.reactor = reactor
224         self._runningQueries = {}
225
226
227     def _fail(self, name, err):
228         err = error.DNSLookupError("address %r not found: %s" % (name, err))
229         return failure.Failure(err)
230
231
232     def _cleanup(self, name, lookupDeferred):
233         userDeferred, cancelCall = self._runningQueries[lookupDeferred]
234         del self._runningQueries[lookupDeferred]
235         userDeferred.errback(self._fail(name, "timeout error"))
236
237
238     def _checkTimeout(self, result, name, lookupDeferred):
239         try:
240             userDeferred, cancelCall = self._runningQueries[lookupDeferred]
241         except KeyError:
242             pass
243         else:
244             del self._runningQueries[lookupDeferred]
245             cancelCall.cancel()
246
247             if isinstance(result, failure.Failure):
248                 userDeferred.errback(self._fail(name, result.getErrorMessage()))
249             else:
250                 userDeferred.callback(result)
251
252
253     def getHostByName(self, name, timeout = (1, 3, 11, 45)):
254         """
255         See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
256
257         Note that the elements of C{timeout} are summed and the result is used
258         as a timeout for the lookup.  Any intermediate timeout or retry logic
259         is left up to the platform via L{socket.gethostbyname}.
260         """
261         if timeout:
262             timeoutDelay = sum(timeout)
263         else:
264             timeoutDelay = 60
265         userDeferred = defer.Deferred()
266         lookupDeferred = threads.deferToThreadPool(
267             self.reactor, self.reactor.getThreadPool(),
268             socket.gethostbyname, name)
269         cancelCall = self.reactor.callLater(
270             timeoutDelay, self._cleanup, name, lookupDeferred)
271         self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
272         lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
273         return userDeferred
274
275
276
277 class BlockingResolver:
278     implements(IResolverSimple)
279
280     def getHostByName(self, name, timeout = (1, 3, 11, 45)):
281         try:
282             address = socket.gethostbyname(name)
283         except socket.error:
284             msg = "address %r not found" % (name,)
285             err = error.DNSLookupError(msg)
286             return defer.fail(err)
287         else:
288             return defer.succeed(address)
289
290
291 class _ThreePhaseEvent(object):
292     """
293     Collection of callables (with arguments) which can be invoked as a group in
294     a particular order.
295
296     This provides the underlying implementation for the reactor's system event
297     triggers.  An instance of this class tracks triggers for all phases of a
298     single type of event.
299
300     @ivar before: A list of the before-phase triggers containing three-tuples
301         of a callable, a tuple of positional arguments, and a dict of keyword
302         arguments
303
304     @ivar finishedBefore: A list of the before-phase triggers which have
305         already been executed.  This is only populated in the C{'BEFORE'} state.
306
307     @ivar during: A list of the during-phase triggers containing three-tuples
308         of a callable, a tuple of positional arguments, and a dict of keyword
309         arguments
310
311     @ivar after: A list of the after-phase triggers containing three-tuples
312         of a callable, a tuple of positional arguments, and a dict of keyword
313         arguments
314
315     @ivar state: A string indicating what is currently going on with this
316         object.  One of C{'BASE'} (for when nothing in particular is happening;
317         this is the initial value), C{'BEFORE'} (when the before-phase triggers
318         are in the process of being executed).
319     """
320     def __init__(self):
321         self.before = []
322         self.during = []
323         self.after = []
324         self.state = 'BASE'
325
326
327     def addTrigger(self, phase, callable, *args, **kwargs):
328         """
329         Add a trigger to the indicate phase.
330
331         @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
332
333         @param callable: An object to be called when this event is triggered.
334         @param *args: Positional arguments to pass to C{callable}.
335         @param **kwargs: Keyword arguments to pass to C{callable}.
336
337         @return: An opaque handle which may be passed to L{removeTrigger} to
338             reverse the effects of calling this method.
339         """
340         if phase not in ('before', 'during', 'after'):
341             raise KeyError("invalid phase")
342         getattr(self, phase).append((callable, args, kwargs))
343         return phase, callable, args, kwargs
344
345
346     def removeTrigger(self, handle):
347         """
348         Remove a previously added trigger callable.
349
350         @param handle: An object previously returned by L{addTrigger}.  The
351             trigger added by that call will be removed.
352
353         @raise ValueError: If the trigger associated with C{handle} has already
354             been removed or if C{handle} is not a valid handle.
355         """
356         return getattr(self, 'removeTrigger_' + self.state)(handle)
357
358
359     def removeTrigger_BASE(self, handle):
360         """
361         Just try to remove the trigger.
362
363         @see: removeTrigger
364         """
365         try:
366             phase, callable, args, kwargs = handle
367         except (TypeError, ValueError):
368             raise ValueError("invalid trigger handle")
369         else:
370             if phase not in ('before', 'during', 'after'):
371                 raise KeyError("invalid phase")
372             getattr(self, phase).remove((callable, args, kwargs))
373
374
375     def removeTrigger_BEFORE(self, handle):
376         """
377         Remove the trigger if it has yet to be executed, otherwise emit a
378         warning that in the future an exception will be raised when removing an
379         already-executed trigger.
380
381         @see: removeTrigger
382         """
383         phase, callable, args, kwargs = handle
384         if phase != 'before':
385             return self.removeTrigger_BASE(handle)
386         if (callable, args, kwargs) in self.finishedBefore:
387             warnings.warn(
388                 "Removing already-fired system event triggers will raise an "
389                 "exception in a future version of Twisted.",
390                 category=DeprecationWarning,
391                 stacklevel=3)
392         else:
393             self.removeTrigger_BASE(handle)
394
395
396     def fireEvent(self):
397         """
398         Call the triggers added to this event.
399         """
400         self.state = 'BEFORE'
401         self.finishedBefore = []
402         beforeResults = []
403         while self.before:
404             callable, args, kwargs = self.before.pop(0)
405             self.finishedBefore.append((callable, args, kwargs))
406             try:
407                 result = callable(*args, **kwargs)
408             except:
409                 log.err()
410             else:
411                 if isinstance(result, Deferred):
412                     beforeResults.append(result)
413         DeferredList(beforeResults).addCallback(self._continueFiring)
414
415
416     def _continueFiring(self, ignored):
417         """
418         Call the during and after phase triggers for this event.
419         """
420         self.state = 'BASE'
421         self.finishedBefore = []
422         for phase in self.during, self.after:
423             while phase:
424                 callable, args, kwargs = phase.pop(0)
425                 try:
426                     callable(*args, **kwargs)
427                 except:
428                     log.err()
429
430
431
432 class ReactorBase(object):
433     """
434     Default base class for Reactors.
435
436     @type _stopped: C{bool}
437     @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
438         and C{reactor.stop}.  This should be replaced with an explicit state
439         machine.
440
441     @type _justStopped: C{bool}
442     @ivar _justStopped: A flag which is true between the time C{reactor.stop}
443         is called and the time the shutdown system event is fired.  This is
444         used to determine whether that event should be fired after each
445         iteration through the mainloop.  This should be replaced with an
446         explicit state machine.
447
448     @type _started: C{bool}
449     @ivar _started: A flag which is true from the time C{reactor.run} is called
450         until the time C{reactor.run} returns.  This is used to prevent calls
451         to C{reactor.run} on a running reactor.  This should be replaced with
452         an explicit state machine.
453
454     @ivar running: See L{IReactorCore.running}
455
456     @ivar _registerAsIOThread: A flag controlling whether the reactor will
457         register the thread it is running in as the I/O thread when it starts.
458         If C{True}, registration will be done, otherwise it will not be.
459     """
460     implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
461
462     _registerAsIOThread = True
463
464     _stopped = True
465     installed = False
466     usingThreads = False
467     resolver = BlockingResolver()
468
469     __name__ = "twisted.internet.reactor"
470
471     def __init__(self):
472         self.threadCallQueue = []
473         self._eventTriggers = {}
474         self._pendingTimedCalls = []
475         self._newTimedCalls = []
476         self._cancellations = 0
477         self.running = False
478         self._started = False
479         self._justStopped = False
480         self._startedBefore = False
481         # reactor internal readers, e.g. the waker.
482         self._internalReaders = set()
483         self.waker = None
484
485         # Arrange for the running attribute to change to True at the right time
486         # and let a subclass possibly do other things at that time (eg install
487         # signal handlers).
488         self.addSystemEventTrigger(
489             'during', 'startup', self._reallyStartRunning)
490         self.addSystemEventTrigger('during', 'shutdown', self.crash)
491         self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
492
493         if platform.supportsThreads():
494             self._initThreads()
495         self.installWaker()
496
497     # override in subclasses
498
499     _lock = None
500
501     def installWaker(self):
502         raise NotImplementedError(
503             reflect.qual(self.__class__) + " did not implement installWaker")
504
505     def installResolver(self, resolver):
506         assert IResolverSimple.providedBy(resolver)
507         oldResolver = self.resolver
508         self.resolver = resolver
509         return oldResolver
510
511     def wakeUp(self):
512         """
513         Wake up the event loop.
514         """
515         if self.waker:
516             self.waker.wakeUp()
517         # if the waker isn't installed, the reactor isn't running, and
518         # therefore doesn't need to be woken up
519
520     def doIteration(self, delay):
521         """
522         Do one iteration over the readers and writers which have been added.
523         """
524         raise NotImplementedError(
525             reflect.qual(self.__class__) + " did not implement doIteration")
526
527     def addReader(self, reader):
528         raise NotImplementedError(
529             reflect.qual(self.__class__) + " did not implement addReader")
530
531     def addWriter(self, writer):
532         raise NotImplementedError(
533             reflect.qual(self.__class__) + " did not implement addWriter")
534
535     def removeReader(self, reader):
536         raise NotImplementedError(
537             reflect.qual(self.__class__) + " did not implement removeReader")
538
539     def removeWriter(self, writer):
540         raise NotImplementedError(
541             reflect.qual(self.__class__) + " did not implement removeWriter")
542
543     def removeAll(self):
544         raise NotImplementedError(
545             reflect.qual(self.__class__) + " did not implement removeAll")
546
547
548     def getReaders(self):
549         raise NotImplementedError(
550             reflect.qual(self.__class__) + " did not implement getReaders")
551
552
553     def getWriters(self):
554         raise NotImplementedError(
555             reflect.qual(self.__class__) + " did not implement getWriters")
556
557
558     def resolve(self, name, timeout = (1, 3, 11, 45)):
559         """Return a Deferred that will resolve a hostname.
560         """
561         if not name:
562             # XXX - This is *less than* '::', and will screw up IPv6 servers
563             return defer.succeed('0.0.0.0')
564         if abstract.isIPAddress(name):
565             return defer.succeed(name)
566         return self.resolver.getHostByName(name, timeout)
567
568     # Installation.
569
570     # IReactorCore
571     def stop(self):
572         """
573         See twisted.internet.interfaces.IReactorCore.stop.
574         """
575         if self._stopped:
576             raise error.ReactorNotRunning(
577                 "Can't stop reactor that isn't running.")
578         self._stopped = True
579         self._justStopped = True
580         self._startedBefore = True
581
582
583     def crash(self):
584         """
585         See twisted.internet.interfaces.IReactorCore.crash.
586
587         Reset reactor state tracking attributes and re-initialize certain
588         state-transition helpers which were set up in C{__init__} but later
589         destroyed (through use).
590         """
591         self._started = False
592         self.running = False
593         self.addSystemEventTrigger(
594             'during', 'startup', self._reallyStartRunning)
595
596     def sigInt(self, *args):
597         """Handle a SIGINT interrupt.
598         """
599         log.msg("Received SIGINT, shutting down.")
600         self.callFromThread(self.stop)
601
602     def sigBreak(self, *args):
603         """Handle a SIGBREAK interrupt.
604         """
605         log.msg("Received SIGBREAK, shutting down.")
606         self.callFromThread(self.stop)
607
608     def sigTerm(self, *args):
609         """Handle a SIGTERM interrupt.
610         """
611         log.msg("Received SIGTERM, shutting down.")
612         self.callFromThread(self.stop)
613
614     def disconnectAll(self):
615         """Disconnect every reader, and writer in the system.
616         """
617         selectables = self.removeAll()
618         for reader in selectables:
619             log.callWithLogger(reader,
620                                reader.connectionLost,
621                                failure.Failure(main.CONNECTION_LOST))
622
623
624     def iterate(self, delay=0):
625         """See twisted.internet.interfaces.IReactorCore.iterate.
626         """
627         self.runUntilCurrent()
628         self.doIteration(delay)
629
630
631     def fireSystemEvent(self, eventType):
632         """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
633         """
634         event = self._eventTriggers.get(eventType)
635         if event is not None:
636             event.fireEvent()
637
638
639     def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
640         """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
641         """
642         assert callable(_f), "%s is not callable" % _f
643         if _eventType not in self._eventTriggers:
644             self._eventTriggers[_eventType] = _ThreePhaseEvent()
645         return (_eventType, self._eventTriggers[_eventType].addTrigger(
646             _phase, _f, *args, **kw))
647
648
649     def removeSystemEventTrigger(self, triggerID):
650         """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
651         """
652         eventType, handle = triggerID
653         self._eventTriggers[eventType].removeTrigger(handle)
654
655
656     def callWhenRunning(self, _callable, *args, **kw):
657         """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
658         """
659         if self.running:
660             _callable(*args, **kw)
661         else:
662             return self.addSystemEventTrigger('after', 'startup',
663                                               _callable, *args, **kw)
664
665     def startRunning(self):
666         """
667         Method called when reactor starts: do some initialization and fire
668         startup events.
669
670         Don't call this directly, call reactor.run() instead: it should take
671         care of calling this.
672
673         This method is somewhat misnamed.  The reactor will not necessarily be
674         in the running state by the time this method returns.  The only
675         guarantee is that it will be on its way to the running state.
676         """
677         if self._started:
678             raise error.ReactorAlreadyRunning()
679         if self._startedBefore:
680             raise error.ReactorNotRestartable()
681         self._started = True
682         self._stopped = False
683         if self._registerAsIOThread:
684             threadable.registerAsIOThread()
685         self.fireSystemEvent('startup')
686
687
688     def _reallyStartRunning(self):
689         """
690         Method called to transition to the running state.  This should happen
691         in the I{during startup} event trigger phase.
692         """
693         self.running = True
694
695     # IReactorTime
696
697     seconds = staticmethod(runtimeSeconds)
698
699     def callLater(self, _seconds, _f, *args, **kw):
700         """See twisted.internet.interfaces.IReactorTime.callLater.
701         """
702         assert callable(_f), "%s is not callable" % _f
703         assert sys.maxint >= _seconds >= 0, \
704                "%s is not greater than or equal to 0 seconds" % (_seconds,)
705         tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
706                            self._cancelCallLater,
707                            self._moveCallLaterSooner,
708                            seconds=self.seconds)
709         self._newTimedCalls.append(tple)
710         return tple
711
712     def _moveCallLaterSooner(self, tple):
713         # Linear time find: slow.
714         heap = self._pendingTimedCalls
715         try:
716             pos = heap.index(tple)
717
718             # Move elt up the heap until it rests at the right place.
719             elt = heap[pos]
720             while pos != 0:
721                 parent = (pos-1) // 2
722                 if heap[parent] <= elt:
723                     break
724                 # move parent down
725                 heap[pos] = heap[parent]
726                 pos = parent
727             heap[pos] = elt
728         except ValueError:
729             # element was not found in heap - oh well...
730             pass
731
732     def _cancelCallLater(self, tple):
733         self._cancellations+=1
734
735
736     def getDelayedCalls(self):
737         """Return all the outstanding delayed calls in the system.
738         They are returned in no particular order.
739         This method is not efficient -- it is really only meant for
740         test cases."""
741         return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
742
743     def _insertNewDelayedCalls(self):
744         for call in self._newTimedCalls:
745             if call.cancelled:
746                 self._cancellations-=1
747             else:
748                 call.activate_delay()
749                 heappush(self._pendingTimedCalls, call)
750         self._newTimedCalls = []
751
752     def timeout(self):
753         # insert new delayed calls to make sure to include them in timeout value
754         self._insertNewDelayedCalls()
755
756         if not self._pendingTimedCalls:
757             return None
758
759         return max(0, self._pendingTimedCalls[0].time - self.seconds())
760
761
762     def runUntilCurrent(self):
763         """Run all pending timed calls.
764         """
765         if self.threadCallQueue:
766             # Keep track of how many calls we actually make, as we're
767             # making them, in case another call is added to the queue
768             # while we're in this loop.
769             count = 0
770             total = len(self.threadCallQueue)
771             for (f, a, kw) in self.threadCallQueue:
772                 try:
773                     f(*a, **kw)
774                 except:
775                     log.err()
776                 count += 1
777                 if count == total:
778                     break
779             del self.threadCallQueue[:count]
780             if self.threadCallQueue:
781                 self.wakeUp()
782
783         # insert new delayed calls now
784         self._insertNewDelayedCalls()
785
786         now = self.seconds()
787         while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
788             call = heappop(self._pendingTimedCalls)
789             if call.cancelled:
790                 self._cancellations-=1
791                 continue
792
793             if call.delayed_time > 0:
794                 call.activate_delay()
795                 heappush(self._pendingTimedCalls, call)
796                 continue
797
798             try:
799                 call.called = 1
800                 call.func(*call.args, **call.kw)
801             except:
802                 log.deferr()
803                 if hasattr(call, "creator"):
804                     e = "\n"
805                     e += " C: previous exception occurred in " + \
806                          "a DelayedCall created here:\n"
807                     e += " C:"
808                     e += "".join(call.creator).rstrip().replace("\n","\n C:")
809                     e += "\n"
810                     log.msg(e)
811
812
813         if (self._cancellations > 50 and
814              self._cancellations > len(self._pendingTimedCalls) >> 1):
815             self._cancellations = 0
816             self._pendingTimedCalls = [x for x in self._pendingTimedCalls
817                                        if not x.cancelled]
818             heapify(self._pendingTimedCalls)
819
820         if self._justStopped:
821             self._justStopped = False
822             self.fireSystemEvent("shutdown")
823
824     # IReactorProcess
825
826     def _checkProcessArgs(self, args, env):
827         """
828         Check for valid arguments and environment to spawnProcess.
829
830         @return: A two element tuple giving values to use when creating the
831         process.  The first element of the tuple is a C{list} of C{str}
832         giving the values for argv of the child process.  The second element
833         of the tuple is either C{None} if C{env} was C{None} or a C{dict}
834         mapping C{str} environment keys to C{str} environment values.
835         """
836         # Any unicode string which Python would successfully implicitly
837         # encode to a byte string would have worked before these explicit
838         # checks were added.  Anything which would have failed with a
839         # UnicodeEncodeError during that implicit encoding step would have
840         # raised an exception in the child process and that would have been
841         # a pain in the butt to debug.
842         #
843         # So, we will explicitly attempt the same encoding which Python
844         # would implicitly do later.  If it fails, we will report an error
845         # without ever spawning a child process.  If it succeeds, we'll save
846         # the result so that Python doesn't need to do it implicitly later.
847         #
848         # For any unicode which we can actually encode, we'll also issue a
849         # deprecation warning, because no one should be passing unicode here
850         # anyway.
851         #
852         # -exarkun
853         defaultEncoding = sys.getdefaultencoding()
854
855         # Common check function
856         def argChecker(arg):
857             """
858             Return either a str or None.  If the given value is not
859             allowable for some reason, None is returned.  Otherwise, a
860             possibly different object which should be used in place of arg
861             is returned.  This forces unicode encoding to happen now, rather
862             than implicitly later.
863             """
864             if isinstance(arg, unicode):
865                 try:
866                     arg = arg.encode(defaultEncoding)
867                 except UnicodeEncodeError:
868                     return None
869                 warnings.warn(
870                     "Argument strings and environment keys/values passed to "
871                     "reactor.spawnProcess should be str, not unicode.",
872                     category=DeprecationWarning,
873                     stacklevel=4)
874             if isinstance(arg, str) and '\0' not in arg:
875                 return arg
876             return None
877
878         # Make a few tests to check input validity
879         if not isinstance(args, (tuple, list)):
880             raise TypeError("Arguments must be a tuple or list")
881
882         outputArgs = []
883         for arg in args:
884             arg = argChecker(arg)
885             if arg is None:
886                 raise TypeError("Arguments contain a non-string value")
887             else:
888                 outputArgs.append(arg)
889
890         outputEnv = None
891         if env is not None:
892             outputEnv = {}
893             for key, val in env.iteritems():
894                 key = argChecker(key)
895                 if key is None:
896                     raise TypeError("Environment contains a non-string key")
897                 val = argChecker(val)
898                 if val is None:
899                     raise TypeError("Environment contains a non-string value")
900                 outputEnv[key] = val
901         return outputArgs, outputEnv
902
903     # IReactorThreads
904     if platform.supportsThreads():
905         threadpool = None
906         # ID of the trigger starting the threadpool
907         _threadpoolStartupID = None
908         # ID of the trigger stopping the threadpool
909         threadpoolShutdownID = None
910
911         def _initThreads(self):
912             self.usingThreads = True
913             self.resolver = ThreadedResolver(self)
914
915         def callFromThread(self, f, *args, **kw):
916             """
917             See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
918             """
919             assert callable(f), "%s is not callable" % (f,)
920             # lists are thread-safe in CPython, but not in Jython
921             # this is probably a bug in Jython, but until fixed this code
922             # won't work in Jython.
923             self.threadCallQueue.append((f, args, kw))
924             self.wakeUp()
925
926         def _initThreadPool(self):
927             """
928             Create the threadpool accessible with callFromThread.
929             """
930             from twisted.python import threadpool
931             self.threadpool = threadpool.ThreadPool(
932                 0, 10, 'twisted.internet.reactor')
933             self._threadpoolStartupID = self.callWhenRunning(
934                 self.threadpool.start)
935             self.threadpoolShutdownID = self.addSystemEventTrigger(
936                 'during', 'shutdown', self._stopThreadPool)
937
938         def _uninstallHandler(self):
939             pass
940
941         def _stopThreadPool(self):
942             """
943             Stop the reactor threadpool.  This method is only valid if there
944             is currently a threadpool (created by L{_initThreadPool}).  It
945             is not intended to be called directly; instead, it will be
946             called by a shutdown trigger created in L{_initThreadPool}.
947             """
948             triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
949             for trigger in filter(None, triggers):
950                 try:
951                     self.removeSystemEventTrigger(trigger)
952                 except ValueError:
953                     pass
954             self._threadpoolStartupID = None
955             self.threadpoolShutdownID = None
956             self.threadpool.stop()
957             self.threadpool = None
958
959
960         def getThreadPool(self):
961             """
962             See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
963             """
964             if self.threadpool is None:
965                 self._initThreadPool()
966             return self.threadpool
967
968
969         def callInThread(self, _callable, *args, **kwargs):
970             """
971             See L{twisted.internet.interfaces.IReactorThreads.callInThread}.
972             """
973             self.getThreadPool().callInThread(_callable, *args, **kwargs)
974
975         def suggestThreadPoolSize(self, size):
976             """
977             See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
978             """
979             self.getThreadPool().adjustPoolsize(maxthreads=size)
980     else:
981         # This is for signal handlers.
982         def callFromThread(self, f, *args, **kw):
983             assert callable(f), "%s is not callable" % (f,)
984             # See comment in the other callFromThread implementation.
985             self.threadCallQueue.append((f, args, kw))
986
987 if platform.supportsThreads():
988     classImplements(ReactorBase, IReactorThreads)
989
990
991 class BaseConnector(styles.Ephemeral):
992     """Basic implementation of connector.
993
994     State can be: "connecting", "connected", "disconnected"
995     """
996
997     implements(IConnector)
998
999     timeoutID = None
1000     factoryStarted = 0
1001
1002     def __init__(self, factory, timeout, reactor):
1003         self.state = "disconnected"
1004         self.reactor = reactor
1005         self.factory = factory
1006         self.timeout = timeout
1007
1008     def disconnect(self):
1009         """Disconnect whatever our state is."""
1010         if self.state == 'connecting':
1011             self.stopConnecting()
1012         elif self.state == 'connected':
1013             self.transport.loseConnection()
1014
1015     def connect(self):
1016         """Start connection to remote server."""
1017         if self.state != "disconnected":
1018             raise RuntimeError, "can't connect in this state"
1019
1020         self.state = "connecting"
1021         if not self.factoryStarted:
1022             self.factory.doStart()
1023             self.factoryStarted = 1
1024         self.transport = transport = self._makeTransport()
1025         if self.timeout is not None:
1026             self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
1027         self.factory.startedConnecting(self)
1028
1029     def stopConnecting(self):
1030         """Stop attempting to connect."""
1031         if self.state != "connecting":
1032             raise error.NotConnectingError, "we're not trying to connect"
1033
1034         self.state = "disconnected"
1035         self.transport.failIfNotConnected(error.UserError())
1036         del self.transport
1037
1038     def cancelTimeout(self):
1039         if self.timeoutID is not None:
1040             try:
1041                 self.timeoutID.cancel()
1042             except ValueError:
1043                 pass
1044             del self.timeoutID
1045
1046     def buildProtocol(self, addr):
1047         self.state = "connected"
1048         self.cancelTimeout()
1049         return self.factory.buildProtocol(addr)
1050
1051     def connectionFailed(self, reason):
1052         self.cancelTimeout()
1053         self.transport = None
1054         self.state = "disconnected"
1055         self.factory.clientConnectionFailed(self, reason)
1056         if self.state == "disconnected":
1057             # factory hasn't called our connect() method
1058             self.factory.doStop()
1059             self.factoryStarted = 0
1060
1061     def connectionLost(self, reason):
1062         self.state = "disconnected"
1063         self.factory.clientConnectionLost(self, reason)
1064         if self.state == "disconnected":
1065             # factory hasn't called our connect() method
1066             self.factory.doStop()
1067             self.factoryStarted = 0
1068
1069     def getDestination(self):
1070         raise NotImplementedError(
1071             reflect.qual(self.__class__) + " did not implement "
1072             "getDestination")
1073
1074
1075
1076 class BasePort(abstract.FileDescriptor):
1077     """Basic implementation of a ListeningPort.
1078
1079     Note: This does not actually implement IListeningPort.
1080     """
1081
1082     addressFamily = None
1083     socketType = None
1084
1085     def createInternetSocket(self):
1086         s = socket.socket(self.addressFamily, self.socketType)
1087         s.setblocking(0)
1088         fdesc._setCloseOnExec(s.fileno())
1089         return s
1090
1091
1092     def doWrite(self):
1093         """Raises a RuntimeError"""
1094         raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)
1095
1096
1097
1098 class _SignalReactorMixin(object):
1099     """
1100     Private mixin to manage signals: it installs signal handlers at start time,
1101     and define run method.
1102
1103     It can only be used mixed in with L{ReactorBase}, and has to be defined
1104     first in the inheritance (so that method resolution order finds
1105     startRunning first).
1106
1107     @type _installSignalHandlers: C{bool}
1108     @ivar _installSignalHandlers: A flag which indicates whether any signal
1109         handlers will be installed during startup.  This includes handlers for
1110         SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
1111         to stop the reactor.
1112     """
1113
1114     _installSignalHandlers = False
1115
1116     def _handleSignals(self):
1117         """
1118         Install the signal handlers for the Twisted event loop.
1119         """
1120         try:
1121             import signal
1122         except ImportError:
1123             log.msg("Warning: signal module unavailable -- "
1124                     "not installing signal handlers.")
1125             return
1126
1127         if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
1128             # only handle if there isn't already a handler, e.g. for Pdb.
1129             signal.signal(signal.SIGINT, self.sigInt)
1130         signal.signal(signal.SIGTERM, self.sigTerm)
1131
1132         # Catch Ctrl-Break in windows
1133         if hasattr(signal, "SIGBREAK"):
1134             signal.signal(signal.SIGBREAK, self.sigBreak)
1135
1136
1137     def startRunning(self, installSignalHandlers=True):
1138         """
1139         Extend the base implementation in order to remember whether signal
1140         handlers should be installed later.
1141
1142         @type installSignalHandlers: C{bool}
1143         @param installSignalHandlers: A flag which, if set, indicates that
1144             handlers for a number of (implementation-defined) signals should be
1145             installed during startup.
1146         """
1147         self._installSignalHandlers = installSignalHandlers
1148         ReactorBase.startRunning(self)
1149
1150
1151     def _reallyStartRunning(self):
1152         """
1153         Extend the base implementation by also installing signal handlers, if
1154         C{self._installSignalHandlers} is true.
1155         """
1156         ReactorBase._reallyStartRunning(self)
1157         if self._installSignalHandlers:
1158             # Make sure this happens before after-startup events, since the
1159             # expectation of after-startup is that the reactor is fully
1160             # initialized.  Don't do it right away for historical reasons
1161             # (perhaps some before-startup triggers don't want there to be a
1162             # custom SIGCHLD handler so that they can run child processes with
1163             # some blocking api).
1164             self._handleSignals()
1165
1166
1167     def run(self, installSignalHandlers=True):
1168         self.startRunning(installSignalHandlers=installSignalHandlers)
1169         self.mainLoop()
1170
1171
1172     def mainLoop(self):
1173         while self._started:
1174             try:
1175                 while self._started:
1176                     # Advance simulation time in delayed event
1177                     # processors.
1178                     self.runUntilCurrent()
1179                     t2 = self.timeout()
1180                     t = self.running and t2
1181                     self.doIteration(t)
1182             except:
1183                 log.msg("Unexpected error in main loop.")
1184                 log.err()
1185             else:
1186                 log.msg('Main loop terminated.')
1187
1188
1189
1190 __all__ = []