1 # Copyright 2015 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 def __cinit__(self, object arguments, bint xds):
19 fork_handlers_and_grpc_init()
21 self.registered_completion_queues = []
22 self.is_started = False
23 self.is_shutting_down = False
24 self.is_shutdown = False
26 cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
27 self.c_server = grpc_server_create(channel_args.c_args(), NULL)
29 grpc_server_set_config_fetcher(self.c_server, grpc_server_config_fetcher_xds_create())
30 self.references.append(arguments)
33 self, CompletionQueue call_queue not None,
34 CompletionQueue server_queue not None, tag):
35 if not self.is_started or self.is_shutting_down:
36 raise ValueError("server must be started and not shutting down")
37 if server_queue not in self.registered_completion_queues:
38 raise ValueError("server_queue must be a registered completion queue")
39 cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
40 request_call_tag.prepare()
41 cpython.Py_INCREF(request_call_tag)
42 return grpc_server_request_call(
43 self.c_server, &request_call_tag.call.c_call,
44 &request_call_tag.call_details.c_details,
45 &request_call_tag.c_invocation_metadata,
46 call_queue.c_completion_queue, server_queue.c_completion_queue,
47 <cpython.PyObject *>request_call_tag)
49 def register_completion_queue(
50 self, CompletionQueue queue not None):
52 raise ValueError("cannot register completion queues after start")
54 grpc_server_register_completion_queue(
55 self.c_server, queue.c_completion_queue, NULL)
56 self.registered_completion_queues.append(queue)
58 def start(self, backup_queue=True):
59 """Start the Cython gRPC Server.
62 backup_queue: a bool indicates whether to spawn a backup completion
63 queue. In the case that no CQ is bound to the server, and the shutdown
64 of server becomes un-observable.
67 raise ValueError("the server has already started")
69 self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True)
70 self.register_completion_queue(self.backup_shutdown_queue)
71 self.is_started = True
73 grpc_server_start(self.c_server)
75 # Ensure the core has gotten a chance to do the start-up work
76 self.backup_shutdown_queue.poll(deadline=time.time())
78 def add_http2_port(self, bytes address,
79 ServerCredentials server_credentials=None):
80 address = str_to_bytes(address)
81 self.references.append(address)
83 cdef char *address_c_string = address
84 if server_credentials is not None:
85 self.references.append(server_credentials)
87 result = grpc_server_add_secure_http2_port(
88 self.c_server, address_c_string, server_credentials.c_credentials)
91 result = grpc_server_add_insecure_http2_port(self.c_server,
95 cdef _c_shutdown(self, CompletionQueue queue, tag):
96 self.is_shutting_down = True
97 cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
98 cpython.Py_INCREF(server_shutdown_tag)
100 grpc_server_shutdown_and_notify(
101 self.c_server, queue.c_completion_queue,
102 <cpython.PyObject *>server_shutdown_tag)
104 def shutdown(self, CompletionQueue queue not None, tag):
105 if queue.is_shutting_down:
106 raise ValueError("queue must be live")
107 elif not self.is_started:
108 raise ValueError("the server hasn't started yet")
109 elif self.is_shutting_down:
111 elif queue not in self.registered_completion_queues:
112 raise ValueError("expected registered completion queue")
114 self._c_shutdown(queue, tag)
116 cdef notify_shutdown_complete(self):
117 # called only after our server shutdown tag has emerged from a completion
119 self.is_shutdown = True
121 def cancel_all_calls(self):
122 if not self.is_shutting_down:
123 raise UsageError("the server must be shutting down to cancel all calls")
124 elif self.is_shutdown:
128 grpc_server_cancel_all_calls(self.c_server)
130 # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any,
131 # portion of this is safe to call from __dealloc__, and potentially remove
132 # backup_shutdown_queue.
134 if self.c_server != NULL:
135 if not self.is_started:
137 elif self.is_shutdown:
139 elif not self.is_shutting_down:
140 if self.backup_shutdown_queue is None:
141 raise InternalError('Server shutdown failed: no completion queue.')
143 # the user didn't call shutdown - use our backup queue
144 self._c_shutdown(self.backup_shutdown_queue, None)
146 while not self.is_shutdown:
147 self.backup_shutdown_queue.poll()
149 # We're in the process of shutting down, but have not shutdown; can't do
150 # much but repeatedly release the GIL and wait
151 while not self.is_shutdown:
154 grpc_server_destroy(self.c_server)
157 def __dealloc__(self):
158 if self.c_server == NULL: