Imported Upstream version 1.26.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / subchannel.h
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
25 #include "src/core/ext/filters/client_channel/connector.h"
26 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
27 #include "src/core/lib/backoff/backoff.h"
28 #include "src/core/lib/channel/channel_stack.h"
29 #include "src/core/lib/gpr/time_precise.h"
30 #include "src/core/lib/gprpp/arena.h"
31 #include "src/core/lib/gprpp/map.h"
32 #include "src/core/lib/gprpp/ref_counted.h"
33 #include "src/core/lib/gprpp/ref_counted_ptr.h"
34 #include "src/core/lib/gprpp/sync.h"
35 #include "src/core/lib/iomgr/polling_entity.h"
36 #include "src/core/lib/iomgr/timer.h"
37 #include "src/core/lib/transport/connectivity_state.h"
38 #include "src/core/lib/transport/metadata.h"
39
40 // Channel arg containing a grpc_resolved_address to connect to.
41 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
42
43 // For debugging refcounting.
44 #ifndef NDEBUG
45 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
46 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
47 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
49 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
50 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
51   const char *file, int line, const char *reason
52 #define GRPC_SUBCHANNEL_REF_REASON reason
53 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
54   , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
55 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
56 #else
57 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
58 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
59 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
60 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
61 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
62 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
63 #define GRPC_SUBCHANNEL_REF_REASON ""
64 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
65 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
66 #endif
67
68 namespace grpc_core {
69
70 class SubchannelCall;
71
72 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
73  public:
74   ConnectedSubchannel(
75       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
76       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
77   ~ConnectedSubchannel();
78
79   void StartWatch(grpc_pollset_set* interested_parties,
80                   OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
81
82   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
83
84   grpc_channel_stack* channel_stack() const { return channel_stack_; }
85   const grpc_channel_args* args() const { return args_; }
86   channelz::SubchannelNode* channelz_subchannel() const {
87     return channelz_subchannel_.get();
88   }
89
90   size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
91
92  private:
93   grpc_channel_stack* channel_stack_;
94   grpc_channel_args* args_;
95   // ref counted pointer to the channelz node in this connected subchannel's
96   // owning subchannel.
97   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
98 };
99
100 // Implements the interface of RefCounted<>.
101 class SubchannelCall {
102  public:
103   struct Args {
104     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
105     grpc_polling_entity* pollent;
106     grpc_slice path;
107     gpr_cycle_counter start_time;
108     grpc_millis deadline;
109     Arena* arena;
110     grpc_call_context_element* context;
111     CallCombiner* call_combiner;
112     size_t parent_data_size;
113   };
114   static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
115
116   // Continues processing a transport stream op batch.
117   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
118
119   // Returns a pointer to the parent data associated with the subchannel call.
120   // The data will be of the size specified in \a parent_data_size field of
121   // the args passed to \a ConnectedSubchannel::CreateCall().
122   void* GetParentData();
123
124   // Returns the call stack of the subchannel call.
125   grpc_call_stack* GetCallStack();
126
127   // Sets the 'then_schedule_closure' argument for call stack destruction.
128   // Must be called once per call.
129   void SetAfterCallStackDestroy(grpc_closure* closure);
130
131   // Interface of RefCounted<>.
132   RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
133   RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
134                                     const char* reason) GRPC_MUST_USE_RESULT;
135   // When refcount drops to 0, destroys itself and the associated call stack,
136   // but does NOT free the memory because it's in the call arena.
137   void Unref();
138   void Unref(const DebugLocation& location, const char* reason);
139
140   static void Destroy(void* arg, grpc_error* error);
141
142  private:
143   // Allow RefCountedPtr<> to access IncrementRefCount().
144   template <typename T>
145   friend class RefCountedPtr;
146
147   SubchannelCall(Args args, grpc_error** error);
148
149   // If channelz is enabled, intercepts recv_trailing so that we may check the
150   // status and associate it to a subchannel.
151   void MaybeInterceptRecvTrailingMetadata(
152       grpc_transport_stream_op_batch* batch);
153
154   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
155
156   // Interface of RefCounted<>.
157   void IncrementRefCount();
158   void IncrementRefCount(const DebugLocation& location, const char* reason);
159
160   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
161   grpc_closure* after_call_stack_destroy_ = nullptr;
162   // State needed to support channelz interception of recv trailing metadata.
163   grpc_closure recv_trailing_metadata_ready_;
164   grpc_closure* original_recv_trailing_metadata_ = nullptr;
165   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
166   grpc_millis deadline_;
167 };
168
169 // A subchannel that knows how to connect to exactly one target address. It
170 // provides a target for load balancing.
171 //
172 // Note that this is the "real" subchannel implementation, whose API is
173 // different from the SubchannelInterface that is exposed to LB policy
174 // implementations.  The client channel provides an adaptor class
175 // (SubchannelWrapper) that "converts" between the two.
176 class Subchannel {
177  public:
178   class ConnectivityStateWatcherInterface
179       : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
180    public:
181     virtual ~ConnectivityStateWatcherInterface() = default;
182
183     // Will be invoked whenever the subchannel's connectivity state
184     // changes.  There will be only one invocation of this method on a
185     // given watcher instance at any given time.
186     //
187     // When the state changes to READY, connected_subchannel will
188     // contain a ref to the connected subchannel.  When it changes from
189     // READY to some other state, the implementation must release its
190     // ref to the connected subchannel.
191     virtual void OnConnectivityStateChange(
192         grpc_connectivity_state new_state,
193         RefCountedPtr<ConnectedSubchannel> connected_subchannel)  // NOLINT
194         = 0;
195
196     virtual grpc_pollset_set* interested_parties() = 0;
197   };
198
199   // The ctor and dtor are not intended to use directly.
200   Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
201              const grpc_channel_args* args);
202   ~Subchannel();
203
204   // Creates a subchannel given \a connector and \a args.
205   static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
206                             const grpc_channel_args* args);
207
208   // Strong and weak refcounting.
209   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
210   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
211   Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
212   void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
213   // Attempts to return a strong ref when only the weak refcount is guaranteed
214   // non-zero. If the strong refcount is zero, does not alter the refcount and
215   // returns null.
216   Subchannel* RefFromWeakRef();
217
218   // Gets the string representing the subchannel address.
219   // Caller doesn't take ownership.
220   const char* GetTargetAddress();
221
222   const grpc_channel_args* channel_args() const { return args_; }
223
224   channelz::SubchannelNode* channelz_node();
225
226   // Returns the current connectivity state of the subchannel.
227   // If health_check_service_name is non-null, the returned connectivity
228   // state will be based on the state reported by the backend for that
229   // service name.
230   // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
231   grpc_connectivity_state CheckConnectivityState(
232       const char* health_check_service_name,
233       RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
234
235   // Starts watching the subchannel's connectivity state.
236   // The first callback to the watcher will be delivered when the
237   // subchannel's connectivity state becomes a value other than
238   // initial_state, which may happen immediately.
239   // Subsequent callbacks will be delivered as the subchannel's state
240   // changes.
241   // The watcher will be destroyed either when the subchannel is
242   // destroyed or when CancelConnectivityStateWatch() is called.
243   void WatchConnectivityState(
244       grpc_connectivity_state initial_state,
245       grpc_core::UniquePtr<char> health_check_service_name,
246       OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
247
248   // Cancels a connectivity state watch.
249   // If the watcher has already been destroyed, this is a no-op.
250   void CancelConnectivityStateWatch(const char* health_check_service_name,
251                                     ConnectivityStateWatcherInterface* watcher);
252
253   // Attempt to connect to the backend.  Has no effect if already connected.
254   void AttemptToConnect();
255
256   // Resets the connection backoff of the subchannel.
257   // TODO(roth): Move connection backoff out of subchannels and up into LB
258   // policy code (probably by adding a SubchannelGroup between
259   // SubchannelList and SubchannelData), at which point this method can
260   // go away.
261   void ResetBackoff();
262
263   // Returns a new channel arg encoding the subchannel address as a URI
264   // string. Caller is responsible for freeing the string.
265   static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
266
267   // Returns the URI string from the subchannel address arg in \a args.
268   static const char* GetUriFromSubchannelAddressArg(
269       const grpc_channel_args* args);
270
271   // Sets \a addr from the subchannel address arg in \a args.
272   static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
273                                                  grpc_resolved_address* addr);
274
275  private:
276   // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
277   // the subchannel's state.
278   class ConnectivityStateWatcherList {
279    public:
280     ~ConnectivityStateWatcherList() { Clear(); }
281
282     void AddWatcherLocked(
283         OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
284     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
285
286     // Notifies all watchers in the list about a change to state.
287     void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
288
289     void Clear() { watchers_.clear(); }
290
291     bool empty() const { return watchers_.empty(); }
292
293    private:
294     // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
295     // be a set instead of a map.
296     std::map<ConnectivityStateWatcherInterface*,
297              OrphanablePtr<ConnectivityStateWatcherInterface>>
298         watchers_;
299   };
300
301   // A map that tracks ConnectivityStateWatcherInterfaces using a particular
302   // health check service name.
303   //
304   // There is one entry in the map for each health check service name.
305   // Entries exist only as long as there are watchers using the
306   // corresponding service name.
307   //
308   // A health check client is maintained only while the subchannel is in
309   // state READY.
310   class HealthWatcherMap {
311    public:
312     void AddWatcherLocked(
313         Subchannel* subchannel, grpc_connectivity_state initial_state,
314         grpc_core::UniquePtr<char> health_check_service_name,
315         OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
316     void RemoveWatcherLocked(const char* health_check_service_name,
317                              ConnectivityStateWatcherInterface* watcher);
318
319     // Notifies the watcher when the subchannel's state changes.
320     void NotifyLocked(grpc_connectivity_state state);
321
322     grpc_connectivity_state CheckConnectivityStateLocked(
323         Subchannel* subchannel, const char* health_check_service_name);
324
325     void ShutdownLocked();
326
327    private:
328     class HealthWatcher;
329
330     std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
331   };
332
333   class ConnectedSubchannelStateWatcher;
334
335   // Sets the subchannel's connectivity state to \a state.
336   void SetConnectivityStateLocked(grpc_connectivity_state state);
337
338   // Methods for connection.
339   void MaybeStartConnectingLocked();
340   static void OnRetryAlarm(void* arg, grpc_error* error);
341   void ContinueConnectingLocked();
342   static void OnConnectingFinished(void* arg, grpc_error* error);
343   bool PublishTransportLocked();
344   void Disconnect();
345
346   gpr_atm RefMutate(gpr_atm delta,
347                     int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
348
349   // The subchannel pool this subchannel is in.
350   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
351   // TODO(juanlishen): Consider using args_ as key_ directly.
352   // Subchannel key that identifies this subchannel in the subchannel pool.
353   SubchannelKey* key_;
354   // Channel args.
355   grpc_channel_args* args_;
356   // pollset_set tracking who's interested in a connection being setup.
357   grpc_pollset_set* pollset_set_;
358   // Protects the other members.
359   Mutex mu_;
360   // Refcount
361   //    - lower INTERNAL_REF_BITS bits are for internal references:
362   //      these do not keep the subchannel open.
363   //    - upper remaining bits are for public references: these do
364   //      keep the subchannel open
365   gpr_atm ref_pair_;
366
367   // Connection states.
368   OrphanablePtr<SubchannelConnector> connector_;
369   // Set during connection.
370   SubchannelConnector::Result connecting_result_;
371   grpc_closure on_connecting_finished_;
372   // Active connection, or null.
373   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
374   bool connecting_ = false;
375   bool disconnected_ = false;
376
377   // Connectivity state tracking.
378   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
379   // The list of watchers without a health check service name.
380   ConnectivityStateWatcherList watcher_list_;
381   // The map of watchers with health check service names.
382   HealthWatcherMap health_watcher_map_;
383
384   // Backoff state.
385   BackOff backoff_;
386   grpc_millis next_attempt_deadline_;
387   grpc_millis min_connect_timeout_ms_;
388   bool backoff_begun_ = false;
389
390   // Retry alarm.
391   grpc_timer retry_alarm_;
392   grpc_closure on_retry_alarm_;
393   bool have_retry_alarm_ = false;
394   // reset_backoff() was called while alarm was pending.
395   bool retry_immediately_ = false;
396
397   // Channelz tracking.
398   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
399 };
400
401 }  // namespace grpc_core
402
403 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */