Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / _exit_scenarios.py
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Defines a number of module-scope gRPC scenarios to test clean exit."""
15
16 import argparse
17 import threading
18 import time
19 import logging
20
21 import grpc
22
23 from tests.unit.framework.common import test_constants
24
25 WAIT_TIME = 1000
26
27 REQUEST = b'request'
28
29 UNSTARTED_SERVER = 'unstarted_server'
30 RUNNING_SERVER = 'running_server'
31 POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server'
32 POLL_CONNECTIVITY = 'poll_connectivity'
33 IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call'
34 IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call'
35 IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call'
36 IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call'
37 IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call'
38 IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call'
39 IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call'
40
41 UNARY_UNARY = b'/test/UnaryUnary'
42 UNARY_STREAM = b'/test/UnaryStream'
43 STREAM_UNARY = b'/test/StreamUnary'
44 STREAM_STREAM = b'/test/StreamStream'
45 PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream'
46 PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary'
47 PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream'
48
49 TEST_TO_METHOD = {
50     IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY,
51     IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM,
52     IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY,
53     IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM,
54     IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM,
55     IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY,
56     IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM,
57 }
58
59
60 def hang_unary_unary(request, servicer_context):
61     time.sleep(WAIT_TIME)
62
63
64 def hang_unary_stream(request, servicer_context):
65     time.sleep(WAIT_TIME)
66
67
68 def hang_partial_unary_stream(request, servicer_context):
69     for _ in range(test_constants.STREAM_LENGTH // 2):
70         yield request
71     time.sleep(WAIT_TIME)
72
73
74 def hang_stream_unary(request_iterator, servicer_context):
75     time.sleep(WAIT_TIME)
76
77
78 def hang_partial_stream_unary(request_iterator, servicer_context):
79     for _ in range(test_constants.STREAM_LENGTH // 2):
80         next(request_iterator)
81     time.sleep(WAIT_TIME)
82
83
84 def hang_stream_stream(request_iterator, servicer_context):
85     time.sleep(WAIT_TIME)
86
87
88 def hang_partial_stream_stream(request_iterator, servicer_context):
89     for _ in range(test_constants.STREAM_LENGTH // 2):
90         yield next(request_iterator)  #pylint: disable=stop-iteration-return
91     time.sleep(WAIT_TIME)
92
93
94 class MethodHandler(grpc.RpcMethodHandler):
95
96     def __init__(self, request_streaming, response_streaming, partial_hang):
97         self.request_streaming = request_streaming
98         self.response_streaming = response_streaming
99         self.request_deserializer = None
100         self.response_serializer = None
101         self.unary_unary = None
102         self.unary_stream = None
103         self.stream_unary = None
104         self.stream_stream = None
105         if self.request_streaming and self.response_streaming:
106             if partial_hang:
107                 self.stream_stream = hang_partial_stream_stream
108             else:
109                 self.stream_stream = hang_stream_stream
110         elif self.request_streaming:
111             if partial_hang:
112                 self.stream_unary = hang_partial_stream_unary
113             else:
114                 self.stream_unary = hang_stream_unary
115         elif self.response_streaming:
116             if partial_hang:
117                 self.unary_stream = hang_partial_unary_stream
118             else:
119                 self.unary_stream = hang_unary_stream
120         else:
121             self.unary_unary = hang_unary_unary
122
123
124 class GenericHandler(grpc.GenericRpcHandler):
125
126     def service(self, handler_call_details):
127         if handler_call_details.method == UNARY_UNARY:
128             return MethodHandler(False, False, False)
129         elif handler_call_details.method == UNARY_STREAM:
130             return MethodHandler(False, True, False)
131         elif handler_call_details.method == STREAM_UNARY:
132             return MethodHandler(True, False, False)
133         elif handler_call_details.method == STREAM_STREAM:
134             return MethodHandler(True, True, False)
135         elif handler_call_details.method == PARTIAL_UNARY_STREAM:
136             return MethodHandler(False, True, True)
137         elif handler_call_details.method == PARTIAL_STREAM_UNARY:
138             return MethodHandler(True, False, True)
139         elif handler_call_details.method == PARTIAL_STREAM_STREAM:
140             return MethodHandler(True, True, True)
141         else:
142             return None
143
144
145 # Traditional executors will not exit until all their
146 # current jobs complete.  Because we submit jobs that will
147 # never finish, we don't want to block exit on these jobs.
148 class DaemonPool(object):
149
150     def submit(self, fn, *args, **kwargs):
151         thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
152         thread.daemon = True
153         thread.start()
154
155     def shutdown(self, wait=True):
156         pass
157
158
159 def infinite_request_iterator():
160     while True:
161         yield REQUEST
162
163
164 if __name__ == '__main__':
165     logging.basicConfig()
166     parser = argparse.ArgumentParser()
167     parser.add_argument('scenario', type=str)
168     parser.add_argument('--wait_for_interrupt',
169                         dest='wait_for_interrupt',
170                         action='store_true')
171     args = parser.parse_args()
172
173     if args.scenario == UNSTARTED_SERVER:
174         server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
175         if args.wait_for_interrupt:
176             time.sleep(WAIT_TIME)
177     elif args.scenario == RUNNING_SERVER:
178         server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
179         port = server.add_insecure_port('[::]:0')
180         server.start()
181         if args.wait_for_interrupt:
182             time.sleep(WAIT_TIME)
183     elif args.scenario == POLL_CONNECTIVITY_NO_SERVER:
184         channel = grpc.insecure_channel('localhost:12345')
185
186         def connectivity_callback(connectivity):
187             pass
188
189         channel.subscribe(connectivity_callback, try_to_connect=True)
190         if args.wait_for_interrupt:
191             time.sleep(WAIT_TIME)
192     elif args.scenario == POLL_CONNECTIVITY:
193         server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
194         port = server.add_insecure_port('[::]:0')
195         server.start()
196         channel = grpc.insecure_channel('localhost:%d' % port)
197
198         def connectivity_callback(connectivity):
199             pass
200
201         channel.subscribe(connectivity_callback, try_to_connect=True)
202         if args.wait_for_interrupt:
203             time.sleep(WAIT_TIME)
204
205     else:
206         handler = GenericHandler()
207         server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
208         port = server.add_insecure_port('[::]:0')
209         server.add_generic_rpc_handlers((handler,))
210         server.start()
211         channel = grpc.insecure_channel('localhost:%d' % port)
212
213         method = TEST_TO_METHOD[args.scenario]
214
215         if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL:
216             multi_callable = channel.unary_unary(method)
217             future = multi_callable.future(REQUEST)
218             result, call = multi_callable.with_call(REQUEST)
219         elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or
220               args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL):
221             multi_callable = channel.unary_stream(method)
222             response_iterator = multi_callable(REQUEST)
223             for response in response_iterator:
224                 pass
225         elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or
226               args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL):
227             multi_callable = channel.stream_unary(method)
228             future = multi_callable.future(infinite_request_iterator())
229             result, call = multi_callable.with_call(
230                 iter([REQUEST] * test_constants.STREAM_LENGTH))
231         elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or
232               args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL):
233             multi_callable = channel.stream_stream(method)
234             response_iterator = multi_callable(infinite_request_iterator())
235             for response in response_iterator:
236                 pass