#include <grpc/support/port_platform.h>
+#include <deque>
+
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
-// Channel arg containing a grpc_resolved_address to connect to.
+// Channel arg containing a URI indicating the address to connect to.
#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
// For debugging refcounting.
ConnectedSubchannel(
grpc_channel_stack* channel_stack, const grpc_channel_args* args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
- ~ConnectedSubchannel();
+ ~ConnectedSubchannel() override;
void StartWatch(grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
class Subchannel {
public:
class ConnectivityStateWatcherInterface
- : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
+ : public RefCounted<ConnectivityStateWatcherInterface> {
public:
- virtual ~ConnectivityStateWatcherInterface() = default;
+ struct ConnectivityStateChange {
+ grpc_connectivity_state state;
+ absl::Status status;
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+ };
+
+ ~ConnectivityStateWatcherInterface() override = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
- //
+ // Implementations should call PopConnectivityStateChange to get the next
+ // connectivity state change.
+ virtual void OnConnectivityStateChange() = 0;
+
+ virtual grpc_pollset_set* interested_parties() = 0;
+
+ // Enqueues connectivity state change notifications.
// When the state changes to READY, connected_subchannel will
// contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its
// ref to the connected subchannel.
- virtual void OnConnectivityStateChange(
- grpc_connectivity_state new_state,
- RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
- = 0;
+ // TODO(yashkt): This is currently needed to send the state updates in the
+ // right order when asynchronously notifying. This will no longer be
+ // necessary when we have access to EventManager.
+ void PushConnectivityStateChange(ConnectivityStateChange state_change);
- virtual grpc_pollset_set* interested_parties() = 0;
+ // Dequeues connectivity state change notifications.
+ ConnectivityStateChange PopConnectivityStateChange();
+
+ private:
+ // Keeps track of the updates that the watcher instance must be notified of.
+ // TODO(yashkt): This is currently needed to send the state updates in the
+ // right order when asynchronously notifying. This will no longer be
+ // necessary when we have access to EventManager.
+ std::deque<ConnectivityStateChange> connectivity_state_queue_;
+ Mutex mu_; // protects the queue
};
// The ctor and dtor are not intended to use directly.
static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args);
+ // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
+ // is larger than the subchannel's current keepalive time. The updated value
+ // will have an affect when the subchannel creates a new ConnectedSubchannel.
+ void ThrottleKeepaliveTime(int new_keepalive_time);
+
// Strong and weak refcounting.
Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void WatchConnectivityState(
grpc_connectivity_state initial_state,
grpc_core::UniquePtr<char> health_check_service_name,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+ RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
// Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op.
~ConnectivityStateWatcherList() { Clear(); }
void AddWatcherLocked(
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+ RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
// Notifies all watchers in the list about a change to state.
- void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
+ void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
+ const absl::Status& status);
void Clear() { watchers_.clear(); }
// TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
// be a set instead of a map.
std::map<ConnectivityStateWatcherInterface*,
- OrphanablePtr<ConnectivityStateWatcherInterface>>
+ RefCountedPtr<ConnectivityStateWatcherInterface>>
watchers_;
};
void AddWatcherLocked(
Subchannel* subchannel, grpc_connectivity_state initial_state,
grpc_core::UniquePtr<char> health_check_service_name,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+ RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
void RemoveWatcherLocked(const char* health_check_service_name,
ConnectivityStateWatcherInterface* watcher);
// Notifies the watcher when the subchannel's state changes.
- void NotifyLocked(grpc_connectivity_state state);
+ void NotifyLocked(grpc_connectivity_state state,
+ const absl::Status& status);
grpc_connectivity_state CheckConnectivityStateLocked(
Subchannel* subchannel, const char* health_check_service_name);
class ConnectedSubchannelStateWatcher;
+ class AsyncWatcherNotifierLocked;
+
// Sets the subchannel's connectivity state to \a state.
- void SetConnectivityStateLocked(grpc_connectivity_state state);
+ void SetConnectivityStateLocked(grpc_connectivity_state state,
+ const absl::Status& status);
// Methods for connection.
void MaybeStartConnectingLocked();
// Connectivity state tracking.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
+ absl::Status status_;
// The list of watchers without a health check service name.
ConnectivityStateWatcherList watcher_list_;
// The map of watchers with health check service names.
bool have_retry_alarm_ = false;
// reset_backoff() was called while alarm was pending.
bool retry_immediately_ = false;
+ // Keepalive time period (-1 for unset)
+ int keepalive_time_ = -1;
// Channelz tracking.
RefCountedPtr<channelz::SubchannelNode> channelz_node_;