1 # Copyright 2017 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Common utilities for tests of the Cython layer of gRPC Python."""
19 from grpc._cython import cygrpc
25 INVOCATION_METADATA = (
26 ('client-md-key', 'client-md-key'),
27 ('client-md-key-bin', b'\x00\x01' * 3000),
31 ('server-initial-md-key', 'server-initial-md-value'),
32 ('server-initial-md-key-bin', b'\x00\x02' * 3000),
36 ('server-trailing-md-key', 'server-trailing-md-value'),
37 ('server-trailing-md-key-bin', b'\x00\x03' * 3000),
41 class QueueDriver(object):
43 def __init__(self, condition, completion_queue):
44 self._condition = condition
45 self._completion_queue = completion_queue
46 self._due = collections.defaultdict(int)
47 self._events = collections.defaultdict(list)
49 def add_due(self, tags):
54 event = self._completion_queue.poll()
56 self._events[event.tag].append(event)
57 self._due[event.tag] -= 1
58 self._condition.notify_all()
59 if self._due[event.tag] <= 0:
60 self._due.pop(event.tag)
64 thread = threading.Thread(target=in_thread)
69 def event_with_tag(self, tag):
73 return self._events[tag].pop(0)
75 self._condition.wait()
78 def execute_many_times(behavior):
79 return tuple(behavior() for _ in range(RPC_COUNT))
82 class OperationResult(
83 collections.namedtuple('OperationResult', (
91 SUCCESSFUL_OPERATION_RESULT = OperationResult(
92 cygrpc.CallError.ok, cygrpc.CompletionType.operation_complete, True)
95 class RpcTest(object):
98 self.server_completion_queue = cygrpc.CompletionQueue()
99 self.server = cygrpc.Server([(b'grpc.so_reuseport', 0)], False)
100 self.server.register_completion_queue(self.server_completion_queue)
101 port = self.server.add_http2_port(b'[::]:0')
103 self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), [],
106 self._server_shutdown_tag = 'server_shutdown_tag'
107 self.server_condition = threading.Condition()
108 self.server_driver = QueueDriver(self.server_condition,
109 self.server_completion_queue)
110 with self.server_condition:
111 self.server_driver.add_due({
112 self._server_shutdown_tag,
115 self.client_condition = threading.Condition()
116 self.client_completion_queue = cygrpc.CompletionQueue()
117 self.client_driver = QueueDriver(self.client_condition,
118 self.client_completion_queue)
121 self.server.shutdown(self.server_completion_queue,
122 self._server_shutdown_tag)
123 self.server.cancel_all_calls()