Imported Upstream version 1.23.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / subchannel.h
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
25 #include "src/core/ext/filters/client_channel/connector.h"
26 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
27 #include "src/core/lib/backoff/backoff.h"
28 #include "src/core/lib/channel/channel_stack.h"
29 #include "src/core/lib/gprpp/arena.h"
30 #include "src/core/lib/gprpp/map.h"
31 #include "src/core/lib/gprpp/ref_counted.h"
32 #include "src/core/lib/gprpp/ref_counted_ptr.h"
33 #include "src/core/lib/gprpp/sync.h"
34 #include "src/core/lib/iomgr/polling_entity.h"
35 #include "src/core/lib/iomgr/timer.h"
36 #include "src/core/lib/transport/connectivity_state.h"
37 #include "src/core/lib/transport/metadata.h"
38
39 // Channel arg containing a grpc_resolved_address to connect to.
40 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
41
42 // For debugging refcounting.
43 #ifndef NDEBUG
44 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
45 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
46   (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
47 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
49 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
50 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
51   const char *file, int line, const char *reason
52 #define GRPC_SUBCHANNEL_REF_REASON reason
53 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
54   , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
55 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
56 #else
57 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
58 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
59 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
60 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
61 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
62 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
63 #define GRPC_SUBCHANNEL_REF_REASON ""
64 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
65 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
66 #endif
67
68 namespace grpc_core {
69
70 class SubchannelCall;
71
72 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
73  public:
74   ConnectedSubchannel(
75       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
76       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
77   ~ConnectedSubchannel();
78
79   void NotifyOnStateChange(grpc_pollset_set* interested_parties,
80                            grpc_connectivity_state* state,
81                            grpc_closure* closure);
82   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
83
84   grpc_channel_stack* channel_stack() const { return channel_stack_; }
85   const grpc_channel_args* args() const { return args_; }
86   channelz::SubchannelNode* channelz_subchannel() const {
87     return channelz_subchannel_.get();
88   }
89
90   size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
91
92  private:
93   grpc_channel_stack* channel_stack_;
94   grpc_channel_args* args_;
95   // ref counted pointer to the channelz node in this connected subchannel's
96   // owning subchannel.
97   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
98 };
99
100 // Implements the interface of RefCounted<>.
101 class SubchannelCall {
102  public:
103   struct Args {
104     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
105     grpc_polling_entity* pollent;
106     grpc_slice path;
107     gpr_timespec start_time;
108     grpc_millis deadline;
109     Arena* arena;
110     grpc_call_context_element* context;
111     CallCombiner* call_combiner;
112     size_t parent_data_size;
113   };
114   static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
115
116   // Continues processing a transport stream op batch.
117   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
118
119   // Returns a pointer to the parent data associated with the subchannel call.
120   // The data will be of the size specified in \a parent_data_size field of
121   // the args passed to \a ConnectedSubchannel::CreateCall().
122   void* GetParentData();
123
124   // Returns the call stack of the subchannel call.
125   grpc_call_stack* GetCallStack();
126
127   // Sets the 'then_schedule_closure' argument for call stack destruction.
128   // Must be called once per call.
129   void SetAfterCallStackDestroy(grpc_closure* closure);
130
131   // Interface of RefCounted<>.
132   RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
133   RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
134                                     const char* reason) GRPC_MUST_USE_RESULT;
135   // When refcount drops to 0, destroys itself and the associated call stack,
136   // but does NOT free the memory because it's in the call arena.
137   void Unref();
138   void Unref(const DebugLocation& location, const char* reason);
139
140   static void Destroy(void* arg, grpc_error* error);
141
142  private:
143   // Allow RefCountedPtr<> to access IncrementRefCount().
144   template <typename T>
145   friend class RefCountedPtr;
146
147   SubchannelCall(Args args, grpc_error** error);
148
149   // If channelz is enabled, intercepts recv_trailing so that we may check the
150   // status and associate it to a subchannel.
151   void MaybeInterceptRecvTrailingMetadata(
152       grpc_transport_stream_op_batch* batch);
153
154   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
155
156   // Interface of RefCounted<>.
157   void IncrementRefCount();
158   void IncrementRefCount(const DebugLocation& location, const char* reason);
159
160   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
161   grpc_closure* after_call_stack_destroy_ = nullptr;
162   // State needed to support channelz interception of recv trailing metadata.
163   grpc_closure recv_trailing_metadata_ready_;
164   grpc_closure* original_recv_trailing_metadata_ = nullptr;
165   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
166   grpc_millis deadline_;
167 };
168
169 // A subchannel that knows how to connect to exactly one target address. It
170 // provides a target for load balancing.
171 //
172 // Note that this is the "real" subchannel implementation, whose API is
173 // different from the SubchannelInterface that is exposed to LB policy
174 // implementations.  The client channel provides an adaptor class
175 // (SubchannelWrapper) that "converts" between the two.
176 class Subchannel {
177  public:
178   class ConnectivityStateWatcherInterface
179       : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
180    public:
181     virtual ~ConnectivityStateWatcherInterface() = default;
182
183     // Will be invoked whenever the subchannel's connectivity state
184     // changes.  There will be only one invocation of this method on a
185     // given watcher instance at any given time.
186     //
187     // When the state changes to READY, connected_subchannel will
188     // contain a ref to the connected subchannel.  When it changes from
189     // READY to some other state, the implementation must release its
190     // ref to the connected subchannel.
191     virtual void OnConnectivityStateChange(
192         grpc_connectivity_state new_state,
193         RefCountedPtr<ConnectedSubchannel> connected_subchannel)  // NOLINT
194         GRPC_ABSTRACT;
195
196     virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
197
198     GRPC_ABSTRACT_BASE_CLASS
199   };
200
201   // The ctor and dtor are not intended to use directly.
202   Subchannel(SubchannelKey* key, grpc_connector* connector,
203              const grpc_channel_args* args);
204   ~Subchannel();
205
206   // Creates a subchannel given \a connector and \a args.
207   static Subchannel* Create(grpc_connector* connector,
208                             const grpc_channel_args* args);
209
210   // Strong and weak refcounting.
211   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
212   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
213   Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
214   void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
215   // Attempts to return a strong ref when only the weak refcount is guaranteed
216   // non-zero. If the strong refcount is zero, does not alter the refcount and
217   // returns null.
218   Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
219
220   // Gets the string representing the subchannel address.
221   // Caller doesn't take ownership.
222   const char* GetTargetAddress();
223
224   const grpc_channel_args* channel_args() const { return args_; }
225
226   channelz::SubchannelNode* channelz_node();
227
228   // Returns the current connectivity state of the subchannel.
229   // If health_check_service_name is non-null, the returned connectivity
230   // state will be based on the state reported by the backend for that
231   // service name.
232   // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
233   grpc_connectivity_state CheckConnectivityState(
234       const char* health_check_service_name,
235       RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
236
237   // Starts watching the subchannel's connectivity state.
238   // The first callback to the watcher will be delivered when the
239   // subchannel's connectivity state becomes a value other than
240   // initial_state, which may happen immediately.
241   // Subsequent callbacks will be delivered as the subchannel's state
242   // changes.
243   // The watcher will be destroyed either when the subchannel is
244   // destroyed or when CancelConnectivityStateWatch() is called.
245   void WatchConnectivityState(
246       grpc_connectivity_state initial_state,
247       UniquePtr<char> health_check_service_name,
248       OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
249
250   // Cancels a connectivity state watch.
251   // If the watcher has already been destroyed, this is a no-op.
252   void CancelConnectivityStateWatch(const char* health_check_service_name,
253                                     ConnectivityStateWatcherInterface* watcher);
254
255   // Attempt to connect to the backend.  Has no effect if already connected.
256   void AttemptToConnect();
257
258   // Resets the connection backoff of the subchannel.
259   // TODO(roth): Move connection backoff out of subchannels and up into LB
260   // policy code (probably by adding a SubchannelGroup between
261   // SubchannelList and SubchannelData), at which point this method can
262   // go away.
263   void ResetBackoff();
264
265   // Returns a new channel arg encoding the subchannel address as a URI
266   // string. Caller is responsible for freeing the string.
267   static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
268
269   // Returns the URI string from the subchannel address arg in \a args.
270   static const char* GetUriFromSubchannelAddressArg(
271       const grpc_channel_args* args);
272
273   // Sets \a addr from the subchannel address arg in \a args.
274   static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
275                                                  grpc_resolved_address* addr);
276
277  private:
278   // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
279   // the subchannel's state.
280   class ConnectivityStateWatcherList {
281    public:
282     ~ConnectivityStateWatcherList() { Clear(); }
283
284     void AddWatcherLocked(
285         OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
286     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
287
288     // Notifies all watchers in the list about a change to state.
289     void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
290
291     void Clear() { watchers_.clear(); }
292
293     bool empty() const { return watchers_.empty(); }
294
295    private:
296     // TODO(roth): This could be a set instead of a map if we had a set
297     // implementation.
298     Map<ConnectivityStateWatcherInterface*,
299         OrphanablePtr<ConnectivityStateWatcherInterface>>
300         watchers_;
301   };
302
303   // A map that tracks ConnectivityStateWatcherInterfaces using a particular
304   // health check service name.
305   //
306   // There is one entry in the map for each health check service name.
307   // Entries exist only as long as there are watchers using the
308   // corresponding service name.
309   //
310   // A health check client is maintained only while the subchannel is in
311   // state READY.
312   class HealthWatcherMap {
313    public:
314     void AddWatcherLocked(
315         Subchannel* subchannel, grpc_connectivity_state initial_state,
316         UniquePtr<char> health_check_service_name,
317         OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
318     void RemoveWatcherLocked(const char* health_check_service_name,
319                              ConnectivityStateWatcherInterface* watcher);
320
321     // Notifies the watcher when the subchannel's state changes.
322     void NotifyLocked(grpc_connectivity_state state);
323
324     grpc_connectivity_state CheckConnectivityStateLocked(
325         Subchannel* subchannel, const char* health_check_service_name);
326
327     void ShutdownLocked();
328
329    private:
330     class HealthWatcher;
331
332     Map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
333   };
334
335   class ConnectedSubchannelStateWatcher;
336
337   // Sets the subchannel's connectivity state to \a state.
338   void SetConnectivityStateLocked(grpc_connectivity_state state);
339
340   // Methods for connection.
341   void MaybeStartConnectingLocked();
342   static void OnRetryAlarm(void* arg, grpc_error* error);
343   void ContinueConnectingLocked();
344   static void OnConnectingFinished(void* arg, grpc_error* error);
345   bool PublishTransportLocked();
346   void Disconnect();
347
348   gpr_atm RefMutate(gpr_atm delta,
349                     int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
350
351   // The subchannel pool this subchannel is in.
352   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
353   // TODO(juanlishen): Consider using args_ as key_ directly.
354   // Subchannel key that identifies this subchannel in the subchannel pool.
355   SubchannelKey* key_;
356   // Channel args.
357   grpc_channel_args* args_;
358   // pollset_set tracking who's interested in a connection being setup.
359   grpc_pollset_set* pollset_set_;
360   // Protects the other members.
361   Mutex mu_;
362   // Refcount
363   //    - lower INTERNAL_REF_BITS bits are for internal references:
364   //      these do not keep the subchannel open.
365   //    - upper remaining bits are for public references: these do
366   //      keep the subchannel open
367   gpr_atm ref_pair_;
368
369   // Connection states.
370   grpc_connector* connector_ = nullptr;
371   // Set during connection.
372   grpc_connect_out_args connecting_result_;
373   grpc_closure on_connecting_finished_;
374   // Active connection, or null.
375   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
376   bool connecting_ = false;
377   bool disconnected_ = false;
378
379   // Connectivity state tracking.
380   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
381   // The list of watchers without a health check service name.
382   ConnectivityStateWatcherList watcher_list_;
383   // The map of watchers with health check service names.
384   HealthWatcherMap health_watcher_map_;
385
386   // Backoff state.
387   BackOff backoff_;
388   grpc_millis next_attempt_deadline_;
389   grpc_millis min_connect_timeout_ms_;
390   bool backoff_begun_ = false;
391
392   // Retry alarm.
393   grpc_timer retry_alarm_;
394   grpc_closure on_retry_alarm_;
395   bool have_retry_alarm_ = false;
396   // reset_backoff() was called while alarm was pending.
397   bool retry_immediately_ = false;
398
399   // Channelz tracking.
400   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
401 };
402
403 }  // namespace grpc_core
404
405 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */