Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _cython / _cygrpc / aio / iomgr / iomgr.pyx.pxi
1 # Copyright 2019 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15
16 import platform
17
18 from cpython cimport Py_INCREF, Py_DECREF
19 from libc cimport string
20
21 import socket as native_socket
22 try:
23     import ipaddress  # CPython 3.3 and above
24 except ImportError:
25     pass
26
27 cdef grpc_socket_vtable asyncio_socket_vtable
28 cdef grpc_custom_resolver_vtable asyncio_resolver_vtable
29 cdef grpc_custom_timer_vtable asyncio_timer_vtable
30 cdef grpc_custom_poller_vtable asyncio_pollset_vtable
31 cdef bint so_reuse_port
32
33
34 cdef grpc_error* asyncio_socket_init(
35         grpc_custom_socket* grpc_socket,
36         int domain) with gil:
37     socket = _AsyncioSocket.create(grpc_socket, None, None)
38     Py_INCREF(socket)
39     grpc_socket.impl = <void*>socket
40     return <grpc_error*>0
41
42
43 cdef void asyncio_socket_destroy(grpc_custom_socket* grpc_socket) with gil:
44     Py_DECREF(<_AsyncioSocket>grpc_socket.impl)
45
46
47 cdef void asyncio_socket_connect(
48         grpc_custom_socket* grpc_socket,
49         const grpc_sockaddr* addr,
50         size_t addr_len,
51         grpc_custom_connect_callback connect_cb) with gil:
52
53     host, port = sockaddr_to_tuple(addr, addr_len)
54     socket = <_AsyncioSocket>grpc_socket.impl
55     socket.connect(host, port, connect_cb)
56
57
58 cdef void asyncio_socket_close(
59         grpc_custom_socket* grpc_socket,
60         grpc_custom_close_callback close_cb) with gil:
61     socket = (<_AsyncioSocket>grpc_socket.impl)
62     socket.close()
63     close_cb(grpc_socket)
64
65
66 cdef void asyncio_socket_shutdown(grpc_custom_socket* grpc_socket) with gil:
67     socket = (<_AsyncioSocket>grpc_socket.impl)
68     socket.close()
69
70
71 cdef void asyncio_socket_write(
72         grpc_custom_socket* grpc_socket,
73         grpc_slice_buffer* slice_buffer,
74         grpc_custom_write_callback write_cb) with gil:
75     socket = (<_AsyncioSocket>grpc_socket.impl)
76     socket.write(slice_buffer, write_cb)
77
78
79 cdef void asyncio_socket_read(
80         grpc_custom_socket* grpc_socket,
81         char* buffer_,
82         size_t length,
83         grpc_custom_read_callback read_cb) with gil:
84     socket = (<_AsyncioSocket>grpc_socket.impl)
85     socket.read(buffer_, length, read_cb)
86
87
88 cdef grpc_error* asyncio_socket_getpeername(
89         grpc_custom_socket* grpc_socket,
90         const grpc_sockaddr* addr,
91         int* length) with gil:
92     peer = (<_AsyncioSocket>grpc_socket.impl).peername()
93
94     cdef grpc_resolved_address c_addr
95     hostname = str_to_bytes(peer[0])
96     grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
97     # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
98     string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
99     length[0] = c_addr.len
100     return grpc_error_none()
101
102
103 cdef grpc_error* asyncio_socket_getsockname(
104         grpc_custom_socket* grpc_socket,
105         const grpc_sockaddr* addr,
106         int* length) with gil:
107     """Supplies sock_addr in add_socket_to_server."""
108     cdef grpc_resolved_address c_addr
109     socket = (<_AsyncioSocket>grpc_socket.impl)
110     if socket is None:
111         peer = ('0.0.0.0', 0)
112     else:
113         peer = socket.sockname()
114     hostname = str_to_bytes(peer[0])
115     grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
116     # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
117     string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
118     length[0] = c_addr.len
119     return grpc_error_none()
120
121
122 cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil:
123     (<_AsyncioSocket>grpc_socket.impl).listen()
124     return grpc_error_none()
125
126
127 def _asyncio_apply_socket_options(object s, int flags):
128     # Turn SO_REUSEADDR on for TCP sockets; if we want to support UDS, we will
129     # need to update this function.
130     s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1)
131     # SO_REUSEPORT only available in POSIX systems.
132     if platform.system() != 'Windows':
133         if GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT & flags:
134             s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1)
135     s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
136
137
138 cdef grpc_error* asyncio_socket_bind(
139         grpc_custom_socket* grpc_socket,
140         const grpc_sockaddr* addr,
141         size_t len, int flags) with gil:
142     host, port = sockaddr_to_tuple(addr, len)
143     try:
144         ip = ipaddress.ip_address(host)
145         if isinstance(ip, ipaddress.IPv6Address):
146             family = native_socket.AF_INET6
147         else:
148             family = native_socket.AF_INET
149
150         socket = native_socket.socket(family=family)
151         _asyncio_apply_socket_options(socket, flags)
152         socket.bind((host, port))
153     except IOError as io_error:
154         return socket_error("bind", str(io_error))
155     else:
156         aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket)
157         cpython.Py_INCREF(aio_socket)  # Py_DECREF in asyncio_socket_destroy
158         grpc_socket.impl = <void*>aio_socket
159         return grpc_error_none()
160
161
162 cdef void asyncio_socket_accept(
163         grpc_custom_socket* grpc_socket,
164         grpc_custom_socket* grpc_socket_client,
165         grpc_custom_accept_callback accept_cb) with gil:
166     (<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb)
167
168
169 cdef grpc_error* asyncio_resolve(
170         char* host,
171         char* port,
172         grpc_resolved_addresses** res) with gil:
173     result = native_socket.getaddrinfo(host, port)
174     res[0] = tuples_to_resolvaddr(result)
175
176
177 cdef void asyncio_resolve_async(
178         grpc_custom_resolver* grpc_resolver,
179         char* host,
180         char* port) with gil:
181     resolver = _AsyncioResolver.create(grpc_resolver)
182     resolver.resolve(host, port)
183
184
185 cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil:
186     timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0)
187     Py_INCREF(timer)
188     grpc_timer.timer = <void*>timer
189
190
191 cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil:
192     timer = <_AsyncioTimer>grpc_timer.timer
193     timer.stop()
194     Py_DECREF(timer)
195
196
197 cdef void asyncio_init_loop() with gil:
198     pass
199
200
201 cdef void asyncio_destroy_loop() with gil:
202     pass
203
204
205 cdef void asyncio_kick_loop() with gil:
206     pass
207
208
209 cdef void asyncio_run_loop(size_t timeout_ms) with gil:
210     pass
211
212
213 def install_asyncio_iomgr():
214     asyncio_resolver_vtable.resolve = asyncio_resolve
215     asyncio_resolver_vtable.resolve_async = asyncio_resolve_async
216
217     asyncio_socket_vtable.init = asyncio_socket_init
218     asyncio_socket_vtable.connect = asyncio_socket_connect
219     asyncio_socket_vtable.destroy = asyncio_socket_destroy
220     asyncio_socket_vtable.shutdown = asyncio_socket_shutdown
221     asyncio_socket_vtable.close = asyncio_socket_close
222     asyncio_socket_vtable.write = asyncio_socket_write
223     asyncio_socket_vtable.read = asyncio_socket_read
224     asyncio_socket_vtable.getpeername = asyncio_socket_getpeername
225     asyncio_socket_vtable.getsockname = asyncio_socket_getsockname
226     asyncio_socket_vtable.bind = asyncio_socket_bind
227     asyncio_socket_vtable.listen = asyncio_socket_listen
228     asyncio_socket_vtable.accept = asyncio_socket_accept
229
230     asyncio_timer_vtable.start = asyncio_timer_start
231     asyncio_timer_vtable.stop = asyncio_timer_stop
232
233     asyncio_pollset_vtable.init = asyncio_init_loop
234     asyncio_pollset_vtable.poll = asyncio_run_loop
235     asyncio_pollset_vtable.kick = asyncio_kick_loop
236     asyncio_pollset_vtable.shutdown = asyncio_destroy_loop
237
238     grpc_custom_iomgr_init(
239         &asyncio_socket_vtable,
240         &asyncio_resolver_vtable,
241         &asyncio_timer_vtable,
242         &asyncio_pollset_vtable
243     )