1 # Copyright 2015 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 require_relative '../grpc'
16 require_relative 'active_call'
17 require_relative 'service'
20 # GRPC contains the General RPC module.
22 # Pool is a simple thread pool.
24 # Default keep alive period is 1s
25 DEFAULT_KEEP_ALIVE = 1
27 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
28 fail 'pool size must be positive' unless size > 0
32 @stop_mutex = Mutex.new # needs to be held when accessing @stopped
33 @stop_cond = ConditionVariable.new
35 @keep_alive = keep_alive
37 # Each worker thread has its own queue to push and pull jobs
38 # these queues are put into @ready_queues when that worker is idle
39 @ready_workers = Queue.new
42 # Returns the number of jobs waiting
48 # Busy worker threads are either doing work, or have a single job
49 # waiting on them. Workers that are idle with no jobs waiting
50 # have their "queues" in @ready_workers
51 !@ready_workers.empty?
54 # Runs the given block on the queue with the provided args.
56 # @param args the args passed blk when it is called
57 # @param blk the block to call
58 def schedule(*args, &blk)
60 @stop_mutex.synchronize do
62 GRPC.logger.warn('did not schedule job, already stopped')
65 GRPC.logger.info('schedule another job')
66 fail 'No worker threads available' if @ready_workers.empty?
67 worker_queue = @ready_workers.pop
69 fail 'worker already has a task waiting' unless worker_queue.empty?
70 worker_queue << [blk, args]
74 # Starts running the jobs in the thread pool.
76 @stop_mutex.synchronize do
77 fail 'already stopped' if @stopped
79 until @workers.size == @size.to_i
80 new_worker_queue = Queue.new
81 @ready_workers << new_worker_queue
82 next_thread = Thread.new(new_worker_queue) do |jobs|
83 catch(:exit) do # allows { throw :exit } to kill a thread
84 loop_execute_jobs(jobs)
88 @workers << next_thread
92 # Stops the jobs in the pool
94 GRPC.logger.info('stopping, will wait for all the workers to exit')
95 @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop
98 break unless ready_for_work?
99 worker_queue = @ready_workers.pop
100 worker_queue << [proc { throw :exit }, []]
102 @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
104 forcibly_stop_workers
105 GRPC.logger.info('stopped, all workers are shutdown')
110 # Forcibly shutdown any threads that are still alive.
111 def forcibly_stop_workers
112 return unless @workers.size > 0
113 GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
118 rescue StandardError => e
119 GRPC.logger.warn('error while terminating a worker')
125 # removes the threads from workers, and signal when all the
126 # threads are complete.
127 def remove_current_thread
128 @stop_mutex.synchronize do
129 @workers.delete(Thread.current)
130 @stop_cond.signal if @workers.size.zero?
134 def loop_execute_jobs(worker_queue)
137 blk, args = worker_queue.pop
139 rescue StandardError, GRPC::Core::CallError => e
140 GRPC.logger.warn('Error in worker thread')
143 # there shouldn't be any work given to this thread while its busy
144 fail('received a task while busy') unless worker_queue.empty?
145 @stop_mutex.synchronize do
147 @ready_workers << worker_queue
153 # RpcServer hosts a number of services and makes them available on the
156 include Core::CallOps
157 include Core::TimeConsts
160 def_delegators :@server, :add_http2_port
162 # Default thread pool size is 30
163 DEFAULT_POOL_SIZE = 30
165 # Deprecated due to internal changes to the thread pool
166 DEFAULT_MAX_WAITING_REQUESTS = 20
168 # Default poll period is 1s
169 DEFAULT_POLL_PERIOD = 1
171 # Signal check period is 0.25s
172 SIGNAL_CHECK_PERIOD = 0.25
174 # setup_connect_md_proc is used by #initialize to validate the
176 def self.setup_connect_md_proc(a_proc)
177 return nil if a_proc.nil?
178 fail(TypeError, '!Proc') unless a_proc.is_a? Proc
182 # Creates a new RpcServer.
184 # The RPC server is configured using keyword arguments.
186 # There are some specific keyword args used to configure the RpcServer
189 # * pool_size: the size of the thread pool the server uses to run its
190 # threads. No more concurrent requests can be made than the size
193 # * max_waiting_requests: Deprecated due to internal changes to the thread
194 # pool. This is still an argument for compatibility but is ignored.
196 # * poll_period: The amount of time in seconds to wait for
197 # currently-serviced RPC's to finish before cancelling them when shutting
200 # * pool_keep_alive: The amount of time in seconds to wait
201 # for currently busy thread-pool threads to finish before
202 # forcing an abrupt exit to each thread.
205 # when non-nil is a proc for determining metadata to send back the client
206 # on receiving an invocation req. The proc signature is:
207 # {key: val, ..} func(method_name, {key: val, ...})
210 # A server arguments hash to be passed down to the underlying core server
213 # Am array of GRPC::ServerInterceptor objects that will be used for
214 # intercepting server handlers to provide extra functionality.
215 # Interceptors are an EXPERIMENTAL API.
217 def initialize(pool_size: DEFAULT_POOL_SIZE,
218 max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS,
219 poll_period: DEFAULT_POLL_PERIOD,
220 pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE,
221 connect_md_proc: nil,
224 @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
225 @max_waiting_requests = max_waiting_requests
226 @poll_period = poll_period
227 @pool_size = pool_size
228 @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive)
229 @run_cond = ConditionVariable.new
230 @run_mutex = Mutex.new
231 # running_state can take 4 values: :not_started, :running, :stopping, and
232 # :stopped. State transitions can only proceed in that order.
233 @running_state = :not_started
234 @server = Core::Server.new(server_args)
235 @interceptors = InterceptorRegistry.new(interceptors)
238 # stops a running server
240 # the call has no impact if the server is already stopped, otherwise
241 # server's current call loop is it's last.
243 # if called via run_till_terminated_or_interrupted,
244 # signal stop_server_thread and dont do anything
245 if @stop_server.nil? == false && @stop_server == false
247 @stop_server_cv.broadcast
250 @run_mutex.synchronize do
251 fail 'Cannot stop before starting' if @running_state == :not_started
252 return if @running_state != :running
253 transition_running_state(:stopping)
254 deadline = from_relative_time(@poll_period)
255 @server.shutdown_and_notify(deadline)
261 @run_mutex.synchronize do
262 return @running_state
266 # Can only be called while holding @run_mutex
267 def transition_running_state(target_state)
268 state_transitions = {
269 not_started: :running,
273 if state_transitions[@running_state] == target_state
274 @running_state = target_state
276 fail "Bad server state transition: #{@running_state}->#{target_state}"
281 running_state == :running
285 running_state == :stopped
288 # Is called from other threads to wait for #run to start up the server.
290 # If run has not been called, this returns immediately.
292 # @param timeout [Numeric] number of seconds to wait
293 # @return [true, false] true if the server is running, false otherwise
294 def wait_till_running(timeout = nil)
295 @run_mutex.synchronize do
296 @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
297 return @running_state == :running
301 # handle registration of classes
303 # service is either a class that includes GRPC::GenericService and whose
304 # #new function can be called without argument or any instance of such a
310 # include GRPC::GenericService
311 # rpc :div DivArgs, DivReply # single request, single response
312 # def initialize(optional_arg='default option') # no args
316 # srv = GRPC::RpcServer.new(...)
318 # # Either of these works
320 # srv.handle(Divider)
324 # srv.handle(Divider.new('replace optional arg'))
326 # It raises RuntimeError:
327 # - if service is not valid service class or object
328 # - its handler methods are already registered
329 # - if the server is already running
331 # @param service [Object|Class] a service class or object as described
334 @run_mutex.synchronize do
335 unless @running_state == :not_started
336 fail 'cannot add services if the server has been started'
338 cls = service.is_a?(Class) ? service : service.class
339 assert_valid_service_class(cls)
340 add_rpc_descs_for(service)
346 # - if no rpc_descs are registered, this exits immediately, otherwise it
347 # continues running permanently and does not return until program exit.
349 # - #running? returns true after this is called, until #stop cause the
350 # the server to stop.
352 @run_mutex.synchronize do
353 fail 'cannot run without registering services' if rpc_descs.size.zero?
356 transition_running_state(:running)
359 loop_handle_server_calls
362 alias_method :run_till_terminated, :run
364 # runs the server with signal handlers
366 # List of String, Integer or both representing signals that the user
367 # would like to send to the server for graceful shutdown
368 # @param wait_interval (optional)
369 # Integer seconds that user would like stop_server_thread to poll
371 def run_till_terminated_or_interrupted(signals, wait_interval = 60)
373 @stop_server_mu = Mutex.new
374 @stop_server_cv = ConditionVariable.new
376 @stop_server_thread = Thread.new do
378 break if @stop_server
379 @stop_server_mu.synchronize do
380 @stop_server_cv.wait(@stop_server_mu, wait_interval)
384 # stop is surrounded by mutex, should handle multiple calls to stop
389 valid_signals = Signal.list
391 # register signal handlers
392 signals.each do |sig|
394 if sig.class == String
396 if sig.start_with?('SIG')
397 # cut out the SIG prefix to see if valid signal
402 # register signal traps for all valid signals
403 if valid_signals.value?(sig) || valid_signals.key?(sig)
406 @stop_server_cv.broadcast
409 fail "#{sig} not a valid signal"
415 @stop_server_thread.join
418 # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
419 def available?(an_rpc)
420 return an_rpc if @pool.ready_for_work?
421 GRPC.logger.warn('no free worker threads currently')
422 noop = proc { |x| x }
424 # Create a new active call that knows that metadata hasn't been
426 c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
427 metadata_received: true, started: false)
428 c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED,
429 'No free threads in thread pool')
433 # Sends UNIMPLEMENTED if the method is not implemented by this server
434 def implemented?(an_rpc)
435 mth = an_rpc.method.to_sym
436 return an_rpc if rpc_descs.key?(mth)
437 GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
438 noop = proc { |x| x }
440 # Create a new active call that knows that
441 # metadata hasn't been sent yet
442 c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
443 metadata_received: true, started: false)
444 c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
448 # handles calls to the server
449 def loop_handle_server_calls
450 fail 'not started' if running_state == :not_started
451 while running_state == :running
453 an_rpc = @server.request_call
454 break if (!an_rpc.nil?) && an_rpc.call.nil?
455 active_call = new_active_server_call(an_rpc)
456 unless active_call.nil?
457 @pool.schedule(active_call) do |ac|
460 rpc_descs[mth].run_server_method(
463 @interceptors.build_context
466 c.send_status(GRPC::Core::StatusCodes::INTERNAL,
467 'Server handler failed')
471 rescue Core::CallError, RuntimeError => e
472 # these might happen for various reasons. The correct behavior of
473 # the server is to log them and continue, if it's not shutting down.
474 if running_state == :running
475 GRPC.logger.warn("server call failed: #{e}")
480 # @running_state should be :stopping here
481 @run_mutex.synchronize do
482 transition_running_state(:stopped)
483 GRPC.logger.info("stopped: #{self}")
488 def new_active_server_call(an_rpc)
489 return nil if an_rpc.nil? || an_rpc.call.nil?
491 # allow the metadata to be accessed from the call
492 an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
494 unless @connect_md_proc.nil?
495 connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
498 return nil unless available?(an_rpc)
499 return nil unless implemented?(an_rpc)
501 # Create the ActiveCall. Indicate that metadata hasnt been sent yet.
502 GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
503 rpc_desc = rpc_descs[an_rpc.method.to_sym]
504 c = ActiveCall.new(an_rpc.call,
505 rpc_desc.marshal_proc,
506 rpc_desc.unmarshal_proc(:input),
508 metadata_received: true,
510 metadata_to_send: connect_md)
511 c.attach_peer_cert(an_rpc.call.peer_cert)
512 mth = an_rpc.method.to_sym
526 def assert_valid_service_class(cls)
527 unless cls.include?(GenericService)
528 fail "#{cls} must 'include GenericService'"
530 fail "#{cls} should specify some rpc descriptions" if
531 cls.rpc_descs.size.zero?
534 # This should be called while holding @run_mutex
535 def add_rpc_descs_for(service)
536 cls = service.is_a?(Class) ? service : service.class
537 specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
538 cls.rpc_descs.each_pair do |name, spec|
539 route = "/#{cls.service_name}/#{name}".to_sym
540 fail "already registered: rpc #{route} from #{spec}" if specs.key? route
542 rpc_name = GenericService.underscore(name.to_s).to_sym
543 if service.is_a?(Class)
544 handlers[route] = cls.new.method(rpc_name)
546 handlers[route] = service.method(rpc_name)
548 GRPC.logger.info("handling #{route} with #{handlers[route]}")