1 # Copyright 2019 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 import socket as native_socket
17 from libc cimport string
20 # TODO(https://github.com/grpc/grpc/issues/21348) Better flow control needed.
21 cdef class _AsyncioSocket:
23 self._grpc_socket = NULL
24 self._grpc_connect_cb = NULL
25 self._grpc_read_cb = NULL
28 self._task_connect = None
29 self._task_read = None
30 self._read_buffer = NULL
32 self._py_socket = None
34 self._loop = asyncio.get_event_loop()
37 cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
40 socket = _AsyncioSocket()
41 socket._grpc_socket = grpc_socket
42 socket._reader = reader
43 socket._writer = writer
44 if writer is not None:
45 socket._peername = writer.get_extra_info('peername')
49 cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket):
50 socket = _AsyncioSocket()
51 socket._grpc_socket = grpc_socket
52 socket._py_socket = py_socket
56 class_name = self.__class__.__name__
58 connected = self.is_connected()
59 return f"<{class_name} {id_} connected={connected}>"
61 def _connect_cb(self, future):
63 self._reader, self._writer = future.result()
64 except Exception as e:
65 self._grpc_connect_cb(
66 <grpc_custom_socket*>self._grpc_socket,
67 grpc_socket_error("Socket connect failed: {}".format(e).encode())
71 self._task_connect = None
73 # gRPC default posix implementation disables nagle
75 sock = self._writer.transport.get_extra_info('socket')
76 sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
78 self._grpc_connect_cb(
79 <grpc_custom_socket*>self._grpc_socket,
83 def _read_cb(self, future):
86 buffer_ = future.result()
87 except Exception as e:
89 error_msg = "%s: %s" % (type(e), str(e))
92 self._task_read = None
96 <void*>self._read_buffer,
101 <grpc_custom_socket*>self._grpc_socket,
107 <grpc_custom_socket*>self._grpc_socket,
109 grpc_socket_error("Read failed: {}".format(error_msg).encode())
112 cdef void connect(self,
115 grpc_custom_connect_callback grpc_connect_cb):
116 assert not self._reader
117 assert not self._task_connect
119 self._task_connect = asyncio.ensure_future(
120 asyncio.open_connection(host, port)
122 self._grpc_connect_cb = grpc_connect_cb
123 self._task_connect.add_done_callback(self._connect_cb)
125 cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb):
126 assert not self._task_read
128 self._task_read = self._loop.create_task(
129 self._reader.read(n=length)
131 self._grpc_read_cb = grpc_read_cb
132 self._task_read.add_done_callback(self._read_cb)
133 self._read_buffer = buffer_
135 cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb):
136 """Performs write to network socket in AsyncIO.
138 For each socket, Core guarantees there'll be only one ongoing write.
139 When the write is finished, we need to call grpc_write_cb to notify
140 Core that the work is done.
143 cdef bytearray outbound_buffer = bytearray()
144 for i in range(g_slice_buffer.count):
145 start = grpc_slice_buffer_start(g_slice_buffer, i)
146 length = grpc_slice_buffer_length(g_slice_buffer, i)
147 outbound_buffer.extend(<bytes>start[:length])
149 self._writer.write(outbound_buffer)
151 <grpc_custom_socket*>self._grpc_socket,
155 cdef bint is_connected(self):
156 return self._reader and not self._reader._transport.is_closing()
158 cdef void close(self):
159 if self.is_connected():
163 # NOTE(lidiz) If the asyncio.Server is created from a Python socket,
164 # the server.close() won't release the fd until the close() is called
165 # for the Python socket.
167 self._py_socket.close()
169 def _new_connection_callback(self, object reader, object writer):
170 client_socket = _AsyncioSocket.create(
171 self._grpc_client_socket,
176 self._grpc_client_socket.impl = <void*>client_socket
177 cpython.Py_INCREF(client_socket) # Py_DECREF in asyncio_socket_destroy
178 # Accept callback expects to be called with:
179 # * grpc_custom_socket: A grpc custom socket for server
180 # * grpc_custom_socket: A grpc custom socket for client (with new Socket instance)
181 # * grpc_error: An error object
182 self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none())
185 async def create_asyncio_server():
186 self._server = await asyncio.start_server(
187 self._new_connection_callback,
188 sock=self._py_socket,
191 self._loop.create_task(create_asyncio_server())
194 grpc_custom_socket* grpc_socket_client,
195 grpc_custom_accept_callback grpc_accept_cb):
196 self._grpc_client_socket = grpc_socket_client
197 self._grpc_accept_cb = grpc_accept_cb
199 cdef tuple peername(self):
200 return self._peername
202 cdef tuple sockname(self):
203 return self._py_socket.getsockname()