upgrade obexd to 0.47
[profile/ivi/obexd.git] / test / pbap-client
1 #!/usr/bin/python
2
3 import gobject
4
5 import sys
6 import os
7 import dbus
8 import dbus.service
9 import dbus.mainloop.glib
10
11 class Transfer:
12         def __init__(self, callback_func):
13                 self.callback_func = callback_func
14                 self.path = None
15                 self.filename = None
16
17 class PbapClient:
18         def __init__(self, session_path):
19                 self.transfers = 0
20                 self.props = dict()
21                 self.flush_func = None
22                 bus = dbus.SessionBus()
23                 obj = bus.get_object("org.bluez.obex.client", session_path)
24                 self.session = dbus.Interface(obj, "org.bluez.obex.Session")
25                 self.pbap = dbus.Interface(obj,
26                                         "org.bluez.obex.PhonebookAccess")
27                 bus.add_signal_receiver(self.transfer_complete,
28                                 dbus_interface="org.bluez.obex.Transfer",
29                                 signal_name="Complete",
30                                 path_keyword="path")
31                 bus.add_signal_receiver(self.transfer_error,
32                                 dbus_interface="org.bluez.obex.Transfer",
33                                 signal_name="Error",
34                                 path_keyword="path")
35
36         def register(self, reply, transfer):
37                 (path, properties) = reply
38                 transfer.path = path
39                 transfer.filename = properties["Filename"]
40                 self.props[path] = transfer
41                 print "Transfer created: %s (file %s)" % (path,
42                                                         transfer.filename)
43
44         def error(self, err):
45                 print err
46                 mainloop.quit()
47
48         def transfer_complete(self, path):
49                 req = self.props.get(path)
50                 if req == None:
51                         return
52                 self.transfers -= 1
53                 print "Transfer %s finished" % path
54                 f = open(req.filename, "r")
55                 os.remove(req.filename)
56                 lines = f.readlines()
57                 del self.props[path]
58                 req.callback_func(lines)
59
60                 if (len(self.props) == 0) and (self.transfers == 0):
61                         if self.flush_func != None:
62                                 f = self.flush_func
63                                 self.flush_func = None
64                                 f()
65
66         def transfer_error(self, code, message, path):
67                 req = self.props.get(path)
68                 if req == None:
69                         return
70                 print "Transfer finished with error %s: %s" % (code, message)
71                 mainloop.quit()
72
73         def pull(self, vcard, func):
74                 req = Transfer(func)
75                 self.pbap.Pull(vcard, "",
76                                 reply_handler=lambda r: self.register(r, req),
77                                 error_handler=self.error)
78                 self.transfers += 1
79
80         def pull_all(self, func):
81                 req = Transfer(func)
82                 self.pbap.PullAll("",
83                                 reply_handler=lambda r: self.register(r, req),
84                                 error_handler=self.error)
85                 self.transfers += 1
86
87         def flush_transfers(self, func):
88                 if (len(self.props) == 0) and (self.transfers == 0):
89                         return
90                 self.flush_func = func
91
92         def interface(self):
93                 return self.pbap
94
95 if  __name__ == '__main__':
96
97         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
98
99         bus = dbus.SessionBus()
100         mainloop = gobject.MainLoop()
101
102         client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
103                                 "org.bluez.obex.Client")
104
105         if (len(sys.argv) < 2):
106                 print "Usage: %s <device>" % (sys.argv[0])
107                 sys.exit(1)
108
109         print "Creating Session"
110         session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
111
112         pbap_client = PbapClient(session_path)
113
114         def process_result(lines, header):
115                 if header != None:
116                         print header
117                 for line in lines:
118                         print line,
119                 print
120
121         def test_paths(paths):
122                 if len(paths) == 0:
123                         print
124                         print "FINISHED"
125                         mainloop.quit()
126                         return
127
128                 path = paths[0]
129
130                 print "\n--- Select Phonebook %s ---\n" % (path)
131                 pbap_client.interface().Select("int", path)
132
133                 print "\n--- GetSize ---\n"
134                 ret = pbap_client.interface().GetSize()
135                 print "Size = %d\n" % (ret)
136
137                 print "\n--- List vCard ---\n"
138                 ret = pbap_client.interface().List()
139                 for item in ret:
140                         print "%s : %s" % (item[0], item[1])
141                         pbap_client.interface().SetFormat("vcard30")
142                         pbap_client.interface().SetFilter(["VERSION", "FN",
143                                                                 "TEL"]);
144                         pbap_client.pull(item[0],
145                                         lambda x: process_result(x, None))
146
147                 pbap_client.interface().SetFormat("vcard30")
148                 pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]);
149                 pbap_client.pull_all(lambda x: process_result(x,
150                                                         "\n--- PullAll ---\n"))
151
152                 pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
153
154         test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])
155
156         mainloop.run()