1 # Copyright 2019 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 _EMPTY_METADATA = None
23 _UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.'
26 cdef class _AioCall(GrpcCallWrapper):
32 CallCredentials call_credentials):
34 self._channel = channel
36 self._loop = asyncio.get_event_loop()
37 self._create_grpc_call(deadline, method, call_credentials)
38 self._is_locally_cancelled = False
39 self._deadline = deadline
41 def __dealloc__(self):
43 grpc_call_unref(self.call)
46 class_name = self.__class__.__name__
48 return f"<{class_name} {id_}>"
50 cdef void _create_grpc_call(self,
53 CallCredentials credentials) except *:
54 """Creates the corresponding Core object for this RPC.
56 For unary calls, the grpc_call lives shortly and can be destroyed after
57 invoke start_batch. However, if either side is streaming, the grpc_call
58 life span will be longer than one function. So, it would better save it
59 as an instance variable than a stack variable, which reflects its
62 cdef grpc_slice method_slice
63 cdef gpr_timespec c_deadline = _timespec_from_time(deadline)
64 cdef grpc_call_error set_credentials_error
66 method_slice = grpc_slice_from_copied_buffer(
67 <const char *> method,
70 self.call = grpc_channel_create_call(
71 self._channel.channel,
74 self._channel.cq.c_ptr(),
81 if credentials is not None:
82 set_credentials_error = grpc_call_set_credentials(self.call, credentials.c())
83 if set_credentials_error != GRPC_CALL_OK:
84 raise Exception("Credentials couldn't have been set")
86 grpc_slice_unref(method_slice)
88 def time_remaining(self):
89 if self._deadline is None:
92 return max(0, self._deadline - time.time())
94 def cancel(self, AioRpcStatus status):
95 """Cancels the RPC in Core with given RPC status.
97 Above abstractions must invoke this method to set Core objects into
100 self._is_locally_cancelled = True
104 cdef grpc_call_error error
105 # Try to fetch application layer cancellation details in the future.
106 # * If cancellation details present, cancel with status;
107 # * If details not present, cancel with unknown reason.
108 if status is not None:
109 details = str_to_bytes(status.details())
110 self._references.append(details)
111 c_details = <char *>details
112 # By implementation, grpc_call_cancel_with_status always return OK
113 error = grpc_call_cancel_with_status(
119 assert error == GRPC_CALL_OK
121 # By implementation, grpc_call_cancel always return OK
122 error = grpc_call_cancel(self.call, NULL)
123 assert error == GRPC_CALL_OK
125 async def unary_unary(self,
127 tuple outbound_initial_metadata,
128 object initial_metadata_observer,
129 object status_observer):
130 """Performs a unary unary RPC.
133 method: name of the calling method in bytes.
134 request: the serialized requests in bytes.
135 deadline: optional deadline of the RPC in float.
136 cancellation_future: the future that meant to transport the
137 cancellation reason from the application layer.
138 initial_metadata_observer: a callback for received initial metadata.
139 status_observer: a callback for received final status.
143 cdef SendInitialMetadataOperation initial_metadata_op = SendInitialMetadataOperation(
144 outbound_initial_metadata,
145 GRPC_INITIAL_METADATA_USED_MASK)
146 cdef SendMessageOperation send_message_op = SendMessageOperation(request, _EMPTY_FLAGS)
147 cdef SendCloseFromClientOperation send_close_op = SendCloseFromClientOperation(_EMPTY_FLAGS)
148 cdef ReceiveInitialMetadataOperation receive_initial_metadata_op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
149 cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
150 cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
152 ops = (initial_metadata_op, send_message_op, send_close_op,
153 receive_initial_metadata_op, receive_message_op,
154 receive_status_on_client_op)
156 # Executes all operations in one batch.
157 # Might raise CancelledError, handling it in Python UnaryUnaryCall.
158 await execute_batch(self,
162 # Reports received initial metadata.
163 initial_metadata_observer(receive_initial_metadata_op.initial_metadata())
165 status = AioRpcStatus(
166 receive_status_on_client_op.code(),
167 receive_status_on_client_op.details(),
168 receive_status_on_client_op.trailing_metadata(),
169 receive_status_on_client_op.error_string(),
171 # Reports the final status of the RPC to Python layer. The observer
172 # pattern is used here to unify unary and streaming code path.
173 status_observer(status)
175 if status.code() == StatusCode.ok:
176 return receive_message_op.message()
180 async def _handle_status_once_received(self, object status_observer):
181 """Handles the status sent by peer once received."""
182 cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
183 cdef tuple ops = (op,)
184 await execute_batch(self, ops, self._loop)
186 # Halts if the RPC is locally cancelled
187 if self._is_locally_cancelled:
190 cdef AioRpcStatus status = AioRpcStatus(
193 op.trailing_metadata(),
196 status_observer(status)
198 async def receive_serialized_message(self):
199 """Receives one single raw message in bytes."""
200 cdef bytes received_message
202 # Receives a message. Returns None when failed:
203 # * EOF, no more messages to read;
204 # * The client application cancels;
205 # * The server sends final status.
206 received_message = await _receive_message(
211 return received_message
215 async def send_serialized_message(self, bytes message):
216 """Sends one single raw message in bytes."""
217 await _send_message(self,
222 async def send_receive_close(self):
223 """Half close the RPC on the client-side."""
224 cdef SendCloseFromClientOperation op = SendCloseFromClientOperation(_EMPTY_FLAGS)
225 cdef tuple ops = (op,)
226 await execute_batch(self, ops, self._loop)
228 async def initiate_unary_stream(self,
230 tuple outbound_initial_metadata,
231 object initial_metadata_observer,
232 object status_observer):
233 """Implementation of the start of a unary-stream call."""
234 # Peer may prematurely end this RPC at any point. We need a corutine
235 # that watches if the server sends the final status.
236 self._loop.create_task(self._handle_status_once_received(status_observer))
238 cdef tuple outbound_ops
239 cdef Operation initial_metadata_op = SendInitialMetadataOperation(
240 outbound_initial_metadata,
241 GRPC_INITIAL_METADATA_USED_MASK)
242 cdef Operation send_message_op = SendMessageOperation(
245 cdef Operation send_close_op = SendCloseFromClientOperation(
254 # Sends out the request message.
255 await execute_batch(self,
259 # Receives initial metadata.
260 initial_metadata_observer(
261 await _receive_initial_metadata(self,
265 async def stream_unary(self,
266 tuple outbound_initial_metadata,
267 object metadata_sent_observer,
268 object initial_metadata_observer,
269 object status_observer):
270 """Actual implementation of the complete unary-stream call.
272 Needs to pay extra attention to the raise mechanism. If we want to
273 propagate the final status exception, then we have to raise it.
274 Othersize, it would end normally and raise `StopAsyncIteration()`.
276 # Sends out initial_metadata ASAP.
277 await _send_initial_metadata(self,
278 outbound_initial_metadata,
280 # Notify upper level that sending messages are allowed now.
281 metadata_sent_observer()
283 # Receives initial metadata.
284 initial_metadata_observer(
285 await _receive_initial_metadata(self,
289 cdef tuple inbound_ops
290 cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
291 cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
292 inbound_ops = (receive_message_op, receive_status_on_client_op)
294 # Executes all operations in one batch.
295 await execute_batch(self,
299 status = AioRpcStatus(
300 receive_status_on_client_op.code(),
301 receive_status_on_client_op.details(),
302 receive_status_on_client_op.trailing_metadata(),
303 receive_status_on_client_op.error_string(),
305 # Reports the final status of the RPC to Python layer. The observer
306 # pattern is used here to unify unary and streaming code path.
307 status_observer(status)
309 if status.code() == StatusCode.ok:
310 return receive_message_op.message()
314 async def initiate_stream_stream(self,
315 tuple outbound_initial_metadata,
316 object metadata_sent_observer,
317 object initial_metadata_observer,
318 object status_observer):
319 """Actual implementation of the complete stream-stream call.
321 Needs to pay extra attention to the raise mechanism. If we want to
322 propagate the final status exception, then we have to raise it.
323 Othersize, it would end normally and raise `StopAsyncIteration()`.
325 # Peer may prematurely end this RPC at any point. We need a corutine
326 # that watches if the server sends the final status.
327 self._loop.create_task(self._handle_status_once_received(status_observer))
329 # Sends out initial_metadata ASAP.
330 await _send_initial_metadata(self,
331 outbound_initial_metadata,
333 # Notify upper level that sending messages are allowed now.
334 metadata_sent_observer()
336 # Receives initial metadata.
337 initial_metadata_observer(
338 await _receive_initial_metadata(self,