Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _cython / _cygrpc / aio / call.pyx.pxi
1 # Copyright 2019 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 cimport cpython
16 import grpc
17
18
19 _EMPTY_FLAGS = 0
20 _EMPTY_MASK = 0
21 _EMPTY_METADATA = None
22
23 _UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.'
24
25
26 cdef class _AioCall(GrpcCallWrapper):
27
28     def __cinit__(self,
29                   AioChannel channel,
30                   object deadline,
31                   bytes method,
32                   CallCredentials call_credentials):
33         self.call = NULL
34         self._channel = channel
35         self._references = []
36         self._loop = asyncio.get_event_loop()
37         self._create_grpc_call(deadline, method, call_credentials)
38         self._is_locally_cancelled = False
39         self._deadline = deadline
40
41     def __dealloc__(self):
42         if self.call:
43             grpc_call_unref(self.call)
44
45     def __repr__(self):
46         class_name = self.__class__.__name__
47         id_ = id(self)
48         return f"<{class_name} {id_}>"
49
50     cdef void _create_grpc_call(self,
51                                 object deadline,
52                                 bytes method,
53                                 CallCredentials credentials) except *:
54         """Creates the corresponding Core object for this RPC.
55
56         For unary calls, the grpc_call lives shortly and can be destroyed after
57         invoke start_batch. However, if either side is streaming, the grpc_call
58         life span will be longer than one function. So, it would better save it
59         as an instance variable than a stack variable, which reflects its
60         nature in Core.
61         """
62         cdef grpc_slice method_slice
63         cdef gpr_timespec c_deadline = _timespec_from_time(deadline)
64         cdef grpc_call_error set_credentials_error
65
66         method_slice = grpc_slice_from_copied_buffer(
67             <const char *> method,
68             <size_t> len(method)
69         )
70         self.call = grpc_channel_create_call(
71             self._channel.channel,
72             NULL,
73             _EMPTY_MASK,
74             self._channel.cq.c_ptr(),
75             method_slice,
76             NULL,
77             c_deadline,
78             NULL
79         )
80
81         if credentials is not None:
82             set_credentials_error = grpc_call_set_credentials(self.call, credentials.c())
83             if set_credentials_error != GRPC_CALL_OK:
84                 raise Exception("Credentials couldn't have been set")
85
86         grpc_slice_unref(method_slice)
87
88     def time_remaining(self):
89         if self._deadline is None:
90             return None
91         else:
92             return max(0, self._deadline - time.time())
93
94     def cancel(self, AioRpcStatus status):
95         """Cancels the RPC in Core with given RPC status.
96         
97         Above abstractions must invoke this method to set Core objects into
98         proper state.
99         """
100         self._is_locally_cancelled = True
101
102         cdef object details
103         cdef char *c_details
104         cdef grpc_call_error error
105         # Try to fetch application layer cancellation details in the future.
106         # * If cancellation details present, cancel with status;
107         # * If details not present, cancel with unknown reason.
108         if status is not None:
109             details = str_to_bytes(status.details())
110             self._references.append(details)
111             c_details = <char *>details
112             # By implementation, grpc_call_cancel_with_status always return OK
113             error = grpc_call_cancel_with_status(
114                 self.call,
115                 status.c_code(),
116                 c_details,
117                 NULL,
118             )
119             assert error == GRPC_CALL_OK
120         else:
121             # By implementation, grpc_call_cancel always return OK
122             error = grpc_call_cancel(self.call, NULL)
123             assert error == GRPC_CALL_OK
124
125     async def unary_unary(self,
126                           bytes request,
127                           tuple outbound_initial_metadata,
128                           object initial_metadata_observer,
129                           object status_observer):
130         """Performs a unary unary RPC.
131         
132         Args:
133           method: name of the calling method in bytes.
134           request: the serialized requests in bytes.
135           deadline: optional deadline of the RPC in float.
136           cancellation_future: the future that meant to transport the
137             cancellation reason from the application layer.
138           initial_metadata_observer: a callback for received initial metadata.
139           status_observer: a callback for received final status.
140         """
141         cdef tuple ops
142
143         cdef SendInitialMetadataOperation initial_metadata_op = SendInitialMetadataOperation(
144             outbound_initial_metadata,
145             GRPC_INITIAL_METADATA_USED_MASK)
146         cdef SendMessageOperation send_message_op = SendMessageOperation(request, _EMPTY_FLAGS)
147         cdef SendCloseFromClientOperation send_close_op = SendCloseFromClientOperation(_EMPTY_FLAGS)
148         cdef ReceiveInitialMetadataOperation receive_initial_metadata_op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
149         cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
150         cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
151
152         ops = (initial_metadata_op, send_message_op, send_close_op,
153                receive_initial_metadata_op, receive_message_op,
154                receive_status_on_client_op)
155
156         # Executes all operations in one batch.
157         # Might raise CancelledError, handling it in Python UnaryUnaryCall.
158         await execute_batch(self,
159                             ops,
160                             self._loop)
161
162         # Reports received initial metadata.
163         initial_metadata_observer(receive_initial_metadata_op.initial_metadata())
164
165         status = AioRpcStatus(
166             receive_status_on_client_op.code(),
167             receive_status_on_client_op.details(),
168             receive_status_on_client_op.trailing_metadata(),
169             receive_status_on_client_op.error_string(),
170         )
171         # Reports the final status of the RPC to Python layer. The observer
172         # pattern is used here to unify unary and streaming code path.
173         status_observer(status)
174
175         if status.code() == StatusCode.ok:
176             return receive_message_op.message()
177         else:
178             return None
179
180     async def _handle_status_once_received(self, object status_observer):
181         """Handles the status sent by peer once received."""
182         cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
183         cdef tuple ops = (op,)
184         await execute_batch(self, ops, self._loop)
185
186         # Halts if the RPC is locally cancelled
187         if self._is_locally_cancelled:
188             return
189
190         cdef AioRpcStatus status = AioRpcStatus(
191             op.code(),
192             op.details(),
193             op.trailing_metadata(),
194             op.error_string(),
195         )
196         status_observer(status)
197
198     async def receive_serialized_message(self):
199         """Receives one single raw message in bytes."""
200         cdef bytes received_message
201
202         # Receives a message. Returns None when failed:
203         # * EOF, no more messages to read;
204         # * The client application cancels;
205         # * The server sends final status.
206         received_message = await _receive_message(
207             self,
208             self._loop
209         )
210         if received_message:
211             return received_message
212         else:
213             return EOF
214
215     async def send_serialized_message(self, bytes message):
216         """Sends one single raw message in bytes."""
217         await _send_message(self,
218                             message,
219                             True,
220                             self._loop)
221
222     async def send_receive_close(self):
223         """Half close the RPC on the client-side."""
224         cdef SendCloseFromClientOperation op = SendCloseFromClientOperation(_EMPTY_FLAGS)
225         cdef tuple ops = (op,)
226         await execute_batch(self, ops, self._loop)
227
228     async def initiate_unary_stream(self,
229                            bytes request,
230                            tuple outbound_initial_metadata,
231                            object initial_metadata_observer,
232                            object status_observer):
233         """Implementation of the start of a unary-stream call."""
234         # Peer may prematurely end this RPC at any point. We need a corutine
235         # that watches if the server sends the final status.
236         self._loop.create_task(self._handle_status_once_received(status_observer))
237
238         cdef tuple outbound_ops
239         cdef Operation initial_metadata_op = SendInitialMetadataOperation(
240             outbound_initial_metadata,
241             GRPC_INITIAL_METADATA_USED_MASK)
242         cdef Operation send_message_op = SendMessageOperation(
243             request,
244             _EMPTY_FLAGS)
245         cdef Operation send_close_op = SendCloseFromClientOperation(
246             _EMPTY_FLAGS)
247
248         outbound_ops = (
249             initial_metadata_op,
250             send_message_op,
251             send_close_op,
252         )
253
254         # Sends out the request message.
255         await execute_batch(self,
256                             outbound_ops,
257                             self._loop)
258
259         # Receives initial metadata.
260         initial_metadata_observer(
261             await _receive_initial_metadata(self,
262                                             self._loop),
263         )
264
265     async def stream_unary(self,
266                            tuple outbound_initial_metadata,
267                            object metadata_sent_observer,
268                            object initial_metadata_observer,
269                            object status_observer):
270         """Actual implementation of the complete unary-stream call.
271         
272         Needs to pay extra attention to the raise mechanism. If we want to
273         propagate the final status exception, then we have to raise it.
274         Othersize, it would end normally and raise `StopAsyncIteration()`.
275         """
276         # Sends out initial_metadata ASAP.
277         await _send_initial_metadata(self,
278                                      outbound_initial_metadata,
279                                      self._loop)
280         # Notify upper level that sending messages are allowed now.
281         metadata_sent_observer()
282
283         # Receives initial metadata.
284         initial_metadata_observer(
285             await _receive_initial_metadata(self,
286                                             self._loop),
287         )
288
289         cdef tuple inbound_ops
290         cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
291         cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
292         inbound_ops = (receive_message_op, receive_status_on_client_op)
293
294         # Executes all operations in one batch.
295         await execute_batch(self,
296                             inbound_ops,
297                             self._loop)
298
299         status = AioRpcStatus(
300             receive_status_on_client_op.code(),
301             receive_status_on_client_op.details(),
302             receive_status_on_client_op.trailing_metadata(),
303             receive_status_on_client_op.error_string(),
304         )
305         # Reports the final status of the RPC to Python layer. The observer
306         # pattern is used here to unify unary and streaming code path.
307         status_observer(status)
308
309         if status.code() == StatusCode.ok:
310             return receive_message_op.message()
311         else:
312             return None
313
314     async def initiate_stream_stream(self,
315                            tuple outbound_initial_metadata,
316                            object metadata_sent_observer,
317                            object initial_metadata_observer,
318                            object status_observer):
319         """Actual implementation of the complete stream-stream call.
320
321         Needs to pay extra attention to the raise mechanism. If we want to
322         propagate the final status exception, then we have to raise it.
323         Othersize, it would end normally and raise `StopAsyncIteration()`.
324         """
325         # Peer may prematurely end this RPC at any point. We need a corutine
326         # that watches if the server sends the final status.
327         self._loop.create_task(self._handle_status_once_received(status_observer))
328
329         # Sends out initial_metadata ASAP.
330         await _send_initial_metadata(self,
331                                      outbound_initial_metadata,
332                                      self._loop)
333         # Notify upper level that sending messages are allowed now.   
334         metadata_sent_observer()
335
336         # Receives initial metadata.
337         initial_metadata_observer(
338             await _receive_initial_metadata(self,
339                                             self._loop),
340         )