upgrade obexd to 0.47
[profile/ivi/obexd.git] / test / pbap-client
index 7456c01..498f8a3 100755 (executable)
 #!/usr/bin/python
 
+import gobject
+
 import sys
+import os
 import dbus
+import dbus.service
+import dbus.mainloop.glib
+
+class Transfer:
+       def __init__(self, callback_func):
+               self.callback_func = callback_func
+               self.path = None
+               self.filename = None
+
+class PbapClient:
+       def __init__(self, session_path):
+               self.transfers = 0
+               self.props = dict()
+               self.flush_func = None
+               bus = dbus.SessionBus()
+               obj = bus.get_object("org.bluez.obex.client", session_path)
+               self.session = dbus.Interface(obj, "org.bluez.obex.Session")
+               self.pbap = dbus.Interface(obj,
+                                       "org.bluez.obex.PhonebookAccess")
+               bus.add_signal_receiver(self.transfer_complete,
+                               dbus_interface="org.bluez.obex.Transfer",
+                               signal_name="Complete",
+                               path_keyword="path")
+               bus.add_signal_receiver(self.transfer_error,
+                               dbus_interface="org.bluez.obex.Transfer",
+                               signal_name="Error",
+                               path_keyword="path")
+
+       def register(self, reply, transfer):
+               (path, properties) = reply
+               transfer.path = path
+               transfer.filename = properties["Filename"]
+               self.props[path] = transfer
+               print "Transfer created: %s (file %s)" % (path,
+                                                       transfer.filename)
+
+       def error(self, err):
+               print err
+               mainloop.quit()
+
+       def transfer_complete(self, path):
+               req = self.props.get(path)
+               if req == None:
+                       return
+               self.transfers -= 1
+               print "Transfer %s finished" % path
+               f = open(req.filename, "r")
+               os.remove(req.filename)
+               lines = f.readlines()
+               del self.props[path]
+               req.callback_func(lines)
+
+               if (len(self.props) == 0) and (self.transfers == 0):
+                       if self.flush_func != None:
+                               f = self.flush_func
+                               self.flush_func = None
+                               f()
+
+       def transfer_error(self, code, message, path):
+               req = self.props.get(path)
+               if req == None:
+                       return
+               print "Transfer finished with error %s: %s" % (code, message)
+               mainloop.quit()
+
+       def pull(self, vcard, func):
+               req = Transfer(func)
+               self.pbap.Pull(vcard, "",
+                               reply_handler=lambda r: self.register(r, req),
+                               error_handler=self.error)
+               self.transfers += 1
+
+       def pull_all(self, func):
+               req = Transfer(func)
+               self.pbap.PullAll("",
+                               reply_handler=lambda r: self.register(r, req),
+                               error_handler=self.error)
+               self.transfers += 1
+
+       def flush_transfers(self, func):
+               if (len(self.props) == 0) and (self.transfers == 0):
+                       return
+               self.flush_func = func
+
+       def interface(self):
+               return self.pbap
+
+if  __name__ == '__main__':
+
+       dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+       bus = dbus.SessionBus()
+       mainloop = gobject.MainLoop()
+
+       client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
+                               "org.bluez.obex.Client")
+
+       if (len(sys.argv) < 2):
+               print "Usage: %s <device>" % (sys.argv[0])
+               sys.exit(1)
+
+       print "Creating Session"
+       session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
+
+       pbap_client = PbapClient(session_path)
+
+       def process_result(lines, header):
+               if header != None:
+                       print header
+               for line in lines:
+                       print line,
+               print
+
+       def test_paths(paths):
+               if len(paths) == 0:
+                       print
+                       print "FINISHED"
+                       mainloop.quit()
+                       return
+
+               path = paths[0]
+
+               print "\n--- Select Phonebook %s ---\n" % (path)
+               pbap_client.interface().Select("int", path)
+
+               print "\n--- GetSize ---\n"
+               ret = pbap_client.interface().GetSize()
+               print "Size = %d\n" % (ret)
+
+               print "\n--- List vCard ---\n"
+               ret = pbap_client.interface().List()
+               for item in ret:
+                       print "%s : %s" % (item[0], item[1])
+                       pbap_client.interface().SetFormat("vcard30")
+                       pbap_client.interface().SetFilter(["VERSION", "FN",
+                                                               "TEL"]);
+                       pbap_client.pull(item[0],
+                                       lambda x: process_result(x, None))
+
+               pbap_client.interface().SetFormat("vcard30")
+               pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]);
+               pbap_client.pull_all(lambda x: process_result(x,
+                                                       "\n--- PullAll ---\n"))
+
+               pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
+
+       test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])
 
-bus = dbus.SessionBus()
-
-client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
-                                               "org.openobex.Client")
-
-print "Creating Session"
-session_path = client.CreateSession({"Destination": sys.argv[1], "Target": "PBAP"})
-pbap = dbus.Interface(bus.get_object("org.openobex.client", session_path),
-                                               "org.openobex.PhonebookAccess")
-session = dbus.Interface(bus.get_object("org.openobex.client", session_path),
-                                                       "org.openobex.Session")
-
-paths = ["PB", "ICH", "OCH", "MCH", "CCH"]
-
-for path in paths:
-       print "\n--- Select Phonebook %s ---\n" % (path)
-       pbap.Select("int", path)
-
-       print "\n--- GetSize ---\n"
-       ret = pbap.GetSize()
-       print "Size = %d\n" % (ret)
-
-       print "\n--- List vCard ---\n"
-       ret = pbap.List()
-       for item in ret:
-               print "%s : %s" % (item[0], item[1])
-               pbap.SetFormat("vcard30")
-               pbap.SetFilter(["VERSION", "FN", "TEL"]);
-               ret = pbap.Pull(item[0])
-               print "%s" % (ret)
-
-       print "\n--- PullAll ---\n"
-       pbap.SetFormat("vcard30")
-       pbap.SetFilter(["VERSION", "FN", "TEL"]);
-       ret = pbap.PullAll()
-       print "%s" % (ret)
+       mainloop.run()