Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _cython / _cygrpc / server.pyx.pxi
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15
16 cdef class Server:
17
18   def __cinit__(self, object arguments):
19     fork_handlers_and_grpc_init()
20     self.references = []
21     self.registered_completion_queues = []
22     self.is_started = False
23     self.is_shutting_down = False
24     self.is_shutdown = False
25     self.c_server = NULL
26     cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
27     self.c_server = grpc_server_create(channel_args.c_args(), NULL)
28     self.references.append(arguments)
29
30   def request_call(
31       self, CompletionQueue call_queue not None,
32       CompletionQueue server_queue not None, tag):
33     if not self.is_started or self.is_shutting_down:
34       raise ValueError("server must be started and not shutting down")
35     if server_queue not in self.registered_completion_queues:
36       raise ValueError("server_queue must be a registered completion queue")
37     cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
38     request_call_tag.prepare()
39     cpython.Py_INCREF(request_call_tag)
40     return grpc_server_request_call(
41         self.c_server, &request_call_tag.call.c_call,
42         &request_call_tag.call_details.c_details,
43         &request_call_tag.c_invocation_metadata,
44         call_queue.c_completion_queue, server_queue.c_completion_queue,
45         <cpython.PyObject *>request_call_tag)
46
47   def register_completion_queue(
48       self, CompletionQueue queue not None):
49     if self.is_started:
50       raise ValueError("cannot register completion queues after start")
51     with nogil:
52       grpc_server_register_completion_queue(
53           self.c_server, queue.c_completion_queue, NULL)
54     self.registered_completion_queues.append(queue)
55
56   def start(self, backup_queue=True):
57     """Start the Cython gRPC Server.
58     
59     Args:
60       backup_queue: a bool indicates whether to spawn a backup completion
61         queue. In the case that no CQ is bound to the server, and the shutdown
62         of server becomes un-observable.
63     """
64     if self.is_started:
65       raise ValueError("the server has already started")
66     if backup_queue:
67       self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True)
68       self.register_completion_queue(self.backup_shutdown_queue)
69     self.is_started = True
70     with nogil:
71       grpc_server_start(self.c_server)
72     if backup_queue:
73       # Ensure the core has gotten a chance to do the start-up work
74       self.backup_shutdown_queue.poll(deadline=time.time())
75
76   def add_http2_port(self, bytes address,
77                      ServerCredentials server_credentials=None):
78     address = str_to_bytes(address)
79     self.references.append(address)
80     cdef int result
81     cdef char *address_c_string = address
82     if server_credentials is not None:
83       self.references.append(server_credentials)
84       with nogil:
85         result = grpc_server_add_secure_http2_port(
86             self.c_server, address_c_string, server_credentials.c_credentials)
87     else:
88       with nogil:
89         result = grpc_server_add_insecure_http2_port(self.c_server,
90                                                      address_c_string)
91     return result
92
93   cdef _c_shutdown(self, CompletionQueue queue, tag):
94     self.is_shutting_down = True
95     cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
96     cpython.Py_INCREF(server_shutdown_tag)
97     with nogil:
98       grpc_server_shutdown_and_notify(
99           self.c_server, queue.c_completion_queue,
100           <cpython.PyObject *>server_shutdown_tag)
101
102   def shutdown(self, CompletionQueue queue not None, tag):
103     if queue.is_shutting_down:
104       raise ValueError("queue must be live")
105     elif not self.is_started:
106       raise ValueError("the server hasn't started yet")
107     elif self.is_shutting_down:
108       return
109     elif queue not in self.registered_completion_queues:
110       raise ValueError("expected registered completion queue")
111     else:
112       self._c_shutdown(queue, tag)
113
114   cdef notify_shutdown_complete(self):
115     # called only after our server shutdown tag has emerged from a completion
116     # queue.
117     self.is_shutdown = True
118
119   def cancel_all_calls(self):
120     if not self.is_shutting_down:
121       raise UsageError("the server must be shutting down to cancel all calls")
122     elif self.is_shutdown:
123       return
124     else:
125       with nogil:
126         grpc_server_cancel_all_calls(self.c_server)
127
128   # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any,
129   # portion of this is safe to call from __dealloc__, and potentially remove
130   # backup_shutdown_queue.
131   def destroy(self):
132     if self.c_server != NULL:
133       if not self.is_started:
134         pass
135       elif self.is_shutdown:
136         pass
137       elif not self.is_shutting_down:
138         if self.backup_shutdown_queue is None:
139           raise InternalError('Server shutdown failed: no completion queue.')
140         else:
141           # the user didn't call shutdown - use our backup queue
142           self._c_shutdown(self.backup_shutdown_queue, None)
143           # and now we wait
144           while not self.is_shutdown:
145             self.backup_shutdown_queue.poll()
146       else:
147         # We're in the process of shutting down, but have not shutdown; can't do
148         # much but repeatedly release the GIL and wait
149         while not self.is_shutdown:
150           time.sleep(0)
151       with nogil:
152         grpc_server_destroy(self.c_server)
153         self.c_server = NULL
154
155   def __dealloc__(self):
156     if self.c_server == NULL:
157       grpc_shutdown()