1 # Copyright 2020 The Pigweed Authors
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
7 # https://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
14 """Run an interruptable, cancellable function after debouncing run requests"""
19 from abc import ABC, abstractmethod
21 _LOG = logging.getLogger(__name__)
24 class DebouncedFunction(ABC):
25 """Function to be run by Debouncer"""
27 def run(self) -> None:
28 """Run the function"""
31 def cancel(self) -> bool:
32 """Cancel an in-progress run of the function.
33 Must be called from different thread than run().
34 Returns true if run was successfully cancelled, false otherwise"""
37 def on_complete(self, cancelled: bool = False) -> bool:
38 """Called after run() finishes. If true, cancelled indicates
39 cancel() was invoked during the last run()"""
41 # Note: The debounce uses threads. Since there is no way to guarantee which
42 # thread recieves a KeyboardInterrupt, it is necessary catch this event
43 # in all debouncer threads and forward it to the user.
45 def on_keyboard_interrupt(self):
46 """Called when keyboard interrupt is delivered to a debouncer thread"""
49 class State(enum.Enum):
50 IDLE = 1 # ------- Transistions to: DEBOUNCING
51 DEBOUNCING = 2 # - Transistions to: RUNNING
52 RUNNING = 3 # ---- Transistions to: INTERRUPTED or COOLDOWN
53 INTERRUPTED = 4 #- Transistions to: RERUN
54 COOLDOWN = 5 #---- Transistions to: IDLE
55 RERUN = 6 #------- Transistions to: IDLE (but triggers a press)
59 """Run an interruptable, cancellable function with debouncing"""
60 def __init__(self, function):
62 self.function = function
64 self.state = State.IDLE
66 self.debounce_seconds = 1
67 self.debounce_timer = None
69 self.cooldown_seconds = 1
70 self.cooldown_timer = None
72 self.rerun_event_description = None
74 self.lock = threading.Lock()
76 def press(self, event_description=None):
77 """Try to run the function for the class. If the function is recently
78 started, this may push out the deadline for actually starting. If the
79 function is already running, will interrupt the function"""
81 self._press_unlocked(event_description)
83 def _press_unlocked(self, event_description=None):
84 _LOG.debug('Press - state = %s', str(self.state))
85 if self.state == State.IDLE:
87 _LOG.info(event_description)
88 self._start_debounce_timer()
89 self._transition(State.DEBOUNCING)
91 elif self.state == State.DEBOUNCING:
92 self._start_debounce_timer()
94 elif self.state == State.RUNNING:
95 # When the function is already running but we get an incoming
96 # event, go into the INTERRUPTED state to signal that we should
97 # re-try running afterwards.
99 # Push an empty line to flush ongoing I/O in subprocess.
102 # Surround the error message with newlines to make it stand out.
104 _LOG.error('Event while running: %s', event_description)
107 self.function.cancel()
108 self._transition(State.INTERRUPTED)
109 self.rerun_event_description = event_description
111 elif self.state == State.INTERRUPTED:
112 # Function is running but was already interrupted. Do nothing.
113 _LOG.debug('Ignoring press - interrupted')
115 elif self.state == State.COOLDOWN:
116 # Function just finished and we are cooling down; so trigger rerun.
117 _LOG.debug('Got event in cooldown; scheduling rerun')
118 self._transition(State.RERUN)
119 self.rerun_event_description = event_description
121 def _transition(self, new_state):
122 _LOG.debug('State: %s -> %s', str(self.state), str(new_state))
123 self.state = new_state
125 def _start_debounce_timer(self):
126 assert self.lock.locked()
127 if self.state == State.DEBOUNCING:
128 self.debounce_timer.cancel()
129 self.debounce_timer = threading.Timer(self.debounce_seconds,
131 self.debounce_timer.start()
133 # Called from debounce_timer thread.
134 def _run_function(self):
137 assert self.state == State.DEBOUNCING
138 self.debounce_timer = None
139 self._transition(State.RUNNING)
141 # Must run the function without the lock held so further press()
142 # calls don't deadlock.
143 _LOG.debug('Running debounced function')
146 _LOG.debug('Finished running debounced function')
148 if self.state == State.RUNNING:
149 self.function.on_complete(cancelled=False)
150 self._transition(State.COOLDOWN)
151 elif self.state == State.INTERRUPTED:
152 self.function.on_complete(cancelled=True)
153 self._transition(State.RERUN)
154 self._start_cooldown_timer()
155 # Ctrl-C on Unix generates KeyboardInterrupt
156 # Ctrl-Z on Windows generates EOFError
157 except (KeyboardInterrupt, EOFError):
158 self.function.on_keyboard_interrupt()
160 def _start_cooldown_timer(self):
161 assert self.lock.locked()
162 self.cooldown_timer = threading.Timer(self.cooldown_seconds,
164 self.cooldown_timer.start()
166 # Called from cooldown_timer thread.
167 def _exit_cooldown(self):
170 self.cooldown_timer = None
171 rerun = (self.state == State.RERUN)
172 self._transition(State.IDLE)
174 # If we were in the RERUN state, then re-trigger the event.
176 self._press_unlocked('Rerunning: %s' %
177 self.rerun_event_description)
179 # Ctrl-C on Unix generates KeyboardInterrupt
180 # Ctrl-Z on Windows generates EOFError
181 except (KeyboardInterrupt, EOFError):
182 self.function.on_keyboard_interrupt()