1 # Copyright 2019 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from cpython cimport Py_INCREF, Py_DECREF
19 from libc cimport string
21 import socket as native_socket
23 import ipaddress # CPython 3.3 and above
27 cdef grpc_socket_vtable asyncio_socket_vtable
28 cdef grpc_custom_resolver_vtable asyncio_resolver_vtable
29 cdef grpc_custom_timer_vtable asyncio_timer_vtable
30 cdef grpc_custom_poller_vtable asyncio_pollset_vtable
31 cdef bint so_reuse_port
34 cdef grpc_error* asyncio_socket_init(
35 grpc_custom_socket* grpc_socket,
37 socket = _AsyncioSocket.create(grpc_socket, None, None)
39 grpc_socket.impl = <void*>socket
43 cdef void asyncio_socket_destroy(grpc_custom_socket* grpc_socket) with gil:
44 Py_DECREF(<_AsyncioSocket>grpc_socket.impl)
47 cdef void asyncio_socket_connect(
48 grpc_custom_socket* grpc_socket,
49 const grpc_sockaddr* addr,
51 grpc_custom_connect_callback connect_cb) with gil:
53 host, port = sockaddr_to_tuple(addr, addr_len)
54 socket = <_AsyncioSocket>grpc_socket.impl
55 socket.connect(host, port, connect_cb)
58 cdef void asyncio_socket_close(
59 grpc_custom_socket* grpc_socket,
60 grpc_custom_close_callback close_cb) with gil:
61 socket = (<_AsyncioSocket>grpc_socket.impl)
66 cdef void asyncio_socket_shutdown(grpc_custom_socket* grpc_socket) with gil:
67 socket = (<_AsyncioSocket>grpc_socket.impl)
71 cdef void asyncio_socket_write(
72 grpc_custom_socket* grpc_socket,
73 grpc_slice_buffer* slice_buffer,
74 grpc_custom_write_callback write_cb) with gil:
75 socket = (<_AsyncioSocket>grpc_socket.impl)
76 socket.write(slice_buffer, write_cb)
79 cdef void asyncio_socket_read(
80 grpc_custom_socket* grpc_socket,
83 grpc_custom_read_callback read_cb) with gil:
84 socket = (<_AsyncioSocket>grpc_socket.impl)
85 socket.read(buffer_, length, read_cb)
88 cdef grpc_error* asyncio_socket_getpeername(
89 grpc_custom_socket* grpc_socket,
90 const grpc_sockaddr* addr,
91 int* length) with gil:
92 peer = (<_AsyncioSocket>grpc_socket.impl).peername()
94 cdef grpc_resolved_address c_addr
95 hostname = str_to_bytes(peer[0])
96 grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
97 # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
98 string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
99 length[0] = c_addr.len
100 return grpc_error_none()
103 cdef grpc_error* asyncio_socket_getsockname(
104 grpc_custom_socket* grpc_socket,
105 const grpc_sockaddr* addr,
106 int* length) with gil:
107 """Supplies sock_addr in add_socket_to_server."""
108 cdef grpc_resolved_address c_addr
109 socket = (<_AsyncioSocket>grpc_socket.impl)
111 peer = ('0.0.0.0', 0)
113 peer = socket.sockname()
114 hostname = str_to_bytes(peer[0])
115 grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
116 # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
117 string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
118 length[0] = c_addr.len
119 return grpc_error_none()
122 cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil:
123 (<_AsyncioSocket>grpc_socket.impl).listen()
124 return grpc_error_none()
127 def _asyncio_apply_socket_options(object s, int flags):
128 # Turn SO_REUSEADDR on for TCP sockets; if we want to support UDS, we will
129 # need to update this function.
130 s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1)
131 # SO_REUSEPORT only available in POSIX systems.
132 if platform.system() != 'Windows':
133 if GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT & flags:
134 s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1)
135 s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
138 cdef grpc_error* asyncio_socket_bind(
139 grpc_custom_socket* grpc_socket,
140 const grpc_sockaddr* addr,
141 size_t len, int flags) with gil:
142 host, port = sockaddr_to_tuple(addr, len)
144 ip = ipaddress.ip_address(host)
145 if isinstance(ip, ipaddress.IPv6Address):
146 family = native_socket.AF_INET6
148 family = native_socket.AF_INET
150 socket = native_socket.socket(family=family)
151 _asyncio_apply_socket_options(socket, flags)
152 socket.bind((host, port))
153 except IOError as io_error:
154 return socket_error("bind", str(io_error))
156 aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket)
157 cpython.Py_INCREF(aio_socket) # Py_DECREF in asyncio_socket_destroy
158 grpc_socket.impl = <void*>aio_socket
159 return grpc_error_none()
162 cdef void asyncio_socket_accept(
163 grpc_custom_socket* grpc_socket,
164 grpc_custom_socket* grpc_socket_client,
165 grpc_custom_accept_callback accept_cb) with gil:
166 (<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb)
169 cdef grpc_error* asyncio_resolve(
172 grpc_resolved_addresses** res) with gil:
173 result = native_socket.getaddrinfo(host, port)
174 res[0] = tuples_to_resolvaddr(result)
177 cdef void asyncio_resolve_async(
178 grpc_custom_resolver* grpc_resolver,
180 char* port) with gil:
181 resolver = _AsyncioResolver.create(grpc_resolver)
182 resolver.resolve(host, port)
185 cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil:
186 timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0)
188 grpc_timer.timer = <void*>timer
191 cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil:
192 timer = <_AsyncioTimer>grpc_timer.timer
197 cdef void asyncio_init_loop() with gil:
201 cdef void asyncio_destroy_loop() with gil:
205 cdef void asyncio_kick_loop() with gil:
209 cdef void asyncio_run_loop(size_t timeout_ms) with gil:
213 def install_asyncio_iomgr():
214 asyncio_resolver_vtable.resolve = asyncio_resolve
215 asyncio_resolver_vtable.resolve_async = asyncio_resolve_async
217 asyncio_socket_vtable.init = asyncio_socket_init
218 asyncio_socket_vtable.connect = asyncio_socket_connect
219 asyncio_socket_vtable.destroy = asyncio_socket_destroy
220 asyncio_socket_vtable.shutdown = asyncio_socket_shutdown
221 asyncio_socket_vtable.close = asyncio_socket_close
222 asyncio_socket_vtable.write = asyncio_socket_write
223 asyncio_socket_vtable.read = asyncio_socket_read
224 asyncio_socket_vtable.getpeername = asyncio_socket_getpeername
225 asyncio_socket_vtable.getsockname = asyncio_socket_getsockname
226 asyncio_socket_vtable.bind = asyncio_socket_bind
227 asyncio_socket_vtable.listen = asyncio_socket_listen
228 asyncio_socket_vtable.accept = asyncio_socket_accept
230 asyncio_timer_vtable.start = asyncio_timer_start
231 asyncio_timer_vtable.stop = asyncio_timer_stop
233 asyncio_pollset_vtable.init = asyncio_init_loop
234 asyncio_pollset_vtable.poll = asyncio_run_loop
235 asyncio_pollset_vtable.kick = asyncio_kick_loop
236 asyncio_pollset_vtable.shutdown = asyncio_destroy_loop
238 grpc_custom_iomgr_init(
239 &asyncio_socket_vtable,
240 &asyncio_resolver_vtable,
241 &asyncio_timer_vtable,
242 &asyncio_pollset_vtable