Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / spread / pb.py
1 # -*- test-case-name: twisted.test.test_pb -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Perspective Broker
7
8 \"This isn\'t a professional opinion, but it's probably got enough
9 internet to kill you.\" --glyph
10
11 Introduction
12 ============
13
14 This is a broker for proxies for and copies of objects.  It provides a
15 translucent interface layer to those proxies.
16
17 The protocol is not opaque, because it provides objects which represent the
18 remote proxies and require no context (server references, IDs) to operate on.
19
20 It is not transparent because it does I{not} attempt to make remote objects
21 behave identically, or even similiarly, to local objects.  Method calls are
22 invoked asynchronously, and specific rules are applied when serializing
23 arguments.
24
25 To get started, begin with L{PBClientFactory} and L{PBServerFactory}.
26
27 @author: Glyph Lefkowitz
28 """
29
30 import random
31 import types
32
33 from zope.interface import implements, Interface
34
35 # Twisted Imports
36 from twisted.python import log, failure, reflect
37 from twisted.python.hashlib import md5
38 from twisted.internet import defer, protocol
39 from twisted.cred.portal import Portal
40 from twisted.cred.credentials import IAnonymous, ICredentials
41 from twisted.cred.credentials import IUsernameHashedPassword, Anonymous
42 from twisted.persisted import styles
43 from twisted.python.components import registerAdapter
44
45 from twisted.spread.interfaces import IJellyable, IUnjellyable
46 from twisted.spread.jelly import jelly, unjelly, globalSecurity
47 from twisted.spread import banana
48
49 from twisted.spread.flavors import Serializable
50 from twisted.spread.flavors import Referenceable, NoSuchMethod
51 from twisted.spread.flavors import Root, IPBRoot
52 from twisted.spread.flavors import ViewPoint
53 from twisted.spread.flavors import Viewable
54 from twisted.spread.flavors import Copyable
55 from twisted.spread.flavors import Jellyable
56 from twisted.spread.flavors import Cacheable
57 from twisted.spread.flavors import RemoteCopy
58 from twisted.spread.flavors import RemoteCache
59 from twisted.spread.flavors import RemoteCacheObserver
60 from twisted.spread.flavors import copyTags
61
62 from twisted.spread.flavors import setUnjellyableForClass
63 from twisted.spread.flavors import setUnjellyableFactoryForClass
64 from twisted.spread.flavors import setUnjellyableForClassTree
65 # These three are backwards compatibility aliases for the previous three.
66 # Ultimately they should be deprecated. -exarkun
67 from twisted.spread.flavors import setCopierForClass
68 from twisted.spread.flavors import setFactoryForClass
69 from twisted.spread.flavors import setCopierForClassTree
70
71
72 MAX_BROKER_REFS = 1024
73
74 portno = 8787
75
76
77
78 class ProtocolError(Exception):
79     """
80     This error is raised when an invalid protocol statement is received.
81     """
82
83
84
85 class DeadReferenceError(ProtocolError):
86     """
87     This error is raised when a method is called on a dead reference (one whose
88     broker has been disconnected).
89     """
90
91
92
93 class Error(Exception):
94     """
95     This error can be raised to generate known error conditions.
96
97     When a PB callable method (perspective_, remote_, view_) raises
98     this error, it indicates that a traceback should not be printed,
99     but instead, the string representation of the exception should be
100     sent.
101     """
102
103
104
105 class RemoteError(Exception):
106     """
107     This class is used to wrap a string-ified exception from the remote side to
108     be able to reraise it. (Raising string exceptions is no longer possible in
109     Python 2.6+)
110
111     The value of this exception will be a str() representation of the remote
112     value.
113
114     @ivar remoteType: The full import path of the exception class which was
115         raised on the remote end.
116     @type remoteType: C{str}
117
118     @ivar remoteTraceback: The remote traceback.
119     @type remoteTraceback: C{str}
120
121     @note: It's not possible to include the remoteTraceback if this exception is
122         thrown into a generator. It must be accessed as an attribute.
123     """
124     def __init__(self, remoteType, value, remoteTraceback):
125         Exception.__init__(self, value)
126         self.remoteType = remoteType
127         self.remoteTraceback = remoteTraceback
128
129
130
131 class RemoteMethod:
132     """
133     This is a translucent reference to a remote message.
134     """
135     def __init__(self, obj, name):
136         """
137         Initialize with a L{RemoteReference} and the name of this message.
138         """
139         self.obj = obj
140         self.name = name
141
142
143     def __cmp__(self, other):
144         return cmp((self.obj, self.name), other)
145
146
147     def __hash__(self):
148         return hash((self.obj, self.name))
149
150
151     def __call__(self, *args, **kw):
152         """
153         Asynchronously invoke a remote method.
154         """
155         return self.obj.broker._sendMessage('',self.obj.perspective,
156             self.obj.luid, self.name, args, kw)
157
158
159
160 class PBConnectionLost(Exception):
161     pass
162
163
164
165 class IPerspective(Interface):
166     """
167     per*spec*tive, n. : The relationship of aspects of a subject to each
168     other and to a whole: 'a perspective of history'; 'a need to view
169     the problem in the proper perspective'.
170
171     This is a Perspective Broker-specific wrapper for an avatar. That
172     is to say, a PB-published view on to the business logic for the
173     system's concept of a 'user'.
174
175     The concept of attached/detached is no longer implemented by the
176     framework. The realm is expected to implement such semantics if
177     needed.
178     """
179
180     def perspectiveMessageReceived(broker, message, args, kwargs):
181         """
182         This method is called when a network message is received.
183
184         @arg broker: The Perspective Broker.
185
186         @type message: str
187         @arg message: The name of the method called by the other end.
188
189         @type args: list in jelly format
190         @arg args: The arguments that were passed by the other end. It
191                    is recommend that you use the `unserialize' method of the
192                    broker to decode this.
193
194         @type kwargs: dict in jelly format
195         @arg kwargs: The keyword arguments that were passed by the
196                      other end.  It is recommended that you use the
197                      `unserialize' method of the broker to decode this.
198
199         @rtype: A jelly list.
200         @return: It is recommended that you use the `serialize' method
201                  of the broker on whatever object you need to return to
202                  generate the return value.
203         """
204
205
206
207 class Avatar:
208     """
209     A default IPerspective implementor.
210
211     This class is intended to be subclassed, and a realm should return
212     an instance of such a subclass when IPerspective is requested of
213     it.
214
215     A peer requesting a perspective will receive only a
216     L{RemoteReference} to a pb.Avatar.  When a method is called on
217     that L{RemoteReference}, it will translate to a method on the
218     remote perspective named 'perspective_methodname'.  (For more
219     information on invoking methods on other objects, see
220     L{flavors.ViewPoint}.)
221     """
222
223     implements(IPerspective)
224
225     def perspectiveMessageReceived(self, broker, message, args, kw):
226         """
227         This method is called when a network message is received.
228
229         This will call::
230
231             self.perspective_%(message)s(*broker.unserialize(args),
232                                          **broker.unserialize(kw))
233
234         to handle the method; subclasses of Avatar are expected to
235         implement methods using this naming convention.
236         """
237
238         args = broker.unserialize(args, self)
239         kw = broker.unserialize(kw, self)
240         method = getattr(self, "perspective_%s" % message)
241         try:
242             state = method(*args, **kw)
243         except TypeError:
244             log.msg("%s didn't accept %s and %s" % (method, args, kw))
245             raise
246         return broker.serialize(state, self, method, args, kw)
247
248
249
250 class AsReferenceable(Referenceable):
251     """
252     A reference directed towards another object.
253     """
254
255     def __init__(self, object, messageType="remote"):
256         self.remoteMessageReceived = getattr(
257             object, messageType + "MessageReceived")
258
259
260
261 class RemoteReference(Serializable, styles.Ephemeral):
262     """
263     A translucent reference to a remote object.
264
265     I may be a reference to a L{flavors.ViewPoint}, a
266     L{flavors.Referenceable}, or an L{IPerspective} implementor (e.g.,
267     pb.Avatar).  From the client's perspective, it is not possible to
268     tell which except by convention.
269
270     I am a \"translucent\" reference because although no additional
271     bookkeeping overhead is given to the application programmer for
272     manipulating a reference, return values are asynchronous.
273
274     See also L{twisted.internet.defer}.
275
276     @ivar broker: The broker I am obtained through.
277     @type broker: L{Broker}
278     """
279
280     implements(IUnjellyable)
281
282     def __init__(self, perspective, broker, luid, doRefCount):
283         """(internal) Initialize me with a broker and a locally-unique ID.
284
285         The ID is unique only to the particular Perspective Broker
286         instance.
287         """
288         self.luid = luid
289         self.broker = broker
290         self.doRefCount = doRefCount
291         self.perspective = perspective
292         self.disconnectCallbacks = []
293
294     def notifyOnDisconnect(self, callback):
295         """Register a callback to be called if our broker gets disconnected.
296
297         This callback will be called with one argument, this instance.
298         """
299         assert callable(callback)
300         self.disconnectCallbacks.append(callback)
301         if len(self.disconnectCallbacks) == 1:
302             self.broker.notifyOnDisconnect(self._disconnected)
303
304     def dontNotifyOnDisconnect(self, callback):
305         """Remove a callback that was registered with notifyOnDisconnect."""
306         self.disconnectCallbacks.remove(callback)
307         if not self.disconnectCallbacks:
308             self.broker.dontNotifyOnDisconnect(self._disconnected)
309
310     def _disconnected(self):
311         """Called if we are disconnected and have callbacks registered."""
312         for callback in self.disconnectCallbacks:
313             callback(self)
314         self.disconnectCallbacks = None
315
316     def jellyFor(self, jellier):
317         """If I am being sent back to where I came from, serialize as a local backreference.
318         """
319         if jellier.invoker:
320             assert self.broker == jellier.invoker, "Can't send references to brokers other than their own."
321             return "local", self.luid
322         else:
323             return "unpersistable", "References cannot be serialized"
324
325     def unjellyFor(self, unjellier, unjellyList):
326         self.__init__(unjellier.invoker.unserializingPerspective, unjellier.invoker, unjellyList[1], 1)
327         return self
328
329     def callRemote(self, _name, *args, **kw):
330         """Asynchronously invoke a remote method.
331
332         @type _name: C{str}
333         @param _name:  the name of the remote method to invoke
334         @param args: arguments to serialize for the remote function
335         @param kw:  keyword arguments to serialize for the remote function.
336         @rtype:   L{twisted.internet.defer.Deferred}
337         @returns: a Deferred which will be fired when the result of
338                   this remote call is received.
339         """
340         # note that we use '_name' instead of 'name' so the user can call
341         # remote methods with 'name' as a keyword parameter, like this:
342         #  ref.callRemote("getPeopleNamed", count=12, name="Bob")
343
344         return self.broker._sendMessage('',self.perspective, self.luid,
345                                         _name, args, kw)
346
347     def remoteMethod(self, key):
348         """Get a L{RemoteMethod} for this key.
349         """
350         return RemoteMethod(self, key)
351
352     def __cmp__(self,other):
353         """Compare me [to another L{RemoteReference}].
354         """
355         if isinstance(other, RemoteReference):
356             if other.broker == self.broker:
357                 return cmp(self.luid, other.luid)
358         return cmp(self.broker, other)
359
360     def __hash__(self):
361         """Hash me.
362         """
363         return self.luid
364
365     def __del__(self):
366         """Do distributed reference counting on finalization.
367         """
368         if self.doRefCount:
369             self.broker.sendDecRef(self.luid)
370
371 setUnjellyableForClass("remote", RemoteReference)
372
373 class Local:
374     """(internal) A reference to a local object.
375     """
376
377     def __init__(self, object, perspective=None):
378         """Initialize.
379         """
380         self.object = object
381         self.perspective = perspective
382         self.refcount = 1
383
384     def __repr__(self):
385         return "<pb.Local %r ref:%s>" % (self.object, self.refcount)
386
387     def incref(self):
388         """Increment and return my reference count.
389         """
390         self.refcount = self.refcount + 1
391         return self.refcount
392
393     def decref(self):
394         """Decrement and return my reference count.
395         """
396         self.refcount = self.refcount - 1
397         return self.refcount
398
399
400 ##
401 # Failure
402 ##
403
404 class CopyableFailure(failure.Failure, Copyable):
405     """
406     A L{flavors.RemoteCopy} and L{flavors.Copyable} version of
407     L{twisted.python.failure.Failure} for serialization.
408     """
409
410     unsafeTracebacks = 0
411
412     def getStateToCopy(self):
413         """
414         Collect state related to the exception which occurred, discarding
415         state which cannot reasonably be serialized.
416         """
417         state = self.__dict__.copy()
418         state['tb'] = None
419         state['frames'] = []
420         state['stack'] = []
421         state['value'] = str(self.value) # Exception instance
422         if isinstance(self.type, str):
423             state['type'] = self.type
424         else:
425             state['type'] = reflect.qual(self.type) # Exception class
426         if self.unsafeTracebacks:
427             state['traceback'] = self.getTraceback()
428         else:
429             state['traceback'] = 'Traceback unavailable\n'
430         return state
431
432
433
434 class CopiedFailure(RemoteCopy, failure.Failure):
435     """
436     A L{CopiedFailure} is a L{pb.RemoteCopy} of a L{failure.Failure}
437     transfered via PB.
438
439     @ivar type: The full import path of the exception class which was raised on
440         the remote end.
441     @type type: C{str}
442
443     @ivar value: A str() representation of the remote value.
444     @type value: L{CopiedFailure} or C{str}
445
446     @ivar traceback: The remote traceback.
447     @type traceback: C{str}
448     """
449
450     def printTraceback(self, file=None, elideFrameworkCode=0, detail='default'):
451         if file is None:
452             file = log.logfile
453         file.write("Traceback from remote host -- ")
454         file.write(self.traceback)
455         file.write(self.type + ": " + self.value)
456         file.write('\n')
457
458
459     def throwExceptionIntoGenerator(self, g):
460         """
461         Throw the original exception into the given generator, preserving
462         traceback information if available. In the case of a L{CopiedFailure}
463         where the exception type is a string, a L{pb.RemoteError} is thrown
464         instead.
465
466         @return: The next value yielded from the generator.
467         @raise StopIteration: If there are no more values in the generator.
468         @raise RemoteError: The wrapped remote exception.
469         """
470         return g.throw(RemoteError(self.type, self.value, self.traceback))
471
472     printBriefTraceback = printTraceback
473     printDetailedTraceback = printTraceback
474
475 setUnjellyableForClass(CopyableFailure, CopiedFailure)
476
477
478
479 def failure2Copyable(fail, unsafeTracebacks=0):
480     f = types.InstanceType(CopyableFailure, fail.__dict__)
481     f.unsafeTracebacks = unsafeTracebacks
482     return f
483
484
485
486 class Broker(banana.Banana):
487     """I am a broker for objects.
488     """
489
490     version = 6
491     username = None
492     factory = None
493
494     def __init__(self, isClient=1, security=globalSecurity):
495         banana.Banana.__init__(self, isClient)
496         self.disconnected = 0
497         self.disconnects = []
498         self.failures = []
499         self.connects = []
500         self.localObjects = {}
501         self.security = security
502         self.pageProducers = []
503         self.currentRequestID = 0
504         self.currentLocalID = 0
505         self.unserializingPerspective = None
506         # Some terms:
507         #  PUID: process unique ID; return value of id() function.  type "int".
508         #  LUID: locally unique ID; an ID unique to an object mapped over this
509         #        connection. type "int"
510         #  GUID: (not used yet) globally unique ID; an ID for an object which
511         #        may be on a redirected or meta server.  Type as yet undecided.
512         # Dictionary mapping LUIDs to local objects.
513         # set above to allow root object to be assigned before connection is made
514         # self.localObjects = {}
515         # Dictionary mapping PUIDs to LUIDs.
516         self.luids = {}
517         # Dictionary mapping LUIDs to local (remotely cached) objects. Remotely
518         # cached means that they're objects which originate here, and were
519         # copied remotely.
520         self.remotelyCachedObjects = {}
521         # Dictionary mapping PUIDs to (cached) LUIDs
522         self.remotelyCachedLUIDs = {}
523         # Dictionary mapping (remote) LUIDs to (locally cached) objects.
524         self.locallyCachedObjects = {}
525         self.waitingForAnswers = {}
526
527         # Mapping from LUIDs to weakref objects with callbacks for performing
528         # any local cleanup which may be necessary for the corresponding
529         # object once it no longer exists.
530         self._localCleanup = {}
531
532
533     def resumeProducing(self):
534         """Called when the consumer attached to me runs out of buffer.
535         """
536         # Go backwards over the list so we can remove indexes from it as we go
537         for pageridx in xrange(len(self.pageProducers)-1, -1, -1):
538             pager = self.pageProducers[pageridx]
539             pager.sendNextPage()
540             if not pager.stillPaging():
541                 del self.pageProducers[pageridx]
542         if not self.pageProducers:
543             self.transport.unregisterProducer()
544
545     # Streaming producer methods; not necessary to implement.
546     def pauseProducing(self):
547         pass
548
549     def stopProducing(self):
550         pass
551
552     def registerPageProducer(self, pager):
553         self.pageProducers.append(pager)
554         if len(self.pageProducers) == 1:
555             self.transport.registerProducer(self, 0)
556
557     def expressionReceived(self, sexp):
558         """Evaluate an expression as it's received.
559         """
560         if isinstance(sexp, types.ListType):
561             command = sexp[0]
562             methodName = "proto_%s" % command
563             method = getattr(self, methodName, None)
564             if method:
565                 method(*sexp[1:])
566             else:
567                 self.sendCall("didNotUnderstand", command)
568         else:
569             raise ProtocolError("Non-list expression received.")
570
571
572     def proto_version(self, vnum):
573         """Protocol message: (version version-number)
574
575         Check to make sure that both ends of the protocol are speaking
576         the same version dialect.
577         """
578
579         if vnum != self.version:
580             raise ProtocolError("Version Incompatibility: %s %s" % (self.version, vnum))
581
582
583     def sendCall(self, *exp):
584         """Utility method to send an expression to the other side of the connection.
585         """
586         self.sendEncoded(exp)
587
588     def proto_didNotUnderstand(self, command):
589         """Respond to stock 'C{didNotUnderstand}' message.
590
591         Log the command that was not understood and continue. (Note:
592         this will probably be changed to close the connection or raise
593         an exception in the future.)
594         """
595         log.msg("Didn't understand command: %r" % command)
596
597     def connectionReady(self):
598         """Initialize. Called after Banana negotiation is done.
599         """
600         self.sendCall("version", self.version)
601         for notifier in self.connects:
602             try:
603                 notifier()
604             except:
605                 log.deferr()
606         self.connects = None
607         if self.factory: # in tests we won't have factory
608             self.factory.clientConnectionMade(self)
609
610     def connectionFailed(self):
611         # XXX should never get called anymore? check!
612         for notifier in self.failures:
613             try:
614                 notifier()
615             except:
616                 log.deferr()
617         self.failures = None
618
619     waitingForAnswers = None
620
621     def connectionLost(self, reason):
622         """The connection was lost.
623         """
624         self.disconnected = 1
625         # nuke potential circular references.
626         self.luids = None
627         if self.waitingForAnswers:
628             for d in self.waitingForAnswers.values():
629                 try:
630                     d.errback(failure.Failure(PBConnectionLost(reason)))
631                 except:
632                     log.deferr()
633         # Assure all Cacheable.stoppedObserving are called
634         for lobj in self.remotelyCachedObjects.values():
635             cacheable = lobj.object
636             perspective = lobj.perspective
637             try:
638                 cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
639             except:
640                 log.deferr()
641         # Loop on a copy to prevent notifiers to mixup
642         # the list by calling dontNotifyOnDisconnect
643         for notifier in self.disconnects[:]:
644             try:
645                 notifier()
646             except:
647                 log.deferr()
648         self.disconnects = None
649         self.waitingForAnswers = None
650         self.localSecurity = None
651         self.remoteSecurity = None
652         self.remotelyCachedObjects = None
653         self.remotelyCachedLUIDs = None
654         self.locallyCachedObjects = None
655         self.localObjects = None
656
657     def notifyOnDisconnect(self, notifier):
658         """Call the given callback when the Broker disconnects."""
659         assert callable(notifier)
660         self.disconnects.append(notifier)
661
662     def notifyOnFail(self, notifier):
663         """Call the given callback if the Broker fails to connect."""
664         assert callable(notifier)
665         self.failures.append(notifier)
666
667     def notifyOnConnect(self, notifier):
668         """Call the given callback when the Broker connects."""
669         assert callable(notifier)
670         if self.connects is None:
671             try:
672                 notifier()
673             except:
674                 log.err()
675         else:
676             self.connects.append(notifier)
677
678     def dontNotifyOnDisconnect(self, notifier):
679         """Remove a callback from list of disconnect callbacks."""
680         try:
681             self.disconnects.remove(notifier)
682         except ValueError:
683             pass
684
685     def localObjectForID(self, luid):
686         """
687         Get a local object for a locally unique ID.
688
689         @return: An object previously stored with L{registerReference} or
690             C{None} if there is no object which corresponds to the given
691             identifier.
692         """
693         lob = self.localObjects.get(luid)
694         if lob is None:
695             return
696         return lob.object
697
698     maxBrokerRefsViolations = 0
699
700     def registerReference(self, object):
701         """Get an ID for a local object.
702
703         Store a persistent reference to a local object and map its id()
704         to a generated, session-unique ID and return that ID.
705         """
706
707         assert object is not None
708         puid = object.processUniqueID()
709         luid = self.luids.get(puid)
710         if luid is None:
711             if len(self.localObjects) > MAX_BROKER_REFS:
712                 self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
713                 if self.maxBrokerRefsViolations > 3:
714                     self.transport.loseConnection()
715                     raise Error("Maximum PB reference count exceeded.  "
716                                 "Goodbye.")
717                 raise Error("Maximum PB reference count exceeded.")
718
719             luid = self.newLocalID()
720             self.localObjects[luid] = Local(object)
721             self.luids[puid] = luid
722         else:
723             self.localObjects[luid].incref()
724         return luid
725
726     def setNameForLocal(self, name, object):
727         """Store a special (string) ID for this object.
728
729         This is how you specify a 'base' set of objects that the remote
730         protocol can connect to.
731         """
732         assert object is not None
733         self.localObjects[name] = Local(object)
734
735     def remoteForName(self, name):
736         """Returns an object from the remote name mapping.
737
738         Note that this does not check the validity of the name, only
739         creates a translucent reference for it.
740         """
741         return RemoteReference(None, self, name, 0)
742
743     def cachedRemotelyAs(self, instance, incref=0):
744         """Returns an ID that says what this instance is cached as remotely, or C{None} if it's not.
745         """
746
747         puid = instance.processUniqueID()
748         luid = self.remotelyCachedLUIDs.get(puid)
749         if (luid is not None) and (incref):
750             self.remotelyCachedObjects[luid].incref()
751         return luid
752
753     def remotelyCachedForLUID(self, luid):
754         """Returns an instance which is cached remotely, with this LUID.
755         """
756         return self.remotelyCachedObjects[luid].object
757
758     def cacheRemotely(self, instance):
759         """
760         XXX"""
761         puid = instance.processUniqueID()
762         luid = self.newLocalID()
763         if len(self.remotelyCachedObjects) > MAX_BROKER_REFS:
764             self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
765             if self.maxBrokerRefsViolations > 3:
766                 self.transport.loseConnection()
767                 raise Error("Maximum PB cache count exceeded.  "
768                             "Goodbye.")
769             raise Error("Maximum PB cache count exceeded.")
770
771         self.remotelyCachedLUIDs[puid] = luid
772         # This table may not be necessary -- for now, it's to make sure that no
773         # monkey business happens with id(instance)
774         self.remotelyCachedObjects[luid] = Local(instance, self.serializingPerspective)
775         return luid
776
777     def cacheLocally(self, cid, instance):
778         """(internal)
779
780         Store a non-filled-out cached instance locally.
781         """
782         self.locallyCachedObjects[cid] = instance
783
784     def cachedLocallyAs(self, cid):
785         instance = self.locallyCachedObjects[cid]
786         return instance
787
788     def serialize(self, object, perspective=None, method=None, args=None, kw=None):
789         """Jelly an object according to the remote security rules for this broker.
790         """
791
792         if isinstance(object, defer.Deferred):
793             object.addCallbacks(self.serialize, lambda x: x,
794                                 callbackKeywords={
795                 'perspective': perspective,
796                 'method': method,
797                 'args': args,
798                 'kw': kw
799                 })
800             return object
801
802         # XXX This call is NOT REENTRANT and testing for reentrancy is just
803         # crazy, so it likely won't be.  Don't ever write methods that call the
804         # broker's serialize() method recursively (e.g. sending a method call
805         # from within a getState (this causes concurrency problems anyway so
806         # you really, really shouldn't do it))
807
808         # self.jellier = _NetJellier(self)
809         self.serializingPerspective = perspective
810         self.jellyMethod = method
811         self.jellyArgs = args
812         self.jellyKw = kw
813         try:
814             return jelly(object, self.security, None, self)
815         finally:
816             self.serializingPerspective = None
817             self.jellyMethod = None
818             self.jellyArgs = None
819             self.jellyKw = None
820
821     def unserialize(self, sexp, perspective = None):
822         """Unjelly an sexp according to the local security rules for this broker.
823         """
824
825         self.unserializingPerspective = perspective
826         try:
827             return unjelly(sexp, self.security, None, self)
828         finally:
829             self.unserializingPerspective = None
830
831     def newLocalID(self):
832         """Generate a new LUID.
833         """
834         self.currentLocalID = self.currentLocalID + 1
835         return self.currentLocalID
836
837     def newRequestID(self):
838         """Generate a new request ID.
839         """
840         self.currentRequestID = self.currentRequestID + 1
841         return self.currentRequestID
842
843     def _sendMessage(self, prefix, perspective, objectID, message, args, kw):
844         pbc = None
845         pbe = None
846         answerRequired = 1
847         if kw.has_key('pbcallback'):
848             pbc = kw['pbcallback']
849             del kw['pbcallback']
850         if kw.has_key('pberrback'):
851             pbe = kw['pberrback']
852             del kw['pberrback']
853         if kw.has_key('pbanswer'):
854             assert (not pbe) and (not pbc), "You can't specify a no-answer requirement."
855             answerRequired = kw['pbanswer']
856             del kw['pbanswer']
857         if self.disconnected:
858             raise DeadReferenceError("Calling Stale Broker")
859         try:
860             netArgs = self.serialize(args, perspective=perspective, method=message)
861             netKw = self.serialize(kw, perspective=perspective, method=message)
862         except:
863             return defer.fail(failure.Failure())
864         requestID = self.newRequestID()
865         if answerRequired:
866             rval = defer.Deferred()
867             self.waitingForAnswers[requestID] = rval
868             if pbc or pbe:
869                 log.msg('warning! using deprecated "pbcallback"')
870                 rval.addCallbacks(pbc, pbe)
871         else:
872             rval = None
873         self.sendCall(prefix+"message", requestID, objectID, message, answerRequired, netArgs, netKw)
874         return rval
875
876     def proto_message(self, requestID, objectID, message, answerRequired, netArgs, netKw):
877         self._recvMessage(self.localObjectForID, requestID, objectID, message, answerRequired, netArgs, netKw)
878     def proto_cachemessage(self, requestID, objectID, message, answerRequired, netArgs, netKw):
879         self._recvMessage(self.cachedLocallyAs, requestID, objectID, message, answerRequired, netArgs, netKw)
880
881     def _recvMessage(self, findObjMethod, requestID, objectID, message, answerRequired, netArgs, netKw):
882         """Received a message-send.
883
884         Look up message based on object, unserialize the arguments, and
885         invoke it with args, and send an 'answer' or 'error' response.
886         """
887         try:
888             object = findObjMethod(objectID)
889             if object is None:
890                 raise Error("Invalid Object ID")
891             netResult = object.remoteMessageReceived(self, message, netArgs, netKw)
892         except Error, e:
893             if answerRequired:
894                 # If the error is Jellyable or explicitly allowed via our
895                 # security options, send it back and let the code on the
896                 # other end deal with unjellying.  If it isn't Jellyable,
897                 # wrap it in a CopyableFailure, which ensures it can be
898                 # unjellied on the other end.  We have to do this because
899                 # all errors must be sent back.
900                 if isinstance(e, Jellyable) or self.security.isClassAllowed(e.__class__):
901                     self._sendError(e, requestID)
902                 else:
903                     self._sendError(CopyableFailure(e), requestID)
904         except:
905             if answerRequired:
906                 log.msg("Peer will receive following PB traceback:", isError=True)
907                 f = CopyableFailure()
908                 self._sendError(f, requestID)
909             log.err()
910         else:
911             if answerRequired:
912                 if isinstance(netResult, defer.Deferred):
913                     args = (requestID,)
914                     netResult.addCallbacks(self._sendAnswer, self._sendFailureOrError,
915                                            callbackArgs=args, errbackArgs=args)
916                     # XXX Should this be done somewhere else?
917                 else:
918                     self._sendAnswer(netResult, requestID)
919     ##
920     # success
921     ##
922
923     def _sendAnswer(self, netResult, requestID):
924         """(internal) Send an answer to a previously sent message.
925         """
926         self.sendCall("answer", requestID, netResult)
927
928     def proto_answer(self, requestID, netResult):
929         """(internal) Got an answer to a previously sent message.
930
931         Look up the appropriate callback and call it.
932         """
933         d = self.waitingForAnswers[requestID]
934         del self.waitingForAnswers[requestID]
935         d.callback(self.unserialize(netResult))
936
937     ##
938     # failure
939     ##
940     def _sendFailureOrError(self, fail, requestID):
941         """
942         Call L{_sendError} or L{_sendFailure}, depending on whether C{fail}
943         represents an L{Error} subclass or not.
944         """
945         if fail.check(Error) is None:
946             self._sendFailure(fail, requestID)
947         else:
948             self._sendError(fail, requestID)
949
950
951     def _sendFailure(self, fail, requestID):
952         """Log error and then send it."""
953         log.msg("Peer will receive following PB traceback:")
954         log.err(fail)
955         self._sendError(fail, requestID)
956
957     def _sendError(self, fail, requestID):
958         """(internal) Send an error for a previously sent message.
959         """
960         if isinstance(fail, failure.Failure):
961             # If the failures value is jellyable or allowed through security,
962             # send the value
963             if (isinstance(fail.value, Jellyable) or
964                 self.security.isClassAllowed(fail.value.__class__)):
965                 fail = fail.value
966             elif not isinstance(fail, CopyableFailure):
967                 fail = failure2Copyable(fail, self.factory.unsafeTracebacks)
968         if isinstance(fail, CopyableFailure):
969             fail.unsafeTracebacks = self.factory.unsafeTracebacks
970         self.sendCall("error", requestID, self.serialize(fail))
971
972     def proto_error(self, requestID, fail):
973         """(internal) Deal with an error.
974         """
975         d = self.waitingForAnswers[requestID]
976         del self.waitingForAnswers[requestID]
977         d.errback(self.unserialize(fail))
978
979     ##
980     # refcounts
981     ##
982
983     def sendDecRef(self, objectID):
984         """(internal) Send a DECREF directive.
985         """
986         self.sendCall("decref", objectID)
987
988     def proto_decref(self, objectID):
989         """(internal) Decrement the reference count of an object.
990
991         If the reference count is zero, it will free the reference to this
992         object.
993         """
994         refs = self.localObjects[objectID].decref()
995         if refs == 0:
996             puid = self.localObjects[objectID].object.processUniqueID()
997             del self.luids[puid]
998             del self.localObjects[objectID]
999             self._localCleanup.pop(puid, lambda: None)()
1000
1001     ##
1002     # caching
1003     ##
1004
1005     def decCacheRef(self, objectID):
1006         """(internal) Send a DECACHE directive.
1007         """
1008         self.sendCall("decache", objectID)
1009
1010     def proto_decache(self, objectID):
1011         """(internal) Decrement the reference count of a cached object.
1012
1013         If the reference count is zero, free the reference, then send an
1014         'uncached' directive.
1015         """
1016         refs = self.remotelyCachedObjects[objectID].decref()
1017         # log.msg('decaching: %s #refs: %s' % (objectID, refs))
1018         if refs == 0:
1019             lobj = self.remotelyCachedObjects[objectID]
1020             cacheable = lobj.object
1021             perspective = lobj.perspective
1022             # TODO: force_decache needs to be able to force-invalidate a
1023             # cacheable reference.
1024             try:
1025                 cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
1026             except:
1027                 log.deferr()
1028             puid = cacheable.processUniqueID()
1029             del self.remotelyCachedLUIDs[puid]
1030             del self.remotelyCachedObjects[objectID]
1031             self.sendCall("uncache", objectID)
1032
1033     def proto_uncache(self, objectID):
1034         """(internal) Tell the client it is now OK to uncache an object.
1035         """
1036         # log.msg("uncaching locally %d" % objectID)
1037         obj = self.locallyCachedObjects[objectID]
1038         obj.broker = None
1039 ##         def reallyDel(obj=obj):
1040 ##             obj.__really_del__()
1041 ##         obj.__del__ = reallyDel
1042         del self.locallyCachedObjects[objectID]
1043
1044
1045
1046 def respond(challenge, password):
1047     """Respond to a challenge.
1048
1049     This is useful for challenge/response authentication.
1050     """
1051     m = md5()
1052     m.update(password)
1053     hashedPassword = m.digest()
1054     m = md5()
1055     m.update(hashedPassword)
1056     m.update(challenge)
1057     doubleHashedPassword = m.digest()
1058     return doubleHashedPassword
1059
1060 def challenge():
1061     """I return some random data."""
1062     crap = ''
1063     for x in range(random.randrange(15,25)):
1064         crap = crap + chr(random.randint(65,90))
1065     crap = md5(crap).digest()
1066     return crap
1067
1068
1069 class PBClientFactory(protocol.ClientFactory):
1070     """
1071     Client factory for PB brokers.
1072
1073     As with all client factories, use with reactor.connectTCP/SSL/etc..
1074     getPerspective and getRootObject can be called either before or
1075     after the connect.
1076     """
1077
1078     protocol = Broker
1079     unsafeTracebacks = False
1080
1081     def __init__(self, unsafeTracebacks=False, security=globalSecurity):
1082         """
1083         @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1084             over the wire.
1085         @type unsafeTracebacks: C{bool}
1086
1087         @param security: security options used by the broker, default to
1088             C{globalSecurity}.
1089         @type security: L{twisted.spread.jelly.SecurityOptions}
1090         """
1091         self.unsafeTracebacks = unsafeTracebacks
1092         self.security = security
1093         self._reset()
1094
1095
1096     def buildProtocol(self, addr):
1097         """
1098         Build the broker instance, passing the security options to it.
1099         """
1100         p = self.protocol(isClient=True, security=self.security)
1101         p.factory = self
1102         return p
1103
1104
1105     def _reset(self):
1106         self.rootObjectRequests = [] # list of deferred
1107         self._broker = None
1108         self._root = None
1109
1110     def _failAll(self, reason):
1111         deferreds = self.rootObjectRequests
1112         self._reset()
1113         for d in deferreds:
1114             d.errback(reason)
1115
1116     def clientConnectionFailed(self, connector, reason):
1117         self._failAll(reason)
1118
1119     def clientConnectionLost(self, connector, reason, reconnecting=0):
1120         """Reconnecting subclasses should call with reconnecting=1."""
1121         if reconnecting:
1122             # any pending requests will go to next connection attempt
1123             # so we don't fail them.
1124             self._broker = None
1125             self._root = None
1126         else:
1127             self._failAll(reason)
1128
1129     def clientConnectionMade(self, broker):
1130         self._broker = broker
1131         self._root = broker.remoteForName("root")
1132         ds = self.rootObjectRequests
1133         self.rootObjectRequests = []
1134         for d in ds:
1135             d.callback(self._root)
1136
1137     def getRootObject(self):
1138         """Get root object of remote PB server.
1139
1140         @return: Deferred of the root object.
1141         """
1142         if self._broker and not self._broker.disconnected:
1143            return defer.succeed(self._root)
1144         d = defer.Deferred()
1145         self.rootObjectRequests.append(d)
1146         return d
1147
1148     def disconnect(self):
1149         """If the factory is connected, close the connection.
1150
1151         Note that if you set up the factory to reconnect, you will need to
1152         implement extra logic to prevent automatic reconnection after this
1153         is called.
1154         """
1155         if self._broker:
1156             self._broker.transport.loseConnection()
1157
1158     def _cbSendUsername(self, root, username, password, client):
1159         return root.callRemote("login", username).addCallback(
1160             self._cbResponse, password, client)
1161
1162     def _cbResponse(self, (challenge, challenger), password, client):
1163         return challenger.callRemote("respond", respond(challenge, password), client)
1164
1165
1166     def _cbLoginAnonymous(self, root, client):
1167         """
1168         Attempt an anonymous login on the given remote root object.
1169
1170         @type root: L{RemoteReference}
1171         @param root: The object on which to attempt the login, most likely
1172             returned by a call to L{PBClientFactory.getRootObject}.
1173
1174         @param client: A jellyable object which will be used as the I{mind}
1175             parameter for the login attempt.
1176
1177         @rtype: L{Deferred}
1178         @return: A L{Deferred} which will be called back with a
1179             L{RemoteReference} to an avatar when anonymous login succeeds, or
1180             which will errback if anonymous login fails.
1181         """
1182         return root.callRemote("loginAnonymous", client)
1183
1184
1185     def login(self, credentials, client=None):
1186         """
1187         Login and get perspective from remote PB server.
1188
1189         Currently the following credentials are supported::
1190
1191             L{twisted.cred.credentials.IUsernamePassword}
1192             L{twisted.cred.credentials.IAnonymous}
1193
1194         @rtype: L{Deferred}
1195         @return: A L{Deferred} which will be called back with a
1196             L{RemoteReference} for the avatar logged in to, or which will
1197             errback if login fails.
1198         """
1199         d = self.getRootObject()
1200
1201         if IAnonymous.providedBy(credentials):
1202             d.addCallback(self._cbLoginAnonymous, client)
1203         else:
1204             d.addCallback(
1205                 self._cbSendUsername, credentials.username,
1206                 credentials.password, client)
1207         return d
1208
1209
1210
1211 class PBServerFactory(protocol.ServerFactory):
1212     """
1213     Server factory for perspective broker.
1214
1215     Login is done using a Portal object, whose realm is expected to return
1216     avatars implementing IPerspective. The credential checkers in the portal
1217     should accept IUsernameHashedPassword or IUsernameMD5Password.
1218
1219     Alternatively, any object providing or adaptable to L{IPBRoot} can be
1220     used instead of a portal to provide the root object of the PB server.
1221     """
1222
1223     unsafeTracebacks = False
1224
1225     # object broker factory
1226     protocol = Broker
1227
1228     def __init__(self, root, unsafeTracebacks=False, security=globalSecurity):
1229         """
1230         @param root: factory providing the root Referenceable used by the broker.
1231         @type root: object providing or adaptable to L{IPBRoot}.
1232
1233         @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1234             over the wire.
1235         @type unsafeTracebacks: C{bool}
1236
1237         @param security: security options used by the broker, default to
1238             C{globalSecurity}.
1239         @type security: L{twisted.spread.jelly.SecurityOptions}
1240         """
1241         self.root = IPBRoot(root)
1242         self.unsafeTracebacks = unsafeTracebacks
1243         self.security = security
1244
1245
1246     def buildProtocol(self, addr):
1247         """
1248         Return a Broker attached to the factory (as the service provider).
1249         """
1250         proto = self.protocol(isClient=False, security=self.security)
1251         proto.factory = self
1252         proto.setNameForLocal("root", self.root.rootObject(proto))
1253         return proto
1254
1255     def clientConnectionMade(self, protocol):
1256         # XXX does this method make any sense?
1257         pass
1258
1259
1260 class IUsernameMD5Password(ICredentials):
1261     """
1262     I encapsulate a username and a hashed password.
1263
1264     This credential is used for username/password over PB. CredentialCheckers
1265     which check this kind of credential must store the passwords in plaintext
1266     form or as a MD5 digest.
1267
1268     @type username: C{str} or C{Deferred}
1269     @ivar username: The username associated with these credentials.
1270     """
1271
1272     def checkPassword(password):
1273         """
1274         Validate these credentials against the correct password.
1275
1276         @type password: C{str}
1277         @param password: The correct, plaintext password against which to
1278             check.
1279
1280         @rtype: C{bool} or L{Deferred}
1281         @return: C{True} if the credentials represented by this object match the
1282             given password, C{False} if they do not, or a L{Deferred} which will
1283             be called back with one of these values.
1284         """
1285
1286     def checkMD5Password(password):
1287         """
1288         Validate these credentials against the correct MD5 digest of the
1289         password.
1290
1291         @type password: C{str}
1292         @param password: The correct MD5 digest of a password against which to
1293             check.
1294
1295         @rtype: C{bool} or L{Deferred}
1296         @return: C{True} if the credentials represented by this object match the
1297             given digest, C{False} if they do not, or a L{Deferred} which will
1298             be called back with one of these values.
1299         """
1300
1301
1302 class _PortalRoot:
1303     """Root object, used to login to portal."""
1304
1305     implements(IPBRoot)
1306
1307     def __init__(self, portal):
1308         self.portal = portal
1309
1310     def rootObject(self, broker):
1311         return _PortalWrapper(self.portal, broker)
1312
1313 registerAdapter(_PortalRoot, Portal, IPBRoot)
1314
1315
1316
1317 class _JellyableAvatarMixin:
1318     """
1319     Helper class for code which deals with avatars which PB must be capable of
1320     sending to a peer.
1321     """
1322     def _cbLogin(self, (interface, avatar, logout)):
1323         """
1324         Ensure that the avatar to be returned to the client is jellyable and
1325         set up disconnection notification to call the realm's logout object.
1326         """
1327         if not IJellyable.providedBy(avatar):
1328             avatar = AsReferenceable(avatar, "perspective")
1329
1330         puid = avatar.processUniqueID()
1331
1332         # only call logout once, whether the connection is dropped (disconnect)
1333         # or a logout occurs (cleanup), and be careful to drop the reference to
1334         # it in either case
1335         logout = [ logout ]
1336         def maybeLogout():
1337             if not logout:
1338                 return
1339             fn = logout[0]
1340             del logout[0]
1341             fn()
1342         self.broker._localCleanup[puid] = maybeLogout
1343         self.broker.notifyOnDisconnect(maybeLogout)
1344
1345         return avatar
1346
1347
1348
1349 class _PortalWrapper(Referenceable, _JellyableAvatarMixin):
1350     """
1351     Root Referenceable object, used to login to portal.
1352     """
1353
1354     def __init__(self, portal, broker):
1355         self.portal = portal
1356         self.broker = broker
1357
1358
1359     def remote_login(self, username):
1360         """
1361         Start of username/password login.
1362         """
1363         c = challenge()
1364         return c, _PortalAuthChallenger(self.portal, self.broker, username, c)
1365
1366
1367     def remote_loginAnonymous(self, mind):
1368         """
1369         Attempt an anonymous login.
1370
1371         @param mind: An object to use as the mind parameter to the portal login
1372             call (possibly None).
1373
1374         @rtype: L{Deferred}
1375         @return: A Deferred which will be called back with an avatar when login
1376             succeeds or which will be errbacked if login fails somehow.
1377         """
1378         d = self.portal.login(Anonymous(), mind, IPerspective)
1379         d.addCallback(self._cbLogin)
1380         return d
1381
1382
1383
1384 class _PortalAuthChallenger(Referenceable, _JellyableAvatarMixin):
1385     """
1386     Called with response to password challenge.
1387     """
1388     implements(IUsernameHashedPassword, IUsernameMD5Password)
1389
1390     def __init__(self, portal, broker, username, challenge):
1391         self.portal = portal
1392         self.broker = broker
1393         self.username = username
1394         self.challenge = challenge
1395
1396
1397     def remote_respond(self, response, mind):
1398         self.response = response
1399         d = self.portal.login(self, mind, IPerspective)
1400         d.addCallback(self._cbLogin)
1401         return d
1402
1403
1404     # IUsernameHashedPassword:
1405     def checkPassword(self, password):
1406         return self.checkMD5Password(md5(password).digest())
1407
1408
1409     # IUsernameMD5Password
1410     def checkMD5Password(self, md5Password):
1411         md = md5()
1412         md.update(md5Password)
1413         md.update(self.challenge)
1414         correct = md.digest()
1415         return self.response == correct
1416
1417
1418 __all__ = [
1419     # Everything from flavors is exposed publically here.
1420     'IPBRoot', 'Serializable', 'Referenceable', 'NoSuchMethod', 'Root',
1421     'ViewPoint', 'Viewable', 'Copyable', 'Jellyable', 'Cacheable',
1422     'RemoteCopy', 'RemoteCache', 'RemoteCacheObserver', 'copyTags',
1423     'setUnjellyableForClass', 'setUnjellyableFactoryForClass',
1424     'setUnjellyableForClassTree',
1425     'setCopierForClass', 'setFactoryForClass', 'setCopierForClassTree',
1426
1427     'MAX_BROKER_REFS', 'portno',
1428
1429     'ProtocolError', 'DeadReferenceError', 'Error', 'PBConnectionLost',
1430     'RemoteMethod', 'IPerspective', 'Avatar', 'AsReferenceable',
1431     'RemoteReference', 'CopyableFailure', 'CopiedFailure', 'failure2Copyable',
1432     'Broker', 'respond', 'challenge', 'PBClientFactory', 'PBServerFactory',
1433     'IUsernameMD5Password',
1434     ]