2005-10-29 Robert McQueen <robot101@debian.org>
+ * python/decorators.py: Add optional arguments to the method and
+ signal decorators to allow you to specify the signature of arguments
+ and return values. Preserve the doc strings of signal functions in the
+ decorated version, for pydoc and friends.
+
+ * python/dbus_bindings.pyx, python/proxies.py: Replace the
+ parse_signature_block function with an iterable dbus.Signature()
+ type. Fix a bug in MessageIter.append_strict where you could append
+ anything by claiming it was a string.
+
+ * python/service.py: Use the out_signature decoration on methods to
+ marshal return values, meaning you no longer require dbus.Array()
+ or dbus.Dictionary() to indicate the type when returning empty
+ arrays or dictionaries. Fix a bug where exceptions which are defined
+ in __main__ are not turned into error replies.
+
+ * test/python/test-client.py, test/python/test-service.py: Add test
+ for correct marshalling of return values according to out_signature.
+ Fix a bug in the async call test where the error_handler is missing a
+ self argument.
+
+2005-10-29 Robert McQueen <robot101@debian.org>
+
* glib/Makefile.am, glib/examples/Makefile.am,
glib/examples/statemachine/Makefile.am: Merge patch from Ubuntu by
Daniel Stone to replace explicit calls to libtool with $(LIBTOOL).
def __init__(self, value):
str.__init__(self, value)
+class SignatureIter(object):
+ def __init__(self, string):
+ object.__init__(self)
+ self.remaining = string
+
+ def next(self):
+ if self.remaining == '':
+ raise StopIteration
+
+ signature = self.remaining
+ block_depth = 0
+ block_type = None
+ end = len(signature)
+
+ for marker in range(0, end):
+ cur_sig = ord(signature[marker])
+
+ if cur_sig == TYPE_ARRAY:
+ pass
+ elif cur_sig == DICT_ENTRY_BEGIN or cur_sig == STRUCT_BEGIN:
+ if block_type == None:
+ block_type = cur_sig
+
+ if block_type == cur_sig:
+ block_depth = block_depth + 1
+
+ elif cur_sig == DICT_ENTRY_END:
+ if block_type == DICT_ENTRY_BEGIN:
+ block_depth = block_depth - 1
+
+ if block_depth == 0:
+ end = marker
+ break
+
+ elif cur_sig == STRUCT_END:
+ if block_type == STRUCT_BEGIN:
+ block_depth = block_depth - 1
+
+ if block_depth == 0:
+ end = marker
+ break
+
+ else:
+ if block_depth == 0:
+ end = marker
+ break
+
+ end = end + 1
+ self.remaining = signature[end:]
+ return Signature(signature[0:end])
+
class Signature(str):
+ """An iterable method signature. Iterating gives the signature of each
+ argument in turn."""
def __init__(self, value):
- str.__init__(self, value)
+ return str.__init__(self, value)
+
+ def __iter__(self):
+ return SignatureIter(self)
+
+class VariantSignature(object):
+ """A fake method signature which when iterated, is an endless stream
+ of variants (handy with zip()). It has no string representation."""
+ def __iter__(self):
+ return self
+
+ def next(self):
+ return 'v'
class Byte(int):
def __init__(self, value):
return ret
- def parse_signature_block(self, signature):
- remainder = ''
- sig = ''
- block_depth = 0
- block_type = None
-
- for marker in range(0, len(signature)):
- cur_sig = ord(signature[marker])
-
- if cur_sig == TYPE_ARRAY:
- pass
- elif cur_sig == DICT_ENTRY_BEGIN or cur_sig == STRUCT_BEGIN:
- if block_type == None:
- block_type = cur_sig
-
- if block_type == cur_sig:
- block_depth = block_depth + 1
-
- elif cur_sig == DICT_ENTRY_END:
- if block_type == DICT_ENTRY_BEGIN:
- block_depth = block_depth - 1
-
- if block_depth == 0:
- break
-
- elif cur_sig == STRUCT_END:
- if block_type == STRUCT_BEGIN:
- block_depth = block_depth - 1
-
- if block_depth == 0:
- break
-
- else:
- if block_depth == 0:
- break
-
- marker = marker + 1
- sig = signature[0:marker]
- remainder = signature[marker:]
- return (sig, remainder)
-
def append_strict(self, value, sig):
if sig == TYPE_INVALID or sig == None:
sig_type = ord(sig[0])
if sig_type == TYPE_STRING:
- retval = self.append(value)
+ retval = self.append_string(value)
elif sig_type == TYPE_INT16:
retval = self.append_int16(value)
elif sig_type == TYPE_UINT16:
dict_entry_iter.__cinit__(&c_dict_entry_iter)
if signature:
- (tmp_sig, remainder) = self.parse_signature_block(signature)
+ signature_iter = iter(Signature(signature))
+ tmp_sig = signature_iter.next()
if not dict_entry_iter.append_strict(key, tmp_sig):
dbus_message_iter_close_container(dict_iter.iter, dict_entry_iter.iter)
dbus_message_iter_close_container(self.iter, dict_iter.iter)
return False
- (tmp_sig, remainder) = self.parse_signature_block(remainder)
+ tmp_sig = signature_iter.next()
if not dict_entry_iter.append_strict(value, tmp_sig):
dbus_message_iter_close_container(dict_iter.iter, dict_entry_iter.iter)
dbus_message_iter_close_container(self.iter, dict_iter.iter)
struct_iter = MessageIter(level)
struct_iter.__cinit__(&c_struct_iter)
- remainder = signature
+ signature_iter = iter(Signature(signature))
for item in python_struct:
if signature:
- (sig, remainder) = self.parse_signature_block(remainder)
+ sig = signature_iter.next()
if sig == '':
dbus_message_iter_close_container(self.iter, struct_iter.iter)
-import _util
+import _util
import inspect
import dbus_bindings
-def method(dbus_interface):
+def method(dbus_interface, in_signature=None, out_signature=None):
_util._validate_interface_or_name(dbus_interface)
def decorator(func):
func._dbus_is_method = True
func._dbus_interface = dbus_interface
+ func._dbus_in_signature = in_signature
+ func._dbus_out_signature = out_signature
func._dbus_args = inspect.getargspec(func)[0]
func._dbus_args.pop(0)
return func
return decorator
-def signal(dbus_interface):
+def signal(dbus_interface, signature=None):
_util._validate_interface_or_name(dbus_interface)
def decorator(func):
def emit_signal(self, *args, **keywords):
self._connection.send(message)
+ emit_signal.__name__ = func.__name__
+ emit_signal.__doc__ = func.__doc__
emit_signal._dbus_is_signal = True
emit_signal._dbus_interface = dbus_interface
- emit_signal.__name__ = func.__name__
+ emit_signal._dbus_signature = signature
emit_signal._dbus_args = inspect.getargspec(func)[0]
emit_signal._dbus_args.pop(0)
return emit_signal
# Add the arguments to the function
iter = message.get_iter(True)
- remainder = self._introspect_sig
- for arg in args:
- if self._introspect_sig:
- (sig, remainder) = iter.parse_signature_block(remainder)
+ if self._introspect_sig:
+ for (arg, sig) in zip(args, dbus_bindings.Signature(self._introspect_sig)):
iter.append_strict(arg, sig)
- else:
+ else:
+ for arg in args:
iter.append(arg)
-
+
if ignore_reply:
result = self._connection.send(message)
args_tuple = (result,)
import dbus_bindings
import _dbus
+import operator
from exceptions import UnknownMethodException
from decorators import method
from decorators import signal
error_name = e.__class__.__name__
else:
error_name = e.__module__ + '.' + str(e.__class__.__name__)
- error_contents = str(e)
- reply = dbus_bindings.Error(message, error_name, error_contents)
+
+ error_contents = str(e)
+ reply = dbus_bindings.Error(message, error_name, error_contents)
else:
reply = dbus_bindings.MethodReturn(message)
- if retval != None:
+
+ # temporary - about to replace the method lookup code...
+ target_parent = target_method
+ target_name = str(target_method)
+
+ # do strict adding if an output signature was provided
+ if target_parent._dbus_out_signature != None:
+ # iterate signature into list of complete types
+ signature = tuple(dbus_bindings.Signature(target_parent._dbus_out_signature))
+
+ if retval == None:
+ if len(signature) != 0:
+ raise TypeError('%s returned nothing but output signature is %s' %
+ (target_name, target_parent._dbus_out_signature))
+ elif len(signature) == 1:
+ iter = reply.get_iter(append=True)
+ iter.append_strict(retval, signature[0])
+ elif len(signature) > 1:
+ if operator.isSequenceType(retval):
+ if len(signature) > len(retval):
+ raise TypeError('output signature %s is longer than the number of values returned by %s' %
+ (target_parent._dbus_out_signature, target_name))
+ elif len(retval) > len(signature):
+ raise TypeError('output signature %s is shorter than the number of values returned by %s' %
+ (target_parent._dbus_out_signature, target_name))
+ else:
+ iter = reply.get_iter(append=True)
+ for (value, sig) in zip(retval, signature):
+ iter.append_strict(value, sig)
+ else:
+ raise TypeError('output signature %s has multiple values but %s didn\'t return a sequence type' %
+ (target_parent._dbus_out_signature, target_name))
+
+ # try and guess the return type
+ elif retval != None:
iter = reply.get_iter(append=True)
iter.append(retval)
-
return reply
class ObjectType(type):
self.test_controler.assertEquals(val, self.expected_result)
- def error_handler(error):
+ def error_handler(self, error):
print error
if self.do_exit:
main_loop.quit()
main_loop.run()
+ def testReturnMarshalling(self):
+ print "\n********* Testing return marshalling ***********"
+
+ # these values are the same as in the server, and the
+ # methods should only succeed when they are called with
+ # the right value number, because they have out_signature
+ # decorations, and return an unmatching type when called
+ # with a different number
+ values = ["", ("",""), ("","",""), [], {}, ["",""], ["","",""]]
+ methods = [
+ (self.iface.ReturnOneString, set([0]), set([0])),
+ (self.iface.ReturnTwoStrings, set([1, 5]), set([5])),
+ (self.iface.ReturnStruct, set([1, 5]), set([1])),
+ # all of our test values are sequences so will marshall correctly into an array :P
+ (self.iface.ReturnArray, set(range(len(values))), set([3, 5, 6])),
+ (self.iface.ReturnDict, set([0, 3, 4]), set([4]))
+ ]
+
+ for (method, success_values, return_values) in methods:
+ print "\nTrying correct behaviour of", method._method_name
+ for value in range(len(values)):
+ try:
+ ret = method(value)
+ except Exception, e:
+ print "%s(%s) raised %s" % (method._method_name, repr(values[value]), e.__class__)
+
+ # should fail if it tried to marshal the wrong type
+ self.assert_(value not in success_values, "%s should succeed when we ask it to return %s\n%s" % (method._method_name, repr(values[value]), e))
+ else:
+ print "%s(%s) returned %s" % (method._method_name, repr(values[value]), repr(ret))
+
+ # should only succeed if it's the right return type
+ self.assert_(value in success_values, "%s should fail when we ask it to return %s" % (method._method_name, repr(values[value])))
+
+ # check the value is right too :D
+ returns = map(lambda n: values[n], return_values)
+ self.assert_(ret in returns, "%s should return one of %s" % (method._method_name, repr(returns)))
+ print
+
class TestDBusPythonToGLibBindings(unittest.TestCase):
def setUp(self):
self.bus = dbus.SessionBus()
return dbus.Array(ret, signature="(uus)")
+ def returnValue(self, test):
+ if test == 0:
+ return ""
+ elif test == 1:
+ return "",""
+ elif test == 2:
+ return "","",""
+ elif test == 3:
+ return []
+ elif test == 4:
+ return {}
+ elif test == 5:
+ return ["",""]
+ elif test == 6:
+ return ["","",""]
+
+ @dbus.service.method("org.freedesktop.DBus.TestSuiteInterface", in_signature='u', out_signature='s')
+ def ReturnOneString(self, test):
+ return self.returnValue(test)
+
+ @dbus.service.method("org.freedesktop.DBus.TestSuiteInterface", in_signature='u', out_signature='ss')
+ def ReturnTwoStrings(self, test):
+ return self.returnValue(test)
+
+ @dbus.service.method("org.freedesktop.DBus.TestSuiteInterface", in_signature='u', out_signature='(ss)')
+ def ReturnStruct(self, test):
+ return self.returnValue(test)
+
+ @dbus.service.method("org.freedesktop.DBus.TestSuiteInterface", in_signature='u', out_signature='as')
+ def ReturnArray(self, test):
+ return self.returnValue(test)
+
+ @dbus.service.method("org.freedesktop.DBus.TestSuiteInterface", in_signature='u', out_signature='a{ss}')
+ def ReturnDict(self, test):
+ return self.returnValue(test)
+
session_bus = dbus.SessionBus()
name = dbus.service.BusName("org.freedesktop.DBus.TestSuitePythonService", bus=session_bus)
object = TestObject(name)