9 import dbus.mainloop.glib
12 def __init__(self, callback_func):
13 self.callback_func = callback_func
18 def __init__(self, session_path):
21 self.flush_func = None
22 bus = dbus.SessionBus()
23 obj = bus.get_object("org.bluez.obex.client", session_path)
24 self.session = dbus.Interface(obj, "org.bluez.obex.Session")
25 self.pbap = dbus.Interface(obj,
26 "org.bluez.obex.PhonebookAccess")
27 bus.add_signal_receiver(self.transfer_complete,
28 dbus_interface="org.bluez.obex.Transfer",
29 signal_name="Complete",
31 bus.add_signal_receiver(self.transfer_error,
32 dbus_interface="org.bluez.obex.Transfer",
36 def register(self, reply, transfer):
37 (path, properties) = reply
39 transfer.filename = properties["Filename"]
40 self.props[path] = transfer
41 print "Transfer created: %s (file %s)" % (path,
48 def transfer_complete(self, path):
49 req = self.props.get(path)
53 print "Transfer %s finished" % path
54 f = open(req.filename, "r")
55 os.remove(req.filename)
58 req.callback_func(lines)
60 if (len(self.props) == 0) and (self.transfers == 0):
61 if self.flush_func != None:
63 self.flush_func = None
66 def transfer_error(self, code, message, path):
67 req = self.props.get(path)
70 print "Transfer finished with error %s: %s" % (code, message)
73 def pull(self, vcard, func):
75 self.pbap.Pull(vcard, "",
76 reply_handler=lambda r: self.register(r, req),
77 error_handler=self.error)
80 def pull_all(self, func):
83 reply_handler=lambda r: self.register(r, req),
84 error_handler=self.error)
87 def flush_transfers(self, func):
88 if (len(self.props) == 0) and (self.transfers == 0):
90 self.flush_func = func
95 if __name__ == '__main__':
97 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
99 bus = dbus.SessionBus()
100 mainloop = gobject.MainLoop()
102 client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
103 "org.bluez.obex.Client")
105 if (len(sys.argv) < 2):
106 print "Usage: %s <device>" % (sys.argv[0])
109 print "Creating Session"
110 session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
112 pbap_client = PbapClient(session_path)
114 def process_result(lines, header):
121 def test_paths(paths):
130 print "\n--- Select Phonebook %s ---\n" % (path)
131 pbap_client.interface().Select("int", path)
133 print "\n--- GetSize ---\n"
134 ret = pbap_client.interface().GetSize()
135 print "Size = %d\n" % (ret)
137 print "\n--- List vCard ---\n"
138 ret = pbap_client.interface().List()
140 print "%s : %s" % (item[0], item[1])
141 pbap_client.interface().SetFormat("vcard30")
142 pbap_client.interface().SetFilter(["VERSION", "FN",
144 pbap_client.pull(item[0],
145 lambda x: process_result(x, None))
147 pbap_client.interface().SetFormat("vcard30")
148 pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]);
149 pbap_client.pull_all(lambda x: process_result(x,
150 "\n--- PullAll ---\n"))
152 pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
154 test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])