Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / _cython / _no_messages_server_completion_queue_per_call_test.py
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test a corner-case at the level of the Cython API."""
15
16 import threading
17 import unittest
18
19 from grpc._cython import cygrpc
20
21 from tests.unit._cython import _common
22 from tests.unit._cython import test_utilities
23
24
25 class Test(_common.RpcTest, unittest.TestCase):
26
27     def _do_rpcs(self):
28         server_call_condition = threading.Condition()
29         server_call_completion_queue = cygrpc.CompletionQueue()
30         server_call_driver = _common.QueueDriver(server_call_condition,
31                                                  server_call_completion_queue)
32
33         server_request_call_tag = 'server_request_call_tag'
34         server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
35         server_complete_rpc_tag = 'server_complete_rpc_tag'
36
37         with self.server_condition:
38             server_request_call_start_batch_result = self.server.request_call(
39                 server_call_completion_queue, self.server_completion_queue,
40                 server_request_call_tag)
41             self.server_driver.add_due({
42                 server_request_call_tag,
43             })
44
45         client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
46         client_complete_rpc_tag = 'client_complete_rpc_tag'
47         client_call = self.channel.integrated_call(
48             _common.EMPTY_FLAGS, b'/twinkies', None, None,
49             _common.INVOCATION_METADATA, None, [(
50                 [
51                     cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
52                 ],
53                 client_receive_initial_metadata_tag,
54             )])
55         client_call.operate([
56             cygrpc.SendInitialMetadataOperation(_common.INVOCATION_METADATA,
57                                                 _common.EMPTY_FLAGS),
58             cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
59             cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
60         ], client_complete_rpc_tag)
61
62         client_events_future = test_utilities.SimpleFuture(lambda: [
63             self.channel.next_call_event(),
64             self.channel.next_call_event(),
65         ])
66
67         server_request_call_event = self.server_driver.event_with_tag(
68             server_request_call_tag)
69
70         with server_call_condition:
71             server_send_initial_metadata_start_batch_result = (
72                 server_request_call_event.call.start_server_batch([
73                     cygrpc.SendInitialMetadataOperation(
74                         _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
75                 ], server_send_initial_metadata_tag))
76             server_call_driver.add_due({
77                 server_send_initial_metadata_tag,
78             })
79         server_send_initial_metadata_event = server_call_driver.event_with_tag(
80             server_send_initial_metadata_tag)
81
82         with server_call_condition:
83             server_complete_rpc_start_batch_result = (
84                 server_request_call_event.call.start_server_batch([
85                     cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
86                     cygrpc.SendStatusFromServerOperation(
87                         _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
88                         b'test details', _common.EMPTY_FLAGS),
89                 ], server_complete_rpc_tag))
90             server_call_driver.add_due({
91                 server_complete_rpc_tag,
92             })
93         server_complete_rpc_event = server_call_driver.event_with_tag(
94             server_complete_rpc_tag)
95
96         client_events = client_events_future.result()
97         if client_events[0].tag is client_receive_initial_metadata_tag:
98             client_receive_initial_metadata_event = client_events[0]
99             client_complete_rpc_event = client_events[1]
100         else:
101             client_complete_rpc_event = client_events[0]
102             client_receive_initial_metadata_event = client_events[1]
103
104         return (
105             _common.OperationResult(server_request_call_start_batch_result,
106                                     server_request_call_event.completion_type,
107                                     server_request_call_event.success),
108             _common.OperationResult(
109                 cygrpc.CallError.ok,
110                 client_receive_initial_metadata_event.completion_type,
111                 client_receive_initial_metadata_event.success),
112             _common.OperationResult(cygrpc.CallError.ok,
113                                     client_complete_rpc_event.completion_type,
114                                     client_complete_rpc_event.success),
115             _common.OperationResult(
116                 server_send_initial_metadata_start_batch_result,
117                 server_send_initial_metadata_event.completion_type,
118                 server_send_initial_metadata_event.success),
119             _common.OperationResult(server_complete_rpc_start_batch_result,
120                                     server_complete_rpc_event.completion_type,
121                                     server_complete_rpc_event.success),
122         )
123
124     def test_rpcs(self):
125         expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * 5
126                     ] * _common.RPC_COUNT
127         actuallys = _common.execute_many_times(self._do_rpcs)
128         self.assertSequenceEqual(expecteds, actuallys)
129
130
131 if __name__ == '__main__':
132     unittest.main(verbosity=2)