Imported Upstream version 1.19.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / subchannel.h
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
25 #include "src/core/ext/filters/client_channel/connector.h"
26 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
27 #include "src/core/lib/backoff/backoff.h"
28 #include "src/core/lib/channel/channel_stack.h"
29 #include "src/core/lib/gpr/arena.h"
30 #include "src/core/lib/gprpp/ref_counted.h"
31 #include "src/core/lib/gprpp/ref_counted_ptr.h"
32 #include "src/core/lib/iomgr/polling_entity.h"
33 #include "src/core/lib/iomgr/timer.h"
34 #include "src/core/lib/transport/connectivity_state.h"
35 #include "src/core/lib/transport/metadata.h"
36
37 // Channel arg containing a grpc_resolved_address to connect to.
38 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
39
40 // For debugging refcounting.
41 #ifndef NDEBUG
42 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
43 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
44   (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
45 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
46 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
47 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
49   const char *file, int line, const char *reason
50 #define GRPC_SUBCHANNEL_REF_REASON reason
51 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
52   , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
53 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
54 #else
55 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
56 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
57 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
58 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
59 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
60 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
61 #define GRPC_SUBCHANNEL_REF_REASON ""
62 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
63 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
64 #endif
65
66 namespace grpc_core {
67
68 class SubchannelCall;
69
70 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
71  public:
72   struct CallArgs {
73     grpc_polling_entity* pollent;
74     grpc_slice path;
75     gpr_timespec start_time;
76     grpc_millis deadline;
77     gpr_arena* arena;
78     grpc_call_context_element* context;
79     grpc_call_combiner* call_combiner;
80     size_t parent_data_size;
81   };
82
83   ConnectedSubchannel(
84       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
85       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel,
86       intptr_t socket_uuid);
87   ~ConnectedSubchannel();
88
89   void NotifyOnStateChange(grpc_pollset_set* interested_parties,
90                            grpc_connectivity_state* state,
91                            grpc_closure* closure);
92   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
93   RefCountedPtr<SubchannelCall> CreateCall(const CallArgs& args,
94                                            grpc_error** error);
95
96   grpc_channel_stack* channel_stack() const { return channel_stack_; }
97   const grpc_channel_args* args() const { return args_; }
98   channelz::SubchannelNode* channelz_subchannel() const {
99     return channelz_subchannel_.get();
100   }
101   intptr_t socket_uuid() const { return socket_uuid_; }
102
103   size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
104
105  private:
106   grpc_channel_stack* channel_stack_;
107   grpc_channel_args* args_;
108   // ref counted pointer to the channelz node in this connected subchannel's
109   // owning subchannel.
110   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
111   // uuid of this subchannel's socket. 0 if this subchannel is not connected.
112   const intptr_t socket_uuid_;
113 };
114
115 // Implements the interface of RefCounted<>.
116 class SubchannelCall {
117  public:
118   SubchannelCall(RefCountedPtr<ConnectedSubchannel> connected_subchannel,
119                  const ConnectedSubchannel::CallArgs& args)
120       : connected_subchannel_(std::move(connected_subchannel)),
121         deadline_(args.deadline) {}
122
123   // Continues processing a transport stream op batch.
124   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
125
126   // Returns a pointer to the parent data associated with the subchannel call.
127   // The data will be of the size specified in \a parent_data_size field of
128   // the args passed to \a ConnectedSubchannel::CreateCall().
129   void* GetParentData();
130
131   // Returns the call stack of the subchannel call.
132   grpc_call_stack* GetCallStack();
133
134   // Sets the 'then_schedule_closure' argument for call stack destruction.
135   // Must be called once per call.
136   void SetAfterCallStackDestroy(grpc_closure* closure);
137
138   // Interface of RefCounted<>.
139   RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
140   RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
141                                     const char* reason) GRPC_MUST_USE_RESULT;
142   // When refcount drops to 0, destroys itself and the associated call stack,
143   // but does NOT free the memory because it's in the call arena.
144   void Unref();
145   void Unref(const DebugLocation& location, const char* reason);
146
147   static void Destroy(void* arg, grpc_error* error);
148
149  private:
150   // Allow RefCountedPtr<> to access IncrementRefCount().
151   template <typename T>
152   friend class RefCountedPtr;
153
154   // If channelz is enabled, intercepts recv_trailing so that we may check the
155   // status and associate it to a subchannel.
156   void MaybeInterceptRecvTrailingMetadata(
157       grpc_transport_stream_op_batch* batch);
158
159   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
160
161   // Interface of RefCounted<>.
162   void IncrementRefCount();
163   void IncrementRefCount(const DebugLocation& location, const char* reason);
164
165   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
166   grpc_closure* after_call_stack_destroy_ = nullptr;
167   // State needed to support channelz interception of recv trailing metadata.
168   grpc_closure recv_trailing_metadata_ready_;
169   grpc_closure* original_recv_trailing_metadata_ = nullptr;
170   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
171   grpc_millis deadline_;
172 };
173
174 // A subchannel that knows how to connect to exactly one target address. It
175 // provides a target for load balancing.
176 class Subchannel {
177  public:
178   // The ctor and dtor are not intended to use directly.
179   Subchannel(SubchannelKey* key, grpc_connector* connector,
180              const grpc_channel_args* args);
181   ~Subchannel();
182
183   // Creates a subchannel given \a connector and \a args.
184   static Subchannel* Create(grpc_connector* connector,
185                             const grpc_channel_args* args);
186
187   // Strong and weak refcounting.
188   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
189   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
190   Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
191   void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
192   Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
193
194   intptr_t GetChildSocketUuid();
195
196   // Gets the string representing the subchannel address.
197   // Caller doesn't take ownership.
198   const char* GetTargetAddress();
199
200   // Gets the connected subchannel - or nullptr if not connected (which may
201   // happen before it initially connects or during transient failures).
202   RefCountedPtr<ConnectedSubchannel> connected_subchannel();
203
204   channelz::SubchannelNode* channelz_node();
205
206   // Polls the current connectivity state of the subchannel.
207   grpc_connectivity_state CheckConnectivity(grpc_error** error,
208                                             bool inhibit_health_checking);
209
210   // When the connectivity state of the subchannel changes from \a *state,
211   // invokes \a notify and updates \a *state with the new state.
212   void NotifyOnStateChange(grpc_pollset_set* interested_parties,
213                            grpc_connectivity_state* state, grpc_closure* notify,
214                            bool inhibit_health_checking);
215
216   // Resets the connection backoff of the subchannel.
217   // TODO(roth): Move connection backoff out of subchannels and up into LB
218   // policy code (probably by adding a SubchannelGroup between
219   // SubchannelList and SubchannelData), at which point this method can
220   // go away.
221   void ResetBackoff();
222
223   // Returns a new channel arg encoding the subchannel address as a URI
224   // string. Caller is responsible for freeing the string.
225   static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
226
227   // Returns the URI string from the subchannel address arg in \a args.
228   static const char* GetUriFromSubchannelAddressArg(
229       const grpc_channel_args* args);
230
231   // Sets \a addr from the subchannel address arg in \a args.
232   static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
233                                                  grpc_resolved_address* addr);
234
235  private:
236   struct ExternalStateWatcher;
237   class ConnectedSubchannelStateWatcher;
238
239   // Sets the subchannel's connectivity state to \a state.
240   void SetConnectivityStateLocked(grpc_connectivity_state state,
241                                   grpc_error* error, const char* reason);
242
243   // Methods for connection.
244   void MaybeStartConnectingLocked();
245   static void OnRetryAlarm(void* arg, grpc_error* error);
246   void ContinueConnectingLocked();
247   static void OnConnectingFinished(void* arg, grpc_error* error);
248   bool PublishTransportLocked();
249   void Disconnect();
250
251   gpr_atm RefMutate(gpr_atm delta,
252                     int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
253
254   // The subchannel pool this subchannel is in.
255   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
256   // TODO(juanlishen): Consider using args_ as key_ directly.
257   // Subchannel key that identifies this subchannel in the subchannel pool.
258   SubchannelKey* key_;
259   // Channel args.
260   grpc_channel_args* args_;
261   // pollset_set tracking who's interested in a connection being setup.
262   grpc_pollset_set* pollset_set_;
263   // Protects the other members.
264   gpr_mu mu_;
265   // Refcount
266   //    - lower INTERNAL_REF_BITS bits are for internal references:
267   //      these do not keep the subchannel open.
268   //    - upper remaining bits are for public references: these do
269   //      keep the subchannel open
270   gpr_atm ref_pair_;
271
272   // Connection states.
273   grpc_connector* connector_ = nullptr;
274   // Set during connection.
275   grpc_connect_out_args connecting_result_;
276   grpc_closure on_connecting_finished_;
277   // Active connection, or null.
278   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
279   OrphanablePtr<ConnectedSubchannelStateWatcher> connected_subchannel_watcher_;
280   bool connecting_ = false;
281   bool disconnected_ = false;
282
283   // Connectivity state tracking.
284   grpc_connectivity_state_tracker state_tracker_;
285   grpc_connectivity_state_tracker state_and_health_tracker_;
286   UniquePtr<char> health_check_service_name_;
287   ExternalStateWatcher* external_state_watcher_list_ = nullptr;
288
289   // Backoff state.
290   BackOff backoff_;
291   grpc_millis next_attempt_deadline_;
292   grpc_millis min_connect_timeout_ms_;
293   bool backoff_begun_ = false;
294
295   // Retry alarm.
296   grpc_timer retry_alarm_;
297   grpc_closure on_retry_alarm_;
298   bool have_retry_alarm_ = false;
299   // reset_backoff() was called while alarm was pending.
300   bool retry_immediately_ = false;
301
302   // Channelz tracking.
303   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
304 };
305
306 }  // namespace grpc_core
307
308 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */