1 // Copyright 2019 The Pigweed Authors
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
7 // https://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
15 // Package pw_target_runner implements a target runner gRPC server which queues
16 // and distributes executables among a group of worker routines.
17 package pw_target_runner
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/reflection"
30 "google.golang.org/grpc/status"
32 pb "pigweed.dev/proto/pw_target_runner/target_runner_pb"
36 errServerNotBound = errors.New("Server not bound to a port")
37 errServerNotRunning = errors.New("Server is not running")
40 // Server is a gRPC server that runs a TargetRunner service.
42 grpcServer *grpc.Server
48 workerPool *WorkerPool
51 // NewServer creates a gRPC server with a registered TargetRunner service.
52 func NewServer() *Server {
54 grpcServer: grpc.NewServer(),
55 workerPool: newWorkerPool("ServerWorkerPool"),
58 reflection.Register(s.grpcServer)
59 pb.RegisterTargetRunnerServer(s.grpcServer, &pwTargetRunnerService{s})
64 // Bind starts a TCP listener on a specified port.
65 func (s *Server) Bind(port int) error {
66 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
74 // RegisterWorker adds a worker to the server's worker pool.
75 func (s *Server) RegisterWorker(worker DeviceRunner) {
76 s.workerPool.RegisterWorker(worker)
79 // RunBinary runs an executable through a worker in the server, returning
80 // the worker's response. The function blocks until the executable has been
82 func (s *Server) RunBinary(path string) (*RunResponse, error) {
84 return nil, errServerNotRunning
87 resChan := make(chan *RunResponse, 1)
90 s.workerPool.QueueExecutable(&RunRequest{
92 ResponseChannel: resChan,
101 if res.Status == pb.RunStatus_SUCCESS {
110 // Serve starts the gRPC server on its configured port. Bind must have been
111 // called before this; an error is returned if it is not. This function blocks
112 // until the server is terminated.
113 func (s *Server) Serve() error {
114 if s.listener == nil {
115 return errServerNotBound
118 log.Printf("Starting gRPC server on %v\n", s.listener.Addr())
120 s.startTime = time.Now()
124 return s.grpcServer.Serve(s.listener)
127 // pwTargetRunnerService implements the pw.target_runner.TargetRunner gRPC
129 type pwTargetRunnerService struct {
133 // RunBinary runs a single executable on-device and returns its result.
134 func (s *pwTargetRunnerService) RunBinary(
136 desc *pb.RunBinaryRequest,
137 ) (*pb.RunBinaryResponse, error) {
138 runRes, err := s.server.RunBinary(desc.FilePath)
140 return nil, status.Error(codes.Internal, "Internal server error")
143 res := &pb.RunBinaryResponse{
144 Result: runRes.Status,
145 QueueTimeNs: uint64(runRes.QueueTime),
146 RunTimeNs: uint64(runRes.RunTime),
147 Output: runRes.Output,
152 // Status returns information about the server.
153 func (s *pwTargetRunnerService) Status(
156 ) (*pb.ServerStatus, error) {
157 resp := &pb.ServerStatus{
158 UptimeNs: uint64(time.Since(s.server.startTime)),
159 TasksPassed: s.server.tasksPassed,
160 TasksFailed: s.server.tasksFailed,