1 # Copyright 2015 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
19 cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *:
20 cdef gpr_timespec c_increment
21 cdef gpr_timespec c_timeout
22 cdef gpr_timespec c_deadline
23 c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
25 c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
27 c_deadline = _timespec_from_time(deadline)
31 c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
32 if gpr_time_cmp(c_timeout, c_deadline) > 0:
33 c_timeout = c_deadline
35 c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
37 if (c_event.type != GRPC_QUEUE_TIMEOUT or
38 gpr_time_cmp(c_timeout, c_deadline) == 0):
42 cpython.PyErr_CheckSignals()
46 cdef _interpret_event(grpc_event c_event):
48 if c_event.type == GRPC_QUEUE_TIMEOUT:
49 # TODO(ericgribkoff) Do not coopt ConnectivityEvent here.
50 return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
51 elif c_event.type == GRPC_QUEUE_SHUTDOWN:
52 # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
53 return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None)
55 tag = <_Tag>c_event.tag
56 # We receive event tags only after they've been inc-ref'd elsewhere in
58 cpython.Py_DECREF(tag)
59 return tag, tag.event(c_event)
62 cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
63 cdef grpc_event c_event = _next(c_completion_queue, deadline)
64 return _interpret_event(c_event)
67 cdef class CompletionQueue:
69 def __cinit__(self, shutdown_cq=False):
70 cdef grpc_completion_queue_attributes c_attrs
71 fork_handlers_and_grpc_init()
74 c_attrs.cq_completion_type = GRPC_CQ_NEXT
75 c_attrs.cq_polling_type = GRPC_CQ_NON_LISTENING
76 self.c_completion_queue = grpc_completion_queue_create(
77 grpc_completion_queue_factory_lookup(&c_attrs), &c_attrs, NULL);
79 self.c_completion_queue = grpc_completion_queue_create_for_next(NULL)
80 self.is_shutting_down = False
81 self.is_shutdown = False
83 cdef _interpret_event(self, grpc_event c_event):
84 unused_tag, event = _interpret_event(c_event)
85 if event.completion_type == GRPC_QUEUE_SHUTDOWN:
86 self.is_shutdown = True
89 # We name this 'poll' to avoid problems with CPython's expectations for
90 # 'special' methods (like next and __next__).
91 def poll(self, deadline=None):
92 return self._interpret_event(_next(self.c_completion_queue, deadline))
96 grpc_completion_queue_shutdown(self.c_completion_queue)
97 self.is_shutting_down = True
100 if not self.is_shutting_down:
101 raise ValueError('queue must be shutting down to be cleared')
102 while self.poll().type != GRPC_QUEUE_SHUTDOWN:
105 def __dealloc__(self):
106 cdef gpr_timespec c_deadline
107 c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
108 if self.c_completion_queue != NULL:
110 if not self.is_shutting_down:
111 grpc_completion_queue_shutdown(self.c_completion_queue)
112 # Pump the queue (All outstanding calls should have been cancelled)
113 while not self.is_shutdown:
114 event = grpc_completion_queue_next(
115 self.c_completion_queue, c_deadline, NULL)
116 self._interpret_event(event)
117 grpc_completion_queue_destroy(self.c_completion_queue)