Imported Upstream version 1.36.0
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _cython / _cygrpc / server.pyx.pxi
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15
16 cdef class Server:
17
18   def __cinit__(self, object arguments, bint xds):
19     fork_handlers_and_grpc_init()
20     self.references = []
21     self.registered_completion_queues = []
22     self.is_started = False
23     self.is_shutting_down = False
24     self.is_shutdown = False
25     self.c_server = NULL
26     cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
27     self.c_server = grpc_server_create(channel_args.c_args(), NULL)
28     if xds:
29       grpc_server_set_config_fetcher(self.c_server, grpc_server_config_fetcher_xds_create())
30     self.references.append(arguments)
31
32   def request_call(
33       self, CompletionQueue call_queue not None,
34       CompletionQueue server_queue not None, tag):
35     if not self.is_started or self.is_shutting_down:
36       raise ValueError("server must be started and not shutting down")
37     if server_queue not in self.registered_completion_queues:
38       raise ValueError("server_queue must be a registered completion queue")
39     cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
40     request_call_tag.prepare()
41     cpython.Py_INCREF(request_call_tag)
42     return grpc_server_request_call(
43         self.c_server, &request_call_tag.call.c_call,
44         &request_call_tag.call_details.c_details,
45         &request_call_tag.c_invocation_metadata,
46         call_queue.c_completion_queue, server_queue.c_completion_queue,
47         <cpython.PyObject *>request_call_tag)
48
49   def register_completion_queue(
50       self, CompletionQueue queue not None):
51     if self.is_started:
52       raise ValueError("cannot register completion queues after start")
53     with nogil:
54       grpc_server_register_completion_queue(
55           self.c_server, queue.c_completion_queue, NULL)
56     self.registered_completion_queues.append(queue)
57
58   def start(self, backup_queue=True):
59     """Start the Cython gRPC Server.
60     
61     Args:
62       backup_queue: a bool indicates whether to spawn a backup completion
63         queue. In the case that no CQ is bound to the server, and the shutdown
64         of server becomes un-observable.
65     """
66     if self.is_started:
67       raise ValueError("the server has already started")
68     if backup_queue:
69       self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True)
70       self.register_completion_queue(self.backup_shutdown_queue)
71     self.is_started = True
72     with nogil:
73       grpc_server_start(self.c_server)
74     if backup_queue:
75       # Ensure the core has gotten a chance to do the start-up work
76       self.backup_shutdown_queue.poll(deadline=time.time())
77
78   def add_http2_port(self, bytes address,
79                      ServerCredentials server_credentials=None):
80     address = str_to_bytes(address)
81     self.references.append(address)
82     cdef int result
83     cdef char *address_c_string = address
84     if server_credentials is not None:
85       self.references.append(server_credentials)
86       with nogil:
87         result = grpc_server_add_secure_http2_port(
88             self.c_server, address_c_string, server_credentials.c_credentials)
89     else:
90       with nogil:
91         result = grpc_server_add_insecure_http2_port(self.c_server,
92                                                      address_c_string)
93     return result
94
95   cdef _c_shutdown(self, CompletionQueue queue, tag):
96     self.is_shutting_down = True
97     cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
98     cpython.Py_INCREF(server_shutdown_tag)
99     with nogil:
100       grpc_server_shutdown_and_notify(
101           self.c_server, queue.c_completion_queue,
102           <cpython.PyObject *>server_shutdown_tag)
103
104   def shutdown(self, CompletionQueue queue not None, tag):
105     if queue.is_shutting_down:
106       raise ValueError("queue must be live")
107     elif not self.is_started:
108       raise ValueError("the server hasn't started yet")
109     elif self.is_shutting_down:
110       return
111     elif queue not in self.registered_completion_queues:
112       raise ValueError("expected registered completion queue")
113     else:
114       self._c_shutdown(queue, tag)
115
116   cdef notify_shutdown_complete(self):
117     # called only after our server shutdown tag has emerged from a completion
118     # queue.
119     self.is_shutdown = True
120
121   def cancel_all_calls(self):
122     if not self.is_shutting_down:
123       raise UsageError("the server must be shutting down to cancel all calls")
124     elif self.is_shutdown:
125       return
126     else:
127       with nogil:
128         grpc_server_cancel_all_calls(self.c_server)
129
130   # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any,
131   # portion of this is safe to call from __dealloc__, and potentially remove
132   # backup_shutdown_queue.
133   def destroy(self):
134     if self.c_server != NULL:
135       if not self.is_started:
136         pass
137       elif self.is_shutdown:
138         pass
139       elif not self.is_shutting_down:
140         if self.backup_shutdown_queue is None:
141           raise InternalError('Server shutdown failed: no completion queue.')
142         else:
143           # the user didn't call shutdown - use our backup queue
144           self._c_shutdown(self.backup_shutdown_queue, None)
145           # and now we wait
146           while not self.is_shutdown:
147             self.backup_shutdown_queue.poll()
148       else:
149         # We're in the process of shutting down, but have not shutdown; can't do
150         # much but repeatedly release the GIL and wait
151         while not self.is_shutdown:
152           time.sleep(0)
153       with nogil:
154         grpc_server_destroy(self.c_server)
155         self.c_server = NULL
156
157   def __dealloc__(self):
158     if self.c_server == NULL:
159       grpc_shutdown()