Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_target_runner / go / src / pigweed.dev / pw_target_runner / worker_pool.go
1 // Copyright 2019 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 package pw_target_runner
16
17 import (
18         "errors"
19         "fmt"
20         "log"
21         "os"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         pb "pigweed.dev/proto/pw_target_runner/target_runner_pb"
27 )
28
29 // RunRequest represents a client request to run a single executable on-device.
30 type RunRequest struct {
31         // Filesystem path to the executable.
32         Path string
33
34         // Channel to which the response is sent back.
35         ResponseChannel chan<- *RunResponse
36
37         // Time when the request was queued. Internal to the worker pool.
38         queueStart time.Time
39 }
40
41 // RunResponse is the response sent after a run request is processed.
42 type RunResponse struct {
43         // Length of time that the run request was queued before being handled
44         // by a worker. Set by the worker pool.
45         QueueTime time.Duration
46
47         // Length of time the runner command took to run the executable.
48         // Set by the worker pool.
49         RunTime time.Duration
50
51         // Raw output of the execution.
52         Output []byte
53
54         // Result of the run.
55         Status pb.RunStatus
56
57         // Error that occurred during the run, if any. If this is not nil, none
58         // of the other fields in this struct are guaranteed to be valid.
59         Err error
60 }
61
62 // DeviceRunner represents a worker which handles run requests.
63 type DeviceRunner interface {
64         // WorkerStart is the lifecycle hook called when the worker routine is
65         // started. Any resources required by the worker should be initialized
66         // here.
67         WorkerStart() error
68
69         // HandleRunRequest is the method called when an executable is scheduled
70         // to run on the worker by the worker pool. It processes the request,
71         // runs the executable, and returns an appropriate response.
72         HandleRunRequest(*RunRequest) *RunResponse
73
74         // WorkerExit is the lifecycle hook called before the worker exits.
75         // Should be used to clean up any resources used by the worker.
76         WorkerExit()
77 }
78
79 // WorkerPool represents a collection of device runners which run on-device
80 // binaries. The worker pool distributes requests to run binaries among its
81 // available workers.
82 type WorkerPool struct {
83         activeWorkers uint32
84         logger        *log.Logger
85         workers       []DeviceRunner
86         waitGroup     sync.WaitGroup
87         reqChannel    chan *RunRequest
88         quitChannel   chan bool
89 }
90
91 var (
92         errWorkerPoolActive    = errors.New("Worker pool is running")
93         errNoRegisteredWorkers = errors.New("No workers registered in pool")
94 )
95
96 // newWorkerPool creates an empty worker pool.
97 func newWorkerPool(name string) *WorkerPool {
98         logPrefix := fmt.Sprintf("[%s] ", name)
99         return &WorkerPool{
100                 logger:      log.New(os.Stdout, logPrefix, log.LstdFlags),
101                 workers:     make([]DeviceRunner, 0),
102                 reqChannel:  make(chan *RunRequest, 1024),
103                 quitChannel: make(chan bool, 64),
104         }
105 }
106
107 // RegisterWorker adds a new worker to the pool. This cannot be done when the
108 // pool is processing requests; Stop() must be called first.
109 func (p *WorkerPool) RegisterWorker(worker DeviceRunner) error {
110         if p.Active() {
111                 return errWorkerPoolActive
112         }
113         p.workers = append(p.workers, worker)
114         return nil
115 }
116
117 // Start launches all registered workers in the pool.
118 func (p *WorkerPool) Start() error {
119         if p.Active() {
120                 return errWorkerPoolActive
121         }
122
123         p.logger.Printf("Starting %d workers\n", len(p.workers))
124         for _, worker := range p.workers {
125                 p.waitGroup.Add(1)
126                 atomic.AddUint32(&p.activeWorkers, 1)
127                 go p.runWorker(worker)
128         }
129
130         return nil
131 }
132
133 // Stop terminates all running workers in the pool. The work queue is not
134 // cleared; queued requests persist and can be processed by calling Start()
135 // again.
136 func (p *WorkerPool) Stop() {
137         if !p.Active() {
138                 return
139         }
140
141         // Send N quit commands to the workers and wait for them to exit.
142         for i := uint32(0); i < p.activeWorkers; i++ {
143                 p.quitChannel <- true
144         }
145         p.waitGroup.Wait()
146
147         p.logger.Println("All workers in pool stopped")
148 }
149
150 // Active returns true if any worker routines are currently running.
151 func (p *WorkerPool) Active() bool {
152         return p.activeWorkers > 0
153 }
154
155 // QueueExecutable adds an executable to the worker pool's queue. If no workers
156 // are registered in the pool, this operation fails and an immediate response is
157 // sent back to the requester indicating the error.
158 func (p *WorkerPool) QueueExecutable(req *RunRequest) {
159         if len(p.workers) == 0 {
160                 p.logger.Printf("Attempt to queue executable %s with no active workers", req.Path)
161                 req.ResponseChannel <- &RunResponse{
162                         Err: errNoRegisteredWorkers,
163                 }
164                 return
165         }
166
167         p.logger.Printf("Queueing executable %s\n", req.Path)
168
169         // Start tracking how long the request is queued.
170         req.queueStart = time.Now()
171         p.reqChannel <- req
172 }
173
174 // runWorker is a function run by the worker pool in a separate goroutine for
175 // each of its registered workers. The function is responsible for calling the
176 // appropriate worker lifecycle hooks and processing requests as they come in
177 // through the worker pool's queue.
178 func (p *WorkerPool) runWorker(worker DeviceRunner) {
179         defer func() {
180                 atomic.AddUint32(&p.activeWorkers, ^uint32(0))
181                 p.waitGroup.Done()
182         }()
183
184         if err := worker.WorkerStart(); err != nil {
185                 return
186         }
187
188 processLoop:
189         for {
190                 // Force the quit channel to be processed before the request
191                 // channel by using a select statement with an empty default
192                 // case to make the read non-blocking. If the quit channel is
193                 // empty, the code will fall through to the main select below.
194                 select {
195                 case q, ok := <-p.quitChannel:
196                         if q || !ok {
197                                 break processLoop
198                         }
199                 default:
200                 }
201
202                 select {
203                 case q, ok := <-p.quitChannel:
204                         if q || !ok {
205                                 break processLoop
206                         }
207                 case req, ok := <-p.reqChannel:
208                         if !ok {
209                                 continue
210                         }
211
212                         queueTime := time.Since(req.queueStart)
213
214                         runStart := time.Now()
215                         res := worker.HandleRunRequest(req)
216                         res.RunTime = time.Since(runStart)
217
218                         res.QueueTime = queueTime
219                         req.ResponseChannel <- res
220                 }
221         }
222
223         worker.WorkerExit()
224 }