3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
22 #include <grpc/support/port_platform.h>
26 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
27 #include "src/core/ext/filters/client_channel/connector.h"
28 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
29 #include "src/core/lib/backoff/backoff.h"
30 #include "src/core/lib/channel/channel_stack.h"
31 #include "src/core/lib/gpr/time_precise.h"
32 #include "src/core/lib/gprpp/arena.h"
33 #include "src/core/lib/gprpp/map.h"
34 #include "src/core/lib/gprpp/ref_counted.h"
35 #include "src/core/lib/gprpp/ref_counted_ptr.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/iomgr/polling_entity.h"
38 #include "src/core/lib/iomgr/timer.h"
39 #include "src/core/lib/transport/connectivity_state.h"
40 #include "src/core/lib/transport/metadata.h"
42 // Channel arg containing a URI indicating the address to connect to.
43 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
45 // For debugging refcounting.
47 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
49 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
50 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
51 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
52 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
53 const char *file, int line, const char *reason
54 #define GRPC_SUBCHANNEL_REF_REASON reason
55 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
56 , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
57 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
59 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
60 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
61 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
62 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
63 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
64 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
65 #define GRPC_SUBCHANNEL_REF_REASON ""
66 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
67 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
74 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
77 grpc_channel_stack* channel_stack, const grpc_channel_args* args,
78 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
79 ~ConnectedSubchannel();
81 void StartWatch(grpc_pollset_set* interested_parties,
82 OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
84 void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
86 grpc_channel_stack* channel_stack() const { return channel_stack_; }
87 const grpc_channel_args* args() const { return args_; }
88 channelz::SubchannelNode* channelz_subchannel() const {
89 return channelz_subchannel_.get();
92 size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
95 grpc_channel_stack* channel_stack_;
96 grpc_channel_args* args_;
97 // ref counted pointer to the channelz node in this connected subchannel's
99 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
102 // Implements the interface of RefCounted<>.
103 class SubchannelCall {
106 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
107 grpc_polling_entity* pollent;
109 gpr_cycle_counter start_time;
110 grpc_millis deadline;
112 grpc_call_context_element* context;
113 CallCombiner* call_combiner;
114 size_t parent_data_size;
116 static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
118 // Continues processing a transport stream op batch.
119 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
121 // Returns a pointer to the parent data associated with the subchannel call.
122 // The data will be of the size specified in \a parent_data_size field of
123 // the args passed to \a ConnectedSubchannel::CreateCall().
124 void* GetParentData();
126 // Returns the call stack of the subchannel call.
127 grpc_call_stack* GetCallStack();
129 // Sets the 'then_schedule_closure' argument for call stack destruction.
130 // Must be called once per call.
131 void SetAfterCallStackDestroy(grpc_closure* closure);
133 // Interface of RefCounted<>.
134 RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
135 RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
136 const char* reason) GRPC_MUST_USE_RESULT;
137 // When refcount drops to 0, destroys itself and the associated call stack,
138 // but does NOT free the memory because it's in the call arena.
140 void Unref(const DebugLocation& location, const char* reason);
142 static void Destroy(void* arg, grpc_error* error);
145 // Allow RefCountedPtr<> to access IncrementRefCount().
146 template <typename T>
147 friend class RefCountedPtr;
149 SubchannelCall(Args args, grpc_error** error);
151 // If channelz is enabled, intercepts recv_trailing so that we may check the
152 // status and associate it to a subchannel.
153 void MaybeInterceptRecvTrailingMetadata(
154 grpc_transport_stream_op_batch* batch);
156 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
158 // Interface of RefCounted<>.
159 void IncrementRefCount();
160 void IncrementRefCount(const DebugLocation& location, const char* reason);
162 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
163 grpc_closure* after_call_stack_destroy_ = nullptr;
164 // State needed to support channelz interception of recv trailing metadata.
165 grpc_closure recv_trailing_metadata_ready_;
166 grpc_closure* original_recv_trailing_metadata_ = nullptr;
167 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
168 grpc_millis deadline_;
171 // A subchannel that knows how to connect to exactly one target address. It
172 // provides a target for load balancing.
174 // Note that this is the "real" subchannel implementation, whose API is
175 // different from the SubchannelInterface that is exposed to LB policy
176 // implementations. The client channel provides an adaptor class
177 // (SubchannelWrapper) that "converts" between the two.
180 class ConnectivityStateWatcherInterface
181 : public RefCounted<ConnectivityStateWatcherInterface> {
183 struct ConnectivityStateChange {
184 grpc_connectivity_state state;
186 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
189 virtual ~ConnectivityStateWatcherInterface() = default;
191 // Will be invoked whenever the subchannel's connectivity state
192 // changes. There will be only one invocation of this method on a
193 // given watcher instance at any given time.
194 // Implementations should call PopConnectivityStateChange to get the next
195 // connectivity state change.
196 virtual void OnConnectivityStateChange() = 0;
198 virtual grpc_pollset_set* interested_parties() = 0;
200 // Enqueues connectivity state change notifications.
201 // When the state changes to READY, connected_subchannel will
202 // contain a ref to the connected subchannel. When it changes from
203 // READY to some other state, the implementation must release its
204 // ref to the connected subchannel.
205 // TODO(yashkt): This is currently needed to send the state updates in the
206 // right order when asynchronously notifying. This will no longer be
207 // necessary when we have access to EventManager.
208 void PushConnectivityStateChange(ConnectivityStateChange state_change);
210 // Dequeues connectivity state change notifications.
211 ConnectivityStateChange PopConnectivityStateChange();
214 // Keeps track of the updates that the watcher instance must be notified of.
215 // TODO(yashkt): This is currently needed to send the state updates in the
216 // right order when asynchronously notifying. This will no longer be
217 // necessary when we have access to EventManager.
218 std::deque<ConnectivityStateChange> connectivity_state_queue_;
219 Mutex mu_; // protects the queue
222 // The ctor and dtor are not intended to use directly.
223 Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
224 const grpc_channel_args* args);
227 // Creates a subchannel given \a connector and \a args.
228 static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
229 const grpc_channel_args* args);
231 // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
232 // is larger than the subchannel's current keepalive time. The updated value
233 // will have an affect when the subchannel creates a new ConnectedSubchannel.
234 void ThrottleKeepaliveTime(int new_keepalive_time);
236 // Strong and weak refcounting.
237 Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
238 void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
239 Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
240 void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
241 // Attempts to return a strong ref when only the weak refcount is guaranteed
242 // non-zero. If the strong refcount is zero, does not alter the refcount and
244 Subchannel* RefFromWeakRef();
246 // Gets the string representing the subchannel address.
247 // Caller doesn't take ownership.
248 const char* GetTargetAddress();
250 const grpc_channel_args* channel_args() const { return args_; }
252 channelz::SubchannelNode* channelz_node();
254 // Returns the current connectivity state of the subchannel.
255 // If health_check_service_name is non-null, the returned connectivity
256 // state will be based on the state reported by the backend for that
258 // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
259 grpc_connectivity_state CheckConnectivityState(
260 const char* health_check_service_name,
261 RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
263 // Starts watching the subchannel's connectivity state.
264 // The first callback to the watcher will be delivered when the
265 // subchannel's connectivity state becomes a value other than
266 // initial_state, which may happen immediately.
267 // Subsequent callbacks will be delivered as the subchannel's state
269 // The watcher will be destroyed either when the subchannel is
270 // destroyed or when CancelConnectivityStateWatch() is called.
271 void WatchConnectivityState(
272 grpc_connectivity_state initial_state,
273 grpc_core::UniquePtr<char> health_check_service_name,
274 RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
276 // Cancels a connectivity state watch.
277 // If the watcher has already been destroyed, this is a no-op.
278 void CancelConnectivityStateWatch(const char* health_check_service_name,
279 ConnectivityStateWatcherInterface* watcher);
281 // Attempt to connect to the backend. Has no effect if already connected.
282 void AttemptToConnect();
284 // Resets the connection backoff of the subchannel.
285 // TODO(roth): Move connection backoff out of subchannels and up into LB
286 // policy code (probably by adding a SubchannelGroup between
287 // SubchannelList and SubchannelData), at which point this method can
291 // Returns a new channel arg encoding the subchannel address as a URI
292 // string. Caller is responsible for freeing the string.
293 static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
295 // Returns the URI string from the subchannel address arg in \a args.
296 static const char* GetUriFromSubchannelAddressArg(
297 const grpc_channel_args* args);
299 // Sets \a addr from the subchannel address arg in \a args.
300 static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
301 grpc_resolved_address* addr);
304 // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
305 // the subchannel's state.
306 class ConnectivityStateWatcherList {
308 ~ConnectivityStateWatcherList() { Clear(); }
310 void AddWatcherLocked(
311 RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
312 void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
314 // Notifies all watchers in the list about a change to state.
315 void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
316 const absl::Status& status);
318 void Clear() { watchers_.clear(); }
320 bool empty() const { return watchers_.empty(); }
323 // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
324 // be a set instead of a map.
325 std::map<ConnectivityStateWatcherInterface*,
326 RefCountedPtr<ConnectivityStateWatcherInterface>>
330 // A map that tracks ConnectivityStateWatcherInterfaces using a particular
331 // health check service name.
333 // There is one entry in the map for each health check service name.
334 // Entries exist only as long as there are watchers using the
335 // corresponding service name.
337 // A health check client is maintained only while the subchannel is in
339 class HealthWatcherMap {
341 void AddWatcherLocked(
342 Subchannel* subchannel, grpc_connectivity_state initial_state,
343 grpc_core::UniquePtr<char> health_check_service_name,
344 RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
345 void RemoveWatcherLocked(const char* health_check_service_name,
346 ConnectivityStateWatcherInterface* watcher);
348 // Notifies the watcher when the subchannel's state changes.
349 void NotifyLocked(grpc_connectivity_state state,
350 const absl::Status& status);
352 grpc_connectivity_state CheckConnectivityStateLocked(
353 Subchannel* subchannel, const char* health_check_service_name);
355 void ShutdownLocked();
360 std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
363 class ConnectedSubchannelStateWatcher;
365 class AsyncWatcherNotifierLocked;
367 // Sets the subchannel's connectivity state to \a state.
368 void SetConnectivityStateLocked(grpc_connectivity_state state,
369 const absl::Status& status);
371 // Methods for connection.
372 void MaybeStartConnectingLocked();
373 static void OnRetryAlarm(void* arg, grpc_error* error);
374 void ContinueConnectingLocked();
375 static void OnConnectingFinished(void* arg, grpc_error* error);
376 bool PublishTransportLocked();
379 gpr_atm RefMutate(gpr_atm delta,
380 int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
382 // The subchannel pool this subchannel is in.
383 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
384 // TODO(juanlishen): Consider using args_ as key_ directly.
385 // Subchannel key that identifies this subchannel in the subchannel pool.
388 grpc_channel_args* args_;
389 // pollset_set tracking who's interested in a connection being setup.
390 grpc_pollset_set* pollset_set_;
391 // Protects the other members.
394 // - lower INTERNAL_REF_BITS bits are for internal references:
395 // these do not keep the subchannel open.
396 // - upper remaining bits are for public references: these do
397 // keep the subchannel open
400 // Connection states.
401 OrphanablePtr<SubchannelConnector> connector_;
402 // Set during connection.
403 SubchannelConnector::Result connecting_result_;
404 grpc_closure on_connecting_finished_;
405 // Active connection, or null.
406 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
407 bool connecting_ = false;
408 bool disconnected_ = false;
410 // Connectivity state tracking.
411 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
412 absl::Status status_;
413 // The list of watchers without a health check service name.
414 ConnectivityStateWatcherList watcher_list_;
415 // The map of watchers with health check service names.
416 HealthWatcherMap health_watcher_map_;
420 grpc_millis next_attempt_deadline_;
421 grpc_millis min_connect_timeout_ms_;
422 bool backoff_begun_ = false;
425 grpc_timer retry_alarm_;
426 grpc_closure on_retry_alarm_;
427 bool have_retry_alarm_ = false;
428 // reset_backoff() was called while alarm was pending.
429 bool retry_immediately_ = false;
430 // Keepalive time period (-1 for unset)
431 int keepalive_time_ = -1;
433 // Channelz tracking.
434 RefCountedPtr<channelz::SubchannelNode> channelz_node_;
437 } // namespace grpc_core
439 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */