Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _cython / _cygrpc / aio / iomgr / socket.pyx.pxi
1 # Copyright 2019 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import socket as native_socket
16
17 from libc cimport string
18
19
20 # TODO(https://github.com/grpc/grpc/issues/21348) Better flow control needed.
21 cdef class _AsyncioSocket:
22     def __cinit__(self):
23         self._grpc_socket = NULL
24         self._grpc_connect_cb = NULL
25         self._grpc_read_cb = NULL
26         self._reader = None
27         self._writer = None
28         self._task_connect = None
29         self._task_read = None
30         self._read_buffer = NULL
31         self._server = None
32         self._py_socket = None
33         self._peername = None
34         self._loop = asyncio.get_event_loop()
35
36     @staticmethod
37     cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
38                                object reader,
39                                object writer):
40         socket = _AsyncioSocket()
41         socket._grpc_socket = grpc_socket
42         socket._reader = reader
43         socket._writer = writer
44         if writer is not None:
45             socket._peername = writer.get_extra_info('peername')
46         return socket
47
48     @staticmethod
49     cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket):
50         socket = _AsyncioSocket()
51         socket._grpc_socket = grpc_socket
52         socket._py_socket = py_socket
53         return socket
54
55     def __repr__(self):
56         class_name = self.__class__.__name__ 
57         id_ = id(self)
58         connected = self.is_connected()
59         return f"<{class_name} {id_} connected={connected}>"
60
61     def _connect_cb(self, future):
62         try:
63             self._reader, self._writer = future.result()
64         except Exception as e:
65             self._grpc_connect_cb(
66                 <grpc_custom_socket*>self._grpc_socket,
67                 grpc_socket_error("Socket connect failed: {}".format(e).encode())
68             )
69             return
70         finally:
71             self._task_connect = None
72
73         # gRPC default posix implementation disables nagle
74         # algorithm.
75         sock = self._writer.transport.get_extra_info('socket')
76         sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
77
78         self._grpc_connect_cb(
79             <grpc_custom_socket*>self._grpc_socket,
80             <grpc_error*>0
81         )
82
83     def _read_cb(self, future):
84         error = False
85         try:
86             buffer_ = future.result()
87         except Exception as e:
88             error = True
89             error_msg = "%s: %s" % (type(e), str(e))
90             _LOGGER.exception(e)
91         finally:
92             self._task_read = None
93
94         if not error:
95             string.memcpy(
96                 <void*>self._read_buffer,
97                 <char*>buffer_,
98                 len(buffer_)
99             )
100             self._grpc_read_cb(
101                 <grpc_custom_socket*>self._grpc_socket,
102                 len(buffer_),
103                 <grpc_error*>0
104             )
105         else:
106             self._grpc_read_cb(
107                 <grpc_custom_socket*>self._grpc_socket,
108                 -1,
109                 grpc_socket_error("Read failed: {}".format(error_msg).encode())
110             )
111
112     cdef void connect(self,
113                       object host,
114                       object port,
115                       grpc_custom_connect_callback grpc_connect_cb):
116         assert not self._reader
117         assert not self._task_connect
118
119         self._task_connect = asyncio.ensure_future(
120             asyncio.open_connection(host, port)
121         )
122         self._grpc_connect_cb = grpc_connect_cb
123         self._task_connect.add_done_callback(self._connect_cb)
124
125     cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb):
126         assert not self._task_read
127
128         self._task_read = self._loop.create_task(
129             self._reader.read(n=length)
130         )
131         self._grpc_read_cb = grpc_read_cb
132         self._task_read.add_done_callback(self._read_cb)
133         self._read_buffer = buffer_
134  
135     cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb):
136         """Performs write to network socket in AsyncIO.
137         
138         For each socket, Core guarantees there'll be only one ongoing write.
139         When the write is finished, we need to call grpc_write_cb to notify
140         Core that the work is done.
141         """
142         cdef char* start
143         cdef bytearray outbound_buffer = bytearray()
144         for i in range(g_slice_buffer.count):
145             start = grpc_slice_buffer_start(g_slice_buffer, i)
146             length = grpc_slice_buffer_length(g_slice_buffer, i)
147             outbound_buffer.extend(<bytes>start[:length])
148
149         self._writer.write(outbound_buffer)
150         grpc_write_cb(
151             <grpc_custom_socket*>self._grpc_socket,
152             <grpc_error*>0
153         )
154
155     cdef bint is_connected(self):
156         return self._reader and not self._reader._transport.is_closing()
157
158     cdef void close(self):
159         if self.is_connected():
160             self._writer.close()
161         if self._server:
162             self._server.close()
163         # NOTE(lidiz) If the asyncio.Server is created from a Python socket,
164         # the server.close() won't release the fd until the close() is called
165         # for the Python socket.
166         if self._py_socket:
167             self._py_socket.close()
168
169     def _new_connection_callback(self, object reader, object writer):
170         client_socket = _AsyncioSocket.create(
171             self._grpc_client_socket,
172             reader,
173             writer,
174         )
175
176         self._grpc_client_socket.impl = <void*>client_socket
177         cpython.Py_INCREF(client_socket)  # Py_DECREF in asyncio_socket_destroy
178         # Accept callback expects to be called with:
179         # * grpc_custom_socket: A grpc custom socket for server
180         # * grpc_custom_socket: A grpc custom socket for client (with new Socket instance)
181         # * grpc_error: An error object
182         self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none())
183
184     cdef listen(self):
185         async def create_asyncio_server():
186             self._server = await asyncio.start_server(
187                 self._new_connection_callback,
188                 sock=self._py_socket,
189             )
190
191         self._loop.create_task(create_asyncio_server())
192
193     cdef accept(self,
194                 grpc_custom_socket* grpc_socket_client,
195                 grpc_custom_accept_callback grpc_accept_cb):
196         self._grpc_client_socket = grpc_socket_client
197         self._grpc_accept_cb = grpc_accept_cb
198
199     cdef tuple peername(self):
200         return self._peername
201
202     cdef tuple sockname(self):
203         return self._py_socket.getsockname()