Imported Upstream version 1.32.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / subchannel.h
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <deque>
25
26 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
27 #include "src/core/ext/filters/client_channel/connector.h"
28 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
29 #include "src/core/lib/backoff/backoff.h"
30 #include "src/core/lib/channel/channel_stack.h"
31 #include "src/core/lib/gpr/time_precise.h"
32 #include "src/core/lib/gprpp/arena.h"
33 #include "src/core/lib/gprpp/map.h"
34 #include "src/core/lib/gprpp/ref_counted.h"
35 #include "src/core/lib/gprpp/ref_counted_ptr.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/iomgr/polling_entity.h"
38 #include "src/core/lib/iomgr/timer.h"
39 #include "src/core/lib/transport/connectivity_state.h"
40 #include "src/core/lib/transport/metadata.h"
41
42 // Channel arg containing a URI indicating the address to connect to.
43 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
44
45 // For debugging refcounting.
46 #ifndef NDEBUG
47 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
49 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
50 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
51 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
52 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
53   const char *file, int line, const char *reason
54 #define GRPC_SUBCHANNEL_REF_REASON reason
55 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
56   , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
57 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
58 #else
59 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
60 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
61 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
62 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
63 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
64 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
65 #define GRPC_SUBCHANNEL_REF_REASON ""
66 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
67 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
68 #endif
69
70 namespace grpc_core {
71
72 class SubchannelCall;
73
74 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
75  public:
76   ConnectedSubchannel(
77       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
78       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
79   ~ConnectedSubchannel();
80
81   void StartWatch(grpc_pollset_set* interested_parties,
82                   OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
83
84   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
85
86   grpc_channel_stack* channel_stack() const { return channel_stack_; }
87   const grpc_channel_args* args() const { return args_; }
88   channelz::SubchannelNode* channelz_subchannel() const {
89     return channelz_subchannel_.get();
90   }
91
92   size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
93
94  private:
95   grpc_channel_stack* channel_stack_;
96   grpc_channel_args* args_;
97   // ref counted pointer to the channelz node in this connected subchannel's
98   // owning subchannel.
99   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
100 };
101
102 // Implements the interface of RefCounted<>.
103 class SubchannelCall {
104  public:
105   struct Args {
106     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
107     grpc_polling_entity* pollent;
108     grpc_slice path;
109     gpr_cycle_counter start_time;
110     grpc_millis deadline;
111     Arena* arena;
112     grpc_call_context_element* context;
113     CallCombiner* call_combiner;
114     size_t parent_data_size;
115   };
116   static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
117
118   // Continues processing a transport stream op batch.
119   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
120
121   // Returns a pointer to the parent data associated with the subchannel call.
122   // The data will be of the size specified in \a parent_data_size field of
123   // the args passed to \a ConnectedSubchannel::CreateCall().
124   void* GetParentData();
125
126   // Returns the call stack of the subchannel call.
127   grpc_call_stack* GetCallStack();
128
129   // Sets the 'then_schedule_closure' argument for call stack destruction.
130   // Must be called once per call.
131   void SetAfterCallStackDestroy(grpc_closure* closure);
132
133   // Interface of RefCounted<>.
134   RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
135   RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
136                                     const char* reason) GRPC_MUST_USE_RESULT;
137   // When refcount drops to 0, destroys itself and the associated call stack,
138   // but does NOT free the memory because it's in the call arena.
139   void Unref();
140   void Unref(const DebugLocation& location, const char* reason);
141
142   static void Destroy(void* arg, grpc_error* error);
143
144  private:
145   // Allow RefCountedPtr<> to access IncrementRefCount().
146   template <typename T>
147   friend class RefCountedPtr;
148
149   SubchannelCall(Args args, grpc_error** error);
150
151   // If channelz is enabled, intercepts recv_trailing so that we may check the
152   // status and associate it to a subchannel.
153   void MaybeInterceptRecvTrailingMetadata(
154       grpc_transport_stream_op_batch* batch);
155
156   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
157
158   // Interface of RefCounted<>.
159   void IncrementRefCount();
160   void IncrementRefCount(const DebugLocation& location, const char* reason);
161
162   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
163   grpc_closure* after_call_stack_destroy_ = nullptr;
164   // State needed to support channelz interception of recv trailing metadata.
165   grpc_closure recv_trailing_metadata_ready_;
166   grpc_closure* original_recv_trailing_metadata_ = nullptr;
167   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
168   grpc_millis deadline_;
169 };
170
171 // A subchannel that knows how to connect to exactly one target address. It
172 // provides a target for load balancing.
173 //
174 // Note that this is the "real" subchannel implementation, whose API is
175 // different from the SubchannelInterface that is exposed to LB policy
176 // implementations.  The client channel provides an adaptor class
177 // (SubchannelWrapper) that "converts" between the two.
178 class Subchannel {
179  public:
180   class ConnectivityStateWatcherInterface
181       : public RefCounted<ConnectivityStateWatcherInterface> {
182    public:
183     struct ConnectivityStateChange {
184       grpc_connectivity_state state;
185       absl::Status status;
186       RefCountedPtr<ConnectedSubchannel> connected_subchannel;
187     };
188
189     virtual ~ConnectivityStateWatcherInterface() = default;
190
191     // Will be invoked whenever the subchannel's connectivity state
192     // changes.  There will be only one invocation of this method on a
193     // given watcher instance at any given time.
194     // Implementations should call PopConnectivityStateChange to get the next
195     // connectivity state change.
196     virtual void OnConnectivityStateChange() = 0;
197
198     virtual grpc_pollset_set* interested_parties() = 0;
199
200     // Enqueues connectivity state change notifications.
201     // When the state changes to READY, connected_subchannel will
202     // contain a ref to the connected subchannel.  When it changes from
203     // READY to some other state, the implementation must release its
204     // ref to the connected subchannel.
205     // TODO(yashkt): This is currently needed to send the state updates in the
206     // right order when asynchronously notifying. This will no longer be
207     // necessary when we have access to EventManager.
208     void PushConnectivityStateChange(ConnectivityStateChange state_change);
209
210     // Dequeues connectivity state change notifications.
211     ConnectivityStateChange PopConnectivityStateChange();
212
213    private:
214     // Keeps track of the updates that the watcher instance must be notified of.
215     // TODO(yashkt): This is currently needed to send the state updates in the
216     // right order when asynchronously notifying. This will no longer be
217     // necessary when we have access to EventManager.
218     std::deque<ConnectivityStateChange> connectivity_state_queue_;
219     Mutex mu_;  // protects the queue
220   };
221
222   // The ctor and dtor are not intended to use directly.
223   Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
224              const grpc_channel_args* args);
225   ~Subchannel();
226
227   // Creates a subchannel given \a connector and \a args.
228   static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
229                             const grpc_channel_args* args);
230
231   // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
232   // is larger than the subchannel's current keepalive time. The updated value
233   // will have an affect when the subchannel creates a new ConnectedSubchannel.
234   void ThrottleKeepaliveTime(int new_keepalive_time);
235
236   // Strong and weak refcounting.
237   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
238   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
239   Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
240   void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
241   // Attempts to return a strong ref when only the weak refcount is guaranteed
242   // non-zero. If the strong refcount is zero, does not alter the refcount and
243   // returns null.
244   Subchannel* RefFromWeakRef();
245
246   // Gets the string representing the subchannel address.
247   // Caller doesn't take ownership.
248   const char* GetTargetAddress();
249
250   const grpc_channel_args* channel_args() const { return args_; }
251
252   channelz::SubchannelNode* channelz_node();
253
254   // Returns the current connectivity state of the subchannel.
255   // If health_check_service_name is non-null, the returned connectivity
256   // state will be based on the state reported by the backend for that
257   // service name.
258   // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
259   grpc_connectivity_state CheckConnectivityState(
260       const char* health_check_service_name,
261       RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
262
263   // Starts watching the subchannel's connectivity state.
264   // The first callback to the watcher will be delivered when the
265   // subchannel's connectivity state becomes a value other than
266   // initial_state, which may happen immediately.
267   // Subsequent callbacks will be delivered as the subchannel's state
268   // changes.
269   // The watcher will be destroyed either when the subchannel is
270   // destroyed or when CancelConnectivityStateWatch() is called.
271   void WatchConnectivityState(
272       grpc_connectivity_state initial_state,
273       grpc_core::UniquePtr<char> health_check_service_name,
274       RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
275
276   // Cancels a connectivity state watch.
277   // If the watcher has already been destroyed, this is a no-op.
278   void CancelConnectivityStateWatch(const char* health_check_service_name,
279                                     ConnectivityStateWatcherInterface* watcher);
280
281   // Attempt to connect to the backend.  Has no effect if already connected.
282   void AttemptToConnect();
283
284   // Resets the connection backoff of the subchannel.
285   // TODO(roth): Move connection backoff out of subchannels and up into LB
286   // policy code (probably by adding a SubchannelGroup between
287   // SubchannelList and SubchannelData), at which point this method can
288   // go away.
289   void ResetBackoff();
290
291   // Returns a new channel arg encoding the subchannel address as a URI
292   // string. Caller is responsible for freeing the string.
293   static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
294
295   // Returns the URI string from the subchannel address arg in \a args.
296   static const char* GetUriFromSubchannelAddressArg(
297       const grpc_channel_args* args);
298
299   // Sets \a addr from the subchannel address arg in \a args.
300   static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
301                                                  grpc_resolved_address* addr);
302
303  private:
304   // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
305   // the subchannel's state.
306   class ConnectivityStateWatcherList {
307    public:
308     ~ConnectivityStateWatcherList() { Clear(); }
309
310     void AddWatcherLocked(
311         RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
312     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
313
314     // Notifies all watchers in the list about a change to state.
315     void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
316                       const absl::Status& status);
317
318     void Clear() { watchers_.clear(); }
319
320     bool empty() const { return watchers_.empty(); }
321
322    private:
323     // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
324     // be a set instead of a map.
325     std::map<ConnectivityStateWatcherInterface*,
326              RefCountedPtr<ConnectivityStateWatcherInterface>>
327         watchers_;
328   };
329
330   // A map that tracks ConnectivityStateWatcherInterfaces using a particular
331   // health check service name.
332   //
333   // There is one entry in the map for each health check service name.
334   // Entries exist only as long as there are watchers using the
335   // corresponding service name.
336   //
337   // A health check client is maintained only while the subchannel is in
338   // state READY.
339   class HealthWatcherMap {
340    public:
341     void AddWatcherLocked(
342         Subchannel* subchannel, grpc_connectivity_state initial_state,
343         grpc_core::UniquePtr<char> health_check_service_name,
344         RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
345     void RemoveWatcherLocked(const char* health_check_service_name,
346                              ConnectivityStateWatcherInterface* watcher);
347
348     // Notifies the watcher when the subchannel's state changes.
349     void NotifyLocked(grpc_connectivity_state state,
350                       const absl::Status& status);
351
352     grpc_connectivity_state CheckConnectivityStateLocked(
353         Subchannel* subchannel, const char* health_check_service_name);
354
355     void ShutdownLocked();
356
357    private:
358     class HealthWatcher;
359
360     std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
361   };
362
363   class ConnectedSubchannelStateWatcher;
364
365   class AsyncWatcherNotifierLocked;
366
367   // Sets the subchannel's connectivity state to \a state.
368   void SetConnectivityStateLocked(grpc_connectivity_state state,
369                                   const absl::Status& status);
370
371   // Methods for connection.
372   void MaybeStartConnectingLocked();
373   static void OnRetryAlarm(void* arg, grpc_error* error);
374   void ContinueConnectingLocked();
375   static void OnConnectingFinished(void* arg, grpc_error* error);
376   bool PublishTransportLocked();
377   void Disconnect();
378
379   gpr_atm RefMutate(gpr_atm delta,
380                     int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
381
382   // The subchannel pool this subchannel is in.
383   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
384   // TODO(juanlishen): Consider using args_ as key_ directly.
385   // Subchannel key that identifies this subchannel in the subchannel pool.
386   SubchannelKey* key_;
387   // Channel args.
388   grpc_channel_args* args_;
389   // pollset_set tracking who's interested in a connection being setup.
390   grpc_pollset_set* pollset_set_;
391   // Protects the other members.
392   Mutex mu_;
393   // Refcount
394   //    - lower INTERNAL_REF_BITS bits are for internal references:
395   //      these do not keep the subchannel open.
396   //    - upper remaining bits are for public references: these do
397   //      keep the subchannel open
398   gpr_atm ref_pair_;
399
400   // Connection states.
401   OrphanablePtr<SubchannelConnector> connector_;
402   // Set during connection.
403   SubchannelConnector::Result connecting_result_;
404   grpc_closure on_connecting_finished_;
405   // Active connection, or null.
406   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
407   bool connecting_ = false;
408   bool disconnected_ = false;
409
410   // Connectivity state tracking.
411   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
412   absl::Status status_;
413   // The list of watchers without a health check service name.
414   ConnectivityStateWatcherList watcher_list_;
415   // The map of watchers with health check service names.
416   HealthWatcherMap health_watcher_map_;
417
418   // Backoff state.
419   BackOff backoff_;
420   grpc_millis next_attempt_deadline_;
421   grpc_millis min_connect_timeout_ms_;
422   bool backoff_begun_ = false;
423
424   // Retry alarm.
425   grpc_timer retry_alarm_;
426   grpc_closure on_retry_alarm_;
427   bool have_retry_alarm_ = false;
428   // reset_backoff() was called while alarm was pending.
429   bool retry_immediately_ = false;
430   // Keepalive time period (-1 for unset)
431   int keepalive_time_ = -1;
432
433   // Channelz tracking.
434   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
435 };
436
437 }  // namespace grpc_core
438
439 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */