Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_watch / py / pw_watch / debounce.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Run an interruptable, cancellable function after debouncing run requests"""
15
16 import enum
17 import logging
18 import threading
19 from abc import ABC, abstractmethod
20
21 _LOG = logging.getLogger(__name__)
22
23
24 class DebouncedFunction(ABC):
25     """Function to be run by Debouncer"""
26     @abstractmethod
27     def run(self) -> None:
28         """Run the function"""
29
30     @abstractmethod
31     def cancel(self) -> bool:
32         """Cancel an in-progress run of the function.
33         Must be called from different thread than run().
34         Returns true if run was successfully cancelled, false otherwise"""
35
36     @abstractmethod
37     def on_complete(self, cancelled: bool = False) -> bool:
38         """Called after run() finishes. If true, cancelled indicates
39         cancel() was invoked during the last run()"""
40
41     # Note: The debounce uses threads. Since there is no way to guarantee which
42     # thread recieves a KeyboardInterrupt, it is necessary catch this event
43     # in all debouncer threads and forward it to the user.
44     @abstractmethod
45     def on_keyboard_interrupt(self):
46         """Called when keyboard interrupt is delivered to a debouncer thread"""
47
48
49 class State(enum.Enum):
50     IDLE = 1  # ------- Transistions to: DEBOUNCING
51     DEBOUNCING = 2  # - Transistions to: RUNNING
52     RUNNING = 3  # ---- Transistions to: INTERRUPTED or COOLDOWN
53     INTERRUPTED = 4  #- Transistions to: RERUN
54     COOLDOWN = 5  #---- Transistions to: IDLE
55     RERUN = 6  #------- Transistions to: IDLE (but triggers a press)
56
57
58 class Debouncer:
59     """Run an interruptable, cancellable function with debouncing"""
60     def __init__(self, function):
61         super().__init__()
62         self.function = function
63
64         self.state = State.IDLE
65
66         self.debounce_seconds = 1
67         self.debounce_timer = None
68
69         self.cooldown_seconds = 1
70         self.cooldown_timer = None
71
72         self.rerun_event_description = None
73
74         self.lock = threading.Lock()
75
76     def press(self, event_description=None):
77         """Try to run the function for the class. If the function is recently
78         started, this may push out the deadline for actually starting. If the
79         function is already running, will interrupt the function"""
80         with self.lock:
81             self._press_unlocked(event_description)
82
83     def _press_unlocked(self, event_description=None):
84         _LOG.debug('Press - state = %s', str(self.state))
85         if self.state == State.IDLE:
86             if event_description:
87                 _LOG.info(event_description)
88             self._start_debounce_timer()
89             self._transition(State.DEBOUNCING)
90
91         elif self.state == State.DEBOUNCING:
92             self._start_debounce_timer()
93
94         elif self.state == State.RUNNING:
95             # When the function is already running but we get an incoming
96             # event, go into the INTERRUPTED state to signal that we should
97             # re-try running afterwards.
98
99             # Push an empty line to flush ongoing I/O in subprocess.
100             print()
101
102             # Surround the error message with newlines to make it stand out.
103             print()
104             _LOG.error('Event while running: %s', event_description)
105             print()
106
107             self.function.cancel()
108             self._transition(State.INTERRUPTED)
109             self.rerun_event_description = event_description
110
111         elif self.state == State.INTERRUPTED:
112             # Function is running but was already interrupted. Do nothing.
113             _LOG.debug('Ignoring press - interrupted')
114
115         elif self.state == State.COOLDOWN:
116             # Function just finished and we are cooling down; so trigger rerun.
117             _LOG.debug('Got event in cooldown; scheduling rerun')
118             self._transition(State.RERUN)
119             self.rerun_event_description = event_description
120
121     def _transition(self, new_state):
122         _LOG.debug('State: %s -> %s', str(self.state), str(new_state))
123         self.state = new_state
124
125     def _start_debounce_timer(self):
126         assert self.lock.locked()
127         if self.state == State.DEBOUNCING:
128             self.debounce_timer.cancel()
129         self.debounce_timer = threading.Timer(self.debounce_seconds,
130                                               self._run_function)
131         self.debounce_timer.start()
132
133     # Called from debounce_timer thread.
134     def _run_function(self):
135         try:
136             with self.lock:
137                 assert self.state == State.DEBOUNCING
138                 self.debounce_timer = None
139                 self._transition(State.RUNNING)
140
141             # Must run the function without the lock held so further press()
142             # calls don't deadlock.
143             _LOG.debug('Running debounced function')
144             self.function.run()
145
146             _LOG.debug('Finished running debounced function')
147             with self.lock:
148                 if self.state == State.RUNNING:
149                     self.function.on_complete(cancelled=False)
150                     self._transition(State.COOLDOWN)
151                 elif self.state == State.INTERRUPTED:
152                     self.function.on_complete(cancelled=True)
153                     self._transition(State.RERUN)
154                 self._start_cooldown_timer()
155         # Ctrl-C on Unix generates KeyboardInterrupt
156         # Ctrl-Z on Windows generates EOFError
157         except (KeyboardInterrupt, EOFError):
158             self.function.on_keyboard_interrupt()
159
160     def _start_cooldown_timer(self):
161         assert self.lock.locked()
162         self.cooldown_timer = threading.Timer(self.cooldown_seconds,
163                                               self._exit_cooldown)
164         self.cooldown_timer.start()
165
166     # Called from cooldown_timer thread.
167     def _exit_cooldown(self):
168         try:
169             with self.lock:
170                 self.cooldown_timer = None
171                 rerun = (self.state == State.RERUN)
172                 self._transition(State.IDLE)
173
174                 # If we were in the RERUN state, then re-trigger the event.
175                 if rerun:
176                     self._press_unlocked('Rerunning: %s' %
177                                          self.rerun_event_description)
178
179         # Ctrl-C on Unix generates KeyboardInterrupt
180         # Ctrl-Z on Windows generates EOFError
181         except (KeyboardInterrupt, EOFError):
182             self.function.on_keyboard_interrupt()