1 # Copyright 2016 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Defines a number of module-scope gRPC scenarios to test clean exit."""
23 from tests.unit.framework.common import test_constants
29 UNSTARTED_SERVER = 'unstarted_server'
30 RUNNING_SERVER = 'running_server'
31 POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server'
32 POLL_CONNECTIVITY = 'poll_connectivity'
33 IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call'
34 IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call'
35 IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call'
36 IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call'
37 IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call'
38 IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call'
39 IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call'
41 UNARY_UNARY = b'/test/UnaryUnary'
42 UNARY_STREAM = b'/test/UnaryStream'
43 STREAM_UNARY = b'/test/StreamUnary'
44 STREAM_STREAM = b'/test/StreamStream'
45 PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream'
46 PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary'
47 PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream'
50 IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY,
51 IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM,
52 IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY,
53 IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM,
54 IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM,
55 IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY,
56 IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM,
60 def hang_unary_unary(request, servicer_context):
64 def hang_unary_stream(request, servicer_context):
68 def hang_partial_unary_stream(request, servicer_context):
69 for _ in range(test_constants.STREAM_LENGTH // 2):
74 def hang_stream_unary(request_iterator, servicer_context):
78 def hang_partial_stream_unary(request_iterator, servicer_context):
79 for _ in range(test_constants.STREAM_LENGTH // 2):
80 next(request_iterator)
84 def hang_stream_stream(request_iterator, servicer_context):
88 def hang_partial_stream_stream(request_iterator, servicer_context):
89 for _ in range(test_constants.STREAM_LENGTH // 2):
90 yield next(request_iterator) #pylint: disable=stop-iteration-return
94 class MethodHandler(grpc.RpcMethodHandler):
96 def __init__(self, request_streaming, response_streaming, partial_hang):
97 self.request_streaming = request_streaming
98 self.response_streaming = response_streaming
99 self.request_deserializer = None
100 self.response_serializer = None
101 self.unary_unary = None
102 self.unary_stream = None
103 self.stream_unary = None
104 self.stream_stream = None
105 if self.request_streaming and self.response_streaming:
107 self.stream_stream = hang_partial_stream_stream
109 self.stream_stream = hang_stream_stream
110 elif self.request_streaming:
112 self.stream_unary = hang_partial_stream_unary
114 self.stream_unary = hang_stream_unary
115 elif self.response_streaming:
117 self.unary_stream = hang_partial_unary_stream
119 self.unary_stream = hang_unary_stream
121 self.unary_unary = hang_unary_unary
124 class GenericHandler(grpc.GenericRpcHandler):
126 def service(self, handler_call_details):
127 if handler_call_details.method == UNARY_UNARY:
128 return MethodHandler(False, False, False)
129 elif handler_call_details.method == UNARY_STREAM:
130 return MethodHandler(False, True, False)
131 elif handler_call_details.method == STREAM_UNARY:
132 return MethodHandler(True, False, False)
133 elif handler_call_details.method == STREAM_STREAM:
134 return MethodHandler(True, True, False)
135 elif handler_call_details.method == PARTIAL_UNARY_STREAM:
136 return MethodHandler(False, True, True)
137 elif handler_call_details.method == PARTIAL_STREAM_UNARY:
138 return MethodHandler(True, False, True)
139 elif handler_call_details.method == PARTIAL_STREAM_STREAM:
140 return MethodHandler(True, True, True)
145 # Traditional executors will not exit until all their
146 # current jobs complete. Because we submit jobs that will
147 # never finish, we don't want to block exit on these jobs.
148 class DaemonPool(object):
150 def submit(self, fn, *args, **kwargs):
151 thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
155 def shutdown(self, wait=True):
159 def infinite_request_iterator():
164 if __name__ == '__main__':
165 logging.basicConfig()
166 parser = argparse.ArgumentParser()
167 parser.add_argument('scenario', type=str)
168 parser.add_argument('--wait_for_interrupt',
169 dest='wait_for_interrupt',
171 args = parser.parse_args()
173 if args.scenario == UNSTARTED_SERVER:
174 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
175 if args.wait_for_interrupt:
176 time.sleep(WAIT_TIME)
177 elif args.scenario == RUNNING_SERVER:
178 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
179 port = server.add_insecure_port('[::]:0')
181 if args.wait_for_interrupt:
182 time.sleep(WAIT_TIME)
183 elif args.scenario == POLL_CONNECTIVITY_NO_SERVER:
184 channel = grpc.insecure_channel('localhost:12345')
186 def connectivity_callback(connectivity):
189 channel.subscribe(connectivity_callback, try_to_connect=True)
190 if args.wait_for_interrupt:
191 time.sleep(WAIT_TIME)
192 elif args.scenario == POLL_CONNECTIVITY:
193 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
194 port = server.add_insecure_port('[::]:0')
196 channel = grpc.insecure_channel('localhost:%d' % port)
198 def connectivity_callback(connectivity):
201 channel.subscribe(connectivity_callback, try_to_connect=True)
202 if args.wait_for_interrupt:
203 time.sleep(WAIT_TIME)
206 handler = GenericHandler()
207 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
208 port = server.add_insecure_port('[::]:0')
209 server.add_generic_rpc_handlers((handler,))
211 channel = grpc.insecure_channel('localhost:%d' % port)
213 method = TEST_TO_METHOD[args.scenario]
215 if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL:
216 multi_callable = channel.unary_unary(method)
217 future = multi_callable.future(REQUEST)
218 result, call = multi_callable.with_call(REQUEST)
219 elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or
220 args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL):
221 multi_callable = channel.unary_stream(method)
222 response_iterator = multi_callable(REQUEST)
223 for response in response_iterator:
225 elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or
226 args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL):
227 multi_callable = channel.stream_unary(method)
228 future = multi_callable.future(infinite_request_iterator())
229 result, call = multi_callable.with_call(
230 iter([REQUEST] * test_constants.STREAM_LENGTH))
231 elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or
232 args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL):
233 multi_callable = channel.stream_stream(method)
234 response_iterator = multi_callable(infinite_request_iterator())
235 for response in response_iterator: