1 # Copyright 2017 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """An example gRPC Python-using client-side application."""
22 from tests.unit.framework.common import test_constants
24 from tests.testing.proto import requests_pb2
25 from tests.testing.proto import services_pb2
26 from tests.testing.proto import services_pb2_grpc
28 from tests.testing import _application_common
32 class Scenario(enum.Enum):
33 UNARY_UNARY = 'unary unary'
34 UNARY_STREAM = 'unary stream'
35 STREAM_UNARY = 'stream unary'
36 STREAM_STREAM = 'stream stream'
37 CONCURRENT_STREAM_UNARY = 'concurrent stream unary'
38 CONCURRENT_STREAM_STREAM = 'concurrent stream stream'
39 CANCEL_UNARY_UNARY = 'cancel unary unary'
40 CANCEL_UNARY_STREAM = 'cancel unary stream'
41 INFINITE_REQUEST_STREAM = 'infinite request stream'
44 class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))):
45 """Outcome of a client application scenario.
48 kind: A Kind value describing the overall kind of scenario execution.
49 code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
50 details: A status details string. Only valid if kind is Kind.RPC_ERROR.
54 class Kind(enum.Enum):
55 SATISFACTORY = 'satisfactory'
56 UNSATISFACTORY = 'unsatisfactory'
57 RPC_ERROR = 'rpc error'
60 _SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
61 _UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)
67 self._condition = threading.Condition()
78 return self._values.pop(0)
82 self._condition.wait()
84 def __next__(self): # (Python 3 Iterator Protocol)
87 def next(self): # (Python 2 Iterator Protocol)
92 self._values.append(value)
93 self._condition.notify_all()
98 self._condition.notify_all()
101 def _run_unary_unary(stub):
102 response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
103 if _application_common.UNARY_UNARY_RESPONSE == response:
104 return _SATISFACTORY_OUTCOME
106 return _UNSATISFACTORY_OUTCOME
109 def _run_unary_stream(stub):
110 response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
112 next(response_iterator)
113 except StopIteration:
114 return _SATISFACTORY_OUTCOME
116 return _UNSATISFACTORY_OUTCOME
119 def _run_stream_unary(stub):
120 response, call = stub.StreUn.with_call(
121 iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
122 if (_application_common.STREAM_UNARY_RESPONSE == response and
123 call.code() is grpc.StatusCode.OK):
124 return _SATISFACTORY_OUTCOME
126 return _UNSATISFACTORY_OUTCOME
129 def _run_stream_stream(stub):
130 request_pipe = _Pipe()
131 response_iterator = stub.StreStre(iter(request_pipe))
132 request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
133 first_responses = next(response_iterator), next(response_iterator)
134 request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
135 second_responses = next(response_iterator), next(response_iterator)
138 next(response_iterator)
139 except StopIteration:
140 unexpected_extra_response = False
142 unexpected_extra_response = True
143 if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and
144 second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
145 and not unexpected_extra_response):
146 return _SATISFACTORY_OUTCOME
148 return _UNSATISFACTORY_OUTCOME
151 def _run_concurrent_stream_unary(stub):
152 future_calls = tuple(
153 stub.StreUn.future(iter((_application_common.STREAM_UNARY_REQUEST,) *
155 for _ in range(test_constants.THREAD_CONCURRENCY))
156 for future_call in future_calls:
157 if future_call.code() is grpc.StatusCode.OK:
158 response = future_call.result()
159 if _application_common.STREAM_UNARY_RESPONSE != response:
160 return _UNSATISFACTORY_OUTCOME
162 return _UNSATISFACTORY_OUTCOME
164 return _SATISFACTORY_OUTCOME
167 def _run_concurrent_stream_stream(stub):
168 condition = threading.Condition()
169 outcomes = [None] * test_constants.RPC_CONCURRENCY
171 def run_stream_stream(index):
172 outcome = _run_stream_stream(stub)
174 outcomes[index] = outcome
177 for index in range(test_constants.RPC_CONCURRENCY):
178 thread = threading.Thread(target=run_stream_stream, args=(index,))
183 for outcome in outcomes:
184 if outcome.kind is not Outcome.Kind.SATISFACTORY:
185 return _UNSATISFACTORY_OUTCOME
187 return _SATISFACTORY_OUTCOME
192 def _run_cancel_unary_unary(stub):
193 response_future_call = stub.UnUn.future(
194 _application_common.UNARY_UNARY_REQUEST)
195 initial_metadata = response_future_call.initial_metadata()
196 cancelled = response_future_call.cancel()
197 if initial_metadata is not None and cancelled:
198 return _SATISFACTORY_OUTCOME
200 return _UNSATISFACTORY_OUTCOME
203 def _run_infinite_request_stream(stub):
205 def infinite_request_iterator():
207 yield _application_common.STREAM_UNARY_REQUEST
209 response_future_call = stub.StreUn.future(
210 infinite_request_iterator(),
211 timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
212 if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
213 return _SATISFACTORY_OUTCOME
215 return _UNSATISFACTORY_OUTCOME
219 Scenario.UNARY_UNARY: _run_unary_unary,
220 Scenario.UNARY_STREAM: _run_unary_stream,
221 Scenario.STREAM_UNARY: _run_stream_unary,
222 Scenario.STREAM_STREAM: _run_stream_stream,
223 Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
224 Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
225 Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
226 Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
230 def run(scenario, channel):
231 stub = services_pb2_grpc.FirstServiceStub(channel)
233 return _IMPLEMENTATIONS[scenario](stub)
234 except grpc.RpcError as rpc_error:
235 return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),