Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / testing / _client_application.py
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """An example gRPC Python-using client-side application."""
15
16 import collections
17 import enum
18 import threading
19 import time
20
21 import grpc
22 from tests.unit.framework.common import test_constants
23
24 from tests.testing.proto import requests_pb2
25 from tests.testing.proto import services_pb2
26 from tests.testing.proto import services_pb2_grpc
27
28 from tests.testing import _application_common
29
30
31 @enum.unique
32 class Scenario(enum.Enum):
33     UNARY_UNARY = 'unary unary'
34     UNARY_STREAM = 'unary stream'
35     STREAM_UNARY = 'stream unary'
36     STREAM_STREAM = 'stream stream'
37     CONCURRENT_STREAM_UNARY = 'concurrent stream unary'
38     CONCURRENT_STREAM_STREAM = 'concurrent stream stream'
39     CANCEL_UNARY_UNARY = 'cancel unary unary'
40     CANCEL_UNARY_STREAM = 'cancel unary stream'
41     INFINITE_REQUEST_STREAM = 'infinite request stream'
42
43
44 class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))):
45     """Outcome of a client application scenario.
46
47     Attributes:
48       kind: A Kind value describing the overall kind of scenario execution.
49       code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
50       details: A status details string. Only valid if kind is Kind.RPC_ERROR.
51     """
52
53     @enum.unique
54     class Kind(enum.Enum):
55         SATISFACTORY = 'satisfactory'
56         UNSATISFACTORY = 'unsatisfactory'
57         RPC_ERROR = 'rpc error'
58
59
60 _SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
61 _UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)
62
63
64 class _Pipe(object):
65
66     def __init__(self):
67         self._condition = threading.Condition()
68         self._values = []
69         self._open = True
70
71     def __iter__(self):
72         return self
73
74     def _next(self):
75         with self._condition:
76             while True:
77                 if self._values:
78                     return self._values.pop(0)
79                 elif not self._open:
80                     raise StopIteration()
81                 else:
82                     self._condition.wait()
83
84     def __next__(self):  # (Python 3 Iterator Protocol)
85         return self._next()
86
87     def next(self):  # (Python 2 Iterator Protocol)
88         return self._next()
89
90     def add(self, value):
91         with self._condition:
92             self._values.append(value)
93             self._condition.notify_all()
94
95     def close(self):
96         with self._condition:
97             self._open = False
98             self._condition.notify_all()
99
100
101 def _run_unary_unary(stub):
102     response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
103     if _application_common.UNARY_UNARY_RESPONSE == response:
104         return _SATISFACTORY_OUTCOME
105     else:
106         return _UNSATISFACTORY_OUTCOME
107
108
109 def _run_unary_stream(stub):
110     response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
111     try:
112         next(response_iterator)
113     except StopIteration:
114         return _SATISFACTORY_OUTCOME
115     else:
116         return _UNSATISFACTORY_OUTCOME
117
118
119 def _run_stream_unary(stub):
120     response, call = stub.StreUn.with_call(
121         iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
122     if (_application_common.STREAM_UNARY_RESPONSE == response and
123             call.code() is grpc.StatusCode.OK):
124         return _SATISFACTORY_OUTCOME
125     else:
126         return _UNSATISFACTORY_OUTCOME
127
128
129 def _run_stream_stream(stub):
130     request_pipe = _Pipe()
131     response_iterator = stub.StreStre(iter(request_pipe))
132     request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
133     first_responses = next(response_iterator), next(response_iterator)
134     request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
135     second_responses = next(response_iterator), next(response_iterator)
136     request_pipe.close()
137     try:
138         next(response_iterator)
139     except StopIteration:
140         unexpected_extra_response = False
141     else:
142         unexpected_extra_response = True
143     if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and
144             second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
145             and not unexpected_extra_response):
146         return _SATISFACTORY_OUTCOME
147     else:
148         return _UNSATISFACTORY_OUTCOME
149
150
151 def _run_concurrent_stream_unary(stub):
152     future_calls = tuple(
153         stub.StreUn.future(iter((_application_common.STREAM_UNARY_REQUEST,) *
154                                 3))
155         for _ in range(test_constants.THREAD_CONCURRENCY))
156     for future_call in future_calls:
157         if future_call.code() is grpc.StatusCode.OK:
158             response = future_call.result()
159             if _application_common.STREAM_UNARY_RESPONSE != response:
160                 return _UNSATISFACTORY_OUTCOME
161         else:
162             return _UNSATISFACTORY_OUTCOME
163     else:
164         return _SATISFACTORY_OUTCOME
165
166
167 def _run_concurrent_stream_stream(stub):
168     condition = threading.Condition()
169     outcomes = [None] * test_constants.RPC_CONCURRENCY
170
171     def run_stream_stream(index):
172         outcome = _run_stream_stream(stub)
173         with condition:
174             outcomes[index] = outcome
175             condition.notify()
176
177     for index in range(test_constants.RPC_CONCURRENCY):
178         thread = threading.Thread(target=run_stream_stream, args=(index,))
179         thread.start()
180     with condition:
181         while True:
182             if all(outcomes):
183                 for outcome in outcomes:
184                     if outcome.kind is not Outcome.Kind.SATISFACTORY:
185                         return _UNSATISFACTORY_OUTCOME
186                 else:
187                     return _SATISFACTORY_OUTCOME
188             else:
189                 condition.wait()
190
191
192 def _run_cancel_unary_unary(stub):
193     response_future_call = stub.UnUn.future(
194         _application_common.UNARY_UNARY_REQUEST)
195     initial_metadata = response_future_call.initial_metadata()
196     cancelled = response_future_call.cancel()
197     if initial_metadata is not None and cancelled:
198         return _SATISFACTORY_OUTCOME
199     else:
200         return _UNSATISFACTORY_OUTCOME
201
202
203 def _run_infinite_request_stream(stub):
204
205     def infinite_request_iterator():
206         while True:
207             yield _application_common.STREAM_UNARY_REQUEST
208
209     response_future_call = stub.StreUn.future(
210         infinite_request_iterator(),
211         timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
212     if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
213         return _SATISFACTORY_OUTCOME
214     else:
215         return _UNSATISFACTORY_OUTCOME
216
217
218 _IMPLEMENTATIONS = {
219     Scenario.UNARY_UNARY: _run_unary_unary,
220     Scenario.UNARY_STREAM: _run_unary_stream,
221     Scenario.STREAM_UNARY: _run_stream_unary,
222     Scenario.STREAM_STREAM: _run_stream_stream,
223     Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
224     Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
225     Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
226     Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
227 }
228
229
230 def run(scenario, channel):
231     stub = services_pb2_grpc.FirstServiceStub(channel)
232     try:
233         return _IMPLEMENTATIONS[scenario](stub)
234     except grpc.RpcError as rpc_error:
235         return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
236                        rpc_error.details())