Upgrade bluez5_37 :Merge the code from private
[platform/upstream/bluez.git] / test / test-health
1 #!/usr/bin/python
2
3 from __future__ import absolute_import, print_function, unicode_literals
4 # -*- coding: utf-8 -*-
5
6 import sys
7 import dbus
8 import dbus.service
9 from dbus.mainloop.glib import DBusGMainLoop
10 try:
11   from gi.repository import GObject
12 except ImportError:
13   import gobject as GObject
14
15 BUS_NAME = 'org.bluez'
16 PATH = '/org/bluez'
17 ADAPTER_INTERFACE = 'org.bluez.Adapter1'
18 HEALTH_MANAGER_INTERFACE = 'org.bluez.HealthManager1'
19 HEALTH_DEVICE_INTERFACE = 'org.bluez.HealthDevice1'
20
21 DBusGMainLoop(set_as_default=True)
22 loop = GObject.MainLoop()
23
24 bus = dbus.SystemBus()
25
26 def sig_received(*args, **kwargs):
27         if "member" not in kwargs:
28                 return
29         if "path" not in kwargs:
30                 return;
31         sig_name = kwargs["member"]
32         path = kwargs["path"]
33         print(sig_name)
34         print(path)
35         if sig_name == "PropertyChanged":
36                 k, v = args
37                 print(k)
38                 print(v)
39         else:
40                 ob = args[0]
41                 print(ob)
42
43
44 def enter_mainloop():
45         bus.add_signal_receiver(sig_received, bus_name=BUS_NAME,
46                                 dbus_interface=HEALTH_DEVICE_INTERFACE,
47                                 path_keyword="path",
48                                 member_keyword="member",
49                                 interface_keyword="interface")
50
51         try:
52                 print("Entering main lopp, push Ctrl+C for finish")
53
54                 mainloop = GObject.MainLoop()
55                 mainloop.run()
56         except KeyboardInterrupt:
57                 pass
58         finally:
59                 print("Exiting, bye")
60
61 hdp_manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
62                                                 HEALTH_MANAGER_INTERFACE)
63
64 role = None
65 while role == None:
66         print("Select 1. source or 2. sink: ",)
67         try:
68                 sel = int(sys.stdin.readline())
69                 if sel == 1:
70                         role = "source"
71                 elif sel == 2:
72                         role = "sink"
73                 else:
74                         raise ValueError
75         except (TypeError, ValueError):
76                 print("Wrong selection, try again: ",)
77         except KeyboardInterrupt:
78                 sys.exit()
79
80 dtype = None
81 while dtype == None:
82         print("Select a data type: ",)
83         try:
84                 sel = int(sys.stdin.readline())
85                 if (sel < 0) or (sel > 65535):
86                         raise ValueError
87                 dtype = sel;
88         except (TypeError, ValueError):
89                 print("Wrong selection, try again: ",)
90         except KeyboardInterrupt:
91                 sys.exit()
92
93 pref = None
94 if role == "source":
95         while pref == None:
96                 try:
97                         print("Select a preferred data channel type 1.",)
98                         print("reliable 2. streaming: ",)
99                         sel = int(sys.stdin.readline())
100                         if sel == 1:
101                                 pref = "reliable"
102                         elif sel == 2:
103                                 pref = "streaming"
104                         else:
105                                 raise ValueError
106
107                 except (TypeError, ValueError):
108                         print("Wrong selection, try again")
109                 except KeyboardInterrupt:
110                         sys.exit()
111
112         app_path = hdp_manager.CreateApplication({
113                                         "DataType": dbus.types.UInt16(dtype),
114                                         "Role": role,
115                                         "Description": "Test Source",
116                                         "ChannelType": pref})
117 else:
118         app_path = hdp_manager.CreateApplication({
119                                         "DataType": dbus.types.UInt16(dtype),
120                                         "Description": "Test sink",
121                                         "Role": role})
122
123 print("New application created:", app_path)
124
125 con = None
126 while con == None:
127         try:
128                 print("Connect to a remote device (y/n)? ",)
129                 sel = sys.stdin.readline()
130                 if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
131                         con = True
132                 elif sel in ("n\n", "no\n", "N\n", "NO\n"):
133                         con = False
134                 else:
135                         print("Wrong selection, try again.")
136         except KeyboardInterrupt:
137                 sys.exit()
138
139 if not con:
140         enter_mainloop()
141         sys.exit()
142
143 manager = dbus.Interface(bus.get_object(BUS_NAME, "/"),
144                                         "org.freedesktop.DBus.ObjectManager")
145
146 objects = manager.GetManagedObjects()
147 adapters = []
148
149 for path, ifaces in objects.iteritems():
150         if ifaces.has_key(ADAPTER_INTERFACE):
151                 adapters.append(path)
152
153 i = 1
154 for ad in adapters:
155         print("%d. %s" % (i, ad))
156         i = i + 1
157
158 print("Select an adapter: ",)
159 select = None
160 while select == None:
161         try:
162                 pos = int(sys.stdin.readline()) - 1
163                 if pos < 0:
164                         raise TypeError
165                 select = adapters[pos]
166         except (TypeError, IndexError, ValueError):
167                 print("Wrong selection, try again: ",)
168         except KeyboardInterrupt:
169                 sys.exit()
170
171 adapter = dbus.Interface(bus.get_object(BUS_NAME, select), ADAPTER_INTERFACE)
172
173 devices = []
174 for path, interfaces in objects.iteritems():
175         if "org.bluez.Device1" not in interfaces:
176                 continue
177         properties = interfaces["org.bluez.Device1"]
178         if properties["Adapter"] != select:
179                 continue;
180
181         if HEALTH_DEVICE_INTERFACE not in interfaces:
182                 continue
183         devices.append(path)
184
185 if len(devices) == 0:
186         print("No devices available")
187         sys.exit()
188
189 i = 1
190 for dev in devices:
191         print("%d. %s" % (i, dev))
192         i = i + 1
193
194 print("Select a device: ",)
195 select = None
196 while select == None:
197         try:
198                 pos = int(sys.stdin.readline()) - 1
199                 if pos < 0:
200                         raise TypeError
201                 select = devices[pos]
202         except (TypeError, IndexError, ValueError):
203                 print("Wrong selection, try again: ",)
204         except KeyboardInterrupt:
205                 sys.exit()
206
207 device = dbus.Interface(bus.get_object(BUS_NAME, select),
208                                         HEALTH_DEVICE_INTERFACE)
209
210 echo = None
211 while echo == None:
212         try:
213                 print("Perform an echo (y/n)? ",)
214                 sel = sys.stdin.readline()
215                 if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
216                         echo = True
217                 elif sel in ("n\n", "no\n", "N\n", "NO\n"):
218                         echo = False
219                 else:
220                         print("Wrong selection, try again.")
221         except KeyboardInterrupt:
222                 sys.exit()
223
224 if echo:
225         if device.Echo():
226                 print("Echo was ok")
227         else:
228                 print("Echo war wrong, exiting")
229                 sys.exit()
230
231 print("Connecting to device %s" % (select))
232
233 if role == "source":
234         chan = device.CreateChannel(app_path, "reliable")
235 else:
236         chan = device.CreateChannel(app_path, "any")
237
238 print(chan)
239
240 enter_mainloop()
241
242 hdp_manager.DestroyApplication(app_path)