Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / ruby / lib / grpc / generic / rpc_server.rb
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 require_relative '../grpc'
16 require_relative 'active_call'
17 require_relative 'service'
18 require 'thread'
19
20 # GRPC contains the General RPC module.
21 module GRPC
22   # Pool is a simple thread pool.
23   class Pool
24     # Default keep alive period is 1s
25     DEFAULT_KEEP_ALIVE = 1
26
27     def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
28       fail 'pool size must be positive' unless size > 0
29       @jobs = Queue.new
30       @size = size
31       @stopped = false
32       @stop_mutex = Mutex.new # needs to be held when accessing @stopped
33       @stop_cond = ConditionVariable.new
34       @workers = []
35       @keep_alive = keep_alive
36
37       # Each worker thread has its own queue to push and pull jobs
38       # these queues are put into @ready_queues when that worker is idle
39       @ready_workers = Queue.new
40     end
41
42     # Returns the number of jobs waiting
43     def jobs_waiting
44       @jobs.size
45     end
46
47     def ready_for_work?
48       # Busy worker threads are either doing work, or have a single job
49       # waiting on them. Workers that are idle with no jobs waiting
50       # have their "queues" in @ready_workers
51       !@ready_workers.empty?
52     end
53
54     # Runs the given block on the queue with the provided args.
55     #
56     # @param args the args passed blk when it is called
57     # @param blk the block to call
58     def schedule(*args, &blk)
59       return if blk.nil?
60       @stop_mutex.synchronize do
61         if @stopped
62           GRPC.logger.warn('did not schedule job, already stopped')
63           return
64         end
65         GRPC.logger.info('schedule another job')
66         fail 'No worker threads available' if @ready_workers.empty?
67         worker_queue = @ready_workers.pop
68
69         fail 'worker already has a task waiting' unless worker_queue.empty?
70         worker_queue << [blk, args]
71       end
72     end
73
74     # Starts running the jobs in the thread pool.
75     def start
76       @stop_mutex.synchronize do
77         fail 'already stopped' if @stopped
78       end
79       until @workers.size == @size.to_i
80         new_worker_queue = Queue.new
81         @ready_workers << new_worker_queue
82         next_thread = Thread.new(new_worker_queue) do |jobs|
83           catch(:exit) do  # allows { throw :exit } to kill a thread
84             loop_execute_jobs(jobs)
85           end
86           remove_current_thread
87         end
88         @workers << next_thread
89       end
90     end
91
92     # Stops the jobs in the pool
93     def stop
94       GRPC.logger.info('stopping, will wait for all the workers to exit')
95       @stop_mutex.synchronize do  # wait @keep_alive seconds for workers to stop
96         @stopped = true
97         loop do
98           break unless ready_for_work?
99           worker_queue = @ready_workers.pop
100           worker_queue << [proc { throw :exit }, []]
101         end
102         @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
103       end
104       forcibly_stop_workers
105       GRPC.logger.info('stopped, all workers are shutdown')
106     end
107
108     protected
109
110     # Forcibly shutdown any threads that are still alive.
111     def forcibly_stop_workers
112       return unless @workers.size > 0
113       GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
114       @workers.each do |t|
115         next unless t.alive?
116         begin
117           t.exit
118         rescue StandardError => e
119           GRPC.logger.warn('error while terminating a worker')
120           GRPC.logger.warn(e)
121         end
122       end
123     end
124
125     # removes the threads from workers, and signal when all the
126     # threads are complete.
127     def remove_current_thread
128       @stop_mutex.synchronize do
129         @workers.delete(Thread.current)
130         @stop_cond.signal if @workers.size.zero?
131       end
132     end
133
134     def loop_execute_jobs(worker_queue)
135       loop do
136         begin
137           blk, args = worker_queue.pop
138           blk.call(*args)
139         rescue StandardError, GRPC::Core::CallError => e
140           GRPC.logger.warn('Error in worker thread')
141           GRPC.logger.warn(e)
142         end
143         # there shouldn't be any work given to this thread while its busy
144         fail('received a task while busy') unless worker_queue.empty?
145         @stop_mutex.synchronize do
146           return if @stopped
147           @ready_workers << worker_queue
148         end
149       end
150     end
151   end
152
153   # RpcServer hosts a number of services and makes them available on the
154   # network.
155   class RpcServer
156     include Core::CallOps
157     include Core::TimeConsts
158     extend ::Forwardable
159
160     def_delegators :@server, :add_http2_port
161
162     # Default thread pool size is 30
163     DEFAULT_POOL_SIZE = 30
164
165     # Deprecated due to internal changes to the thread pool
166     DEFAULT_MAX_WAITING_REQUESTS = 20
167
168     # Default poll period is 1s
169     DEFAULT_POLL_PERIOD = 1
170
171     # Signal check period is 0.25s
172     SIGNAL_CHECK_PERIOD = 0.25
173
174     # setup_connect_md_proc is used by #initialize to validate the
175     # connect_md_proc.
176     def self.setup_connect_md_proc(a_proc)
177       return nil if a_proc.nil?
178       fail(TypeError, '!Proc') unless a_proc.is_a? Proc
179       a_proc
180     end
181
182     # Creates a new RpcServer.
183     #
184     # The RPC server is configured using keyword arguments.
185     #
186     # There are some specific keyword args used to configure the RpcServer
187     # instance.
188     #
189     # * pool_size: the size of the thread pool the server uses to run its
190     # threads. No more concurrent requests can be made than the size
191     # of the thread pool
192     #
193     # * max_waiting_requests: Deprecated due to internal changes to the thread
194     # pool. This is still an argument for compatibility but is ignored.
195     #
196     # * poll_period: The amount of time in seconds to wait for
197     # currently-serviced RPC's to finish before cancelling them when shutting
198     # down the server.
199     #
200     # * pool_keep_alive: The amount of time in seconds to wait
201     # for currently busy thread-pool threads to finish before
202     # forcing an abrupt exit to each thread.
203     #
204     # * connect_md_proc:
205     # when non-nil is a proc for determining metadata to send back the client
206     # on receiving an invocation req.  The proc signature is:
207     #   {key: val, ..} func(method_name, {key: val, ...})
208     #
209     # * server_args:
210     # A server arguments hash to be passed down to the underlying core server
211     #
212     # * interceptors:
213     # Am array of GRPC::ServerInterceptor objects that will be used for
214     # intercepting server handlers to provide extra functionality.
215     # Interceptors are an EXPERIMENTAL API.
216     #
217     def initialize(pool_size: DEFAULT_POOL_SIZE,
218                    max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS,
219                    poll_period: DEFAULT_POLL_PERIOD,
220                    pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE,
221                    connect_md_proc: nil,
222                    server_args: {},
223                    interceptors: [])
224       @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
225       @max_waiting_requests = max_waiting_requests
226       @poll_period = poll_period
227       @pool_size = pool_size
228       @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive)
229       @run_cond = ConditionVariable.new
230       @run_mutex = Mutex.new
231       # running_state can take 4 values: :not_started, :running, :stopping, and
232       # :stopped. State transitions can only proceed in that order.
233       @running_state = :not_started
234       @server = Core::Server.new(server_args)
235       @interceptors = InterceptorRegistry.new(interceptors)
236     end
237
238     # stops a running server
239     #
240     # the call has no impact if the server is already stopped, otherwise
241     # server's current call loop is it's last.
242     def stop
243       # if called via run_till_terminated_or_interrupted,
244       #   signal stop_server_thread and dont do anything
245       if @stop_server.nil? == false && @stop_server == false
246         @stop_server = true
247         @stop_server_cv.broadcast
248         return
249       end
250       @run_mutex.synchronize do
251         fail 'Cannot stop before starting' if @running_state == :not_started
252         return if @running_state != :running
253         transition_running_state(:stopping)
254         deadline = from_relative_time(@poll_period)
255         @server.shutdown_and_notify(deadline)
256       end
257       @pool.stop
258     end
259
260     def running_state
261       @run_mutex.synchronize do
262         return @running_state
263       end
264     end
265
266     # Can only be called while holding @run_mutex
267     def transition_running_state(target_state)
268       state_transitions = {
269         not_started: :running,
270         running: :stopping,
271         stopping: :stopped
272       }
273       if state_transitions[@running_state] == target_state
274         @running_state = target_state
275       else
276         fail "Bad server state transition: #{@running_state}->#{target_state}"
277       end
278     end
279
280     def running?
281       running_state == :running
282     end
283
284     def stopped?
285       running_state == :stopped
286     end
287
288     # Is called from other threads to wait for #run to start up the server.
289     #
290     # If run has not been called, this returns immediately.
291     #
292     # @param timeout [Numeric] number of seconds to wait
293     # @return [true, false] true if the server is running, false otherwise
294     def wait_till_running(timeout = nil)
295       @run_mutex.synchronize do
296         @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
297         return @running_state == :running
298       end
299     end
300
301     # handle registration of classes
302     #
303     # service is either a class that includes GRPC::GenericService and whose
304     # #new function can be called without argument or any instance of such a
305     # class.
306     #
307     # E.g, after
308     #
309     # class Divider
310     #   include GRPC::GenericService
311     #   rpc :div DivArgs, DivReply    # single request, single response
312     #   def initialize(optional_arg='default option') # no args
313     #     ...
314     #   end
315     #
316     # srv = GRPC::RpcServer.new(...)
317     #
318     # # Either of these works
319     #
320     # srv.handle(Divider)
321     #
322     # # or
323     #
324     # srv.handle(Divider.new('replace optional arg'))
325     #
326     # It raises RuntimeError:
327     # - if service is not valid service class or object
328     # - its handler methods are already registered
329     # - if the server is already running
330     #
331     # @param service [Object|Class] a service class or object as described
332     #        above
333     def handle(service)
334       @run_mutex.synchronize do
335         unless @running_state == :not_started
336           fail 'cannot add services if the server has been started'
337         end
338         cls = service.is_a?(Class) ? service : service.class
339         assert_valid_service_class(cls)
340         add_rpc_descs_for(service)
341       end
342     end
343
344     # runs the server
345     #
346     # - if no rpc_descs are registered, this exits immediately, otherwise it
347     #   continues running permanently and does not return until program exit.
348     #
349     # - #running? returns true after this is called, until #stop cause the
350     #   the server to stop.
351     def run
352       @run_mutex.synchronize do
353         fail 'cannot run without registering services' if rpc_descs.size.zero?
354         @pool.start
355         @server.start
356         transition_running_state(:running)
357         @run_cond.broadcast
358       end
359       loop_handle_server_calls
360     end
361
362     alias_method :run_till_terminated, :run
363
364     # runs the server with signal handlers
365     # @param signals
366     #     List of String, Integer or both representing signals that the user
367     #     would like to send to the server for graceful shutdown
368     # @param wait_interval (optional)
369     #     Integer seconds that user would like stop_server_thread to poll
370     #     stop_server
371     def run_till_terminated_or_interrupted(signals, wait_interval = 60)
372       @stop_server = false
373       @stop_server_mu = Mutex.new
374       @stop_server_cv = ConditionVariable.new
375
376       @stop_server_thread = Thread.new do
377         loop do
378           break if @stop_server
379           @stop_server_mu.synchronize do
380             @stop_server_cv.wait(@stop_server_mu, wait_interval)
381           end
382         end
383
384         # stop is surrounded by mutex, should handle multiple calls to stop
385         #   correctly
386         stop
387       end
388
389       valid_signals = Signal.list
390
391       # register signal handlers
392       signals.each do |sig|
393         # input validation
394         if sig.class == String
395           sig.upcase!
396           if sig.start_with?('SIG')
397             # cut out the SIG prefix to see if valid signal
398             sig = sig[3..-1]
399           end
400         end
401
402         # register signal traps for all valid signals
403         if valid_signals.value?(sig) || valid_signals.key?(sig)
404           Signal.trap(sig) do
405             @stop_server = true
406             @stop_server_cv.broadcast
407           end
408         else
409           fail "#{sig} not a valid signal"
410         end
411       end
412
413       run
414
415       @stop_server_thread.join
416     end
417
418     # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
419     def available?(an_rpc)
420       return an_rpc if @pool.ready_for_work?
421       GRPC.logger.warn('no free worker threads currently')
422       noop = proc { |x| x }
423
424       # Create a new active call that knows that metadata hasn't been
425       # sent yet
426       c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
427                          metadata_received: true, started: false)
428       c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED,
429                     'No free threads in thread pool')
430       nil
431     end
432
433     # Sends UNIMPLEMENTED if the method is not implemented by this server
434     def implemented?(an_rpc)
435       mth = an_rpc.method.to_sym
436       return an_rpc if rpc_descs.key?(mth)
437       GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
438       noop = proc { |x| x }
439
440       # Create a new active call that knows that
441       # metadata hasn't been sent yet
442       c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
443                          metadata_received: true, started: false)
444       c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
445       nil
446     end
447
448     # handles calls to the server
449     def loop_handle_server_calls
450       fail 'not started' if running_state == :not_started
451       while running_state == :running
452         begin
453           an_rpc = @server.request_call
454           break if (!an_rpc.nil?) && an_rpc.call.nil?
455           active_call = new_active_server_call(an_rpc)
456           unless active_call.nil?
457             @pool.schedule(active_call) do |ac|
458               c, mth = ac
459               begin
460                 rpc_descs[mth].run_server_method(
461                   c,
462                   rpc_handlers[mth],
463                   @interceptors.build_context
464                 )
465               rescue StandardError
466                 c.send_status(GRPC::Core::StatusCodes::INTERNAL,
467                               'Server handler failed')
468               end
469             end
470           end
471         rescue Core::CallError, RuntimeError => e
472           # these might happen for various reasons.  The correct behavior of
473           # the server is to log them and continue, if it's not shutting down.
474           if running_state == :running
475             GRPC.logger.warn("server call failed: #{e}")
476           end
477           next
478         end
479       end
480       # @running_state should be :stopping here
481       @run_mutex.synchronize do
482         transition_running_state(:stopped)
483         GRPC.logger.info("stopped: #{self}")
484         @server.close
485       end
486     end
487
488     def new_active_server_call(an_rpc)
489       return nil if an_rpc.nil? || an_rpc.call.nil?
490
491       # allow the metadata to be accessed from the call
492       an_rpc.call.metadata = an_rpc.metadata  # attaches md to call for handlers
493       connect_md = nil
494       unless @connect_md_proc.nil?
495         connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
496       end
497
498       return nil unless available?(an_rpc)
499       return nil unless implemented?(an_rpc)
500
501       # Create the ActiveCall. Indicate that metadata hasnt been sent yet.
502       GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
503       rpc_desc = rpc_descs[an_rpc.method.to_sym]
504       c = ActiveCall.new(an_rpc.call,
505                          rpc_desc.marshal_proc,
506                          rpc_desc.unmarshal_proc(:input),
507                          an_rpc.deadline,
508                          metadata_received: true,
509                          started: false,
510                          metadata_to_send: connect_md)
511       c.attach_peer_cert(an_rpc.call.peer_cert)
512       mth = an_rpc.method.to_sym
513       [c, mth]
514     end
515
516     protected
517
518     def rpc_descs
519       @rpc_descs ||= {}
520     end
521
522     def rpc_handlers
523       @rpc_handlers ||= {}
524     end
525
526     def assert_valid_service_class(cls)
527       unless cls.include?(GenericService)
528         fail "#{cls} must 'include GenericService'"
529       end
530       fail "#{cls} should specify some rpc descriptions" if
531         cls.rpc_descs.size.zero?
532     end
533
534     # This should be called while holding @run_mutex
535     def add_rpc_descs_for(service)
536       cls = service.is_a?(Class) ? service : service.class
537       specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
538       cls.rpc_descs.each_pair do |name, spec|
539         route = "/#{cls.service_name}/#{name}".to_sym
540         fail "already registered: rpc #{route} from #{spec}" if specs.key? route
541         specs[route] = spec
542         rpc_name = GenericService.underscore(name.to_s).to_sym
543         if service.is_a?(Class)
544           handlers[route] = cls.new.method(rpc_name)
545         else
546           handlers[route] = service.method(rpc_name)
547         end
548         GRPC.logger.info("handling #{route} with #{handlers[route]}")
549       end
550     end
551   end
552 end