Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / subchannel.h
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
25 #include "src/core/ext/filters/client_channel/connector.h"
26 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
27 #include "src/core/lib/backoff/backoff.h"
28 #include "src/core/lib/channel/channel_stack.h"
29 #include "src/core/lib/gprpp/arena.h"
30 #include "src/core/lib/gprpp/ref_counted.h"
31 #include "src/core/lib/gprpp/ref_counted_ptr.h"
32 #include "src/core/lib/gprpp/sync.h"
33 #include "src/core/lib/iomgr/polling_entity.h"
34 #include "src/core/lib/iomgr/timer.h"
35 #include "src/core/lib/transport/connectivity_state.h"
36 #include "src/core/lib/transport/metadata.h"
37
38 // Channel arg containing a grpc_resolved_address to connect to.
39 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
40
41 // For debugging refcounting.
42 #ifndef NDEBUG
43 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
44 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
45   (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
46 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
47 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
49 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
50   const char *file, int line, const char *reason
51 #define GRPC_SUBCHANNEL_REF_REASON reason
52 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
53   , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
54 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
55 #else
56 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
57 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
58 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
59 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
60 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
61 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
62 #define GRPC_SUBCHANNEL_REF_REASON ""
63 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
64 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
65 #endif
66
67 namespace grpc_core {
68
69 class SubchannelCall;
70
71 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
72  public:
73   struct CallArgs {
74     grpc_polling_entity* pollent;
75     grpc_slice path;
76     gpr_timespec start_time;
77     grpc_millis deadline;
78     Arena* arena;
79     grpc_call_context_element* context;
80     grpc_core::CallCombiner* call_combiner;
81     size_t parent_data_size;
82   };
83
84   ConnectedSubchannel(
85       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
86       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel,
87       intptr_t socket_uuid);
88   ~ConnectedSubchannel();
89
90   void NotifyOnStateChange(grpc_pollset_set* interested_parties,
91                            grpc_connectivity_state* state,
92                            grpc_closure* closure);
93   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
94   RefCountedPtr<SubchannelCall> CreateCall(const CallArgs& args,
95                                            grpc_error** error);
96
97   grpc_channel_stack* channel_stack() const { return channel_stack_; }
98   const grpc_channel_args* args() const { return args_; }
99   channelz::SubchannelNode* channelz_subchannel() const {
100     return channelz_subchannel_.get();
101   }
102   intptr_t socket_uuid() const { return socket_uuid_; }
103
104   size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
105
106  private:
107   grpc_channel_stack* channel_stack_;
108   grpc_channel_args* args_;
109   // ref counted pointer to the channelz node in this connected subchannel's
110   // owning subchannel.
111   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
112   // uuid of this subchannel's socket. 0 if this subchannel is not connected.
113   const intptr_t socket_uuid_;
114 };
115
116 // Implements the interface of RefCounted<>.
117 class SubchannelCall {
118  public:
119   SubchannelCall(RefCountedPtr<ConnectedSubchannel> connected_subchannel,
120                  const ConnectedSubchannel::CallArgs& args)
121       : connected_subchannel_(std::move(connected_subchannel)),
122         deadline_(args.deadline) {}
123
124   // Continues processing a transport stream op batch.
125   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
126
127   // Returns a pointer to the parent data associated with the subchannel call.
128   // The data will be of the size specified in \a parent_data_size field of
129   // the args passed to \a ConnectedSubchannel::CreateCall().
130   void* GetParentData();
131
132   // Returns the call stack of the subchannel call.
133   grpc_call_stack* GetCallStack();
134
135   // Sets the 'then_schedule_closure' argument for call stack destruction.
136   // Must be called once per call.
137   void SetAfterCallStackDestroy(grpc_closure* closure);
138
139   // Interface of RefCounted<>.
140   RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
141   RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
142                                     const char* reason) GRPC_MUST_USE_RESULT;
143   // When refcount drops to 0, destroys itself and the associated call stack,
144   // but does NOT free the memory because it's in the call arena.
145   void Unref();
146   void Unref(const DebugLocation& location, const char* reason);
147
148   static void Destroy(void* arg, grpc_error* error);
149
150  private:
151   // Allow RefCountedPtr<> to access IncrementRefCount().
152   template <typename T>
153   friend class RefCountedPtr;
154
155   // If channelz is enabled, intercepts recv_trailing so that we may check the
156   // status and associate it to a subchannel.
157   void MaybeInterceptRecvTrailingMetadata(
158       grpc_transport_stream_op_batch* batch);
159
160   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
161
162   // Interface of RefCounted<>.
163   void IncrementRefCount();
164   void IncrementRefCount(const DebugLocation& location, const char* reason);
165
166   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
167   grpc_closure* after_call_stack_destroy_ = nullptr;
168   // State needed to support channelz interception of recv trailing metadata.
169   grpc_closure recv_trailing_metadata_ready_;
170   grpc_closure* original_recv_trailing_metadata_ = nullptr;
171   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
172   grpc_millis deadline_;
173 };
174
175 // A subchannel that knows how to connect to exactly one target address. It
176 // provides a target for load balancing.
177 class Subchannel {
178  public:
179   // The ctor and dtor are not intended to use directly.
180   Subchannel(SubchannelKey* key, grpc_connector* connector,
181              const grpc_channel_args* args);
182   ~Subchannel();
183
184   // Creates a subchannel given \a connector and \a args.
185   static Subchannel* Create(grpc_connector* connector,
186                             const grpc_channel_args* args);
187
188   // Strong and weak refcounting.
189   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
190   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
191   Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
192   void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
193   // Attempts to return a strong ref when only the weak refcount is guaranteed
194   // non-zero. If the strong refcount is zero, does not alter the refcount and
195   // returns null.
196   Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
197
198   intptr_t GetChildSocketUuid();
199
200   // Gets the string representing the subchannel address.
201   // Caller doesn't take ownership.
202   const char* GetTargetAddress();
203
204   // Gets the connected subchannel - or nullptr if not connected (which may
205   // happen before it initially connects or during transient failures).
206   RefCountedPtr<ConnectedSubchannel> connected_subchannel();
207
208   channelz::SubchannelNode* channelz_node();
209
210   // Polls the current connectivity state of the subchannel.
211   grpc_connectivity_state CheckConnectivity(bool inhibit_health_checking);
212
213   // When the connectivity state of the subchannel changes from \a *state,
214   // invokes \a notify and updates \a *state with the new state.
215   void NotifyOnStateChange(grpc_pollset_set* interested_parties,
216                            grpc_connectivity_state* state, grpc_closure* notify,
217                            bool inhibit_health_checking);
218
219   // Resets the connection backoff of the subchannel.
220   // TODO(roth): Move connection backoff out of subchannels and up into LB
221   // policy code (probably by adding a SubchannelGroup between
222   // SubchannelList and SubchannelData), at which point this method can
223   // go away.
224   void ResetBackoff();
225
226   // Returns a new channel arg encoding the subchannel address as a URI
227   // string. Caller is responsible for freeing the string.
228   static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
229
230   // Returns the URI string from the subchannel address arg in \a args.
231   static const char* GetUriFromSubchannelAddressArg(
232       const grpc_channel_args* args);
233
234   // Sets \a addr from the subchannel address arg in \a args.
235   static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
236                                                  grpc_resolved_address* addr);
237
238  private:
239   struct ExternalStateWatcher;
240   class ConnectedSubchannelStateWatcher;
241
242   // Sets the subchannel's connectivity state to \a state.
243   void SetConnectivityStateLocked(grpc_connectivity_state state,
244                                   const char* reason);
245
246   // Methods for connection.
247   void MaybeStartConnectingLocked();
248   static void OnRetryAlarm(void* arg, grpc_error* error);
249   void ContinueConnectingLocked();
250   static void OnConnectingFinished(void* arg, grpc_error* error);
251   bool PublishTransportLocked();
252   void Disconnect();
253
254   gpr_atm RefMutate(gpr_atm delta,
255                     int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
256
257   // The subchannel pool this subchannel is in.
258   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
259   // TODO(juanlishen): Consider using args_ as key_ directly.
260   // Subchannel key that identifies this subchannel in the subchannel pool.
261   SubchannelKey* key_;
262   // Channel args.
263   grpc_channel_args* args_;
264   // pollset_set tracking who's interested in a connection being setup.
265   grpc_pollset_set* pollset_set_;
266   // Protects the other members.
267   Mutex mu_;
268   // Refcount
269   //    - lower INTERNAL_REF_BITS bits are for internal references:
270   //      these do not keep the subchannel open.
271   //    - upper remaining bits are for public references: these do
272   //      keep the subchannel open
273   gpr_atm ref_pair_;
274
275   // Connection states.
276   grpc_connector* connector_ = nullptr;
277   // Set during connection.
278   grpc_connect_out_args connecting_result_;
279   grpc_closure on_connecting_finished_;
280   // Active connection, or null.
281   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
282   OrphanablePtr<ConnectedSubchannelStateWatcher> connected_subchannel_watcher_;
283   bool connecting_ = false;
284   bool disconnected_ = false;
285
286   // Connectivity state tracking.
287   grpc_connectivity_state_tracker state_tracker_;
288   grpc_connectivity_state_tracker state_and_health_tracker_;
289   UniquePtr<char> health_check_service_name_;
290   ExternalStateWatcher* external_state_watcher_list_ = nullptr;
291
292   // Backoff state.
293   BackOff backoff_;
294   grpc_millis next_attempt_deadline_;
295   grpc_millis min_connect_timeout_ms_;
296   bool backoff_begun_ = false;
297
298   // Retry alarm.
299   grpc_timer retry_alarm_;
300   grpc_closure on_retry_alarm_;
301   bool have_retry_alarm_ = false;
302   // reset_backoff() was called while alarm was pending.
303   bool retry_immediately_ = false;
304
305   // Channelz tracking.
306   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
307 };
308
309 }  // namespace grpc_core
310
311 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */