'tools/libinput-measure-touchpad-tap.py',
'tools/libinput-measure-touchpad-pressure.py',
'tools/libinput-measure-touch-size.py',
+ 'tools/libinput-replay.py'
)
foreach t : src_python_tools
install : true,
)
-install_data('tools/libinput-replay',
- install_dir : libinput_tool_path)
-
if get_option('debug-gui')
dep_gtk = dependency('gtk+-3.0', version : '>= 3.20')
dep_cairo = dependency('cairo')
+++ /dev/null
-#!/usr/bin/env python3
-# vim: set expandtab shiftwidth=4:
-# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */
-#
-# Copyright © 2018 Red Hat, Inc.
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the "Software"),
-# to deal in the Software without restriction, including without limitation
-# the rights to use, copy, modify, merge, publish, distribute, sublicense,
-# and/or sell copies of the Software, and to permit persons to whom the
-# Software is furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice (including the next
-# paragraph) shall be included in all copies or substantial portions of the
-# Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
-# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
-# DEALINGS IN THE SOFTWARE.
-
-import os
-import sys
-import time
-import math
-import multiprocessing
-import argparse
-from pathlib import Path
-
-try:
- import libevdev
- import yaml
- import pyudev
-except ModuleNotFoundError as e:
- print("Error: {}".format(e), file=sys.stderr)
- print(
- "One or more python modules are missing. Please install those "
- "modules and re-run this tool."
- )
- sys.exit(1)
-
-
-SUPPORTED_FILE_VERSION = 1
-
-
-def error(msg, **kwargs):
- print(msg, **kwargs, file=sys.stderr)
-
-
-class YamlException(Exception):
- pass
-
-
-def fetch(yaml, key):
- """Helper function to avoid confusing a YAML error with a
- normal KeyError bug"""
- try:
- return yaml[key]
- except KeyError:
- raise YamlException("Failed to get '{}' from recording.".format(key))
-
-
-def check_udev_properties(yaml_data, uinput):
- """
- Compare the properties our new uinput device has with the ones from the
- recording and ring the alarm bell if one of them is off.
- """
- yaml_udev_section = fetch(yaml_data, "udev")
- yaml_udev_props = fetch(yaml_udev_section, "properties")
- yaml_props = {
- k: v for (k, v) in [prop.split("=", maxsplit=1) for prop in yaml_udev_props]
- }
- try:
- # We don't assign this one to virtual devices
- del yaml_props["LIBINPUT_DEVICE_GROUP"]
- except KeyError:
- pass
-
- # give udev some time to catch up
- time.sleep(0.2)
- context = pyudev.Context()
- udev_device = pyudev.Devices.from_device_file(context, uinput.devnode)
- for name, value in udev_device.properties.items():
- if name in yaml_props:
- if yaml_props[name] != value:
- error(
- f"Warning: udev property mismatch: recording has {name}={yaml_props[name]}, device has {name}={value}"
- )
- del yaml_props[name]
- else:
- # The list of properties we add to the recording, see libinput-record.c
- prefixes = (
- "ID_INPUT",
- "LIBINPUT",
- "EVDEV_ABS",
- "MOUSE_DPI",
- "POINTINGSTICK_",
- )
- for prefix in prefixes:
- if name.startswith(prefix):
- error(f"Warning: unexpected property: {name}={value}")
-
- # the ones we found above were removed from the dict
- for name, value in yaml_props.items():
- error(f"Warning: device is missing recorded udev property: {name}={value}")
-
-
-def create(device):
- evdev = fetch(device, "evdev")
-
- d = libevdev.Device()
- d.name = fetch(evdev, "name")
-
- ids = fetch(evdev, "id")
- if len(ids) != 4:
- raise YamlException("Invalid ID format: {}".format(ids))
- d.id = dict(zip(["bustype", "vendor", "product", "version"], ids))
-
- codes = fetch(evdev, "codes")
- for evtype, evcodes in codes.items():
- for code in evcodes:
- data = None
- if evtype == libevdev.EV_ABS.value:
- values = fetch(evdev, "absinfo")[code]
- absinfo = libevdev.InputAbsInfo(
- minimum=values[0],
- maximum=values[1],
- fuzz=values[2],
- flat=values[3],
- resolution=values[4],
- )
- data = absinfo
- elif evtype == libevdev.EV_REP.value:
- if code == libevdev.EV_REP.REP_DELAY.value:
- data = 500
- elif code == libevdev.EV_REP.REP_PERIOD.value:
- data = 20
- d.enable(libevdev.evbit(evtype, code), data=data)
-
- properties = fetch(evdev, "properties")
- for prop in properties:
- d.enable(libevdev.propbit(prop))
-
- uinput = d.create_uinput_device()
-
- check_udev_properties(device, uinput)
-
- return uinput
-
-
-def print_events(devnode, indent, evs):
- devnode = os.path.basename(devnode)
- for e in evs:
- print(
- "{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format(
- devnode,
- " " * (indent * 8),
- e.sec,
- e.usec,
- e.type.name,
- e.code.name,
- e.value,
- )
- )
-
-
-def replay(device, verbose):
- events = fetch(device, "events")
- if events is None:
- return
- uinput = device["__uinput"]
-
- # The first event may have a nonzero offset but we want to replay
- # immediately regardless. When replaying multiple devices, the first
- # offset is the offset from the first event on any device.
- offset = time.time() - device["__first_event_offset"]
-
- if offset < 0:
- error("WARNING: event time offset is in the future, refusing to replay")
- return
-
- # each 'evdev' set contains one SYN_REPORT so we only need to check for
- # the time offset once per event
- for event in events:
- try:
- evdev = fetch(event, "evdev")
- except YamlException:
- continue
-
- (sec, usec, evtype, evcode, value) = evdev[0]
- evtime = sec + usec / 1e6 + offset
- now = time.time()
-
- if evtime - now > 150 / 1e6: # 150 µs error margin
- time.sleep(evtime - now - 150 / 1e6)
-
- evs = [
- libevdev.InputEvent(
- libevdev.evbit(e[2], e[3]), value=e[4], sec=e[0], usec=e[1]
- )
- for e in evdev
- ]
- uinput.send_events(evs)
- if verbose:
- print_events(uinput.devnode, device["__index"], evs)
-
-
-def first_timestamp(device):
- events = fetch(device, "events")
- for e in events or []:
- try:
- evdev = fetch(e, "evdev")
- (sec, usec, *_) = evdev[0]
- return sec + usec / 1.0e6
- except YamlException:
- pass
-
- return None
-
-
-def wrap(func, *args):
- try:
- func(*args)
- except KeyboardInterrupt:
- pass
-
-
-def loop(args, recording):
- devices = fetch(recording, "devices")
-
- first_timestamps = tuple(
- filter(lambda x: x is not None, [first_timestamp(d) for d in devices])
- )
- # All devices need to start replaying at the same time, so let's find
- # the very first event and offset everything by that timestamp.
- toffset = min(first_timestamps or [math.inf])
-
- for idx, d in enumerate(devices):
- uinput = create(d)
- print("{}: {}".format(uinput.devnode, uinput.name))
- d["__uinput"] = uinput # cheaper to hide it in the dict then work around it
- d["__index"] = idx
- d["__first_event_offset"] = toffset
-
- if not first_timestamps:
- input("No events in recording. Hit enter to quit")
- return
-
- while True:
- input("Hit enter to start replaying")
-
- processes = []
- for d in devices:
- p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose))
- processes.append(p)
-
- for p in processes:
- p.start()
-
- for p in processes:
- p.join()
-
- del processes
-
-
-def create_device_quirk(device):
- try:
- quirks = fetch(device, "quirks")
- if not quirks:
- return None
- except YamlException:
- return None
- # Where the device has a quirk, we match on name, vendor and product.
- # That's the best match we can assemble here from the info we have.
- evdev = fetch(device, "evdev")
- name = fetch(evdev, "name")
- id = fetch(evdev, "id")
- quirk = (
- "[libinput-replay {name}]\n"
- "MatchName={name}\n"
- "MatchVendor=0x{id[1]:04X}\n"
- "MatchProduct=0x{id[2]:04X}\n"
- ).format(name=name, id=id)
- quirk += "\n".join(quirks)
- return quirk
-
-
-def setup_quirks(recording):
- devices = fetch(recording, "devices")
- overrides = None
- quirks = []
- for d in devices:
- if "quirks" in d:
- quirk = create_device_quirk(d)
- if quirk:
- quirks.append(quirk)
- if not quirks:
- return None
-
- overrides = Path("/etc/libinput/local-overrides.quirks")
- if overrides.exists():
- print(
- "{} exists, please move it out of the way first".format(overrides),
- file=sys.stderr,
- )
- sys.exit(1)
-
- overrides.parent.mkdir(exist_ok=True)
- with overrides.open("w+") as fd:
- fd.write("# This file was generated by libinput replay\n")
- fd.write("# Unless libinput replay is running right now, remove this file.\n")
- fd.write("\n\n".join(quirks))
-
- return overrides
-
-
-def check_file(recording):
- version = fetch(recording, "version")
- if version != SUPPORTED_FILE_VERSION:
- raise YamlException(
- "Invalid file format: {}, expected {}".format(
- version, SUPPORTED_FILE_VERSION
- )
- )
-
- ndevices = fetch(recording, "ndevices")
- devices = fetch(recording, "devices")
- if ndevices != len(devices):
- error(
- "WARNING: truncated file, expected {} devices, got {}".format(
- ndevices, len(devices)
- )
- )
-
-
-def main():
- parser = argparse.ArgumentParser(description="Replay a device recording")
- parser.add_argument(
- "recording",
- metavar="recorded-file.yaml",
- type=str,
- help="Path to device recording",
- )
- parser.add_argument("--verbose", action="store_true")
- args = parser.parse_args()
-
- quirks_file = None
-
- try:
- with open(args.recording) as f:
- y = yaml.safe_load(f)
- check_file(y)
- quirks_file = setup_quirks(y)
- loop(args, y)
- except KeyboardInterrupt:
- pass
- except (PermissionError, OSError) as e:
- error("Error: failed to open device: {}".format(e))
- except YamlException as e:
- error("Error: failed to parse recording: {}".format(e))
- finally:
- if quirks_file:
- quirks_file.unlink()
-
-
-if __name__ == "__main__":
- main()
--- /dev/null
+#!/usr/bin/env python3
+# vim: set expandtab shiftwidth=4:
+# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */
+#
+# Copyright © 2018 Red Hat, Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"),
+# to deal in the Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish, distribute, sublicense,
+# and/or sell copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice (including the next
+# paragraph) shall be included in all copies or substantial portions of the
+# Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+# DEALINGS IN THE SOFTWARE.
+
+import os
+import sys
+import time
+import math
+import multiprocessing
+import argparse
+from pathlib import Path
+
+try:
+ import libevdev
+ import yaml
+ import pyudev
+except ModuleNotFoundError as e:
+ print("Error: {}".format(e), file=sys.stderr)
+ print(
+ "One or more python modules are missing. Please install those "
+ "modules and re-run this tool."
+ )
+ sys.exit(1)
+
+
+SUPPORTED_FILE_VERSION = 1
+
+
+def error(msg, **kwargs):
+ print(msg, **kwargs, file=sys.stderr)
+
+
+class YamlException(Exception):
+ pass
+
+
+def fetch(yaml, key):
+ """Helper function to avoid confusing a YAML error with a
+ normal KeyError bug"""
+ try:
+ return yaml[key]
+ except KeyError:
+ raise YamlException("Failed to get '{}' from recording.".format(key))
+
+
+def check_udev_properties(yaml_data, uinput):
+ """
+ Compare the properties our new uinput device has with the ones from the
+ recording and ring the alarm bell if one of them is off.
+ """
+ yaml_udev_section = fetch(yaml_data, "udev")
+ yaml_udev_props = fetch(yaml_udev_section, "properties")
+ yaml_props = {
+ k: v for (k, v) in [prop.split("=", maxsplit=1) for prop in yaml_udev_props]
+ }
+ try:
+ # We don't assign this one to virtual devices
+ del yaml_props["LIBINPUT_DEVICE_GROUP"]
+ except KeyError:
+ pass
+
+ # give udev some time to catch up
+ time.sleep(0.2)
+ context = pyudev.Context()
+ udev_device = pyudev.Devices.from_device_file(context, uinput.devnode)
+ for name, value in udev_device.properties.items():
+ if name in yaml_props:
+ if yaml_props[name] != value:
+ error(
+ f"Warning: udev property mismatch: recording has {name}={yaml_props[name]}, device has {name}={value}"
+ )
+ del yaml_props[name]
+ else:
+ # The list of properties we add to the recording, see libinput-record.c
+ prefixes = (
+ "ID_INPUT",
+ "LIBINPUT",
+ "EVDEV_ABS",
+ "MOUSE_DPI",
+ "POINTINGSTICK_",
+ )
+ for prefix in prefixes:
+ if name.startswith(prefix):
+ error(f"Warning: unexpected property: {name}={value}")
+
+ # the ones we found above were removed from the dict
+ for name, value in yaml_props.items():
+ error(f"Warning: device is missing recorded udev property: {name}={value}")
+
+
+def create(device):
+ evdev = fetch(device, "evdev")
+
+ d = libevdev.Device()
+ d.name = fetch(evdev, "name")
+
+ ids = fetch(evdev, "id")
+ if len(ids) != 4:
+ raise YamlException("Invalid ID format: {}".format(ids))
+ d.id = dict(zip(["bustype", "vendor", "product", "version"], ids))
+
+ codes = fetch(evdev, "codes")
+ for evtype, evcodes in codes.items():
+ for code in evcodes:
+ data = None
+ if evtype == libevdev.EV_ABS.value:
+ values = fetch(evdev, "absinfo")[code]
+ absinfo = libevdev.InputAbsInfo(
+ minimum=values[0],
+ maximum=values[1],
+ fuzz=values[2],
+ flat=values[3],
+ resolution=values[4],
+ )
+ data = absinfo
+ elif evtype == libevdev.EV_REP.value:
+ if code == libevdev.EV_REP.REP_DELAY.value:
+ data = 500
+ elif code == libevdev.EV_REP.REP_PERIOD.value:
+ data = 20
+ d.enable(libevdev.evbit(evtype, code), data=data)
+
+ properties = fetch(evdev, "properties")
+ for prop in properties:
+ d.enable(libevdev.propbit(prop))
+
+ uinput = d.create_uinput_device()
+
+ check_udev_properties(device, uinput)
+
+ return uinput
+
+
+def print_events(devnode, indent, evs):
+ devnode = os.path.basename(devnode)
+ for e in evs:
+ print(
+ "{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format(
+ devnode,
+ " " * (indent * 8),
+ e.sec,
+ e.usec,
+ e.type.name,
+ e.code.name,
+ e.value,
+ )
+ )
+
+
+def replay(device, verbose):
+ events = fetch(device, "events")
+ if events is None:
+ return
+ uinput = device["__uinput"]
+
+ # The first event may have a nonzero offset but we want to replay
+ # immediately regardless. When replaying multiple devices, the first
+ # offset is the offset from the first event on any device.
+ offset = time.time() - device["__first_event_offset"]
+
+ if offset < 0:
+ error("WARNING: event time offset is in the future, refusing to replay")
+ return
+
+ # each 'evdev' set contains one SYN_REPORT so we only need to check for
+ # the time offset once per event
+ for event in events:
+ try:
+ evdev = fetch(event, "evdev")
+ except YamlException:
+ continue
+
+ (sec, usec, evtype, evcode, value) = evdev[0]
+ evtime = sec + usec / 1e6 + offset
+ now = time.time()
+
+ if evtime - now > 150 / 1e6: # 150 µs error margin
+ time.sleep(evtime - now - 150 / 1e6)
+
+ evs = [
+ libevdev.InputEvent(
+ libevdev.evbit(e[2], e[3]), value=e[4], sec=e[0], usec=e[1]
+ )
+ for e in evdev
+ ]
+ uinput.send_events(evs)
+ if verbose:
+ print_events(uinput.devnode, device["__index"], evs)
+
+
+def first_timestamp(device):
+ events = fetch(device, "events")
+ for e in events or []:
+ try:
+ evdev = fetch(e, "evdev")
+ (sec, usec, *_) = evdev[0]
+ return sec + usec / 1.0e6
+ except YamlException:
+ pass
+
+ return None
+
+
+def wrap(func, *args):
+ try:
+ func(*args)
+ except KeyboardInterrupt:
+ pass
+
+
+def loop(args, recording):
+ devices = fetch(recording, "devices")
+
+ first_timestamps = tuple(
+ filter(lambda x: x is not None, [first_timestamp(d) for d in devices])
+ )
+ # All devices need to start replaying at the same time, so let's find
+ # the very first event and offset everything by that timestamp.
+ toffset = min(first_timestamps or [math.inf])
+
+ for idx, d in enumerate(devices):
+ uinput = create(d)
+ print("{}: {}".format(uinput.devnode, uinput.name))
+ d["__uinput"] = uinput # cheaper to hide it in the dict then work around it
+ d["__index"] = idx
+ d["__first_event_offset"] = toffset
+
+ if not first_timestamps:
+ input("No events in recording. Hit enter to quit")
+ return
+
+ while True:
+ input("Hit enter to start replaying")
+
+ processes = []
+ for d in devices:
+ p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose))
+ processes.append(p)
+
+ for p in processes:
+ p.start()
+
+ for p in processes:
+ p.join()
+
+ del processes
+
+
+def create_device_quirk(device):
+ try:
+ quirks = fetch(device, "quirks")
+ if not quirks:
+ return None
+ except YamlException:
+ return None
+ # Where the device has a quirk, we match on name, vendor and product.
+ # That's the best match we can assemble here from the info we have.
+ evdev = fetch(device, "evdev")
+ name = fetch(evdev, "name")
+ id = fetch(evdev, "id")
+ quirk = (
+ "[libinput-replay {name}]\n"
+ "MatchName={name}\n"
+ "MatchVendor=0x{id[1]:04X}\n"
+ "MatchProduct=0x{id[2]:04X}\n"
+ ).format(name=name, id=id)
+ quirk += "\n".join(quirks)
+ return quirk
+
+
+def setup_quirks(recording):
+ devices = fetch(recording, "devices")
+ overrides = None
+ quirks = []
+ for d in devices:
+ if "quirks" in d:
+ quirk = create_device_quirk(d)
+ if quirk:
+ quirks.append(quirk)
+ if not quirks:
+ return None
+
+ overrides = Path("/etc/libinput/local-overrides.quirks")
+ if overrides.exists():
+ print(
+ "{} exists, please move it out of the way first".format(overrides),
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+ overrides.parent.mkdir(exist_ok=True)
+ with overrides.open("w+") as fd:
+ fd.write("# This file was generated by libinput replay\n")
+ fd.write("# Unless libinput replay is running right now, remove this file.\n")
+ fd.write("\n\n".join(quirks))
+
+ return overrides
+
+
+def check_file(recording):
+ version = fetch(recording, "version")
+ if version != SUPPORTED_FILE_VERSION:
+ raise YamlException(
+ "Invalid file format: {}, expected {}".format(
+ version, SUPPORTED_FILE_VERSION
+ )
+ )
+
+ ndevices = fetch(recording, "ndevices")
+ devices = fetch(recording, "devices")
+ if ndevices != len(devices):
+ error(
+ "WARNING: truncated file, expected {} devices, got {}".format(
+ ndevices, len(devices)
+ )
+ )
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Replay a device recording")
+ parser.add_argument(
+ "recording",
+ metavar="recorded-file.yaml",
+ type=str,
+ help="Path to device recording",
+ )
+ parser.add_argument("--verbose", action="store_true")
+ args = parser.parse_args()
+
+ quirks_file = None
+
+ try:
+ with open(args.recording) as f:
+ y = yaml.safe_load(f)
+ check_file(y)
+ quirks_file = setup_quirks(y)
+ loop(args, y)
+ except KeyboardInterrupt:
+ pass
+ except (PermissionError, OSError) as e:
+ error("Error: failed to open device: {}".format(e))
+ except YamlException as e:
+ error("Error: failed to parse recording: {}".format(e))
+ finally:
+ if quirks_file:
+ quirks_file.unlink()
+
+
+if __name__ == "__main__":
+ main()