3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
22 #include <grpc/support/port_platform.h>
24 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
25 #include "src/core/ext/filters/client_channel/connector.h"
26 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
27 #include "src/core/lib/backoff/backoff.h"
28 #include "src/core/lib/channel/channel_stack.h"
29 #include "src/core/lib/gpr/time_precise.h"
30 #include "src/core/lib/gprpp/arena.h"
31 #include "src/core/lib/gprpp/map.h"
32 #include "src/core/lib/gprpp/ref_counted.h"
33 #include "src/core/lib/gprpp/ref_counted_ptr.h"
34 #include "src/core/lib/gprpp/sync.h"
35 #include "src/core/lib/iomgr/polling_entity.h"
36 #include "src/core/lib/iomgr/timer.h"
37 #include "src/core/lib/transport/connectivity_state.h"
38 #include "src/core/lib/transport/metadata.h"
40 // Channel arg containing a grpc_resolved_address to connect to.
41 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
43 // For debugging refcounting.
45 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
46 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
47 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
49 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
50 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
51 const char *file, int line, const char *reason
52 #define GRPC_SUBCHANNEL_REF_REASON reason
53 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
54 , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
55 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
57 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
58 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
59 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
60 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
61 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
62 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
63 #define GRPC_SUBCHANNEL_REF_REASON ""
64 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
65 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
72 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
75 grpc_channel_stack* channel_stack, const grpc_channel_args* args,
76 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
77 ~ConnectedSubchannel();
79 void StartWatch(grpc_pollset_set* interested_parties,
80 OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
82 void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
84 grpc_channel_stack* channel_stack() const { return channel_stack_; }
85 const grpc_channel_args* args() const { return args_; }
86 channelz::SubchannelNode* channelz_subchannel() const {
87 return channelz_subchannel_.get();
90 size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
93 grpc_channel_stack* channel_stack_;
94 grpc_channel_args* args_;
95 // ref counted pointer to the channelz node in this connected subchannel's
97 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
100 // Implements the interface of RefCounted<>.
101 class SubchannelCall {
104 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
105 grpc_polling_entity* pollent;
107 gpr_cycle_counter start_time;
108 grpc_millis deadline;
110 grpc_call_context_element* context;
111 CallCombiner* call_combiner;
112 size_t parent_data_size;
114 static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
116 // Continues processing a transport stream op batch.
117 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
119 // Returns a pointer to the parent data associated with the subchannel call.
120 // The data will be of the size specified in \a parent_data_size field of
121 // the args passed to \a ConnectedSubchannel::CreateCall().
122 void* GetParentData();
124 // Returns the call stack of the subchannel call.
125 grpc_call_stack* GetCallStack();
127 // Sets the 'then_schedule_closure' argument for call stack destruction.
128 // Must be called once per call.
129 void SetAfterCallStackDestroy(grpc_closure* closure);
131 // Interface of RefCounted<>.
132 RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
133 RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
134 const char* reason) GRPC_MUST_USE_RESULT;
135 // When refcount drops to 0, destroys itself and the associated call stack,
136 // but does NOT free the memory because it's in the call arena.
138 void Unref(const DebugLocation& location, const char* reason);
140 static void Destroy(void* arg, grpc_error* error);
143 // Allow RefCountedPtr<> to access IncrementRefCount().
144 template <typename T>
145 friend class RefCountedPtr;
147 SubchannelCall(Args args, grpc_error** error);
149 // If channelz is enabled, intercepts recv_trailing so that we may check the
150 // status and associate it to a subchannel.
151 void MaybeInterceptRecvTrailingMetadata(
152 grpc_transport_stream_op_batch* batch);
154 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
156 // Interface of RefCounted<>.
157 void IncrementRefCount();
158 void IncrementRefCount(const DebugLocation& location, const char* reason);
160 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
161 grpc_closure* after_call_stack_destroy_ = nullptr;
162 // State needed to support channelz interception of recv trailing metadata.
163 grpc_closure recv_trailing_metadata_ready_;
164 grpc_closure* original_recv_trailing_metadata_ = nullptr;
165 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
166 grpc_millis deadline_;
169 // A subchannel that knows how to connect to exactly one target address. It
170 // provides a target for load balancing.
172 // Note that this is the "real" subchannel implementation, whose API is
173 // different from the SubchannelInterface that is exposed to LB policy
174 // implementations. The client channel provides an adaptor class
175 // (SubchannelWrapper) that "converts" between the two.
178 class ConnectivityStateWatcherInterface
179 : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
181 virtual ~ConnectivityStateWatcherInterface() = default;
183 // Will be invoked whenever the subchannel's connectivity state
184 // changes. There will be only one invocation of this method on a
185 // given watcher instance at any given time.
187 // When the state changes to READY, connected_subchannel will
188 // contain a ref to the connected subchannel. When it changes from
189 // READY to some other state, the implementation must release its
190 // ref to the connected subchannel.
191 virtual void OnConnectivityStateChange(
192 grpc_connectivity_state new_state,
193 RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
196 virtual grpc_pollset_set* interested_parties() = 0;
199 // The ctor and dtor are not intended to use directly.
200 Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
201 const grpc_channel_args* args);
204 // Creates a subchannel given \a connector and \a args.
205 static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
206 const grpc_channel_args* args);
208 // Strong and weak refcounting.
209 Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
210 void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
211 Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
212 void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
213 // Attempts to return a strong ref when only the weak refcount is guaranteed
214 // non-zero. If the strong refcount is zero, does not alter the refcount and
216 Subchannel* RefFromWeakRef();
218 // Gets the string representing the subchannel address.
219 // Caller doesn't take ownership.
220 const char* GetTargetAddress();
222 const grpc_channel_args* channel_args() const { return args_; }
224 channelz::SubchannelNode* channelz_node();
226 // Returns the current connectivity state of the subchannel.
227 // If health_check_service_name is non-null, the returned connectivity
228 // state will be based on the state reported by the backend for that
230 // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
231 grpc_connectivity_state CheckConnectivityState(
232 const char* health_check_service_name,
233 RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
235 // Starts watching the subchannel's connectivity state.
236 // The first callback to the watcher will be delivered when the
237 // subchannel's connectivity state becomes a value other than
238 // initial_state, which may happen immediately.
239 // Subsequent callbacks will be delivered as the subchannel's state
241 // The watcher will be destroyed either when the subchannel is
242 // destroyed or when CancelConnectivityStateWatch() is called.
243 void WatchConnectivityState(
244 grpc_connectivity_state initial_state,
245 grpc_core::UniquePtr<char> health_check_service_name,
246 OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
248 // Cancels a connectivity state watch.
249 // If the watcher has already been destroyed, this is a no-op.
250 void CancelConnectivityStateWatch(const char* health_check_service_name,
251 ConnectivityStateWatcherInterface* watcher);
253 // Attempt to connect to the backend. Has no effect if already connected.
254 void AttemptToConnect();
256 // Resets the connection backoff of the subchannel.
257 // TODO(roth): Move connection backoff out of subchannels and up into LB
258 // policy code (probably by adding a SubchannelGroup between
259 // SubchannelList and SubchannelData), at which point this method can
263 // Returns a new channel arg encoding the subchannel address as a URI
264 // string. Caller is responsible for freeing the string.
265 static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
267 // Returns the URI string from the subchannel address arg in \a args.
268 static const char* GetUriFromSubchannelAddressArg(
269 const grpc_channel_args* args);
271 // Sets \a addr from the subchannel address arg in \a args.
272 static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
273 grpc_resolved_address* addr);
276 // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
277 // the subchannel's state.
278 class ConnectivityStateWatcherList {
280 ~ConnectivityStateWatcherList() { Clear(); }
282 void AddWatcherLocked(
283 OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
284 void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
286 // Notifies all watchers in the list about a change to state.
287 void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
289 void Clear() { watchers_.clear(); }
291 bool empty() const { return watchers_.empty(); }
294 // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
295 // be a set instead of a map.
296 std::map<ConnectivityStateWatcherInterface*,
297 OrphanablePtr<ConnectivityStateWatcherInterface>>
301 // A map that tracks ConnectivityStateWatcherInterfaces using a particular
302 // health check service name.
304 // There is one entry in the map for each health check service name.
305 // Entries exist only as long as there are watchers using the
306 // corresponding service name.
308 // A health check client is maintained only while the subchannel is in
310 class HealthWatcherMap {
312 void AddWatcherLocked(
313 Subchannel* subchannel, grpc_connectivity_state initial_state,
314 grpc_core::UniquePtr<char> health_check_service_name,
315 OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
316 void RemoveWatcherLocked(const char* health_check_service_name,
317 ConnectivityStateWatcherInterface* watcher);
319 // Notifies the watcher when the subchannel's state changes.
320 void NotifyLocked(grpc_connectivity_state state);
322 grpc_connectivity_state CheckConnectivityStateLocked(
323 Subchannel* subchannel, const char* health_check_service_name);
325 void ShutdownLocked();
330 std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
333 class ConnectedSubchannelStateWatcher;
335 // Sets the subchannel's connectivity state to \a state.
336 void SetConnectivityStateLocked(grpc_connectivity_state state);
338 // Methods for connection.
339 void MaybeStartConnectingLocked();
340 static void OnRetryAlarm(void* arg, grpc_error* error);
341 void ContinueConnectingLocked();
342 static void OnConnectingFinished(void* arg, grpc_error* error);
343 bool PublishTransportLocked();
346 gpr_atm RefMutate(gpr_atm delta,
347 int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
349 // The subchannel pool this subchannel is in.
350 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
351 // TODO(juanlishen): Consider using args_ as key_ directly.
352 // Subchannel key that identifies this subchannel in the subchannel pool.
355 grpc_channel_args* args_;
356 // pollset_set tracking who's interested in a connection being setup.
357 grpc_pollset_set* pollset_set_;
358 // Protects the other members.
361 // - lower INTERNAL_REF_BITS bits are for internal references:
362 // these do not keep the subchannel open.
363 // - upper remaining bits are for public references: these do
364 // keep the subchannel open
367 // Connection states.
368 OrphanablePtr<SubchannelConnector> connector_;
369 // Set during connection.
370 SubchannelConnector::Result connecting_result_;
371 grpc_closure on_connecting_finished_;
372 // Active connection, or null.
373 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
374 bool connecting_ = false;
375 bool disconnected_ = false;
377 // Connectivity state tracking.
378 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
379 // The list of watchers without a health check service name.
380 ConnectivityStateWatcherList watcher_list_;
381 // The map of watchers with health check service names.
382 HealthWatcherMap health_watcher_map_;
386 grpc_millis next_attempt_deadline_;
387 grpc_millis min_connect_timeout_ms_;
388 bool backoff_begun_ = false;
391 grpc_timer retry_alarm_;
392 grpc_closure on_retry_alarm_;
393 bool have_retry_alarm_ = false;
394 // reset_backoff() was called while alarm was pending.
395 bool retry_immediately_ = false;
397 // Channelz tracking.
398 RefCountedPtr<channelz::SubchannelNode> channelz_node_;
401 } // namespace grpc_core
403 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */