1 # -*- test-case-name: twisted.test.test_pb -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
8 \"This isn\'t a professional opinion, but it's probably got enough
9 internet to kill you.\" --glyph
14 This is a broker for proxies for and copies of objects. It provides a
15 translucent interface layer to those proxies.
17 The protocol is not opaque, because it provides objects which represent the
18 remote proxies and require no context (server references, IDs) to operate on.
20 It is not transparent because it does I{not} attempt to make remote objects
21 behave identically, or even similiarly, to local objects. Method calls are
22 invoked asynchronously, and specific rules are applied when serializing
25 To get started, begin with L{PBClientFactory} and L{PBServerFactory}.
27 @author: Glyph Lefkowitz
33 from zope.interface import implements, Interface
36 from twisted.python import log, failure, reflect
37 from twisted.python.hashlib import md5
38 from twisted.internet import defer, protocol
39 from twisted.cred.portal import Portal
40 from twisted.cred.credentials import IAnonymous, ICredentials
41 from twisted.cred.credentials import IUsernameHashedPassword, Anonymous
42 from twisted.persisted import styles
43 from twisted.python.components import registerAdapter
45 from twisted.spread.interfaces import IJellyable, IUnjellyable
46 from twisted.spread.jelly import jelly, unjelly, globalSecurity
47 from twisted.spread import banana
49 from twisted.spread.flavors import Serializable
50 from twisted.spread.flavors import Referenceable, NoSuchMethod
51 from twisted.spread.flavors import Root, IPBRoot
52 from twisted.spread.flavors import ViewPoint
53 from twisted.spread.flavors import Viewable
54 from twisted.spread.flavors import Copyable
55 from twisted.spread.flavors import Jellyable
56 from twisted.spread.flavors import Cacheable
57 from twisted.spread.flavors import RemoteCopy
58 from twisted.spread.flavors import RemoteCache
59 from twisted.spread.flavors import RemoteCacheObserver
60 from twisted.spread.flavors import copyTags
62 from twisted.spread.flavors import setUnjellyableForClass
63 from twisted.spread.flavors import setUnjellyableFactoryForClass
64 from twisted.spread.flavors import setUnjellyableForClassTree
65 # These three are backwards compatibility aliases for the previous three.
66 # Ultimately they should be deprecated. -exarkun
67 from twisted.spread.flavors import setCopierForClass
68 from twisted.spread.flavors import setFactoryForClass
69 from twisted.spread.flavors import setCopierForClassTree
72 MAX_BROKER_REFS = 1024
78 class ProtocolError(Exception):
80 This error is raised when an invalid protocol statement is received.
85 class DeadReferenceError(ProtocolError):
87 This error is raised when a method is called on a dead reference (one whose
88 broker has been disconnected).
93 class Error(Exception):
95 This error can be raised to generate known error conditions.
97 When a PB callable method (perspective_, remote_, view_) raises
98 this error, it indicates that a traceback should not be printed,
99 but instead, the string representation of the exception should be
105 class RemoteError(Exception):
107 This class is used to wrap a string-ified exception from the remote side to
108 be able to reraise it. (Raising string exceptions is no longer possible in
111 The value of this exception will be a str() representation of the remote
114 @ivar remoteType: The full import path of the exception class which was
115 raised on the remote end.
116 @type remoteType: C{str}
118 @ivar remoteTraceback: The remote traceback.
119 @type remoteTraceback: C{str}
121 @note: It's not possible to include the remoteTraceback if this exception is
122 thrown into a generator. It must be accessed as an attribute.
124 def __init__(self, remoteType, value, remoteTraceback):
125 Exception.__init__(self, value)
126 self.remoteType = remoteType
127 self.remoteTraceback = remoteTraceback
133 This is a translucent reference to a remote message.
135 def __init__(self, obj, name):
137 Initialize with a L{RemoteReference} and the name of this message.
143 def __cmp__(self, other):
144 return cmp((self.obj, self.name), other)
148 return hash((self.obj, self.name))
151 def __call__(self, *args, **kw):
153 Asynchronously invoke a remote method.
155 return self.obj.broker._sendMessage('',self.obj.perspective,
156 self.obj.luid, self.name, args, kw)
160 class PBConnectionLost(Exception):
165 class IPerspective(Interface):
167 per*spec*tive, n. : The relationship of aspects of a subject to each
168 other and to a whole: 'a perspective of history'; 'a need to view
169 the problem in the proper perspective'.
171 This is a Perspective Broker-specific wrapper for an avatar. That
172 is to say, a PB-published view on to the business logic for the
173 system's concept of a 'user'.
175 The concept of attached/detached is no longer implemented by the
176 framework. The realm is expected to implement such semantics if
180 def perspectiveMessageReceived(broker, message, args, kwargs):
182 This method is called when a network message is received.
184 @arg broker: The Perspective Broker.
187 @arg message: The name of the method called by the other end.
189 @type args: list in jelly format
190 @arg args: The arguments that were passed by the other end. It
191 is recommend that you use the `unserialize' method of the
192 broker to decode this.
194 @type kwargs: dict in jelly format
195 @arg kwargs: The keyword arguments that were passed by the
196 other end. It is recommended that you use the
197 `unserialize' method of the broker to decode this.
199 @rtype: A jelly list.
200 @return: It is recommended that you use the `serialize' method
201 of the broker on whatever object you need to return to
202 generate the return value.
209 A default IPerspective implementor.
211 This class is intended to be subclassed, and a realm should return
212 an instance of such a subclass when IPerspective is requested of
215 A peer requesting a perspective will receive only a
216 L{RemoteReference} to a pb.Avatar. When a method is called on
217 that L{RemoteReference}, it will translate to a method on the
218 remote perspective named 'perspective_methodname'. (For more
219 information on invoking methods on other objects, see
220 L{flavors.ViewPoint}.)
223 implements(IPerspective)
225 def perspectiveMessageReceived(self, broker, message, args, kw):
227 This method is called when a network message is received.
231 self.perspective_%(message)s(*broker.unserialize(args),
232 **broker.unserialize(kw))
234 to handle the method; subclasses of Avatar are expected to
235 implement methods using this naming convention.
238 args = broker.unserialize(args, self)
239 kw = broker.unserialize(kw, self)
240 method = getattr(self, "perspective_%s" % message)
242 state = method(*args, **kw)
244 log.msg("%s didn't accept %s and %s" % (method, args, kw))
246 return broker.serialize(state, self, method, args, kw)
250 class AsReferenceable(Referenceable):
252 A reference directed towards another object.
255 def __init__(self, object, messageType="remote"):
256 self.remoteMessageReceived = getattr(
257 object, messageType + "MessageReceived")
261 class RemoteReference(Serializable, styles.Ephemeral):
263 A translucent reference to a remote object.
265 I may be a reference to a L{flavors.ViewPoint}, a
266 L{flavors.Referenceable}, or an L{IPerspective} implementor (e.g.,
267 pb.Avatar). From the client's perspective, it is not possible to
268 tell which except by convention.
270 I am a \"translucent\" reference because although no additional
271 bookkeeping overhead is given to the application programmer for
272 manipulating a reference, return values are asynchronous.
274 See also L{twisted.internet.defer}.
276 @ivar broker: The broker I am obtained through.
277 @type broker: L{Broker}
280 implements(IUnjellyable)
282 def __init__(self, perspective, broker, luid, doRefCount):
283 """(internal) Initialize me with a broker and a locally-unique ID.
285 The ID is unique only to the particular Perspective Broker
290 self.doRefCount = doRefCount
291 self.perspective = perspective
292 self.disconnectCallbacks = []
294 def notifyOnDisconnect(self, callback):
295 """Register a callback to be called if our broker gets disconnected.
297 This callback will be called with one argument, this instance.
299 assert callable(callback)
300 self.disconnectCallbacks.append(callback)
301 if len(self.disconnectCallbacks) == 1:
302 self.broker.notifyOnDisconnect(self._disconnected)
304 def dontNotifyOnDisconnect(self, callback):
305 """Remove a callback that was registered with notifyOnDisconnect."""
306 self.disconnectCallbacks.remove(callback)
307 if not self.disconnectCallbacks:
308 self.broker.dontNotifyOnDisconnect(self._disconnected)
310 def _disconnected(self):
311 """Called if we are disconnected and have callbacks registered."""
312 for callback in self.disconnectCallbacks:
314 self.disconnectCallbacks = None
316 def jellyFor(self, jellier):
317 """If I am being sent back to where I came from, serialize as a local backreference.
320 assert self.broker == jellier.invoker, "Can't send references to brokers other than their own."
321 return "local", self.luid
323 return "unpersistable", "References cannot be serialized"
325 def unjellyFor(self, unjellier, unjellyList):
326 self.__init__(unjellier.invoker.unserializingPerspective, unjellier.invoker, unjellyList[1], 1)
329 def callRemote(self, _name, *args, **kw):
330 """Asynchronously invoke a remote method.
333 @param _name: the name of the remote method to invoke
334 @param args: arguments to serialize for the remote function
335 @param kw: keyword arguments to serialize for the remote function.
336 @rtype: L{twisted.internet.defer.Deferred}
337 @returns: a Deferred which will be fired when the result of
338 this remote call is received.
340 # note that we use '_name' instead of 'name' so the user can call
341 # remote methods with 'name' as a keyword parameter, like this:
342 # ref.callRemote("getPeopleNamed", count=12, name="Bob")
344 return self.broker._sendMessage('',self.perspective, self.luid,
347 def remoteMethod(self, key):
348 """Get a L{RemoteMethod} for this key.
350 return RemoteMethod(self, key)
352 def __cmp__(self,other):
353 """Compare me [to another L{RemoteReference}].
355 if isinstance(other, RemoteReference):
356 if other.broker == self.broker:
357 return cmp(self.luid, other.luid)
358 return cmp(self.broker, other)
366 """Do distributed reference counting on finalization.
369 self.broker.sendDecRef(self.luid)
371 setUnjellyableForClass("remote", RemoteReference)
374 """(internal) A reference to a local object.
377 def __init__(self, object, perspective=None):
381 self.perspective = perspective
385 return "<pb.Local %r ref:%s>" % (self.object, self.refcount)
388 """Increment and return my reference count.
390 self.refcount = self.refcount + 1
394 """Decrement and return my reference count.
396 self.refcount = self.refcount - 1
404 class CopyableFailure(failure.Failure, Copyable):
406 A L{flavors.RemoteCopy} and L{flavors.Copyable} version of
407 L{twisted.python.failure.Failure} for serialization.
412 def getStateToCopy(self):
414 Collect state related to the exception which occurred, discarding
415 state which cannot reasonably be serialized.
417 state = self.__dict__.copy()
421 state['value'] = str(self.value) # Exception instance
422 if isinstance(self.type, str):
423 state['type'] = self.type
425 state['type'] = reflect.qual(self.type) # Exception class
426 if self.unsafeTracebacks:
427 state['traceback'] = self.getTraceback()
429 state['traceback'] = 'Traceback unavailable\n'
434 class CopiedFailure(RemoteCopy, failure.Failure):
436 A L{CopiedFailure} is a L{pb.RemoteCopy} of a L{failure.Failure}
439 @ivar type: The full import path of the exception class which was raised on
443 @ivar value: A str() representation of the remote value.
444 @type value: L{CopiedFailure} or C{str}
446 @ivar traceback: The remote traceback.
447 @type traceback: C{str}
450 def printTraceback(self, file=None, elideFrameworkCode=0, detail='default'):
453 file.write("Traceback from remote host -- ")
454 file.write(self.traceback)
455 file.write(self.type + ": " + self.value)
459 def throwExceptionIntoGenerator(self, g):
461 Throw the original exception into the given generator, preserving
462 traceback information if available. In the case of a L{CopiedFailure}
463 where the exception type is a string, a L{pb.RemoteError} is thrown
466 @return: The next value yielded from the generator.
467 @raise StopIteration: If there are no more values in the generator.
468 @raise RemoteError: The wrapped remote exception.
470 return g.throw(RemoteError(self.type, self.value, self.traceback))
472 printBriefTraceback = printTraceback
473 printDetailedTraceback = printTraceback
475 setUnjellyableForClass(CopyableFailure, CopiedFailure)
479 def failure2Copyable(fail, unsafeTracebacks=0):
480 f = types.InstanceType(CopyableFailure, fail.__dict__)
481 f.unsafeTracebacks = unsafeTracebacks
486 class Broker(banana.Banana):
487 """I am a broker for objects.
494 def __init__(self, isClient=1, security=globalSecurity):
495 banana.Banana.__init__(self, isClient)
496 self.disconnected = 0
497 self.disconnects = []
500 self.localObjects = {}
501 self.security = security
502 self.pageProducers = []
503 self.currentRequestID = 0
504 self.currentLocalID = 0
505 self.unserializingPerspective = None
507 # PUID: process unique ID; return value of id() function. type "int".
508 # LUID: locally unique ID; an ID unique to an object mapped over this
509 # connection. type "int"
510 # GUID: (not used yet) globally unique ID; an ID for an object which
511 # may be on a redirected or meta server. Type as yet undecided.
512 # Dictionary mapping LUIDs to local objects.
513 # set above to allow root object to be assigned before connection is made
514 # self.localObjects = {}
515 # Dictionary mapping PUIDs to LUIDs.
517 # Dictionary mapping LUIDs to local (remotely cached) objects. Remotely
518 # cached means that they're objects which originate here, and were
520 self.remotelyCachedObjects = {}
521 # Dictionary mapping PUIDs to (cached) LUIDs
522 self.remotelyCachedLUIDs = {}
523 # Dictionary mapping (remote) LUIDs to (locally cached) objects.
524 self.locallyCachedObjects = {}
525 self.waitingForAnswers = {}
527 # Mapping from LUIDs to weakref objects with callbacks for performing
528 # any local cleanup which may be necessary for the corresponding
529 # object once it no longer exists.
530 self._localCleanup = {}
533 def resumeProducing(self):
534 """Called when the consumer attached to me runs out of buffer.
536 # Go backwards over the list so we can remove indexes from it as we go
537 for pageridx in xrange(len(self.pageProducers)-1, -1, -1):
538 pager = self.pageProducers[pageridx]
540 if not pager.stillPaging():
541 del self.pageProducers[pageridx]
542 if not self.pageProducers:
543 self.transport.unregisterProducer()
545 # Streaming producer methods; not necessary to implement.
546 def pauseProducing(self):
549 def stopProducing(self):
552 def registerPageProducer(self, pager):
553 self.pageProducers.append(pager)
554 if len(self.pageProducers) == 1:
555 self.transport.registerProducer(self, 0)
557 def expressionReceived(self, sexp):
558 """Evaluate an expression as it's received.
560 if isinstance(sexp, types.ListType):
562 methodName = "proto_%s" % command
563 method = getattr(self, methodName, None)
567 self.sendCall("didNotUnderstand", command)
569 raise ProtocolError("Non-list expression received.")
572 def proto_version(self, vnum):
573 """Protocol message: (version version-number)
575 Check to make sure that both ends of the protocol are speaking
576 the same version dialect.
579 if vnum != self.version:
580 raise ProtocolError("Version Incompatibility: %s %s" % (self.version, vnum))
583 def sendCall(self, *exp):
584 """Utility method to send an expression to the other side of the connection.
586 self.sendEncoded(exp)
588 def proto_didNotUnderstand(self, command):
589 """Respond to stock 'C{didNotUnderstand}' message.
591 Log the command that was not understood and continue. (Note:
592 this will probably be changed to close the connection or raise
593 an exception in the future.)
595 log.msg("Didn't understand command: %r" % command)
597 def connectionReady(self):
598 """Initialize. Called after Banana negotiation is done.
600 self.sendCall("version", self.version)
601 for notifier in self.connects:
607 if self.factory: # in tests we won't have factory
608 self.factory.clientConnectionMade(self)
610 def connectionFailed(self):
611 # XXX should never get called anymore? check!
612 for notifier in self.failures:
619 waitingForAnswers = None
621 def connectionLost(self, reason):
622 """The connection was lost.
624 self.disconnected = 1
625 # nuke potential circular references.
627 if self.waitingForAnswers:
628 for d in self.waitingForAnswers.values():
630 d.errback(failure.Failure(PBConnectionLost(reason)))
633 # Assure all Cacheable.stoppedObserving are called
634 for lobj in self.remotelyCachedObjects.values():
635 cacheable = lobj.object
636 perspective = lobj.perspective
638 cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
641 # Loop on a copy to prevent notifiers to mixup
642 # the list by calling dontNotifyOnDisconnect
643 for notifier in self.disconnects[:]:
648 self.disconnects = None
649 self.waitingForAnswers = None
650 self.localSecurity = None
651 self.remoteSecurity = None
652 self.remotelyCachedObjects = None
653 self.remotelyCachedLUIDs = None
654 self.locallyCachedObjects = None
655 self.localObjects = None
657 def notifyOnDisconnect(self, notifier):
658 """Call the given callback when the Broker disconnects."""
659 assert callable(notifier)
660 self.disconnects.append(notifier)
662 def notifyOnFail(self, notifier):
663 """Call the given callback if the Broker fails to connect."""
664 assert callable(notifier)
665 self.failures.append(notifier)
667 def notifyOnConnect(self, notifier):
668 """Call the given callback when the Broker connects."""
669 assert callable(notifier)
670 if self.connects is None:
676 self.connects.append(notifier)
678 def dontNotifyOnDisconnect(self, notifier):
679 """Remove a callback from list of disconnect callbacks."""
681 self.disconnects.remove(notifier)
685 def localObjectForID(self, luid):
687 Get a local object for a locally unique ID.
689 @return: An object previously stored with L{registerReference} or
690 C{None} if there is no object which corresponds to the given
693 lob = self.localObjects.get(luid)
698 maxBrokerRefsViolations = 0
700 def registerReference(self, object):
701 """Get an ID for a local object.
703 Store a persistent reference to a local object and map its id()
704 to a generated, session-unique ID and return that ID.
707 assert object is not None
708 puid = object.processUniqueID()
709 luid = self.luids.get(puid)
711 if len(self.localObjects) > MAX_BROKER_REFS:
712 self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
713 if self.maxBrokerRefsViolations > 3:
714 self.transport.loseConnection()
715 raise Error("Maximum PB reference count exceeded. "
717 raise Error("Maximum PB reference count exceeded.")
719 luid = self.newLocalID()
720 self.localObjects[luid] = Local(object)
721 self.luids[puid] = luid
723 self.localObjects[luid].incref()
726 def setNameForLocal(self, name, object):
727 """Store a special (string) ID for this object.
729 This is how you specify a 'base' set of objects that the remote
730 protocol can connect to.
732 assert object is not None
733 self.localObjects[name] = Local(object)
735 def remoteForName(self, name):
736 """Returns an object from the remote name mapping.
738 Note that this does not check the validity of the name, only
739 creates a translucent reference for it.
741 return RemoteReference(None, self, name, 0)
743 def cachedRemotelyAs(self, instance, incref=0):
744 """Returns an ID that says what this instance is cached as remotely, or C{None} if it's not.
747 puid = instance.processUniqueID()
748 luid = self.remotelyCachedLUIDs.get(puid)
749 if (luid is not None) and (incref):
750 self.remotelyCachedObjects[luid].incref()
753 def remotelyCachedForLUID(self, luid):
754 """Returns an instance which is cached remotely, with this LUID.
756 return self.remotelyCachedObjects[luid].object
758 def cacheRemotely(self, instance):
761 puid = instance.processUniqueID()
762 luid = self.newLocalID()
763 if len(self.remotelyCachedObjects) > MAX_BROKER_REFS:
764 self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
765 if self.maxBrokerRefsViolations > 3:
766 self.transport.loseConnection()
767 raise Error("Maximum PB cache count exceeded. "
769 raise Error("Maximum PB cache count exceeded.")
771 self.remotelyCachedLUIDs[puid] = luid
772 # This table may not be necessary -- for now, it's to make sure that no
773 # monkey business happens with id(instance)
774 self.remotelyCachedObjects[luid] = Local(instance, self.serializingPerspective)
777 def cacheLocally(self, cid, instance):
780 Store a non-filled-out cached instance locally.
782 self.locallyCachedObjects[cid] = instance
784 def cachedLocallyAs(self, cid):
785 instance = self.locallyCachedObjects[cid]
788 def serialize(self, object, perspective=None, method=None, args=None, kw=None):
789 """Jelly an object according to the remote security rules for this broker.
792 if isinstance(object, defer.Deferred):
793 object.addCallbacks(self.serialize, lambda x: x,
795 'perspective': perspective,
802 # XXX This call is NOT REENTRANT and testing for reentrancy is just
803 # crazy, so it likely won't be. Don't ever write methods that call the
804 # broker's serialize() method recursively (e.g. sending a method call
805 # from within a getState (this causes concurrency problems anyway so
806 # you really, really shouldn't do it))
808 # self.jellier = _NetJellier(self)
809 self.serializingPerspective = perspective
810 self.jellyMethod = method
811 self.jellyArgs = args
814 return jelly(object, self.security, None, self)
816 self.serializingPerspective = None
817 self.jellyMethod = None
818 self.jellyArgs = None
821 def unserialize(self, sexp, perspective = None):
822 """Unjelly an sexp according to the local security rules for this broker.
825 self.unserializingPerspective = perspective
827 return unjelly(sexp, self.security, None, self)
829 self.unserializingPerspective = None
831 def newLocalID(self):
832 """Generate a new LUID.
834 self.currentLocalID = self.currentLocalID + 1
835 return self.currentLocalID
837 def newRequestID(self):
838 """Generate a new request ID.
840 self.currentRequestID = self.currentRequestID + 1
841 return self.currentRequestID
843 def _sendMessage(self, prefix, perspective, objectID, message, args, kw):
847 if kw.has_key('pbcallback'):
848 pbc = kw['pbcallback']
850 if kw.has_key('pberrback'):
851 pbe = kw['pberrback']
853 if kw.has_key('pbanswer'):
854 assert (not pbe) and (not pbc), "You can't specify a no-answer requirement."
855 answerRequired = kw['pbanswer']
857 if self.disconnected:
858 raise DeadReferenceError("Calling Stale Broker")
860 netArgs = self.serialize(args, perspective=perspective, method=message)
861 netKw = self.serialize(kw, perspective=perspective, method=message)
863 return defer.fail(failure.Failure())
864 requestID = self.newRequestID()
866 rval = defer.Deferred()
867 self.waitingForAnswers[requestID] = rval
869 log.msg('warning! using deprecated "pbcallback"')
870 rval.addCallbacks(pbc, pbe)
873 self.sendCall(prefix+"message", requestID, objectID, message, answerRequired, netArgs, netKw)
876 def proto_message(self, requestID, objectID, message, answerRequired, netArgs, netKw):
877 self._recvMessage(self.localObjectForID, requestID, objectID, message, answerRequired, netArgs, netKw)
878 def proto_cachemessage(self, requestID, objectID, message, answerRequired, netArgs, netKw):
879 self._recvMessage(self.cachedLocallyAs, requestID, objectID, message, answerRequired, netArgs, netKw)
881 def _recvMessage(self, findObjMethod, requestID, objectID, message, answerRequired, netArgs, netKw):
882 """Received a message-send.
884 Look up message based on object, unserialize the arguments, and
885 invoke it with args, and send an 'answer' or 'error' response.
888 object = findObjMethod(objectID)
890 raise Error("Invalid Object ID")
891 netResult = object.remoteMessageReceived(self, message, netArgs, netKw)
894 # If the error is Jellyable or explicitly allowed via our
895 # security options, send it back and let the code on the
896 # other end deal with unjellying. If it isn't Jellyable,
897 # wrap it in a CopyableFailure, which ensures it can be
898 # unjellied on the other end. We have to do this because
899 # all errors must be sent back.
900 if isinstance(e, Jellyable) or self.security.isClassAllowed(e.__class__):
901 self._sendError(e, requestID)
903 self._sendError(CopyableFailure(e), requestID)
906 log.msg("Peer will receive following PB traceback:", isError=True)
907 f = CopyableFailure()
908 self._sendError(f, requestID)
912 if isinstance(netResult, defer.Deferred):
914 netResult.addCallbacks(self._sendAnswer, self._sendFailureOrError,
915 callbackArgs=args, errbackArgs=args)
916 # XXX Should this be done somewhere else?
918 self._sendAnswer(netResult, requestID)
923 def _sendAnswer(self, netResult, requestID):
924 """(internal) Send an answer to a previously sent message.
926 self.sendCall("answer", requestID, netResult)
928 def proto_answer(self, requestID, netResult):
929 """(internal) Got an answer to a previously sent message.
931 Look up the appropriate callback and call it.
933 d = self.waitingForAnswers[requestID]
934 del self.waitingForAnswers[requestID]
935 d.callback(self.unserialize(netResult))
940 def _sendFailureOrError(self, fail, requestID):
942 Call L{_sendError} or L{_sendFailure}, depending on whether C{fail}
943 represents an L{Error} subclass or not.
945 if fail.check(Error) is None:
946 self._sendFailure(fail, requestID)
948 self._sendError(fail, requestID)
951 def _sendFailure(self, fail, requestID):
952 """Log error and then send it."""
953 log.msg("Peer will receive following PB traceback:")
955 self._sendError(fail, requestID)
957 def _sendError(self, fail, requestID):
958 """(internal) Send an error for a previously sent message.
960 if isinstance(fail, failure.Failure):
961 # If the failures value is jellyable or allowed through security,
963 if (isinstance(fail.value, Jellyable) or
964 self.security.isClassAllowed(fail.value.__class__)):
966 elif not isinstance(fail, CopyableFailure):
967 fail = failure2Copyable(fail, self.factory.unsafeTracebacks)
968 if isinstance(fail, CopyableFailure):
969 fail.unsafeTracebacks = self.factory.unsafeTracebacks
970 self.sendCall("error", requestID, self.serialize(fail))
972 def proto_error(self, requestID, fail):
973 """(internal) Deal with an error.
975 d = self.waitingForAnswers[requestID]
976 del self.waitingForAnswers[requestID]
977 d.errback(self.unserialize(fail))
983 def sendDecRef(self, objectID):
984 """(internal) Send a DECREF directive.
986 self.sendCall("decref", objectID)
988 def proto_decref(self, objectID):
989 """(internal) Decrement the reference count of an object.
991 If the reference count is zero, it will free the reference to this
994 refs = self.localObjects[objectID].decref()
996 puid = self.localObjects[objectID].object.processUniqueID()
998 del self.localObjects[objectID]
999 self._localCleanup.pop(puid, lambda: None)()
1005 def decCacheRef(self, objectID):
1006 """(internal) Send a DECACHE directive.
1008 self.sendCall("decache", objectID)
1010 def proto_decache(self, objectID):
1011 """(internal) Decrement the reference count of a cached object.
1013 If the reference count is zero, free the reference, then send an
1014 'uncached' directive.
1016 refs = self.remotelyCachedObjects[objectID].decref()
1017 # log.msg('decaching: %s #refs: %s' % (objectID, refs))
1019 lobj = self.remotelyCachedObjects[objectID]
1020 cacheable = lobj.object
1021 perspective = lobj.perspective
1022 # TODO: force_decache needs to be able to force-invalidate a
1023 # cacheable reference.
1025 cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
1028 puid = cacheable.processUniqueID()
1029 del self.remotelyCachedLUIDs[puid]
1030 del self.remotelyCachedObjects[objectID]
1031 self.sendCall("uncache", objectID)
1033 def proto_uncache(self, objectID):
1034 """(internal) Tell the client it is now OK to uncache an object.
1036 # log.msg("uncaching locally %d" % objectID)
1037 obj = self.locallyCachedObjects[objectID]
1039 ## def reallyDel(obj=obj):
1040 ## obj.__really_del__()
1041 ## obj.__del__ = reallyDel
1042 del self.locallyCachedObjects[objectID]
1046 def respond(challenge, password):
1047 """Respond to a challenge.
1049 This is useful for challenge/response authentication.
1053 hashedPassword = m.digest()
1055 m.update(hashedPassword)
1057 doubleHashedPassword = m.digest()
1058 return doubleHashedPassword
1061 """I return some random data."""
1063 for x in range(random.randrange(15,25)):
1064 crap = crap + chr(random.randint(65,90))
1065 crap = md5(crap).digest()
1069 class PBClientFactory(protocol.ClientFactory):
1071 Client factory for PB brokers.
1073 As with all client factories, use with reactor.connectTCP/SSL/etc..
1074 getPerspective and getRootObject can be called either before or
1079 unsafeTracebacks = False
1081 def __init__(self, unsafeTracebacks=False, security=globalSecurity):
1083 @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1085 @type unsafeTracebacks: C{bool}
1087 @param security: security options used by the broker, default to
1089 @type security: L{twisted.spread.jelly.SecurityOptions}
1091 self.unsafeTracebacks = unsafeTracebacks
1092 self.security = security
1096 def buildProtocol(self, addr):
1098 Build the broker instance, passing the security options to it.
1100 p = self.protocol(isClient=True, security=self.security)
1106 self.rootObjectRequests = [] # list of deferred
1110 def _failAll(self, reason):
1111 deferreds = self.rootObjectRequests
1116 def clientConnectionFailed(self, connector, reason):
1117 self._failAll(reason)
1119 def clientConnectionLost(self, connector, reason, reconnecting=0):
1120 """Reconnecting subclasses should call with reconnecting=1."""
1122 # any pending requests will go to next connection attempt
1123 # so we don't fail them.
1127 self._failAll(reason)
1129 def clientConnectionMade(self, broker):
1130 self._broker = broker
1131 self._root = broker.remoteForName("root")
1132 ds = self.rootObjectRequests
1133 self.rootObjectRequests = []
1135 d.callback(self._root)
1137 def getRootObject(self):
1138 """Get root object of remote PB server.
1140 @return: Deferred of the root object.
1142 if self._broker and not self._broker.disconnected:
1143 return defer.succeed(self._root)
1144 d = defer.Deferred()
1145 self.rootObjectRequests.append(d)
1148 def disconnect(self):
1149 """If the factory is connected, close the connection.
1151 Note that if you set up the factory to reconnect, you will need to
1152 implement extra logic to prevent automatic reconnection after this
1156 self._broker.transport.loseConnection()
1158 def _cbSendUsername(self, root, username, password, client):
1159 return root.callRemote("login", username).addCallback(
1160 self._cbResponse, password, client)
1162 def _cbResponse(self, (challenge, challenger), password, client):
1163 return challenger.callRemote("respond", respond(challenge, password), client)
1166 def _cbLoginAnonymous(self, root, client):
1168 Attempt an anonymous login on the given remote root object.
1170 @type root: L{RemoteReference}
1171 @param root: The object on which to attempt the login, most likely
1172 returned by a call to L{PBClientFactory.getRootObject}.
1174 @param client: A jellyable object which will be used as the I{mind}
1175 parameter for the login attempt.
1178 @return: A L{Deferred} which will be called back with a
1179 L{RemoteReference} to an avatar when anonymous login succeeds, or
1180 which will errback if anonymous login fails.
1182 return root.callRemote("loginAnonymous", client)
1185 def login(self, credentials, client=None):
1187 Login and get perspective from remote PB server.
1189 Currently the following credentials are supported::
1191 L{twisted.cred.credentials.IUsernamePassword}
1192 L{twisted.cred.credentials.IAnonymous}
1195 @return: A L{Deferred} which will be called back with a
1196 L{RemoteReference} for the avatar logged in to, or which will
1197 errback if login fails.
1199 d = self.getRootObject()
1201 if IAnonymous.providedBy(credentials):
1202 d.addCallback(self._cbLoginAnonymous, client)
1205 self._cbSendUsername, credentials.username,
1206 credentials.password, client)
1211 class PBServerFactory(protocol.ServerFactory):
1213 Server factory for perspective broker.
1215 Login is done using a Portal object, whose realm is expected to return
1216 avatars implementing IPerspective. The credential checkers in the portal
1217 should accept IUsernameHashedPassword or IUsernameMD5Password.
1219 Alternatively, any object providing or adaptable to L{IPBRoot} can be
1220 used instead of a portal to provide the root object of the PB server.
1223 unsafeTracebacks = False
1225 # object broker factory
1228 def __init__(self, root, unsafeTracebacks=False, security=globalSecurity):
1230 @param root: factory providing the root Referenceable used by the broker.
1231 @type root: object providing or adaptable to L{IPBRoot}.
1233 @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1235 @type unsafeTracebacks: C{bool}
1237 @param security: security options used by the broker, default to
1239 @type security: L{twisted.spread.jelly.SecurityOptions}
1241 self.root = IPBRoot(root)
1242 self.unsafeTracebacks = unsafeTracebacks
1243 self.security = security
1246 def buildProtocol(self, addr):
1248 Return a Broker attached to the factory (as the service provider).
1250 proto = self.protocol(isClient=False, security=self.security)
1251 proto.factory = self
1252 proto.setNameForLocal("root", self.root.rootObject(proto))
1255 def clientConnectionMade(self, protocol):
1256 # XXX does this method make any sense?
1260 class IUsernameMD5Password(ICredentials):
1262 I encapsulate a username and a hashed password.
1264 This credential is used for username/password over PB. CredentialCheckers
1265 which check this kind of credential must store the passwords in plaintext
1266 form or as a MD5 digest.
1268 @type username: C{str} or C{Deferred}
1269 @ivar username: The username associated with these credentials.
1272 def checkPassword(password):
1274 Validate these credentials against the correct password.
1276 @type password: C{str}
1277 @param password: The correct, plaintext password against which to
1280 @rtype: C{bool} or L{Deferred}
1281 @return: C{True} if the credentials represented by this object match the
1282 given password, C{False} if they do not, or a L{Deferred} which will
1283 be called back with one of these values.
1286 def checkMD5Password(password):
1288 Validate these credentials against the correct MD5 digest of the
1291 @type password: C{str}
1292 @param password: The correct MD5 digest of a password against which to
1295 @rtype: C{bool} or L{Deferred}
1296 @return: C{True} if the credentials represented by this object match the
1297 given digest, C{False} if they do not, or a L{Deferred} which will
1298 be called back with one of these values.
1303 """Root object, used to login to portal."""
1307 def __init__(self, portal):
1308 self.portal = portal
1310 def rootObject(self, broker):
1311 return _PortalWrapper(self.portal, broker)
1313 registerAdapter(_PortalRoot, Portal, IPBRoot)
1317 class _JellyableAvatarMixin:
1319 Helper class for code which deals with avatars which PB must be capable of
1322 def _cbLogin(self, (interface, avatar, logout)):
1324 Ensure that the avatar to be returned to the client is jellyable and
1325 set up disconnection notification to call the realm's logout object.
1327 if not IJellyable.providedBy(avatar):
1328 avatar = AsReferenceable(avatar, "perspective")
1330 puid = avatar.processUniqueID()
1332 # only call logout once, whether the connection is dropped (disconnect)
1333 # or a logout occurs (cleanup), and be careful to drop the reference to
1342 self.broker._localCleanup[puid] = maybeLogout
1343 self.broker.notifyOnDisconnect(maybeLogout)
1349 class _PortalWrapper(Referenceable, _JellyableAvatarMixin):
1351 Root Referenceable object, used to login to portal.
1354 def __init__(self, portal, broker):
1355 self.portal = portal
1356 self.broker = broker
1359 def remote_login(self, username):
1361 Start of username/password login.
1364 return c, _PortalAuthChallenger(self.portal, self.broker, username, c)
1367 def remote_loginAnonymous(self, mind):
1369 Attempt an anonymous login.
1371 @param mind: An object to use as the mind parameter to the portal login
1372 call (possibly None).
1375 @return: A Deferred which will be called back with an avatar when login
1376 succeeds or which will be errbacked if login fails somehow.
1378 d = self.portal.login(Anonymous(), mind, IPerspective)
1379 d.addCallback(self._cbLogin)
1384 class _PortalAuthChallenger(Referenceable, _JellyableAvatarMixin):
1386 Called with response to password challenge.
1388 implements(IUsernameHashedPassword, IUsernameMD5Password)
1390 def __init__(self, portal, broker, username, challenge):
1391 self.portal = portal
1392 self.broker = broker
1393 self.username = username
1394 self.challenge = challenge
1397 def remote_respond(self, response, mind):
1398 self.response = response
1399 d = self.portal.login(self, mind, IPerspective)
1400 d.addCallback(self._cbLogin)
1404 # IUsernameHashedPassword:
1405 def checkPassword(self, password):
1406 return self.checkMD5Password(md5(password).digest())
1409 # IUsernameMD5Password
1410 def checkMD5Password(self, md5Password):
1412 md.update(md5Password)
1413 md.update(self.challenge)
1414 correct = md.digest()
1415 return self.response == correct
1419 # Everything from flavors is exposed publically here.
1420 'IPBRoot', 'Serializable', 'Referenceable', 'NoSuchMethod', 'Root',
1421 'ViewPoint', 'Viewable', 'Copyable', 'Jellyable', 'Cacheable',
1422 'RemoteCopy', 'RemoteCache', 'RemoteCacheObserver', 'copyTags',
1423 'setUnjellyableForClass', 'setUnjellyableFactoryForClass',
1424 'setUnjellyableForClassTree',
1425 'setCopierForClass', 'setFactoryForClass', 'setCopierForClassTree',
1427 'MAX_BROKER_REFS', 'portno',
1429 'ProtocolError', 'DeadReferenceError', 'Error', 'PBConnectionLost',
1430 'RemoteMethod', 'IPerspective', 'Avatar', 'AsReferenceable',
1431 'RemoteReference', 'CopyableFailure', 'CopiedFailure', 'failure2Copyable',
1432 'Broker', 'respond', 'challenge', 'PBClientFactory', 'PBServerFactory',
1433 'IUsernameMD5Password',