2 // Copyright 2015 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #ifndef GRPC_CORE_LIB_SURFACE_SERVER_H
18 #define GRPC_CORE_LIB_SURFACE_SERVER_H
20 #include <grpc/support/port_platform.h>
25 #include "absl/types/optional.h"
27 #include <grpc/grpc.h>
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_stack.h"
31 #include "src/core/lib/channel/channelz.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gprpp/atomic.h"
34 #include "src/core/lib/surface/completion_queue.h"
35 #include "src/core/lib/transport/transport.h"
39 extern TraceFlag grpc_server_channel_trace;
41 class Server : public InternallyRefCounted<Server> {
44 static const grpc_channel_filter kServerTopFilter;
46 // Opaque type used for registered methods.
47 struct RegisteredMethod;
49 // An object to represent the most relevant characteristics of a
50 // newly-allocated call object when using an AllocatingRequestMatcherBatch.
51 struct BatchCallAllocation {
52 grpc_experimental_completion_queue_functor* tag;
54 grpc_metadata_array* initial_metadata;
55 grpc_call_details* details;
58 // An object to represent the most relevant characteristics of a
59 // newly-allocated call object when using an
60 // AllocatingRequestMatcherRegistered.
61 struct RegisteredCallAllocation {
62 grpc_experimental_completion_queue_functor* tag;
64 grpc_metadata_array* initial_metadata;
65 gpr_timespec* deadline;
66 grpc_byte_buffer** optional_payload;
69 /// Interface for listeners.
70 /// Implementations must override the Orphan() method, which should stop
71 /// listening and initiate destruction of the listener.
72 class ListenerInterface : public Orphanable {
74 virtual ~ListenerInterface() = default;
76 /// Starts listening. This listener may refer to the pollset object beyond
77 /// this call, so it is a pointer rather than a reference.
78 virtual void Start(Server* server,
79 const std::vector<grpc_pollset*>* pollsets) = 0;
81 /// Returns the channelz node for the listen socket, or null if not
83 virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
85 /// Sets a closure to be invoked by the listener when its destruction
87 virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
90 explicit Server(const grpc_channel_args* args);
93 void Orphan() override;
95 const grpc_channel_args* channel_args() const { return channel_args_; }
96 grpc_resource_user* default_resource_user() const {
97 return default_resource_user_;
99 channelz::ServerNode* channelz_node() const { return channelz_node_.get(); }
101 // Do not call this before Start(). Returns the pollsets. The
102 // vector itself is immutable, but the pollsets inside are mutable. The
103 // result is valid for the lifetime of the server.
104 const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
106 bool HasOpenConnections();
108 // Adds a listener to the server. When the server starts, it will call
109 // the listener's Start() method, and when it shuts down, it will orphan
111 void AddListener(OrphanablePtr<ListenerInterface> listener);
113 // Starts listening for connections.
116 // Sets up a transport. Creates a channel stack and binds the transport to
117 // the server. Called from the listener when a new connection is accepted.
118 void SetupTransport(grpc_transport* transport,
119 grpc_pollset* accepting_pollset,
120 const grpc_channel_args* args,
121 const RefCountedPtr<channelz::SocketNode>& socket_node,
122 grpc_resource_user* resource_user = nullptr);
124 void RegisterCompletionQueue(grpc_completion_queue* cq);
126 // Functions to specify that a specific registered method or the unregistered
127 // collection should use a specific allocator for request matching.
128 void SetRegisteredMethodAllocator(
129 grpc_completion_queue* cq, void* method_tag,
130 std::function<RegisteredCallAllocation()> allocator);
131 void SetBatchMethodAllocator(grpc_completion_queue* cq,
132 std::function<BatchCallAllocation()> allocator);
134 RegisteredMethod* RegisterMethod(
135 const char* method, const char* host,
136 grpc_server_register_method_payload_handling payload_handling,
139 grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details,
140 grpc_metadata_array* request_metadata,
141 grpc_completion_queue* cq_bound_to_call,
142 grpc_completion_queue* cq_for_notification,
145 grpc_call_error RequestRegisteredCall(
146 RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
147 grpc_metadata_array* request_metadata,
148 grpc_byte_buffer** optional_payload,
149 grpc_completion_queue* cq_bound_to_call,
150 grpc_completion_queue* cq_for_notification, void* tag_new);
152 void ShutdownAndNotify(grpc_completion_queue* cq, void* tag);
154 void CancelAllCalls();
157 struct RequestedCall;
159 struct ChannelRegisteredMethod {
160 RegisteredMethod* server_registered_method = nullptr;
163 ExternallyManagedSlice method;
164 ExternallyManagedSlice host;
167 class RequestMatcherInterface;
168 class RealRequestMatcher;
169 class AllocatingRequestMatcherBase;
170 class AllocatingRequestMatcherBatch;
171 class AllocatingRequestMatcherRegistered;
175 ChannelData() = default;
178 void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel,
179 size_t cq_idx, grpc_transport* transport,
180 intptr_t channelz_socket_uuid);
182 RefCountedPtr<Server> server() const { return server_; }
183 grpc_channel* channel() const { return channel_; }
184 size_t cq_idx() const { return cq_idx_; }
186 ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host,
187 const grpc_slice& path,
190 // Filter vtable functions.
191 static grpc_error* InitChannelElement(grpc_channel_element* elem,
192 grpc_channel_element_args* args);
193 static void DestroyChannelElement(grpc_channel_element* elem);
196 class ConnectivityWatcher;
198 static void AcceptStream(void* arg, grpc_transport* /*transport*/,
199 const void* transport_server_data);
203 static void FinishDestroy(void* arg, grpc_error* error);
205 RefCountedPtr<Server> server_;
206 grpc_channel* channel_;
207 // The index into Server::cqs_ of the CQ used as a starting point for
208 // where to publish new incoming calls.
210 absl::optional<std::list<ChannelData*>::iterator> list_position_;
211 // A hash-table of the methods and hosts of the registered methods.
212 // TODO(vjpai): Convert this to an STL map type as opposed to a direct
213 // bucket implementation. (Consider performance impact, hash function to
215 std::unique_ptr<std::vector<ChannelRegisteredMethod>> registered_methods_;
216 uint32_t registered_method_max_probes_;
217 grpc_closure finish_destroy_channel_closure_;
218 intptr_t channelz_socket_uuid_;
223 enum class CallState {
224 NOT_STARTED, // Waiting for metadata.
225 PENDING, // Initial metadata read, not flow controlled in yet.
226 ACTIVATED, // Flow controlled in, on completion queue.
227 ZOMBIED, // Cancelled before being queued.
230 CallData(grpc_call_element* elem, const grpc_call_element_args& args,
231 RefCountedPtr<Server> server);
234 // Starts the recv_initial_metadata batch on the call.
235 // Invoked from ChannelData::AcceptStream().
236 void Start(grpc_call_element* elem);
238 void SetState(CallState state);
240 // Attempts to move from PENDING to ACTIVATED state. Returns true
242 bool MaybeActivate();
244 // Publishes an incoming call to the application after it has been
246 void Publish(size_t cq_idx, RequestedCall* rc);
250 void FailCallCreation();
252 // Filter vtable functions.
253 static grpc_error* InitCallElement(grpc_call_element* elem,
254 const grpc_call_element_args* args);
255 static void DestroyCallElement(grpc_call_element* elem,
256 const grpc_call_final_info* /*final_info*/,
257 grpc_closure* /*ignored*/);
258 static void StartTransportStreamOpBatch(
259 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
262 // Helper functions for handling calls at the top of the call stack.
263 static void RecvInitialMetadataBatchComplete(void* arg, grpc_error* error);
264 void StartNewRpc(grpc_call_element* elem);
265 static void PublishNewRpc(void* arg, grpc_error* error);
267 // Functions used inside the call stack.
268 void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
269 grpc_transport_stream_op_batch* batch);
270 static void RecvInitialMetadataReady(void* arg, grpc_error* error);
271 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
273 RefCountedPtr<Server> server_;
277 Atomic<CallState> state_{CallState::NOT_STARTED};
279 absl::optional<grpc_slice> path_;
280 absl::optional<grpc_slice> host_;
281 grpc_millis deadline_ = GRPC_MILLIS_INF_FUTURE;
283 grpc_completion_queue* cq_new_ = nullptr;
285 RequestMatcherInterface* matcher_ = nullptr;
286 grpc_byte_buffer* payload_ = nullptr;
288 grpc_closure kill_zombie_closure_;
290 grpc_metadata_array initial_metadata_ =
291 grpc_metadata_array(); // Zero-initialize the C struct.
292 grpc_closure recv_initial_metadata_batch_complete_;
294 grpc_metadata_batch* recv_initial_metadata_ = nullptr;
295 uint32_t recv_initial_metadata_flags_ = 0;
296 grpc_closure recv_initial_metadata_ready_;
297 grpc_closure* original_recv_initial_metadata_ready_;
298 grpc_error* recv_initial_metadata_error_ = GRPC_ERROR_NONE;
300 bool seen_recv_trailing_metadata_ready_ = false;
301 grpc_closure recv_trailing_metadata_ready_;
302 grpc_closure* original_recv_trailing_metadata_ready_;
303 grpc_error* recv_trailing_metadata_error_ = GRPC_ERROR_NONE;
305 grpc_closure publish_;
307 CallCombiner* call_combiner_;
311 explicit Listener(OrphanablePtr<ListenerInterface> l)
312 : listener(std::move(l)) {}
313 OrphanablePtr<ListenerInterface> listener;
314 grpc_closure destroy_done;
318 ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
319 : tag(tag_arg), cq(cq_arg) {}
321 grpc_completion_queue* const cq;
322 grpc_cq_completion completion;
325 static void ListenerDestroyDone(void* arg, grpc_error* error);
327 static void DoneShutdownEvent(void* server,
328 grpc_cq_completion* /*completion*/) {
329 static_cast<Server*>(server)->Unref();
332 static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
334 void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error* error);
335 grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
337 void MaybeFinishShutdown();
339 void KillPendingWorkLocked(grpc_error* error);
341 static grpc_call_error ValidateServerRequest(
342 grpc_completion_queue* cq_for_notification, void* tag,
343 grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
344 grpc_call_error ValidateServerRequestAndCq(
345 size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
346 grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
348 std::vector<grpc_channel*> GetChannelsLocked() const;
350 grpc_channel_args* const channel_args_;
351 grpc_resource_user* default_resource_user_ = nullptr;
352 RefCountedPtr<channelz::ServerNode> channelz_node_;
354 std::vector<grpc_completion_queue*> cqs_;
355 std::vector<grpc_pollset*> pollsets_;
356 bool started_ = false;
358 // The two following mutexes control access to server-state.
359 // mu_global_ controls access to non-call-related state (e.g., channel state).
360 // mu_call_ controls access to call-related state (e.g., the call lists).
362 // If they are ever required to be nested, you must lock mu_global_
363 // before mu_call_. This is currently used in shutdown processing
364 // (ShutdownAndNotify() and MaybeFinishShutdown()).
365 Mutex mu_global_; // mutex for server and channel state
366 Mutex mu_call_; // mutex for call-specific state
368 // startup synchronization: flag is protected by mu_global_, signals whether
369 // we are doing the listener start routine or not.
370 bool starting_ = false;
371 CondVar starting_cv_;
373 std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_;
375 // Request matcher for unregistered methods.
376 std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
378 std::atomic_bool shutdown_flag_{false};
379 bool shutdown_published_ = false;
380 std::vector<ShutdownTag> shutdown_tags_;
382 std::list<ChannelData*> channels_;
384 std::list<Listener> listeners_;
385 size_t listeners_destroyed_ = 0;
387 // The last time we printed a shutdown progress message.
388 gpr_timespec last_shutdown_message_time_;
391 } // namespace grpc_core
394 grpc_core::OrphanablePtr<grpc_core::Server> core_server;
397 #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */