e3c31882d4919a8baf6ab8cec8b825a237d22102
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _cython / _cygrpc / aio / completion_queue.pyx.pxi
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import socket
16
17 cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME)
18 cdef float _POLL_AWAKE_INTERVAL_S = 0.2
19
20 # This bool indicates if the event loop impl can monitor a given fd, or has
21 # loop.add_reader method.
22 cdef bint _has_fd_monitoring = True
23
24 IF UNAME_SYSNAME == "Windows":
25     cdef void _unified_socket_write(int fd) nogil:
26         win_socket_send(<WIN_SOCKET>fd, b"1", 1, 0)
27 ELSE:
28     from posix cimport unistd
29
30     cdef void _unified_socket_write(int fd) nogil:
31         unistd.write(fd, b"1", 1)
32
33
34 def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success):
35     CallbackWrapper.functor_run(callback_wrapper.c_functor(), success)
36
37
38 cdef class BaseCompletionQueue:
39
40     cdef grpc_completion_queue* c_ptr(self):
41         return self._cq
42
43
44 cdef class _BoundEventLoop:
45
46     def __cinit__(self, object loop, object read_socket, object handler):
47         global _has_fd_monitoring
48         self.loop = loop
49         self.read_socket = read_socket
50         reader_function = functools.partial(
51             handler,
52             loop
53         )
54         # NOTE(lidiz) There isn't a way to cleanly pre-check if fd monitoring
55         # support is available or not. Checking the event loop policy is not
56         # good enough. The application can has its own loop implementation, or
57         # uses different types of event loops (e.g., 1 Proactor, 3 Selectors).
58         if _has_fd_monitoring:
59             try:
60                 self.loop.add_reader(self.read_socket, reader_function)
61                 self._has_reader = True
62             except NotImplementedError:
63                 _has_fd_monitoring = False
64                 self._has_reader = False
65
66     def close(self):
67         if self.loop:
68             if self._has_reader:
69                 self.loop.remove_reader(self.read_socket)
70
71
72 cdef class PollerCompletionQueue(BaseCompletionQueue):
73
74     def __cinit__(self):
75         self._cq = grpc_completion_queue_create_for_next(NULL)
76         self._shutdown = False
77         self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
78         self._poller_thread.start()
79
80         self._read_socket, self._write_socket = socket.socketpair()
81         self._write_fd = self._write_socket.fileno()
82         self._loops = {}
83
84         # The read socket might be read by multiple threads. But only one of them will
85         # read the 1 byte sent by the poller thread. This setting is essential to allow
86         # multiple loops in multiple threads bound to the same poller.
87         self._read_socket.setblocking(False)
88
89         self._queue = cpp_event_queue()
90
91     def bind_loop(self, object loop):
92         if loop in self._loops:
93             return
94         else:
95             self._loops[loop] = _BoundEventLoop(loop, self._read_socket, self._handle_events)
96
97     cdef void _poll(self) nogil:
98         cdef grpc_event event
99         cdef CallbackContext *context
100
101         while not self._shutdown:
102             event = grpc_completion_queue_next(self._cq,
103                                                _GPR_INF_FUTURE,
104                                                NULL)
105
106             if event.type == GRPC_QUEUE_TIMEOUT:
107                 with gil:
108                     raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!")
109             elif event.type == GRPC_QUEUE_SHUTDOWN:
110                 self._shutdown = True
111             else:
112                 self._queue_mutex.lock()
113                 self._queue.push(event)
114                 self._queue_mutex.unlock()
115                 if _has_fd_monitoring:
116                     _unified_socket_write(self._write_fd)
117                 else:
118                     with gil:
119                         # Event loops can be paused or killed at any time. So,
120                         # instead of deligate to any thread, the polling thread
121                         # should handle the distribution of the event.
122                         self._handle_events(None)
123
124     def _poll_wrapper(self):
125         with nogil:
126             self._poll()
127
128     cdef shutdown(self):
129         # Removes the socket hook from loops
130         for loop in self._loops:
131             self._loops.get(loop).close()
132
133         # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown
134         grpc_completion_queue_shutdown(self._cq)
135         while not self._shutdown:
136             self._poller_thread.join(timeout=_POLL_AWAKE_INTERVAL_S)
137         grpc_completion_queue_destroy(self._cq)
138
139         # Clean up socket resources
140         self._read_socket.close()
141         self._write_socket.close()
142
143     def _handle_events(self, object context_loop):
144         cdef bytes data
145         if _has_fd_monitoring:
146             # If fd monitoring is working, clean the socket without blocking.
147             data = self._read_socket.recv(1)
148         cdef grpc_event event
149         cdef CallbackContext *context
150
151         while True:
152             self._queue_mutex.lock()
153             if self._queue.empty():
154                 self._queue_mutex.unlock()
155                 break
156             else:
157                 event = self._queue.front()
158                 self._queue.pop()
159                 self._queue_mutex.unlock()
160
161             context = <CallbackContext *>event.tag
162             loop = <object>context.loop
163             if loop is context_loop:
164                 # Executes callbacks: complete the future
165                 CallbackWrapper.functor_run(
166                     <grpc_completion_queue_functor *>event.tag,
167                     event.success
168                 )
169             else:
170                 loop.call_soon_threadsafe(
171                     _handle_callback_wrapper,
172                     <CallbackWrapper>context.callback_wrapper,
173                     event.success
174                 )
175
176
177 cdef class CallbackCompletionQueue(BaseCompletionQueue):
178
179     def __cinit__(self):
180         self._loop = get_working_loop()
181         self._shutdown_completed = self._loop.create_future()
182         self._wrapper = CallbackWrapper(
183             self._shutdown_completed,
184             self._loop,
185             CQ_SHUTDOWN_FAILURE_HANDLER)
186         self._cq = grpc_completion_queue_create_for_callback(
187             self._wrapper.c_functor(),
188             NULL
189         )
190
191     async def shutdown(self):
192         grpc_completion_queue_shutdown(self._cq)
193         await self._shutdown_completed
194         grpc_completion_queue_destroy(self._cq)