3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
22 #include <grpc/support/port_platform.h>
24 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
25 #include "src/core/ext/filters/client_channel/connector.h"
26 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
27 #include "src/core/lib/backoff/backoff.h"
28 #include "src/core/lib/channel/channel_stack.h"
29 #include "src/core/lib/gprpp/arena.h"
30 #include "src/core/lib/gprpp/ref_counted.h"
31 #include "src/core/lib/gprpp/ref_counted_ptr.h"
32 #include "src/core/lib/gprpp/sync.h"
33 #include "src/core/lib/iomgr/polling_entity.h"
34 #include "src/core/lib/iomgr/timer.h"
35 #include "src/core/lib/transport/connectivity_state.h"
36 #include "src/core/lib/transport/metadata.h"
38 // Channel arg containing a grpc_resolved_address to connect to.
39 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
41 // For debugging refcounting.
43 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
44 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
45 (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
46 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
47 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
49 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
50 const char *file, int line, const char *reason
51 #define GRPC_SUBCHANNEL_REF_REASON reason
52 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
53 , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
54 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
56 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
57 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
58 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
59 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
60 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
61 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
62 #define GRPC_SUBCHANNEL_REF_REASON ""
63 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
64 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
71 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
74 grpc_polling_entity* pollent;
76 gpr_timespec start_time;
79 grpc_call_context_element* context;
80 grpc_core::CallCombiner* call_combiner;
81 size_t parent_data_size;
85 grpc_channel_stack* channel_stack, const grpc_channel_args* args,
86 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel,
87 intptr_t socket_uuid);
88 ~ConnectedSubchannel();
90 void NotifyOnStateChange(grpc_pollset_set* interested_parties,
91 grpc_connectivity_state* state,
92 grpc_closure* closure);
93 void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
94 RefCountedPtr<SubchannelCall> CreateCall(const CallArgs& args,
97 grpc_channel_stack* channel_stack() const { return channel_stack_; }
98 const grpc_channel_args* args() const { return args_; }
99 channelz::SubchannelNode* channelz_subchannel() const {
100 return channelz_subchannel_.get();
102 intptr_t socket_uuid() const { return socket_uuid_; }
104 size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
107 grpc_channel_stack* channel_stack_;
108 grpc_channel_args* args_;
109 // ref counted pointer to the channelz node in this connected subchannel's
110 // owning subchannel.
111 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
112 // uuid of this subchannel's socket. 0 if this subchannel is not connected.
113 const intptr_t socket_uuid_;
116 // Implements the interface of RefCounted<>.
117 class SubchannelCall {
119 SubchannelCall(RefCountedPtr<ConnectedSubchannel> connected_subchannel,
120 const ConnectedSubchannel::CallArgs& args)
121 : connected_subchannel_(std::move(connected_subchannel)),
122 deadline_(args.deadline) {}
124 // Continues processing a transport stream op batch.
125 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
127 // Returns a pointer to the parent data associated with the subchannel call.
128 // The data will be of the size specified in \a parent_data_size field of
129 // the args passed to \a ConnectedSubchannel::CreateCall().
130 void* GetParentData();
132 // Returns the call stack of the subchannel call.
133 grpc_call_stack* GetCallStack();
135 // Sets the 'then_schedule_closure' argument for call stack destruction.
136 // Must be called once per call.
137 void SetAfterCallStackDestroy(grpc_closure* closure);
139 // Interface of RefCounted<>.
140 RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
141 RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
142 const char* reason) GRPC_MUST_USE_RESULT;
143 // When refcount drops to 0, destroys itself and the associated call stack,
144 // but does NOT free the memory because it's in the call arena.
146 void Unref(const DebugLocation& location, const char* reason);
148 static void Destroy(void* arg, grpc_error* error);
151 // Allow RefCountedPtr<> to access IncrementRefCount().
152 template <typename T>
153 friend class RefCountedPtr;
155 // If channelz is enabled, intercepts recv_trailing so that we may check the
156 // status and associate it to a subchannel.
157 void MaybeInterceptRecvTrailingMetadata(
158 grpc_transport_stream_op_batch* batch);
160 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
162 // Interface of RefCounted<>.
163 void IncrementRefCount();
164 void IncrementRefCount(const DebugLocation& location, const char* reason);
166 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
167 grpc_closure* after_call_stack_destroy_ = nullptr;
168 // State needed to support channelz interception of recv trailing metadata.
169 grpc_closure recv_trailing_metadata_ready_;
170 grpc_closure* original_recv_trailing_metadata_ = nullptr;
171 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
172 grpc_millis deadline_;
175 // A subchannel that knows how to connect to exactly one target address. It
176 // provides a target for load balancing.
179 // The ctor and dtor are not intended to use directly.
180 Subchannel(SubchannelKey* key, grpc_connector* connector,
181 const grpc_channel_args* args);
184 // Creates a subchannel given \a connector and \a args.
185 static Subchannel* Create(grpc_connector* connector,
186 const grpc_channel_args* args);
188 // Strong and weak refcounting.
189 Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
190 void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
191 Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
192 void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
193 // Attempts to return a strong ref when only the weak refcount is guaranteed
194 // non-zero. If the strong refcount is zero, does not alter the refcount and
196 Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
198 intptr_t GetChildSocketUuid();
200 // Gets the string representing the subchannel address.
201 // Caller doesn't take ownership.
202 const char* GetTargetAddress();
204 // Gets the connected subchannel - or nullptr if not connected (which may
205 // happen before it initially connects or during transient failures).
206 RefCountedPtr<ConnectedSubchannel> connected_subchannel();
208 channelz::SubchannelNode* channelz_node();
210 // Polls the current connectivity state of the subchannel.
211 grpc_connectivity_state CheckConnectivity(bool inhibit_health_checking);
213 // When the connectivity state of the subchannel changes from \a *state,
214 // invokes \a notify and updates \a *state with the new state.
215 void NotifyOnStateChange(grpc_pollset_set* interested_parties,
216 grpc_connectivity_state* state, grpc_closure* notify,
217 bool inhibit_health_checking);
219 // Resets the connection backoff of the subchannel.
220 // TODO(roth): Move connection backoff out of subchannels and up into LB
221 // policy code (probably by adding a SubchannelGroup between
222 // SubchannelList and SubchannelData), at which point this method can
226 // Returns a new channel arg encoding the subchannel address as a URI
227 // string. Caller is responsible for freeing the string.
228 static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
230 // Returns the URI string from the subchannel address arg in \a args.
231 static const char* GetUriFromSubchannelAddressArg(
232 const grpc_channel_args* args);
234 // Sets \a addr from the subchannel address arg in \a args.
235 static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
236 grpc_resolved_address* addr);
239 struct ExternalStateWatcher;
240 class ConnectedSubchannelStateWatcher;
242 // Sets the subchannel's connectivity state to \a state.
243 void SetConnectivityStateLocked(grpc_connectivity_state state,
246 // Methods for connection.
247 void MaybeStartConnectingLocked();
248 static void OnRetryAlarm(void* arg, grpc_error* error);
249 void ContinueConnectingLocked();
250 static void OnConnectingFinished(void* arg, grpc_error* error);
251 bool PublishTransportLocked();
254 gpr_atm RefMutate(gpr_atm delta,
255 int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
257 // The subchannel pool this subchannel is in.
258 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
259 // TODO(juanlishen): Consider using args_ as key_ directly.
260 // Subchannel key that identifies this subchannel in the subchannel pool.
263 grpc_channel_args* args_;
264 // pollset_set tracking who's interested in a connection being setup.
265 grpc_pollset_set* pollset_set_;
266 // Protects the other members.
269 // - lower INTERNAL_REF_BITS bits are for internal references:
270 // these do not keep the subchannel open.
271 // - upper remaining bits are for public references: these do
272 // keep the subchannel open
275 // Connection states.
276 grpc_connector* connector_ = nullptr;
277 // Set during connection.
278 grpc_connect_out_args connecting_result_;
279 grpc_closure on_connecting_finished_;
280 // Active connection, or null.
281 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
282 OrphanablePtr<ConnectedSubchannelStateWatcher> connected_subchannel_watcher_;
283 bool connecting_ = false;
284 bool disconnected_ = false;
286 // Connectivity state tracking.
287 grpc_connectivity_state_tracker state_tracker_;
288 grpc_connectivity_state_tracker state_and_health_tracker_;
289 UniquePtr<char> health_check_service_name_;
290 ExternalStateWatcher* external_state_watcher_list_ = nullptr;
294 grpc_millis next_attempt_deadline_;
295 grpc_millis min_connect_timeout_ms_;
296 bool backoff_begun_ = false;
299 grpc_timer retry_alarm_;
300 grpc_closure on_retry_alarm_;
301 bool have_retry_alarm_ = false;
302 // reset_backoff() was called while alarm was pending.
303 bool retry_immediately_ = false;
305 // Channelz tracking.
306 RefCountedPtr<channelz::SubchannelNode> channelz_node_;
309 } // namespace grpc_core
311 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */