#include <grpc/support/port_platform.h>
+#include <deque>
+
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
-// Channel arg containing a grpc_resolved_address to connect to.
+// Channel arg containing a URI indicating the address to connect to.
#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
// For debugging refcounting.
#ifndef NDEBUG
#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
- (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
ConnectedSubchannel(
grpc_channel_stack* channel_stack, const grpc_channel_args* args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
- ~ConnectedSubchannel();
+ ~ConnectedSubchannel() override;
void StartWatch(grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
class Subchannel {
public:
class ConnectivityStateWatcherInterface
- : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
+ : public RefCounted<ConnectivityStateWatcherInterface> {
public:
- virtual ~ConnectivityStateWatcherInterface() = default;
+ struct ConnectivityStateChange {
+ grpc_connectivity_state state;
+ absl::Status status;
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+ };
+
+ ~ConnectivityStateWatcherInterface() override = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
- //
+ // Implementations should call PopConnectivityStateChange to get the next
+ // connectivity state change.
+ virtual void OnConnectivityStateChange() = 0;
+
+ virtual grpc_pollset_set* interested_parties() = 0;
+
+ // Enqueues connectivity state change notifications.
// When the state changes to READY, connected_subchannel will
// contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its
// ref to the connected subchannel.
- virtual void OnConnectivityStateChange(
- grpc_connectivity_state new_state,
- RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
- = 0;
+ // TODO(yashkt): This is currently needed to send the state updates in the
+ // right order when asynchronously notifying. This will no longer be
+ // necessary when we have access to EventManager.
+ void PushConnectivityStateChange(ConnectivityStateChange state_change);
- virtual grpc_pollset_set* interested_parties() = 0;
+ // Dequeues connectivity state change notifications.
+ ConnectivityStateChange PopConnectivityStateChange();
+
+ private:
+ // Keeps track of the updates that the watcher instance must be notified of.
+ // TODO(yashkt): This is currently needed to send the state updates in the
+ // right order when asynchronously notifying. This will no longer be
+ // necessary when we have access to EventManager.
+ std::deque<ConnectivityStateChange> connectivity_state_queue_;
+ Mutex mu_; // protects the queue
};
// The ctor and dtor are not intended to use directly.
- Subchannel(SubchannelKey* key, grpc_connector* connector,
+ Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args);
~Subchannel();
// Creates a subchannel given \a connector and \a args.
- static Subchannel* Create(grpc_connector* connector,
+ static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args);
+ // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
+ // is larger than the subchannel's current keepalive time. The updated value
+ // will have an affect when the subchannel creates a new ConnectedSubchannel.
+ void ThrottleKeepaliveTime(int new_keepalive_time);
+
// Strong and weak refcounting.
Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
// Attempts to return a strong ref when only the weak refcount is guaranteed
// non-zero. If the strong refcount is zero, does not alter the refcount and
// returns null.
- Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+ Subchannel* RefFromWeakRef();
// Gets the string representing the subchannel address.
// Caller doesn't take ownership.
// destroyed or when CancelConnectivityStateWatch() is called.
void WatchConnectivityState(
grpc_connectivity_state initial_state,
- UniquePtr<char> health_check_service_name,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+ grpc_core::UniquePtr<char> health_check_service_name,
+ RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
// Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op.
~ConnectivityStateWatcherList() { Clear(); }
void AddWatcherLocked(
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+ RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
// Notifies all watchers in the list about a change to state.
- void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
+ void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
+ const absl::Status& status);
void Clear() { watchers_.clear(); }
private:
// TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
// be a set instead of a map.
- Map<ConnectivityStateWatcherInterface*,
- OrphanablePtr<ConnectivityStateWatcherInterface>>
+ std::map<ConnectivityStateWatcherInterface*,
+ RefCountedPtr<ConnectivityStateWatcherInterface>>
watchers_;
};
public:
void AddWatcherLocked(
Subchannel* subchannel, grpc_connectivity_state initial_state,
- UniquePtr<char> health_check_service_name,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+ grpc_core::UniquePtr<char> health_check_service_name,
+ RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
void RemoveWatcherLocked(const char* health_check_service_name,
ConnectivityStateWatcherInterface* watcher);
// Notifies the watcher when the subchannel's state changes.
- void NotifyLocked(grpc_connectivity_state state);
+ void NotifyLocked(grpc_connectivity_state state,
+ const absl::Status& status);
grpc_connectivity_state CheckConnectivityStateLocked(
Subchannel* subchannel, const char* health_check_service_name);
private:
class HealthWatcher;
- Map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
+ std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
};
class ConnectedSubchannelStateWatcher;
+ class AsyncWatcherNotifierLocked;
+
// Sets the subchannel's connectivity state to \a state.
- void SetConnectivityStateLocked(grpc_connectivity_state state);
+ void SetConnectivityStateLocked(grpc_connectivity_state state,
+ const absl::Status& status);
// Methods for connection.
void MaybeStartConnectingLocked();
gpr_atm ref_pair_;
// Connection states.
- grpc_connector* connector_ = nullptr;
+ OrphanablePtr<SubchannelConnector> connector_;
// Set during connection.
- grpc_connect_out_args connecting_result_;
+ SubchannelConnector::Result connecting_result_;
grpc_closure on_connecting_finished_;
// Active connection, or null.
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
// Connectivity state tracking.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
+ absl::Status status_;
// The list of watchers without a health check service name.
ConnectivityStateWatcherList watcher_list_;
// The map of watchers with health check service names.
bool have_retry_alarm_ = false;
// reset_backoff() was called while alarm was pending.
bool retry_immediately_ = false;
+ // Keepalive time period (-1 for unset)
+ int keepalive_time_ = -1;
// Channelz tracking.
RefCountedPtr<channelz::SubchannelNode> channelz_node_;