Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / core / lib / surface / server.h
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #ifndef GRPC_CORE_LIB_SURFACE_SERVER_H
18 #define GRPC_CORE_LIB_SURFACE_SERVER_H
19
20 #include <grpc/support/port_platform.h>
21
22 #include <list>
23 #include <vector>
24
25 #include "absl/types/optional.h"
26
27 #include <grpc/grpc.h>
28
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_stack.h"
31 #include "src/core/lib/channel/channelz.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gprpp/atomic.h"
34 #include "src/core/lib/surface/completion_queue.h"
35 #include "src/core/lib/transport/transport.h"
36
37 namespace grpc_core {
38
39 extern TraceFlag grpc_server_channel_trace;
40
41 class Server : public InternallyRefCounted<Server> {
42  public:
43   // Filter vtable.
44   static const grpc_channel_filter kServerTopFilter;
45
46   // Opaque type used for registered methods.
47   struct RegisteredMethod;
48
49   // An object to represent the most relevant characteristics of a
50   // newly-allocated call object when using an AllocatingRequestMatcherBatch.
51   struct BatchCallAllocation {
52     grpc_experimental_completion_queue_functor* tag;
53     grpc_call** call;
54     grpc_metadata_array* initial_metadata;
55     grpc_call_details* details;
56   };
57
58   // An object to represent the most relevant characteristics of a
59   // newly-allocated call object when using an
60   // AllocatingRequestMatcherRegistered.
61   struct RegisteredCallAllocation {
62     grpc_experimental_completion_queue_functor* tag;
63     grpc_call** call;
64     grpc_metadata_array* initial_metadata;
65     gpr_timespec* deadline;
66     grpc_byte_buffer** optional_payload;
67   };
68
69   /// Interface for listeners.
70   /// Implementations must override the Orphan() method, which should stop
71   /// listening and initiate destruction of the listener.
72   class ListenerInterface : public Orphanable {
73    public:
74     virtual ~ListenerInterface() = default;
75
76     /// Starts listening. This listener may refer to the pollset object beyond
77     /// this call, so it is a pointer rather than a reference.
78     virtual void Start(Server* server,
79                        const std::vector<grpc_pollset*>* pollsets) = 0;
80
81     /// Returns the channelz node for the listen socket, or null if not
82     /// supported.
83     virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
84
85     /// Sets a closure to be invoked by the listener when its destruction
86     /// is complete.
87     virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
88   };
89
90   explicit Server(const grpc_channel_args* args);
91   ~Server();
92
93   void Orphan() override;
94
95   const grpc_channel_args* channel_args() const { return channel_args_; }
96   grpc_resource_user* default_resource_user() const {
97     return default_resource_user_;
98   }
99   channelz::ServerNode* channelz_node() const { return channelz_node_.get(); }
100
101   // Do not call this before Start(). Returns the pollsets. The
102   // vector itself is immutable, but the pollsets inside are mutable. The
103   // result is valid for the lifetime of the server.
104   const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
105
106   bool HasOpenConnections();
107
108   // Adds a listener to the server.  When the server starts, it will call
109   // the listener's Start() method, and when it shuts down, it will orphan
110   // the listener.
111   void AddListener(OrphanablePtr<ListenerInterface> listener);
112
113   // Starts listening for connections.
114   void Start();
115
116   // Sets up a transport.  Creates a channel stack and binds the transport to
117   // the server.  Called from the listener when a new connection is accepted.
118   void SetupTransport(grpc_transport* transport,
119                       grpc_pollset* accepting_pollset,
120                       const grpc_channel_args* args,
121                       const RefCountedPtr<channelz::SocketNode>& socket_node,
122                       grpc_resource_user* resource_user = nullptr);
123
124   void RegisterCompletionQueue(grpc_completion_queue* cq);
125
126   // Functions to specify that a specific registered method or the unregistered
127   // collection should use a specific allocator for request matching.
128   void SetRegisteredMethodAllocator(
129       grpc_completion_queue* cq, void* method_tag,
130       std::function<RegisteredCallAllocation()> allocator);
131   void SetBatchMethodAllocator(grpc_completion_queue* cq,
132                                std::function<BatchCallAllocation()> allocator);
133
134   RegisteredMethod* RegisterMethod(
135       const char* method, const char* host,
136       grpc_server_register_method_payload_handling payload_handling,
137       uint32_t flags);
138
139   grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details,
140                               grpc_metadata_array* request_metadata,
141                               grpc_completion_queue* cq_bound_to_call,
142                               grpc_completion_queue* cq_for_notification,
143                               void* tag);
144
145   grpc_call_error RequestRegisteredCall(
146       RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
147       grpc_metadata_array* request_metadata,
148       grpc_byte_buffer** optional_payload,
149       grpc_completion_queue* cq_bound_to_call,
150       grpc_completion_queue* cq_for_notification, void* tag_new);
151
152   void ShutdownAndNotify(grpc_completion_queue* cq, void* tag);
153
154   void CancelAllCalls();
155
156  private:
157   struct RequestedCall;
158
159   struct ChannelRegisteredMethod {
160     RegisteredMethod* server_registered_method = nullptr;
161     uint32_t flags;
162     bool has_host;
163     ExternallyManagedSlice method;
164     ExternallyManagedSlice host;
165   };
166
167   class RequestMatcherInterface;
168   class RealRequestMatcher;
169   class AllocatingRequestMatcherBase;
170   class AllocatingRequestMatcherBatch;
171   class AllocatingRequestMatcherRegistered;
172
173   class ChannelData {
174    public:
175     ChannelData() = default;
176     ~ChannelData();
177
178     void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel,
179                        size_t cq_idx, grpc_transport* transport,
180                        intptr_t channelz_socket_uuid);
181
182     RefCountedPtr<Server> server() const { return server_; }
183     grpc_channel* channel() const { return channel_; }
184     size_t cq_idx() const { return cq_idx_; }
185
186     ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host,
187                                                  const grpc_slice& path,
188                                                  bool is_idempotent);
189
190     // Filter vtable functions.
191     static grpc_error* InitChannelElement(grpc_channel_element* elem,
192                                           grpc_channel_element_args* args);
193     static void DestroyChannelElement(grpc_channel_element* elem);
194
195    private:
196     class ConnectivityWatcher;
197
198     static void AcceptStream(void* arg, grpc_transport* /*transport*/,
199                              const void* transport_server_data);
200
201     void Destroy();
202
203     static void FinishDestroy(void* arg, grpc_error* error);
204
205     RefCountedPtr<Server> server_;
206     grpc_channel* channel_;
207     // The index into Server::cqs_ of the CQ used as a starting point for
208     // where to publish new incoming calls.
209     size_t cq_idx_;
210     absl::optional<std::list<ChannelData*>::iterator> list_position_;
211     // A hash-table of the methods and hosts of the registered methods.
212     // TODO(vjpai): Convert this to an STL map type as opposed to a direct
213     // bucket implementation. (Consider performance impact, hash function to
214     // use, etc.)
215     std::unique_ptr<std::vector<ChannelRegisteredMethod>> registered_methods_;
216     uint32_t registered_method_max_probes_;
217     grpc_closure finish_destroy_channel_closure_;
218     intptr_t channelz_socket_uuid_;
219   };
220
221   class CallData {
222    public:
223     enum class CallState {
224       NOT_STARTED,  // Waiting for metadata.
225       PENDING,      // Initial metadata read, not flow controlled in yet.
226       ACTIVATED,    // Flow controlled in, on completion queue.
227       ZOMBIED,      // Cancelled before being queued.
228     };
229
230     CallData(grpc_call_element* elem, const grpc_call_element_args& args,
231              RefCountedPtr<Server> server);
232     ~CallData();
233
234     // Starts the recv_initial_metadata batch on the call.
235     // Invoked from ChannelData::AcceptStream().
236     void Start(grpc_call_element* elem);
237
238     void SetState(CallState state);
239
240     // Attempts to move from PENDING to ACTIVATED state.  Returns true
241     // on success.
242     bool MaybeActivate();
243
244     // Publishes an incoming call to the application after it has been
245     // matched.
246     void Publish(size_t cq_idx, RequestedCall* rc);
247
248     void KillZombie();
249
250     void FailCallCreation();
251
252     // Filter vtable functions.
253     static grpc_error* InitCallElement(grpc_call_element* elem,
254                                        const grpc_call_element_args* args);
255     static void DestroyCallElement(grpc_call_element* elem,
256                                    const grpc_call_final_info* /*final_info*/,
257                                    grpc_closure* /*ignored*/);
258     static void StartTransportStreamOpBatch(
259         grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
260
261    private:
262     // Helper functions for handling calls at the top of the call stack.
263     static void RecvInitialMetadataBatchComplete(void* arg, grpc_error* error);
264     void StartNewRpc(grpc_call_element* elem);
265     static void PublishNewRpc(void* arg, grpc_error* error);
266
267     // Functions used inside the call stack.
268     void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
269                                          grpc_transport_stream_op_batch* batch);
270     static void RecvInitialMetadataReady(void* arg, grpc_error* error);
271     static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
272
273     RefCountedPtr<Server> server_;
274
275     grpc_call* call_;
276
277     Atomic<CallState> state_{CallState::NOT_STARTED};
278
279     absl::optional<grpc_slice> path_;
280     absl::optional<grpc_slice> host_;
281     grpc_millis deadline_ = GRPC_MILLIS_INF_FUTURE;
282
283     grpc_completion_queue* cq_new_ = nullptr;
284
285     RequestMatcherInterface* matcher_ = nullptr;
286     grpc_byte_buffer* payload_ = nullptr;
287
288     grpc_closure kill_zombie_closure_;
289
290     grpc_metadata_array initial_metadata_ =
291         grpc_metadata_array();  // Zero-initialize the C struct.
292     grpc_closure recv_initial_metadata_batch_complete_;
293
294     grpc_metadata_batch* recv_initial_metadata_ = nullptr;
295     uint32_t recv_initial_metadata_flags_ = 0;
296     grpc_closure recv_initial_metadata_ready_;
297     grpc_closure* original_recv_initial_metadata_ready_;
298     grpc_error* recv_initial_metadata_error_ = GRPC_ERROR_NONE;
299
300     bool seen_recv_trailing_metadata_ready_ = false;
301     grpc_closure recv_trailing_metadata_ready_;
302     grpc_closure* original_recv_trailing_metadata_ready_;
303     grpc_error* recv_trailing_metadata_error_ = GRPC_ERROR_NONE;
304
305     grpc_closure publish_;
306
307     CallCombiner* call_combiner_;
308   };
309
310   struct Listener {
311     explicit Listener(OrphanablePtr<ListenerInterface> l)
312         : listener(std::move(l)) {}
313     OrphanablePtr<ListenerInterface> listener;
314     grpc_closure destroy_done;
315   };
316
317   struct ShutdownTag {
318     ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
319         : tag(tag_arg), cq(cq_arg) {}
320     void* const tag;
321     grpc_completion_queue* const cq;
322     grpc_cq_completion completion;
323   };
324
325   static void ListenerDestroyDone(void* arg, grpc_error* error);
326
327   static void DoneShutdownEvent(void* server,
328                                 grpc_cq_completion* /*completion*/) {
329     static_cast<Server*>(server)->Unref();
330   }
331
332   static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
333
334   void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error* error);
335   grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
336
337   void MaybeFinishShutdown();
338
339   void KillPendingWorkLocked(grpc_error* error);
340
341   static grpc_call_error ValidateServerRequest(
342       grpc_completion_queue* cq_for_notification, void* tag,
343       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
344   grpc_call_error ValidateServerRequestAndCq(
345       size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
346       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
347
348   std::vector<grpc_channel*> GetChannelsLocked() const;
349
350   grpc_channel_args* const channel_args_;
351   grpc_resource_user* default_resource_user_ = nullptr;
352   RefCountedPtr<channelz::ServerNode> channelz_node_;
353
354   std::vector<grpc_completion_queue*> cqs_;
355   std::vector<grpc_pollset*> pollsets_;
356   bool started_ = false;
357
358   // The two following mutexes control access to server-state.
359   // mu_global_ controls access to non-call-related state (e.g., channel state).
360   // mu_call_ controls access to call-related state (e.g., the call lists).
361   //
362   // If they are ever required to be nested, you must lock mu_global_
363   // before mu_call_. This is currently used in shutdown processing
364   // (ShutdownAndNotify() and MaybeFinishShutdown()).
365   Mutex mu_global_;  // mutex for server and channel state
366   Mutex mu_call_;    // mutex for call-specific state
367
368   // startup synchronization: flag is protected by mu_global_, signals whether
369   // we are doing the listener start routine or not.
370   bool starting_ = false;
371   CondVar starting_cv_;
372
373   std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_;
374
375   // Request matcher for unregistered methods.
376   std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
377
378   std::atomic_bool shutdown_flag_{false};
379   bool shutdown_published_ = false;
380   std::vector<ShutdownTag> shutdown_tags_;
381
382   std::list<ChannelData*> channels_;
383
384   std::list<Listener> listeners_;
385   size_t listeners_destroyed_ = 0;
386
387   // The last time we printed a shutdown progress message.
388   gpr_timespec last_shutdown_message_time_;
389 };
390
391 }  // namespace grpc_core
392
393 struct grpc_server {
394   grpc_core::OrphanablePtr<grpc_core::Server> core_server;
395 };
396
397 #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */