"""The metaclasses used by the mojo python bindings."""
import itertools
+import logging
+import sys
# pylint: disable=F0401
+import mojo.bindings.messaging as messaging
+import mojo.bindings.promise as promise
import mojo.bindings.serialization as serialization
],
},
'fields': [
- FieldDescriptor('x', _descriptor.TYPE_INT32, 0),
+ SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0),
],
}
fields = list(
itertools.chain.from_iterable([group.descriptors for group in groups]))
+ fields.sort(key=lambda f: f.index)
for field in fields:
dictionary[field.name] = _BuildProperty(field)
# Add init
- dictionary['__init__'] = _StructInit
+ dictionary['__init__'] = _StructInit(fields)
# Add serialization method
serialization_object = serialization.Serialization(groups)
return serialization_object.Serialize(self, handle_offset)
dictionary['Serialize'] = Serialize
+ # pylint: disable=W0212
+ def AsDict(self):
+ return self._fields
+ dictionary['AsDict'] = AsDict
+
def Deserialize(cls, data, handles):
result = cls.__new__(cls)
fields = {}
return result
dictionary['Deserialize'] = classmethod(Deserialize)
+ dictionary['__eq__'] = _StructEq(fields)
+ dictionary['__ne__'] = _StructNe
+
return type.__new__(mcs, name, bases, dictionary)
# Prevent adding new attributes, or mutating constants.
raise AttributeError, 'can\'t delete attribute'
-def _StructInit(self, **kwargs):
- self._fields = {}
- for name in kwargs:
- self.__setattr__(name, kwargs[name])
+class MojoInterfaceType(type):
+ """Meta class for interfaces.
+
+ Usage:
+ class MyInterface(object):
+ __metaclass__ = MojoInterfaceType
+ DESCRIPTOR = {
+ 'client': MyInterfaceClient,
+ 'methods': [
+ {
+ 'name': 'FireAndForget',
+ 'ordinal': 0,
+ 'parameters': [
+ SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0),
+ ]
+ },
+ {
+ 'name': 'Ping',
+ 'ordinal': 1,
+ 'parameters': [
+ SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0),
+ ],
+ 'responses': [
+ SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0),
+ ],
+ },
+ ],
+ }
+ """
+
+ def __new__(mcs, name, bases, dictionary):
+ # If one of the base class is already an interface type, do not edit the
+ # class.
+ for base in bases:
+ if isinstance(base, mcs):
+ return type.__new__(mcs, name, bases, dictionary)
+
+ descriptor = dictionary.pop('DESCRIPTOR', {})
+
+ methods = [_MethodDescriptor(x) for x in descriptor.get('methods', [])]
+ for method in methods:
+ dictionary[method.name] = _NotImplemented
+ client_class_getter = descriptor.get('client', None)
+
+ interface_manager = InterfaceManager(name, methods, client_class_getter)
+ dictionary.update({
+ 'client': None,
+ 'manager': None,
+ '_interface_manager': interface_manager,
+ })
+
+ interface_class = type.__new__(mcs, name, bases, dictionary)
+ interface_manager.interface_class = interface_class
+ return interface_class
+
+ @property
+ def manager(mcs):
+ return mcs._interface_manager
+
+ # Prevent adding new attributes, or mutating constants.
+ def __setattr__(mcs, key, value):
+ raise AttributeError, 'can\'t set attribute'
+
+ # Prevent deleting constants.
+ def __delattr__(mcs, key):
+ raise AttributeError, 'can\'t delete attribute'
+
+
+class InterfaceProxy(object):
+ """
+ A proxy allows to access a remote interface through a message pipe.
+ """
+ pass
+
+
+class InterfaceRequest(object):
+ """
+ An interface request allows to send a request for an interface to a remote
+ object and start using it immediately.
+ """
+
+ def __init__(self, handle):
+ self._handle = handle
+
+ def IsPending(self):
+ return self._handle.IsValid()
+
+ def PassMessagePipe(self):
+ result = self._handle
+ self._handle = None
+ return result
+
+
+class InterfaceManager(object):
+ """
+ Manager for an interface class. The manager contains the operation that allows
+ to bind an implementation to a pipe, or to generate a proxy for an interface
+ over a pipe.
+ """
+
+ def __init__(self, name, methods, client_class_getter):
+ self.name = name
+ self.methods = methods
+ self.interface_class = None
+ self._client_class_getter = client_class_getter
+ self._client_manager = None
+ self._client_manager_computed = False
+ self._proxy_class = None
+ self._stub_class = None
+
+ @property
+ def client_manager(self):
+ if not self._client_manager_computed:
+ self._client_manager_computed = True
+ if self._client_class_getter:
+ self._client_manager = self._client_class_getter().manager
+ return self._client_manager
+
+ def Proxy(self, handle):
+ router = messaging.Router(handle)
+ error_handler = _ProxyErrorHandler()
+ router.SetErrorHandler(error_handler)
+ router.Start()
+ return self._InternalProxy(router, error_handler)
+
+ # pylint: disable=W0212
+ def Bind(self, impl, handle):
+ router = messaging.Router(handle)
+ router.SetIncomingMessageReceiver(self._Stub(impl))
+ error_handler = _ProxyErrorHandler()
+ router.SetErrorHandler(error_handler)
+
+ # Retain the router, until an error happen.
+ retainer = _Retainer(router)
+ def Cleanup(_):
+ retainer.release()
+ error_handler.AddCallback(Cleanup)
+
+ if self.client_manager:
+ impl.client = self.client_manager._InternalProxy(router, error_handler)
+
+ # Give an instance manager to the implementation to allow it to close
+ # the connection.
+ impl.manager = InstanceManager(router)
+
+ router.Start()
+
+ def _InternalProxy(self, router, error_handler):
+ if not self._proxy_class:
+ dictionary = {
+ '__module__': __name__,
+ '__init__': _ProxyInit,
+ }
+ if self.client_manager:
+ dictionary['client'] = property(_ProxyGetClient, _ProxySetClient)
+ dictionary['manager'] = None
+ dictionary['_client_manager'] = self.client_manager
+ for method in self.methods:
+ dictionary[method.name] = _ProxyMethodCall(method)
+ self._proxy_class = type('%sProxy' % self.name,
+ (self.interface_class, InterfaceProxy),
+ dictionary)
+
+ proxy = self._proxy_class(router, error_handler)
+ # Give an instance manager to the proxy to allow to close the connection.
+ proxy.manager = InstanceManager(router)
+ return proxy
+
+ def _Stub(self, impl):
+ if not self._stub_class:
+ accept_method = _StubAccept(self.methods)
+ dictionary = {
+ '__module__': __name__,
+ '__init__': _StubInit,
+ 'Accept': accept_method,
+ 'AcceptWithResponder': accept_method,
+ }
+ self._stub_class = type('%sStub' % self.name,
+ (messaging.MessageReceiverWithResponder,),
+ dictionary)
+ return self._stub_class(impl)
+
+
+class InstanceManager(object):
+ """
+ Manager for the implementation of an interface or a proxy. The manager allows
+ to control the connection over the pipe.
+ """
+ def __init__(self, router):
+ self.router = router
+
+ def Close(self):
+ self.router.Close()
+
+ def PassMessagePipe(self):
+ return self.router.PassMessagePipe()
+
+
+class _MethodDescriptor(object):
+ def __init__(self, descriptor):
+ self.name = descriptor['name']
+ self.ordinal = descriptor['ordinal']
+ self.parameters_struct = _ConstructParameterStruct(
+ descriptor['parameters'], self.name, "Parameters")
+ self.response_struct = _ConstructParameterStruct(
+ descriptor.get('responses'), self.name, "Responses")
+
+
+def _ConstructParameterStruct(descriptor, name, suffix):
+ if descriptor is None:
+ return None
+ parameter_dictionary = {
+ '__metaclass__': MojoStructType,
+ '__module__': __name__,
+ 'DESCRIPTOR': descriptor,
+ }
+ return MojoStructType(
+ '%s%s' % (name, suffix),
+ (object,),
+ parameter_dictionary)
+
+
+class _ProxyErrorHandler(messaging.ConnectionErrorHandler):
+ def __init__(self):
+ messaging.ConnectionErrorHandler.__init__(self)
+ self._callbacks = set()
+
+ def OnError(self, result):
+ exception = messaging.MessagingException('Mojo error: %d' % result)
+ for callback in list(self._callbacks):
+ callback(exception)
+ self._callbacks = None
+
+ def AddCallback(self, callback):
+ if self._callbacks is not None:
+ self._callbacks.add(callback)
+
+ def RemoveCallback(self, callback):
+ if self._callbacks:
+ self._callbacks.remove(callback)
+
+
+class _Retainer(object):
+
+ # Set to force instances to be retained.
+ _RETAINED = set()
+
+ def __init__(self, retained):
+ self._retained = retained
+ _Retainer._RETAINED.add(self)
+
+ def release(self):
+ self._retained = None
+ _Retainer._RETAINED.remove(self)
+
+
+def _StructInit(fields):
+ def _Init(self, *args, **kwargs):
+ if len(args) + len(kwargs) > len(fields):
+ raise TypeError('__init__() takes %d argument (%d given)' %
+ (len(fields), len(args) + len(kwargs)))
+ self._fields = {}
+ for f, a in zip(fields, args):
+ self.__setattr__(f.name, a)
+ remaining_fields = set(x.name for x in fields[len(args):])
+ for name in kwargs:
+ if not name in remaining_fields:
+ if name in (x.name for x in fields[:len(args)]):
+ raise TypeError(
+ '__init__() got multiple values for keyword argument %r' % name)
+ raise TypeError('__init__() got an unexpected keyword argument %r' %
+ name)
+ self.__setattr__(name, kwargs[name])
+ return _Init
def _BuildProperty(field):
self._fields[field.name] = field.field_type.Convert(value)
return property(Get, Set)
+
+
+def _StructEq(fields):
+ def _Eq(self, other):
+ if type(self) is not type(other):
+ return False
+ for field in fields:
+ if getattr(self, field.name) != getattr(other, field.name):
+ return False
+ return True
+ return _Eq
+
+def _StructNe(self, other):
+ return not self.__eq__(other)
+
+
+def _ProxyInit(self, router, error_handler):
+ self._router = router
+ self._error_handler = error_handler
+ self._client = None
+
+
+# pylint: disable=W0212
+def _ProxyGetClient(self):
+ return self._client
+
+
+# pylint: disable=W0212
+def _ProxySetClient(self, client):
+ self._client = client
+ stub = self._client_manager._Stub(client)
+ self._router.SetIncomingMessageReceiver(stub)
+
+
+# pylint: disable=W0212
+def _ProxyMethodCall(method):
+ flags = messaging.NO_FLAG
+ if method.response_struct:
+ flags = messaging.MESSAGE_EXPECTS_RESPONSE_FLAG
+ def _Call(self, *args, **kwargs):
+ def GenerationMethod(resolve, reject):
+ message = _GetMessage(method, flags, *args, **kwargs)
+ if method.response_struct:
+ def Accept(message):
+ try:
+ assert message.header.message_type == method.ordinal
+ payload = message.payload
+ response = method.response_struct.Deserialize(payload.data,
+ payload.handles)
+ as_dict = response.AsDict()
+ if len(as_dict) == 1:
+ value = as_dict.values()[0]
+ if not isinstance(value, dict):
+ response = value
+ resolve(response)
+ return True
+ except Exception as e:
+ # Adding traceback similarly to python 3.0 (pep-3134)
+ e.__traceback__ = sys.exc_info()[2]
+ reject(e)
+ return False
+ finally:
+ self._error_handler.RemoveCallback(reject)
+
+ self._error_handler.AddCallback(reject)
+ if not self._router.AcceptWithResponder(
+ message, messaging.ForwardingMessageReceiver(Accept)):
+ self._error_handler.RemoveCallback(reject)
+ reject(messaging.MessagingException("Unable to send message."))
+ else:
+ if (self._router.Accept(message)):
+ resolve(None)
+ else:
+ reject(messaging.MessagingException("Unable to send message."))
+ return promise.Promise(GenerationMethod)
+ return _Call
+
+
+def _GetMessage(method, flags, *args, **kwargs):
+ if flags == messaging.MESSAGE_IS_RESPONSE_FLAG:
+ struct = method.response_struct(*args, **kwargs)
+ else:
+ struct = method.parameters_struct(*args, **kwargs)
+ header = messaging.MessageHeader(method.ordinal, flags)
+ data = header.Serialize()
+ (payload, handles) = struct.Serialize()
+ data.extend(payload)
+ return messaging.Message(data, handles, header)
+
+
+def _StubInit(self, impl):
+ self.impl = impl
+
+
+def _StubAccept(methods):
+ methods_by_ordinal = dict((m.ordinal, m) for m in methods)
+ def Accept(self, message, responder=None):
+ try:
+ header = message.header
+ assert header.expects_response == bool(responder)
+ assert header.message_type in methods_by_ordinal
+ method = methods_by_ordinal[header.message_type]
+ payload = message.payload
+ parameters = method.parameters_struct.Deserialize(
+ payload.data, payload.handles).AsDict()
+ response = getattr(self.impl, method.name)(**parameters)
+ if header.expects_response:
+ def SendResponse(response):
+ if isinstance(response, dict):
+ response_message = _GetMessage(method,
+ messaging.MESSAGE_IS_RESPONSE_FLAG,
+ **response)
+ else:
+ response_message = _GetMessage(method,
+ messaging.MESSAGE_IS_RESPONSE_FLAG,
+ response)
+ response_message.header.request_id = header.request_id
+ responder.Accept(response_message)
+ p = promise.Promise.Resolve(response).Then(SendResponse)
+ if self.impl.manager:
+ # Close the connection in case of error.
+ p.Catch(lambda _: self.impl.manager.Close())
+ return True
+ # pylint: disable=W0702
+ except:
+ # Close the connection in case of error.
+ logging.warning(
+ 'Error occured in accept method. Connection will be closed.')
+ if self.impl.manager:
+ self.impl.manager.Close()
+ return False
+ return Accept
+
+
+def _NotImplemented(*_1, **_2):
+ raise NotImplementedError()