tizen 2.3.1 release
[framework/connectivity/bluez.git] / test / pbap-client
1 #!/usr/bin/python
2
3 from __future__ import absolute_import, print_function, unicode_literals
4
5 import os
6 import sys
7 import dbus
8 import dbus.service
9 import dbus.mainloop.glib
10 try:
11   from gi.repository import GObject
12 except ImportError:
13   import gobject as GObject
14
15 BUS_NAME='org.bluez.obex'
16 PATH = '/org/bluez/obex'
17 CLIENT_INTERFACE = 'org.bluez.obex.Client1'
18 SESSION_INTERFACE = 'org.bluez.obex.Session1'
19 PHONEBOOK_ACCESS_INTERFACE = 'org.bluez.obex.PhonebookAccess1'
20 TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
21
22 class Transfer:
23         def __init__(self, callback_func):
24                 self.callback_func = callback_func
25                 self.path = None
26                 self.filename = None
27
28 class PbapClient:
29         def __init__(self, session_path):
30                 self.transfers = 0
31                 self.props = dict()
32                 self.flush_func = None
33                 bus = dbus.SessionBus()
34                 obj = bus.get_object(BUS_NAME, session_path)
35                 self.session = dbus.Interface(obj, SESSION_INTERFACE)
36                 self.pbap = dbus.Interface(obj, PHONEBOOK_ACCESS_INTERFACE)
37                 bus.add_signal_receiver(self.properties_changed,
38                         dbus_interface="org.freedesktop.DBus.Properties",
39                         signal_name="PropertiesChanged",
40                         path_keyword="path")
41
42         def register(self, path, properties, transfer):
43                 transfer.path = path
44                 transfer.filename = properties["Filename"]
45                 self.props[path] = transfer
46                 print("Transfer created: %s (file %s)" % (path,
47                                                         transfer.filename))
48
49         def error(self, err):
50                 print(err)
51                 mainloop.quit()
52
53         def transfer_complete(self, path):
54                 req = self.props.get(path)
55                 if req == None:
56                         return
57                 self.transfers -= 1
58                 print("Transfer %s complete" % path)
59                 try:
60                         f = open(req.filename, "r")
61                         os.remove(req.filename)
62                         lines = f.readlines()
63                         del self.props[path]
64                         req.callback_func(lines)
65                 except:
66                         pass
67
68                 if (len(self.props) == 0) and (self.transfers == 0):
69                         if self.flush_func != None:
70                                 f = self.flush_func
71                                 self.flush_func = None
72                                 f()
73
74         def transfer_error(self, path):
75                 print("Transfer %s error" % path)
76                 mainloop.quit()
77
78         def properties_changed(self, interface, properties, invalidated, path):
79                 req = self.props.get(path)
80                 if req == None:
81                         return
82
83                 if properties['Status'] == 'complete':
84                         self.transfer_complete(path)
85                         return
86
87                 if properties['Status'] == 'error':
88                         self.transfer_error(path)
89                         return
90
91         def pull(self, vcard, params, func):
92                 req = Transfer(func)
93                 self.pbap.Pull(vcard, "", params,
94                         reply_handler=lambda o, p: self.register(o, p, req),
95                         error_handler=self.error)
96                 self.transfers += 1
97
98         def pull_all(self, params, func):
99                 req = Transfer(func)
100                 self.pbap.PullAll("", params,
101                         reply_handler=lambda o, p: self.register(o, p, req),
102                         error_handler=self.error)
103                 self.transfers += 1
104
105         def flush_transfers(self, func):
106                 if (len(self.props) == 0) and (self.transfers == 0):
107                         return
108                 self.flush_func = func
109
110         def interface(self):
111                 return self.pbap
112
113 if  __name__ == '__main__':
114
115         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
116
117         bus = dbus.SessionBus()
118         mainloop = GObject.MainLoop()
119
120         client = dbus.Interface(bus.get_object(BUS_NAME, PATH),
121                                                         CLIENT_INTERFACE)
122
123         if (len(sys.argv) < 2):
124                 print("Usage: %s <device>" % (sys.argv[0]))
125                 sys.exit(1)
126
127         print("Creating Session")
128         session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
129
130         pbap_client = PbapClient(session_path)
131
132         def process_result(lines, header):
133                 if header != None:
134                         print(header)
135                 for line in lines:
136                         print(line),
137                 print
138
139         def test_paths(paths):
140                 if len(paths) == 0:
141                         print
142                         print("FINISHED")
143                         mainloop.quit()
144                         return
145
146                 path = paths[0]
147
148                 print("\n--- Select Phonebook %s ---\n" % (path))
149                 pbap_client.interface().Select("int", path)
150
151                 print("\n--- GetSize ---\n")
152                 ret = pbap_client.interface().GetSize()
153                 print("Size = %d\n" % (ret))
154
155                 print("\n--- List vCard ---\n")
156                 try:
157                         ret = pbap_client.interface().List(dbus.Dictionary())
158                 except:
159                         ret = []
160
161                 params = dbus.Dictionary({ "Format" : "vcard30",
162                                                 "Fields" : ["PHOTO"] })
163                 for item in ret:
164                         print("%s : %s" % (item[0], item[1]))
165                         pbap_client.pull(item[0], params,
166                                         lambda x: process_result(x, None))
167
168                 pbap_client.pull_all(params, lambda x: process_result(x,
169                                                         "\n--- PullAll ---\n"))
170
171                 pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
172
173         test_paths(["PB", "ICH", "OCH", "MCH", "CCH", "SPD", "FAV"])
174
175         mainloop.run()