3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
22 #include <grpc/support/port_platform.h>
24 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
25 #include "src/core/ext/filters/client_channel/connector.h"
26 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
27 #include "src/core/lib/backoff/backoff.h"
28 #include "src/core/lib/channel/channel_stack.h"
29 #include "src/core/lib/gprpp/arena.h"
30 #include "src/core/lib/gprpp/map.h"
31 #include "src/core/lib/gprpp/ref_counted.h"
32 #include "src/core/lib/gprpp/ref_counted_ptr.h"
33 #include "src/core/lib/gprpp/sync.h"
34 #include "src/core/lib/iomgr/polling_entity.h"
35 #include "src/core/lib/iomgr/timer.h"
36 #include "src/core/lib/transport/connectivity_state.h"
37 #include "src/core/lib/transport/metadata.h"
39 // Channel arg containing a grpc_resolved_address to connect to.
40 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
42 // For debugging refcounting.
44 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
45 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
46 (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
47 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
49 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
50 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
51 const char *file, int line, const char *reason
52 #define GRPC_SUBCHANNEL_REF_REASON reason
53 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
54 , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
55 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
57 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
58 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
59 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
60 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
61 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
62 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
63 #define GRPC_SUBCHANNEL_REF_REASON ""
64 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
65 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
72 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
75 grpc_channel_stack* channel_stack, const grpc_channel_args* args,
76 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
77 ~ConnectedSubchannel();
79 void NotifyOnStateChange(grpc_pollset_set* interested_parties,
80 grpc_connectivity_state* state,
81 grpc_closure* closure);
82 void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
84 grpc_channel_stack* channel_stack() const { return channel_stack_; }
85 const grpc_channel_args* args() const { return args_; }
86 channelz::SubchannelNode* channelz_subchannel() const {
87 return channelz_subchannel_.get();
90 size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
93 grpc_channel_stack* channel_stack_;
94 grpc_channel_args* args_;
95 // ref counted pointer to the channelz node in this connected subchannel's
97 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
100 // Implements the interface of RefCounted<>.
101 class SubchannelCall {
104 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
105 grpc_polling_entity* pollent;
107 gpr_timespec start_time;
108 grpc_millis deadline;
110 grpc_call_context_element* context;
111 CallCombiner* call_combiner;
112 size_t parent_data_size;
114 static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
116 // Continues processing a transport stream op batch.
117 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
119 // Returns a pointer to the parent data associated with the subchannel call.
120 // The data will be of the size specified in \a parent_data_size field of
121 // the args passed to \a ConnectedSubchannel::CreateCall().
122 void* GetParentData();
124 // Returns the call stack of the subchannel call.
125 grpc_call_stack* GetCallStack();
127 // Sets the 'then_schedule_closure' argument for call stack destruction.
128 // Must be called once per call.
129 void SetAfterCallStackDestroy(grpc_closure* closure);
131 // Interface of RefCounted<>.
132 RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
133 RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
134 const char* reason) GRPC_MUST_USE_RESULT;
135 // When refcount drops to 0, destroys itself and the associated call stack,
136 // but does NOT free the memory because it's in the call arena.
138 void Unref(const DebugLocation& location, const char* reason);
140 static void Destroy(void* arg, grpc_error* error);
143 // Allow RefCountedPtr<> to access IncrementRefCount().
144 template <typename T>
145 friend class RefCountedPtr;
147 SubchannelCall(Args args, grpc_error** error);
149 // If channelz is enabled, intercepts recv_trailing so that we may check the
150 // status and associate it to a subchannel.
151 void MaybeInterceptRecvTrailingMetadata(
152 grpc_transport_stream_op_batch* batch);
154 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
156 // Interface of RefCounted<>.
157 void IncrementRefCount();
158 void IncrementRefCount(const DebugLocation& location, const char* reason);
160 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
161 grpc_closure* after_call_stack_destroy_ = nullptr;
162 // State needed to support channelz interception of recv trailing metadata.
163 grpc_closure recv_trailing_metadata_ready_;
164 grpc_closure* original_recv_trailing_metadata_ = nullptr;
165 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
166 grpc_millis deadline_;
169 // A subchannel that knows how to connect to exactly one target address. It
170 // provides a target for load balancing.
172 // Note that this is the "real" subchannel implementation, whose API is
173 // different from the SubchannelInterface that is exposed to LB policy
174 // implementations. The client channel provides an adaptor class
175 // (SubchannelWrapper) that "converts" between the two.
178 class ConnectivityStateWatcherInterface
179 : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
181 virtual ~ConnectivityStateWatcherInterface() = default;
183 // Will be invoked whenever the subchannel's connectivity state
184 // changes. There will be only one invocation of this method on a
185 // given watcher instance at any given time.
187 // When the state changes to READY, connected_subchannel will
188 // contain a ref to the connected subchannel. When it changes from
189 // READY to some other state, the implementation must release its
190 // ref to the connected subchannel.
191 virtual void OnConnectivityStateChange(
192 grpc_connectivity_state new_state,
193 RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
196 virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
198 GRPC_ABSTRACT_BASE_CLASS
201 // The ctor and dtor are not intended to use directly.
202 Subchannel(SubchannelKey* key, grpc_connector* connector,
203 const grpc_channel_args* args);
206 // Creates a subchannel given \a connector and \a args.
207 static Subchannel* Create(grpc_connector* connector,
208 const grpc_channel_args* args);
210 // Strong and weak refcounting.
211 Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
212 void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
213 Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
214 void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
215 // Attempts to return a strong ref when only the weak refcount is guaranteed
216 // non-zero. If the strong refcount is zero, does not alter the refcount and
218 Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
220 // Gets the string representing the subchannel address.
221 // Caller doesn't take ownership.
222 const char* GetTargetAddress();
224 const grpc_channel_args* channel_args() const { return args_; }
226 channelz::SubchannelNode* channelz_node();
228 // Returns the current connectivity state of the subchannel.
229 // If health_check_service_name is non-null, the returned connectivity
230 // state will be based on the state reported by the backend for that
232 // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
233 grpc_connectivity_state CheckConnectivityState(
234 const char* health_check_service_name,
235 RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
237 // Starts watching the subchannel's connectivity state.
238 // The first callback to the watcher will be delivered when the
239 // subchannel's connectivity state becomes a value other than
240 // initial_state, which may happen immediately.
241 // Subsequent callbacks will be delivered as the subchannel's state
243 // The watcher will be destroyed either when the subchannel is
244 // destroyed or when CancelConnectivityStateWatch() is called.
245 void WatchConnectivityState(
246 grpc_connectivity_state initial_state,
247 UniquePtr<char> health_check_service_name,
248 OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
250 // Cancels a connectivity state watch.
251 // If the watcher has already been destroyed, this is a no-op.
252 void CancelConnectivityStateWatch(const char* health_check_service_name,
253 ConnectivityStateWatcherInterface* watcher);
255 // Attempt to connect to the backend. Has no effect if already connected.
256 void AttemptToConnect();
258 // Resets the connection backoff of the subchannel.
259 // TODO(roth): Move connection backoff out of subchannels and up into LB
260 // policy code (probably by adding a SubchannelGroup between
261 // SubchannelList and SubchannelData), at which point this method can
265 // Returns a new channel arg encoding the subchannel address as a URI
266 // string. Caller is responsible for freeing the string.
267 static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
269 // Returns the URI string from the subchannel address arg in \a args.
270 static const char* GetUriFromSubchannelAddressArg(
271 const grpc_channel_args* args);
273 // Sets \a addr from the subchannel address arg in \a args.
274 static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
275 grpc_resolved_address* addr);
278 // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
279 // the subchannel's state.
280 class ConnectivityStateWatcherList {
282 ~ConnectivityStateWatcherList() { Clear(); }
284 void AddWatcherLocked(
285 OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
286 void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
288 // Notifies all watchers in the list about a change to state.
289 void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
291 void Clear() { watchers_.clear(); }
293 bool empty() const { return watchers_.empty(); }
296 // TODO(roth): This could be a set instead of a map if we had a set
298 Map<ConnectivityStateWatcherInterface*,
299 OrphanablePtr<ConnectivityStateWatcherInterface>>
303 // A map that tracks ConnectivityStateWatcherInterfaces using a particular
304 // health check service name.
306 // There is one entry in the map for each health check service name.
307 // Entries exist only as long as there are watchers using the
308 // corresponding service name.
310 // A health check client is maintained only while the subchannel is in
312 class HealthWatcherMap {
314 void AddWatcherLocked(
315 Subchannel* subchannel, grpc_connectivity_state initial_state,
316 UniquePtr<char> health_check_service_name,
317 OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
318 void RemoveWatcherLocked(const char* health_check_service_name,
319 ConnectivityStateWatcherInterface* watcher);
321 // Notifies the watcher when the subchannel's state changes.
322 void NotifyLocked(grpc_connectivity_state state);
324 grpc_connectivity_state CheckConnectivityStateLocked(
325 Subchannel* subchannel, const char* health_check_service_name);
327 void ShutdownLocked();
332 Map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
335 class ConnectedSubchannelStateWatcher;
337 // Sets the subchannel's connectivity state to \a state.
338 void SetConnectivityStateLocked(grpc_connectivity_state state);
340 // Methods for connection.
341 void MaybeStartConnectingLocked();
342 static void OnRetryAlarm(void* arg, grpc_error* error);
343 void ContinueConnectingLocked();
344 static void OnConnectingFinished(void* arg, grpc_error* error);
345 bool PublishTransportLocked();
348 gpr_atm RefMutate(gpr_atm delta,
349 int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
351 // The subchannel pool this subchannel is in.
352 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
353 // TODO(juanlishen): Consider using args_ as key_ directly.
354 // Subchannel key that identifies this subchannel in the subchannel pool.
357 grpc_channel_args* args_;
358 // pollset_set tracking who's interested in a connection being setup.
359 grpc_pollset_set* pollset_set_;
360 // Protects the other members.
363 // - lower INTERNAL_REF_BITS bits are for internal references:
364 // these do not keep the subchannel open.
365 // - upper remaining bits are for public references: these do
366 // keep the subchannel open
369 // Connection states.
370 grpc_connector* connector_ = nullptr;
371 // Set during connection.
372 grpc_connect_out_args connecting_result_;
373 grpc_closure on_connecting_finished_;
374 // Active connection, or null.
375 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
376 bool connecting_ = false;
377 bool disconnected_ = false;
379 // Connectivity state tracking.
380 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
381 // The list of watchers without a health check service name.
382 ConnectivityStateWatcherList watcher_list_;
383 // The map of watchers with health check service names.
384 HealthWatcherMap health_watcher_map_;
388 grpc_millis next_attempt_deadline_;
389 grpc_millis min_connect_timeout_ms_;
390 bool backoff_begun_ = false;
393 grpc_timer retry_alarm_;
394 grpc_closure on_retry_alarm_;
395 bool have_retry_alarm_ = false;
396 // reset_backoff() was called while alarm was pending.
397 bool retry_immediately_ = false;
399 // Channelz tracking.
400 RefCountedPtr<channelz::SubchannelNode> channelz_node_;
403 } // namespace grpc_core
405 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */