1 # Copyright 2017 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test a corner-case at the level of the Cython API."""
19 from grpc._cython import cygrpc
21 from tests.unit._cython import _common
22 from tests.unit._cython import test_utilities
25 class Test(_common.RpcTest, unittest.TestCase):
28 server_call_condition = threading.Condition()
29 server_call_completion_queue = cygrpc.CompletionQueue()
30 server_call_driver = _common.QueueDriver(server_call_condition,
31 server_call_completion_queue)
33 server_request_call_tag = 'server_request_call_tag'
34 server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
35 server_complete_rpc_tag = 'server_complete_rpc_tag'
37 with self.server_condition:
38 server_request_call_start_batch_result = self.server.request_call(
39 server_call_completion_queue, self.server_completion_queue,
40 server_request_call_tag)
41 self.server_driver.add_due({
42 server_request_call_tag,
45 client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
46 client_complete_rpc_tag = 'client_complete_rpc_tag'
47 client_call = self.channel.integrated_call(
48 _common.EMPTY_FLAGS, b'/twinkies', None, None,
49 _common.INVOCATION_METADATA, None, [(
51 cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
53 client_receive_initial_metadata_tag,
56 cygrpc.SendInitialMetadataOperation(_common.INVOCATION_METADATA,
58 cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
59 cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
60 ], client_complete_rpc_tag)
62 client_events_future = test_utilities.SimpleFuture(lambda: [
63 self.channel.next_call_event(),
64 self.channel.next_call_event(),
67 server_request_call_event = self.server_driver.event_with_tag(
68 server_request_call_tag)
70 with server_call_condition:
71 server_send_initial_metadata_start_batch_result = (
72 server_request_call_event.call.start_server_batch([
73 cygrpc.SendInitialMetadataOperation(
74 _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
75 ], server_send_initial_metadata_tag))
76 server_call_driver.add_due({
77 server_send_initial_metadata_tag,
79 server_send_initial_metadata_event = server_call_driver.event_with_tag(
80 server_send_initial_metadata_tag)
82 with server_call_condition:
83 server_complete_rpc_start_batch_result = (
84 server_request_call_event.call.start_server_batch([
85 cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
86 cygrpc.SendStatusFromServerOperation(
87 _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
88 b'test details', _common.EMPTY_FLAGS),
89 ], server_complete_rpc_tag))
90 server_call_driver.add_due({
91 server_complete_rpc_tag,
93 server_complete_rpc_event = server_call_driver.event_with_tag(
94 server_complete_rpc_tag)
96 client_events = client_events_future.result()
97 if client_events[0].tag is client_receive_initial_metadata_tag:
98 client_receive_initial_metadata_event = client_events[0]
99 client_complete_rpc_event = client_events[1]
101 client_complete_rpc_event = client_events[0]
102 client_receive_initial_metadata_event = client_events[1]
105 _common.OperationResult(server_request_call_start_batch_result,
106 server_request_call_event.completion_type,
107 server_request_call_event.success),
108 _common.OperationResult(
110 client_receive_initial_metadata_event.completion_type,
111 client_receive_initial_metadata_event.success),
112 _common.OperationResult(cygrpc.CallError.ok,
113 client_complete_rpc_event.completion_type,
114 client_complete_rpc_event.success),
115 _common.OperationResult(
116 server_send_initial_metadata_start_batch_result,
117 server_send_initial_metadata_event.completion_type,
118 server_send_initial_metadata_event.success),
119 _common.OperationResult(server_complete_rpc_start_batch_result,
120 server_complete_rpc_event.completion_type,
121 server_complete_rpc_event.success),
125 expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * 5
126 ] * _common.RPC_COUNT
127 actuallys = _common.execute_many_times(self._do_rpcs)
128 self.assertSequenceEqual(expecteds, actuallys)
131 if __name__ == '__main__':
132 unittest.main(verbosity=2)