Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_testing / grpc_testing / _server / _server.py
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import threading
16
17 import grpc_testing
18 from grpc_testing import _common
19 from grpc_testing._server import _handler
20 from grpc_testing._server import _rpc
21 from grpc_testing._server import _server_rpc
22 from grpc_testing._server import _service
23 from grpc_testing._server import _servicer_context
24
25
26 def _implementation(descriptors_to_servicers, method_descriptor):
27     servicer = descriptors_to_servicers[method_descriptor.containing_service]
28     return getattr(servicer, method_descriptor.name)
29
30
31 def _unary_unary_service(request):
32
33     def service(implementation, rpc, servicer_context):
34         _service.unary_unary(implementation, rpc, request, servicer_context)
35
36     return service
37
38
39 def _unary_stream_service(request):
40
41     def service(implementation, rpc, servicer_context):
42         _service.unary_stream(implementation, rpc, request, servicer_context)
43
44     return service
45
46
47 def _stream_unary_service(handler):
48
49     def service(implementation, rpc, servicer_context):
50         _service.stream_unary(implementation, rpc, handler, servicer_context)
51
52     return service
53
54
55 def _stream_stream_service(handler):
56
57     def service(implementation, rpc, servicer_context):
58         _service.stream_stream(implementation, rpc, handler, servicer_context)
59
60     return service
61
62
63 class _Serverish(_common.Serverish):
64
65     def __init__(self, descriptors_to_servicers, time):
66         self._descriptors_to_servicers = descriptors_to_servicers
67         self._time = time
68
69     def _invoke(self, service_behavior, method_descriptor, handler,
70                 invocation_metadata, deadline):
71         implementation = _implementation(self._descriptors_to_servicers,
72                                          method_descriptor)
73         rpc = _rpc.Rpc(handler, invocation_metadata)
74         if handler.add_termination_callback(rpc.extrinsic_abort):
75             servicer_context = _servicer_context.ServicerContext(
76                 rpc, self._time, deadline)
77             service_thread = threading.Thread(target=service_behavior,
78                                               args=(
79                                                   implementation,
80                                                   rpc,
81                                                   servicer_context,
82                                               ))
83             service_thread.start()
84
85     def invoke_unary_unary(self, method_descriptor, handler,
86                            invocation_metadata, request, deadline):
87         self._invoke(_unary_unary_service(request), method_descriptor, handler,
88                      invocation_metadata, deadline)
89
90     def invoke_unary_stream(self, method_descriptor, handler,
91                             invocation_metadata, request, deadline):
92         self._invoke(_unary_stream_service(request), method_descriptor, handler,
93                      invocation_metadata, deadline)
94
95     def invoke_stream_unary(self, method_descriptor, handler,
96                             invocation_metadata, deadline):
97         self._invoke(_stream_unary_service(handler), method_descriptor, handler,
98                      invocation_metadata, deadline)
99
100     def invoke_stream_stream(self, method_descriptor, handler,
101                              invocation_metadata, deadline):
102         self._invoke(_stream_stream_service(handler), method_descriptor,
103                      handler, invocation_metadata, deadline)
104
105
106 def _deadline_and_handler(requests_closed, time, timeout):
107     if timeout is None:
108         return None, _handler.handler_without_deadline(requests_closed)
109     else:
110         deadline = time.time() + timeout
111         handler = _handler.handler_with_deadline(requests_closed, time,
112                                                  deadline)
113         return deadline, handler
114
115
116 class _Server(grpc_testing.Server):
117
118     def __init__(self, serverish, time):
119         self._serverish = serverish
120         self._time = time
121
122     def invoke_unary_unary(self, method_descriptor, invocation_metadata,
123                            request, timeout):
124         deadline, handler = _deadline_and_handler(True, self._time, timeout)
125         self._serverish.invoke_unary_unary(method_descriptor, handler,
126                                            invocation_metadata, request,
127                                            deadline)
128         return _server_rpc.UnaryUnaryServerRpc(handler)
129
130     def invoke_unary_stream(self, method_descriptor, invocation_metadata,
131                             request, timeout):
132         deadline, handler = _deadline_and_handler(True, self._time, timeout)
133         self._serverish.invoke_unary_stream(method_descriptor, handler,
134                                             invocation_metadata, request,
135                                             deadline)
136         return _server_rpc.UnaryStreamServerRpc(handler)
137
138     def invoke_stream_unary(self, method_descriptor, invocation_metadata,
139                             timeout):
140         deadline, handler = _deadline_and_handler(False, self._time, timeout)
141         self._serverish.invoke_stream_unary(method_descriptor, handler,
142                                             invocation_metadata, deadline)
143         return _server_rpc.StreamUnaryServerRpc(handler)
144
145     def invoke_stream_stream(self, method_descriptor, invocation_metadata,
146                              timeout):
147         deadline, handler = _deadline_and_handler(False, self._time, timeout)
148         self._serverish.invoke_stream_stream(method_descriptor, handler,
149                                              invocation_metadata, deadline)
150         return _server_rpc.StreamStreamServerRpc(handler)
151
152
153 def server_from_descriptor_to_servicers(descriptors_to_servicers, time):
154     return _Server(_Serverish(descriptors_to_servicers, time), time)