3 from __future__ import absolute_import, print_function, unicode_literals
4 # -*- coding: utf-8 -*-
9 from dbus.mainloop.glib import DBusGMainLoop
12 DBusGMainLoop(set_as_default=True)
13 loop = gobject.MainLoop()
15 bus = dbus.SystemBus()
17 def sig_received(*args, **kwargs):
18 if "member" not in kwargs:
20 if "path" not in kwargs:
22 sig_name = kwargs["member"]
26 if sig_name == "PropertyChanged":
36 bus.add_signal_receiver(sig_received, bus_name="org.bluez",
37 dbus_interface = "org.bluez.HealthDevice",
39 member_keyword="member",
40 interface_keyword="interface")
43 print("Entering main lopp, push Ctrl+C for finish")
45 mainloop = gobject.MainLoop()
47 except KeyboardInterrupt:
52 hdp_manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"),
53 "org.bluez.HealthManager")
57 print("Select 1. source or 2. sink: ",)
59 sel = int(sys.stdin.readline())
66 except (TypeError, ValueError):
67 print("Wrong selection, try again: ",)
68 except KeyboardInterrupt:
73 print("Select a data type: ",)
75 sel = int(sys.stdin.readline())
76 if (sel < 0) or (sel > 65535):
79 except (TypeError, ValueError):
80 print("Wrong selection, try again: ",)
81 except KeyboardInterrupt:
88 print("Select a preferred data channel type 1.",)
89 print("reliable 2. streaming: ",)
90 sel = int(sys.stdin.readline())
98 except (TypeError, ValueError):
99 print("Wrong selection, try again")
100 except KeyboardInterrupt:
103 app_path = hdp_manager.CreateApplication({
104 "DataType": dbus.types.UInt16(dtype),
106 "Description": "Test Source",
107 "ChannelType": pref})
109 app_path = hdp_manager.CreateApplication({
110 "DataType": dbus.types.UInt16(dtype),
111 "Description": "Test sink",
114 print("New application created:", app_path)
119 print("Connect to a remote device (y/n)? ",)
120 sel = sys.stdin.readline()
121 if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
123 elif sel in ("n\n", "no\n", "N\n", "NO\n"):
126 print("Wrong selection, try again.")
127 except KeyboardInterrupt:
134 manager = dbus.Interface(bus.get_object("org.bluez", "/"),
137 adapters = manager.ListAdapters()
141 print("%d. %s" % (i, ad))
144 print("Select an adapter: ",)
146 while select == None:
148 pos = int(sys.stdin.readline()) - 1
151 select = adapters[pos]
152 except (TypeError, IndexError, ValueError):
153 print("Wrong selection, try again: ",)
154 except KeyboardInterrupt:
157 adapter = dbus.Interface(bus.get_object("org.bluez", select),
160 devices = adapter.ListDevices()
162 if len(devices) == 0:
163 print("No devices available")
168 print("%d. %s" % (i, dev))
171 print("Select a device: ",)
173 while select == None:
175 pos = int(sys.stdin.readline()) - 1
178 select = devices[pos]
179 except (TypeError, IndexError, ValueError):
180 print("Wrong selection, try again: ",)
181 except KeyboardInterrupt:
184 device = dbus.Interface(bus.get_object("org.bluez", select),
185 "org.bluez.HealthDevice")
190 print("Perform an echo (y/n)? ",)
191 sel = sys.stdin.readline()
192 if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
194 elif sel in ("n\n", "no\n", "N\n", "NO\n"):
197 print("Wrong selection, try again.")
198 except KeyboardInterrupt:
205 print("Echo war wrong, exiting")
208 print("Connecting to device %s" % (select))
211 chan = device.CreateChannel(app_path, "Reliable")
213 chan = device.CreateChannel(app_path, "Any")
219 hdp_manager.DestroyApplication(app_path)