1 // Copyright 2019 The Pigweed Authors
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
7 // https://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
15 package pw_target_runner
26 pb "pigweed.dev/proto/pw_target_runner/target_runner_pb"
29 // RunRequest represents a client request to run a single executable on-device.
30 type RunRequest struct {
31 // Filesystem path to the executable.
34 // Channel to which the response is sent back.
35 ResponseChannel chan<- *RunResponse
37 // Time when the request was queued. Internal to the worker pool.
41 // RunResponse is the response sent after a run request is processed.
42 type RunResponse struct {
43 // Length of time that the run request was queued before being handled
44 // by a worker. Set by the worker pool.
45 QueueTime time.Duration
47 // Length of time the runner command took to run the executable.
48 // Set by the worker pool.
51 // Raw output of the execution.
57 // Error that occurred during the run, if any. If this is not nil, none
58 // of the other fields in this struct are guaranteed to be valid.
62 // DeviceRunner represents a worker which handles run requests.
63 type DeviceRunner interface {
64 // WorkerStart is the lifecycle hook called when the worker routine is
65 // started. Any resources required by the worker should be initialized
69 // HandleRunRequest is the method called when an executable is scheduled
70 // to run on the worker by the worker pool. It processes the request,
71 // runs the executable, and returns an appropriate response.
72 HandleRunRequest(*RunRequest) *RunResponse
74 // WorkerExit is the lifecycle hook called before the worker exits.
75 // Should be used to clean up any resources used by the worker.
79 // WorkerPool represents a collection of device runners which run on-device
80 // binaries. The worker pool distributes requests to run binaries among its
82 type WorkerPool struct {
85 workers []DeviceRunner
86 waitGroup sync.WaitGroup
87 reqChannel chan *RunRequest
92 errWorkerPoolActive = errors.New("Worker pool is running")
93 errNoRegisteredWorkers = errors.New("No workers registered in pool")
96 // newWorkerPool creates an empty worker pool.
97 func newWorkerPool(name string) *WorkerPool {
98 logPrefix := fmt.Sprintf("[%s] ", name)
100 logger: log.New(os.Stdout, logPrefix, log.LstdFlags),
101 workers: make([]DeviceRunner, 0),
102 reqChannel: make(chan *RunRequest, 1024),
103 quitChannel: make(chan bool, 64),
107 // RegisterWorker adds a new worker to the pool. This cannot be done when the
108 // pool is processing requests; Stop() must be called first.
109 func (p *WorkerPool) RegisterWorker(worker DeviceRunner) error {
111 return errWorkerPoolActive
113 p.workers = append(p.workers, worker)
117 // Start launches all registered workers in the pool.
118 func (p *WorkerPool) Start() error {
120 return errWorkerPoolActive
123 p.logger.Printf("Starting %d workers\n", len(p.workers))
124 for _, worker := range p.workers {
126 atomic.AddUint32(&p.activeWorkers, 1)
127 go p.runWorker(worker)
133 // Stop terminates all running workers in the pool. The work queue is not
134 // cleared; queued requests persist and can be processed by calling Start()
136 func (p *WorkerPool) Stop() {
141 // Send N quit commands to the workers and wait for them to exit.
142 for i := uint32(0); i < p.activeWorkers; i++ {
143 p.quitChannel <- true
147 p.logger.Println("All workers in pool stopped")
150 // Active returns true if any worker routines are currently running.
151 func (p *WorkerPool) Active() bool {
152 return p.activeWorkers > 0
155 // QueueExecutable adds an executable to the worker pool's queue. If no workers
156 // are registered in the pool, this operation fails and an immediate response is
157 // sent back to the requester indicating the error.
158 func (p *WorkerPool) QueueExecutable(req *RunRequest) {
159 if len(p.workers) == 0 {
160 p.logger.Printf("Attempt to queue executable %s with no active workers", req.Path)
161 req.ResponseChannel <- &RunResponse{
162 Err: errNoRegisteredWorkers,
167 p.logger.Printf("Queueing executable %s\n", req.Path)
169 // Start tracking how long the request is queued.
170 req.queueStart = time.Now()
174 // runWorker is a function run by the worker pool in a separate goroutine for
175 // each of its registered workers. The function is responsible for calling the
176 // appropriate worker lifecycle hooks and processing requests as they come in
177 // through the worker pool's queue.
178 func (p *WorkerPool) runWorker(worker DeviceRunner) {
180 atomic.AddUint32(&p.activeWorkers, ^uint32(0))
184 if err := worker.WorkerStart(); err != nil {
190 // Force the quit channel to be processed before the request
191 // channel by using a select statement with an empty default
192 // case to make the read non-blocking. If the quit channel is
193 // empty, the code will fall through to the main select below.
195 case q, ok := <-p.quitChannel:
203 case q, ok := <-p.quitChannel:
207 case req, ok := <-p.reqChannel:
212 queueTime := time.Since(req.queueStart)
214 runStart := time.Now()
215 res := worker.HandleRunRequest(req)
216 res.RunTime = time.Since(runStart)
218 res.QueueTime = queueTime
219 req.ResponseChannel <- res