1 # Copyright 2020 The gRPC Authors
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME)
18 cdef float _POLL_AWAKE_INTERVAL_S = 0.2
20 # This bool indicates if the event loop impl can monitor a given fd, or has
21 # loop.add_reader method.
22 cdef bint _has_fd_monitoring = True
24 IF UNAME_SYSNAME == "Windows":
25 cdef void _unified_socket_write(int fd) nogil:
26 win_socket_send(<WIN_SOCKET>fd, b"1", 1, 0)
28 from posix cimport unistd
30 cdef void _unified_socket_write(int fd) nogil:
31 unistd.write(fd, b"1", 1)
34 def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success):
35 CallbackWrapper.functor_run(callback_wrapper.c_functor(), success)
38 cdef class BaseCompletionQueue:
40 cdef grpc_completion_queue* c_ptr(self):
44 cdef class _BoundEventLoop:
46 def __cinit__(self, object loop, object read_socket, object handler):
47 global _has_fd_monitoring
49 self.read_socket = read_socket
50 reader_function = functools.partial(
54 # NOTE(lidiz) There isn't a way to cleanly pre-check if fd monitoring
55 # support is available or not. Checking the event loop policy is not
56 # good enough. The application can has its own loop implementation, or
57 # uses different types of event loops (e.g., 1 Proactor, 3 Selectors).
58 if _has_fd_monitoring:
60 self.loop.add_reader(self.read_socket, reader_function)
61 self._has_reader = True
62 except NotImplementedError:
63 _has_fd_monitoring = False
64 self._has_reader = False
69 self.loop.remove_reader(self.read_socket)
72 cdef class PollerCompletionQueue(BaseCompletionQueue):
75 self._cq = grpc_completion_queue_create_for_next(NULL)
76 self._shutdown = False
77 self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
78 self._poller_thread.start()
80 self._read_socket, self._write_socket = socket.socketpair()
81 self._write_fd = self._write_socket.fileno()
84 # The read socket might be read by multiple threads. But only one of them will
85 # read the 1 byte sent by the poller thread. This setting is essential to allow
86 # multiple loops in multiple threads bound to the same poller.
87 self._read_socket.setblocking(False)
89 self._queue = cpp_event_queue()
91 def bind_loop(self, object loop):
92 if loop in self._loops:
95 self._loops[loop] = _BoundEventLoop(loop, self._read_socket, self._handle_events)
97 cdef void _poll(self) nogil:
99 cdef CallbackContext *context
101 while not self._shutdown:
102 event = grpc_completion_queue_next(self._cq,
106 if event.type == GRPC_QUEUE_TIMEOUT:
108 raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!")
109 elif event.type == GRPC_QUEUE_SHUTDOWN:
110 self._shutdown = True
112 self._queue_mutex.lock()
113 self._queue.push(event)
114 self._queue_mutex.unlock()
115 if _has_fd_monitoring:
116 _unified_socket_write(self._write_fd)
119 # Event loops can be paused or killed at any time. So,
120 # instead of deligate to any thread, the polling thread
121 # should handle the distribution of the event.
122 self._handle_events(None)
124 def _poll_wrapper(self):
129 # Removes the socket hook from loops
130 for loop in self._loops:
131 self._loops.get(loop).close()
133 # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown
134 grpc_completion_queue_shutdown(self._cq)
135 while not self._shutdown:
136 self._poller_thread.join(timeout=_POLL_AWAKE_INTERVAL_S)
137 grpc_completion_queue_destroy(self._cq)
139 # Clean up socket resources
140 self._read_socket.close()
141 self._write_socket.close()
143 def _handle_events(self, object context_loop):
145 if _has_fd_monitoring:
146 # If fd monitoring is working, clean the socket without blocking.
147 data = self._read_socket.recv(1)
148 cdef grpc_event event
149 cdef CallbackContext *context
152 self._queue_mutex.lock()
153 if self._queue.empty():
154 self._queue_mutex.unlock()
157 event = self._queue.front()
159 self._queue_mutex.unlock()
161 context = <CallbackContext *>event.tag
162 loop = <object>context.loop
163 if loop is context_loop:
164 # Executes callbacks: complete the future
165 CallbackWrapper.functor_run(
166 <grpc_completion_queue_functor *>event.tag,
170 loop.call_soon_threadsafe(
171 _handle_callback_wrapper,
172 <CallbackWrapper>context.callback_wrapper,
177 cdef class CallbackCompletionQueue(BaseCompletionQueue):
180 self._loop = get_working_loop()
181 self._shutdown_completed = self._loop.create_future()
182 self._wrapper = CallbackWrapper(
183 self._shutdown_completed,
185 CQ_SHUTDOWN_FAILURE_HANDLER)
186 self._cq = grpc_completion_queue_create_for_callback(
187 self._wrapper.c_functor(),
191 async def shutdown(self):
192 grpc_completion_queue_shutdown(self._cq)
193 await self._shutdown_completed
194 grpc_completion_queue_destroy(self._cq)