cdef class Message
cdef class PendingCall
cdef class Watch
+cdef class MessageIter
cdef void cunregister_function_handler (DBusConnection *connection,
void *user_data):
#FIXME: this is totally busted, don't use a class shared member like parsed_path
def _build_parsed_path(self, path_element_list):
- cdef char **cpatharray
+ cdef char **cpatharray
+ cdef int i
+ cdef int size
size = len(path_element_list)
cpatharray = <char **>malloc(sizeof(char*) * (size + 1))
- for i in range(size):
+ for i from 0 <= i < size:
path_element = path_element_list[i]
cpatharray[i] = path_element
cdef DBusMessageIter *iter
cdef DBusMessageIter real_iter
-
- def __init__(self, Message message):
- cdef DBusMessage *msg
+ def __init__(self):
self.iter = &self.real_iter
- msg = message._get_msg()
- dbus_message_iter_init(msg, self.iter)
+
+ cdef __cinit__(self, DBusMessageIter *iter):
+ self.real_iter = iter[0]
cdef DBusMessageIter *_get_iter(self):
return self.iter
return retval
def get_dict(self):
- cdef DBusMessageIter dict_iter
- cdef DBusMessageIter* old_iter
+ cdef DBusMessageIter c_dict_iter
+ cdef MessageIter dict_iter
+
+ dbus_message_iter_init_dict_iterator(self.iter, &c_dict_iter)
+
+ dict_iter = MessageIter()
+ dict_iter.__cinit__(&c_dict_iter)
dict = {}
- dbus_message_iter_init_dict_iterator(self.iter, &dict_iter)
- # FIXME: nasty hack so we can use existing self.get() method
- old_iter = self.iter
- self.iter = &dict_iter
+
+ end_of_dict = False
while True:
- key = self.get_dict_key()
- value = self.get()
+ key = dict_iter.get_dict_key()
+ value = dict_iter.get()
dict[key] = value
- if not self.has_next():
+ if not dict_iter.has_next():
break
- self.next()
+ dict_iter.next()
- self.iter = old_iter
return dict
def get_arg_type(self):
def get_byte_array(self):
cdef int len
cdef unsigned char *retval
+ cdef int i
dbus_message_iter_get_byte_array(self.iter, &retval, <int*>&len)
list = []
for i from 0 <= i < len:
def get_int32_array(self):
cdef int len
cdef dbus_int32_t *retval
+ cdef int i
dbus_message_iter_get_int32_array(self.iter, &retval, <int*>&len)
python_list = []
for i from 0 <= i < len:
def get_uint32_array(self):
cdef int len
cdef dbus_uint32_t *retval
+ cdef int i
dbus_message_iter_get_uint32_array(self.iter, &retval, <int*>&len)
python_list = []
for i from 0 <= i < len:
def get_double_array(self):
cdef int len
cdef double *retval
+ cdef int i
dbus_message_iter_get_double_array(self.iter, &retval, <int*>&len)
python_list = []
for i from 0 <= i < len:
def get_string_array(self):
cdef int len
cdef char **retval
-
+ cdef int i
dbus_message_iter_get_string_array(self.iter, &retval, <int*>&len)
list = []
for i from 0 <= i < len:
def get_object_path_array(self):
cdef int len
cdef char **retval
-
+ cdef int i
dbus_message_iter_get_object_path_array(self.iter, &retval, <int*>&len)
list = []
for i from 0 <= i < len:
retval = self.append_boolean(value)
elif value_type == int:
retval = self.append_int32(value)
- elif value_type == float:
- retval = self.append_double(value)
elif value_type == str:
retval = self.append_string(value)
+ elif value_type == float:
+ retval = self.append_double(value)
+ elif value_type == dict:
+ retval = self.append_dict(value)
elif value_type == list:
if (len(value) == 0):
- raise TypeError, "Empty list"
+ raise TypeError, "Empty lists are currently not supported, return None instead"
list_type = type(value[0])
if list_type == str:
self.append_string_array(value)
def append_object_path(self, value):
return dbus_message_iter_append_object_path(self.iter, value)
- # FIXME: append_array, append_dict_array, append_boolean_array, append_uint32_array
+ # FIXME: append_array, append_boolean_array, append_uint32_array
+
+ def append_dict(self, python_dict):
+ cdef DBusMessageIter c_dict_iter
+ cdef MessageIter dict_iter
+
+ dbus_message_iter_append_dict(self.iter, &c_dict_iter)
+
+ dict_iter = MessageIter()
+ dict_iter.__cinit__(&c_dict_iter)
+
+ for key, value in python_dict.iteritems():
+ if type(key) != str:
+ raise TypeError, "DBus dict keys must be strings"
+ dict_iter.append_dict_key(key)
+ dict_iter.append(value)
def append_byte_array(self, python_list):
cdef unsigned char * value
cdef int length
+ cdef int i
length = len(python_list)
value = <unsigned char*>malloc(length * sizeof(unsigned char))
for i from 0 <= i < length:
def append_int32_array(self, python_list):
cdef dbus_int32_t *value
cdef int length
+ cdef int i
length = len(python_list)
value = <dbus_int32_t*>malloc(length * sizeof(dbus_int32_t))
for i from 0 <= i < length:
def append_double_array(self, python_list):
cdef double *value
cdef int length
+ cdef int i
length = len(python_list)
value = <double*>malloc(length * sizeof(double))
for i from 0 <= i < length:
def append_object_path_array(self, list):
cdef char **value
cdef int length
+ cdef int i
length = len(list)
value = <char**>malloc(length * sizeof(char *))
for i from 0 <= i < length:
cdef char **value
cdef int length
cdef dbus_bool_t return_code
+ cdef int i
length = len(python_list)
value = <char**>malloc(length * sizeof(char *))
for i from 0 <= i < length:
return self.msg
def get_iter(self):
- return MessageIter(self)
+ cdef DBusMessageIter iter
+ cdef MessageIter message_iter
+ cdef DBusMessage *msg
+
+ msg = self._get_msg()
+ dbus_message_iter_init(msg, &iter)
+
+ message_iter = MessageIter()
+ message_iter.__cinit__(&iter)
+
+ return message_iter
def get_args_list(self):
retval = [ ]
for i in range(len(sent_list)):
ensure_same(sent_list[i], reply_list[i])
+
+def TestEchoDict(sent_dict):
+ assert(type(sent_dict) == dict)
+
+ global remote_object
+
+ reply_dict = remote_object.Echo(sent_dict)
+
+
+ assert(type(reply_dict) == dict)
+
+ assert(len(reply_dict) == len(sent_dict))
+
+ for key in sent_dict.keys():
+ ensure_same(reply_dict[key], sent_dict[key])
session_bus = dbus.SessionBus()
for i in range(200):
double_list.append(float(i) / 1000)
TestEchoList(double_list)
+
+#FIXME: this currently fails!
+#empty_list = []
+#TestEchoList(empty_list)
+
+string_to_int_dict = {}
+for i in range(200):
+ string_to_int_dict["key" + str(i)] = i
+TestEchoDict(string_to_int_dict)
+
+string_to_double_dict = {}
+for i in range(200):
+ string_to_double_dict["key" + str(i)] = float(i) / 1000
+TestEchoDict(string_to_double_dict)
+
+string_to_string_dict = {}
+for i in range(200):
+ string_to_string_dict["key" + str(i)] = "value" + str(i)
+TestEchoDict(string_to_string_dict)
+
+#FIXME: this currently crashes dbus in c code
+#empty_dict = {}
+#TestEchoDict(empty_dict)