3 from __future__ import absolute_import, print_function, unicode_literals
9 import dbus.mainloop.glib
11 from gi.repository import GObject
13 import gobject as GObject
15 BUS_NAME='org.bluez.obex'
16 PATH = '/org/bluez/obex'
17 CLIENT_INTERFACE = 'org.bluez.obex.Client1'
18 SESSION_INTERFACE = 'org.bluez.obex.Session1'
19 PHONEBOOK_ACCESS_INTERFACE = 'org.bluez.obex.PhonebookAccess1'
20 TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
23 def __init__(self, callback_func):
24 self.callback_func = callback_func
29 def __init__(self, session_path):
32 self.flush_func = None
33 bus = dbus.SessionBus()
34 obj = bus.get_object(BUS_NAME, session_path)
35 self.session = dbus.Interface(obj, SESSION_INTERFACE)
36 self.pbap = dbus.Interface(obj, PHONEBOOK_ACCESS_INTERFACE)
37 bus.add_signal_receiver(self.properties_changed,
38 dbus_interface="org.freedesktop.DBus.Properties",
39 signal_name="PropertiesChanged",
42 def register(self, path, properties, transfer):
44 transfer.filename = properties["Filename"]
45 self.props[path] = transfer
46 print("Transfer created: %s (file %s)" % (path,
53 def transfer_complete(self, path):
54 req = self.props.get(path)
58 print("Transfer %s complete" % path)
60 f = open(req.filename, "r")
61 os.remove(req.filename)
64 req.callback_func(lines)
68 if (len(self.props) == 0) and (self.transfers == 0):
69 if self.flush_func != None:
71 self.flush_func = None
74 def transfer_error(self, path):
75 print("Transfer %s error" % path)
78 def properties_changed(self, interface, properties, invalidated, path):
79 req = self.props.get(path)
83 if properties['Status'] == 'complete':
84 self.transfer_complete(path)
87 if properties['Status'] == 'error':
88 self.transfer_error(path)
91 def pull(self, vcard, params, func):
93 self.pbap.Pull(vcard, "", params,
94 reply_handler=lambda o, p: self.register(o, p, req),
95 error_handler=self.error)
98 def pull_all(self, params, func):
100 self.pbap.PullAll("", params,
101 reply_handler=lambda o, p: self.register(o, p, req),
102 error_handler=self.error)
105 def flush_transfers(self, func):
106 if (len(self.props) == 0) and (self.transfers == 0):
108 self.flush_func = func
113 if __name__ == '__main__':
115 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
117 bus = dbus.SessionBus()
118 mainloop = GObject.MainLoop()
120 client = dbus.Interface(bus.get_object(BUS_NAME, PATH),
123 if (len(sys.argv) < 2):
124 print("Usage: %s <device>" % (sys.argv[0]))
127 print("Creating Session")
128 session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
130 pbap_client = PbapClient(session_path)
132 def process_result(lines, header):
139 def test_paths(paths):
148 print("\n--- Select Phonebook %s ---\n" % (path))
149 pbap_client.interface().Select("int", path)
151 print("\n--- GetSize ---\n")
152 ret = pbap_client.interface().GetSize()
153 print("Size = %d\n" % (ret))
155 print("\n--- List vCard ---\n")
157 ret = pbap_client.interface().List(dbus.Dictionary())
161 params = dbus.Dictionary({ "Format" : "vcard30",
162 "Fields" : ["PHOTO"] })
164 print("%s : %s" % (item[0], item[1]))
165 pbap_client.pull(item[0], params,
166 lambda x: process_result(x, None))
168 pbap_client.pull_all(params, lambda x: process_result(x,
169 "\n--- PullAll ---\n"))
171 pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
173 test_paths(["PB", "ICH", "OCH", "MCH", "CCH", "SPD", "FAV"])