3 from __future__ import absolute_import, print_function, unicode_literals
4 # -*- coding: utf-8 -*-
9 from dbus.mainloop.glib import DBusGMainLoop
11 from gi.repository import GObject
13 import gobject as GObject
15 BUS_NAME = 'org.bluez'
17 ADAPTER_INTERFACE = 'org.bluez.Adapter1'
18 HEALTH_MANAGER_INTERFACE = 'org.bluez.HealthManager1'
19 HEALTH_DEVICE_INTERFACE = 'org.bluez.HealthDevice1'
21 DBusGMainLoop(set_as_default=True)
22 loop = GObject.MainLoop()
24 bus = dbus.SystemBus()
26 def sig_received(*args, **kwargs):
27 if "member" not in kwargs:
29 if "path" not in kwargs:
31 sig_name = kwargs["member"]
35 if sig_name == "PropertyChanged":
45 bus.add_signal_receiver(sig_received, bus_name=BUS_NAME,
46 dbus_interface=HEALTH_DEVICE_INTERFACE,
48 member_keyword="member",
49 interface_keyword="interface")
52 print("Entering main lopp, push Ctrl+C for finish")
54 mainloop = GObject.MainLoop()
56 except KeyboardInterrupt:
61 hdp_manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
62 HEALTH_MANAGER_INTERFACE)
66 print("Select 1. source or 2. sink: ",)
68 sel = int(sys.stdin.readline())
75 except (TypeError, ValueError):
76 print("Wrong selection, try again: ",)
77 except KeyboardInterrupt:
82 print("Select a data type: ",)
84 sel = int(sys.stdin.readline())
85 if (sel < 0) or (sel > 65535):
88 except (TypeError, ValueError):
89 print("Wrong selection, try again: ",)
90 except KeyboardInterrupt:
97 print("Select a preferred data channel type 1.",)
98 print("reliable 2. streaming: ",)
99 sel = int(sys.stdin.readline())
107 except (TypeError, ValueError):
108 print("Wrong selection, try again")
109 except KeyboardInterrupt:
112 app_path = hdp_manager.CreateApplication({
113 "DataType": dbus.types.UInt16(dtype),
115 "Description": "Test Source",
116 "ChannelType": pref})
118 app_path = hdp_manager.CreateApplication({
119 "DataType": dbus.types.UInt16(dtype),
120 "Description": "Test sink",
123 print("New application created:", app_path)
128 print("Connect to a remote device (y/n)? ",)
129 sel = sys.stdin.readline()
130 if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
132 elif sel in ("n\n", "no\n", "N\n", "NO\n"):
135 print("Wrong selection, try again.")
136 except KeyboardInterrupt:
143 manager = dbus.Interface(bus.get_object(BUS_NAME, "/"),
144 "org.freedesktop.DBus.ObjectManager")
146 objects = manager.GetManagedObjects()
149 for path, ifaces in objects.iteritems():
150 if ifaces.has_key(ADAPTER_INTERFACE):
151 adapters.append(path)
155 print("%d. %s" % (i, ad))
158 print("Select an adapter: ",)
160 while select == None:
162 pos = int(sys.stdin.readline()) - 1
165 select = adapters[pos]
166 except (TypeError, IndexError, ValueError):
167 print("Wrong selection, try again: ",)
168 except KeyboardInterrupt:
171 adapter = dbus.Interface(bus.get_object(BUS_NAME, select), ADAPTER_INTERFACE)
174 for path, interfaces in objects.iteritems():
175 if "org.bluez.Device1" not in interfaces:
177 properties = interfaces["org.bluez.Device1"]
178 if properties["Adapter"] != select:
181 if HEALTH_DEVICE_INTERFACE not in interfaces:
185 if len(devices) == 0:
186 print("No devices available")
191 print("%d. %s" % (i, dev))
194 print("Select a device: ",)
196 while select == None:
198 pos = int(sys.stdin.readline()) - 1
201 select = devices[pos]
202 except (TypeError, IndexError, ValueError):
203 print("Wrong selection, try again: ",)
204 except KeyboardInterrupt:
207 device = dbus.Interface(bus.get_object(BUS_NAME, select),
208 HEALTH_DEVICE_INTERFACE)
213 print("Perform an echo (y/n)? ",)
214 sel = sys.stdin.readline()
215 if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
217 elif sel in ("n\n", "no\n", "N\n", "NO\n"):
220 print("Wrong selection, try again.")
221 except KeyboardInterrupt:
228 print("Echo war wrong, exiting")
231 print("Connecting to device %s" % (select))
234 chan = device.CreateChannel(app_path, "reliable")
236 chan = device.CreateChannel(app_path, "any")
242 hdp_manager.DestroyApplication(app_path)