3 from __future__ import absolute_import, print_function, unicode_literals
6 Cycling Speed and Cadence test script
9 from optparse import OptionParser, make_option
13 import dbus.mainloop.glib
15 from gi.repository import GObject
17 import gobject as GObject
20 BUS_NAME = 'org.bluez'
21 CYCLINGSPEED_MANAGER_INTERFACE = 'org.bluez.CyclingSpeedManager1'
22 CYCLINGSPEED_WATCHER_INTERFACE = 'org.bluez.CyclingSpeedWatcher1'
23 CYCLINGSPEED_INTERFACE = 'org.bluez.CyclingSpeed1'
26 def __init__(self, wrap_v):
27 self._now = [None, None]
28 self._prev = [None, None]
32 return ((self._now[0] is not None)
33 and (self._now[1] is not None)
34 and (self._prev[0] is not None)
35 and (self._prev[1] is not None))
38 delta = self._now[0] - self._prev[0]
39 if (delta < 0) and (self._wrap_v):
44 delta = self._now[1] - self._prev[1]
50 self._prev = self._now
53 class Watcher(dbus.service.Object):
54 _wheel = MeasurementQ(False)
55 _crank = MeasurementQ(True)
58 def enable_calc(self, v):
59 self._circumference = v
61 @dbus.service.method(CYCLINGSPEED_WATCHER_INTERFACE,
62 in_signature="oa{sv}", out_signature="")
63 def MeasurementReceived(self, device, measure):
64 print("Measurement received from %s" % device)
68 if "WheelRevolutions" in measure:
69 rev = measure["WheelRevolutions"]
70 print("WheelRevolutions: ", measure["WheelRevolutions"])
71 if "LastWheelEventTime" in measure:
72 evt = measure["LastWheelEventTime"]
73 print("LastWheelEventTime: ", measure["LastWheelEventTime"])
74 self._wheel.put( [rev, evt] )
78 if "CrankRevolutions" in measure:
79 rev = measure["CrankRevolutions"]
80 print("CrankRevolutions: ", measure["CrankRevolutions"])
81 if "LastCrankEventTime" in measure:
82 evt = measure["LastCrankEventTime"]
83 print("LastCrankEventTime: ", measure["LastCrankEventTime"])
84 self._crank.put( [rev, evt] )
86 if self._circumference is None:
89 if self._wheel.can_calc():
90 delta_v = self._wheel.delta_v()
91 delta_t = self._wheel.delta_t()
93 if (delta_v >= 0) and (delta_t > 0):
94 speed = delta_v * self._circumference * 1024 / delta_t # mm/s
95 speed = speed * 0.0036 # mm/s -> km/h
96 print("(calculated) Speed: %.2f km/h" % speed)
98 if self._crank.can_calc():
99 delta_v = self._crank.delta_v()
100 delta_t = self._crank.delta_t()
103 cadence = delta_v * 1024 / delta_t
104 print("(calculated) Cadence: %d rpm" % cadence)
106 def properties_changed(interface, changed, invalidated):
107 if "Location" in changed:
108 print("Sensor location: %s" % changed["Location"])
110 if __name__ == "__main__":
111 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
113 bus = dbus.SystemBus()
116 make_option("-i", "--adapter", action="store",
117 type="string", dest="adapter"),
118 make_option("-b", "--device", action="store",
119 type="string", dest="address"),
120 make_option("-c", "--circumference", action="store",
121 type="int", dest="circumference"),
124 parser = OptionParser(option_list=option_list)
126 (options, args) = parser.parse_args()
128 if not options.address:
129 print("Usage: %s [-i <adapter>] -b <bdaddr> [-c <value>] [cmd]" % (sys.argv[0]))
130 print("Possible commands:")
131 print("\tShowSupportedLocations")
132 print("\tSetLocation <location>")
133 print("\tSetCumulativeWheelRevolutions <value>")
136 managed_objects = bluezutils.get_managed_objects()
137 adapter = bluezutils.find_adapter_in_objects(managed_objects,
139 adapter_path = adapter.object_path
141 device = bluezutils.find_device_in_objects(managed_objects,
144 device_path = device.object_path
146 cscmanager = dbus.Interface(bus.get_object(BUS_NAME, adapter_path),
147 CYCLINGSPEED_MANAGER_INTERFACE)
149 watcher_path = "/test/watcher"
150 watcher = Watcher(bus, watcher_path)
151 if options.circumference:
152 watcher.enable_calc(options.circumference)
153 cscmanager.RegisterWatcher(watcher_path)
155 csc = dbus.Interface(bus.get_object(BUS_NAME, device_path),
156 CYCLINGSPEED_INTERFACE)
158 bus.add_signal_receiver(properties_changed, bus_name=BUS_NAME,
160 dbus_interface="org.freedesktop.DBus.Properties",
161 signal_name="PropertiesChanged")
163 device_prop = dbus.Interface(bus.get_object(BUS_NAME, device_path),
164 "org.freedesktop.DBus.Properties")
166 properties = device_prop.GetAll(CYCLINGSPEED_INTERFACE)
168 if "Location" in properties:
169 print("Sensor location: %s" % properties["Location"])
171 print("Sensor location is not supported")
174 if args[0] == "ShowSupportedLocations":
175 if properties["MultipleLocationsSupported"]:
176 print("Supported locations: ", properties["SupportedLocations"])
178 print("Multiple sensor locations not supported")
180 elif args[0] == "SetLocation":
181 if properties["MultipleLocationsSupported"]:
182 device_prop.Set(CYCLINGSPEED_INTERFACE, "Location", args[1])
184 print("Multiple sensor locations not supported")
186 elif args[0] == "SetCumulativeWheelRevolutions":
187 if properties["WheelRevolutionDataSupported"]:
188 csc.SetCumulativeWheelRevolutions(dbus.UInt32(args[1]))
190 print("Wheel revolution data not supported")
193 print("Unknown command")
196 mainloop = GObject.MainLoop()