Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _cython / _cygrpc / completion_queue.pyx.pxi
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 cimport cpython
16
17 import threading
18 import time
19
20 cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
21
22
23 cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *:
24   cdef gpr_timespec c_increment
25   cdef gpr_timespec c_timeout
26   cdef gpr_timespec c_deadline
27   c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
28   if deadline is None:
29     c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
30   else:
31     c_deadline = _timespec_from_time(deadline)
32
33   while True:
34     with nogil:
35       c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
36       if gpr_time_cmp(c_timeout, c_deadline) > 0:
37         c_timeout = c_deadline
38   
39       c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
40   
41       if (c_event.type != GRPC_QUEUE_TIMEOUT or
42           gpr_time_cmp(c_timeout, c_deadline) == 0):
43         break
44
45     # Handle any signals
46     cpython.PyErr_CheckSignals()
47   return c_event
48
49
50 cdef _interpret_event(grpc_event c_event):
51   cdef _Tag tag
52   if c_event.type == GRPC_QUEUE_TIMEOUT:
53     # TODO(ericgribkoff) Do not coopt ConnectivityEvent here.
54     return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
55   elif c_event.type == GRPC_QUEUE_SHUTDOWN:
56     # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
57     return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None)
58   else:
59     tag = <_Tag>c_event.tag
60     # We receive event tags only after they've been inc-ref'd elsewhere in
61     # the code.
62     cpython.Py_DECREF(tag)
63     return tag, tag.event(c_event)
64
65
66 cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
67   cdef grpc_event c_event = _next(c_completion_queue, deadline)
68   return _interpret_event(c_event)
69
70
71 cdef class CompletionQueue:
72
73   def __cinit__(self, shutdown_cq=False):
74     cdef grpc_completion_queue_attributes c_attrs
75     fork_handlers_and_grpc_init()
76     if shutdown_cq:
77       c_attrs.version = 1
78       c_attrs.cq_completion_type = GRPC_CQ_NEXT
79       c_attrs.cq_polling_type = GRPC_CQ_NON_LISTENING
80       self.c_completion_queue = grpc_completion_queue_create(
81           grpc_completion_queue_factory_lookup(&c_attrs), &c_attrs, NULL);
82     else:
83       self.c_completion_queue = grpc_completion_queue_create_for_next(NULL)
84     self.is_shutting_down = False
85     self.is_shutdown = False
86
87   cdef _interpret_event(self, grpc_event c_event):
88     unused_tag, event = _interpret_event(c_event)
89     if event.completion_type == GRPC_QUEUE_SHUTDOWN:
90       self.is_shutdown = True
91     return event
92
93   # We name this 'poll' to avoid problems with CPython's expectations for
94   # 'special' methods (like next and __next__).
95   def poll(self, deadline=None):
96     return self._interpret_event(_next(self.c_completion_queue, deadline))
97
98   def shutdown(self):
99     with nogil:
100       grpc_completion_queue_shutdown(self.c_completion_queue)
101     self.is_shutting_down = True
102
103   def clear(self):
104     if not self.is_shutting_down:
105       raise ValueError('queue must be shutting down to be cleared')
106     while self.poll().type != GRPC_QUEUE_SHUTDOWN:
107       pass
108
109   def __dealloc__(self):
110     cdef gpr_timespec c_deadline
111     c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
112     if self.c_completion_queue != NULL:
113       # Ensure shutdown
114       if not self.is_shutting_down:
115         grpc_completion_queue_shutdown(self.c_completion_queue)
116       # Pump the queue (All outstanding calls should have been cancelled)
117       while not self.is_shutdown:
118         event = grpc_completion_queue_next(
119             self.c_completion_queue, c_deadline, NULL)
120         self._interpret_event(event)
121       grpc_completion_queue_destroy(self.c_completion_queue)
122     grpc_shutdown_blocking()