Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / subchannel.h
index cf4ddaf..46ffb2f 100644 (file)
@@ -21,6 +21,8 @@
 
 #include <grpc/support/port_platform.h>
 
+#include <deque>
+
 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
 #include "src/core/ext/filters/client_channel/connector.h"
 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
@@ -37,7 +39,7 @@
 #include "src/core/lib/transport/connectivity_state.h"
 #include "src/core/lib/transport/metadata.h"
 
-// Channel arg containing a grpc_resolved_address to connect to.
+// Channel arg containing a URI indicating the address to connect to.
 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
 
 // For debugging refcounting.
@@ -74,7 +76,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
   ConnectedSubchannel(
       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
-  ~ConnectedSubchannel();
+  ~ConnectedSubchannel() override;
 
   void StartWatch(grpc_pollset_set* interested_parties,
                   OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
@@ -176,24 +178,45 @@ class SubchannelCall {
 class Subchannel {
  public:
   class ConnectivityStateWatcherInterface
-      : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
+      : public RefCounted<ConnectivityStateWatcherInterface> {
    public:
-    virtual ~ConnectivityStateWatcherInterface() = default;
+    struct ConnectivityStateChange {
+      grpc_connectivity_state state;
+      absl::Status status;
+      RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+    };
+
+    ~ConnectivityStateWatcherInterface() override = default;
 
     // Will be invoked whenever the subchannel's connectivity state
     // changes.  There will be only one invocation of this method on a
     // given watcher instance at any given time.
-    //
+    // Implementations should call PopConnectivityStateChange to get the next
+    // connectivity state change.
+    virtual void OnConnectivityStateChange() = 0;
+
+    virtual grpc_pollset_set* interested_parties() = 0;
+
+    // Enqueues connectivity state change notifications.
     // When the state changes to READY, connected_subchannel will
     // contain a ref to the connected subchannel.  When it changes from
     // READY to some other state, the implementation must release its
     // ref to the connected subchannel.
-    virtual void OnConnectivityStateChange(
-        grpc_connectivity_state new_state,
-        RefCountedPtr<ConnectedSubchannel> connected_subchannel)  // NOLINT
-        = 0;
+    // TODO(yashkt): This is currently needed to send the state updates in the
+    // right order when asynchronously notifying. This will no longer be
+    // necessary when we have access to EventManager.
+    void PushConnectivityStateChange(ConnectivityStateChange state_change);
 
-    virtual grpc_pollset_set* interested_parties() = 0;
+    // Dequeues connectivity state change notifications.
+    ConnectivityStateChange PopConnectivityStateChange();
+
+   private:
+    // Keeps track of the updates that the watcher instance must be notified of.
+    // TODO(yashkt): This is currently needed to send the state updates in the
+    // right order when asynchronously notifying. This will no longer be
+    // necessary when we have access to EventManager.
+    std::deque<ConnectivityStateChange> connectivity_state_queue_;
+    Mutex mu_;  // protects the queue
   };
 
   // The ctor and dtor are not intended to use directly.
@@ -205,6 +228,11 @@ class Subchannel {
   static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
                             const grpc_channel_args* args);
 
+  // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
+  // is larger than the subchannel's current keepalive time. The updated value
+  // will have an affect when the subchannel creates a new ConnectedSubchannel.
+  void ThrottleKeepaliveTime(int new_keepalive_time);
+
   // Strong and weak refcounting.
   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
@@ -243,7 +271,7 @@ class Subchannel {
   void WatchConnectivityState(
       grpc_connectivity_state initial_state,
       grpc_core::UniquePtr<char> health_check_service_name,
-      OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+      RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
 
   // Cancels a connectivity state watch.
   // If the watcher has already been destroyed, this is a no-op.
@@ -280,11 +308,12 @@ class Subchannel {
     ~ConnectivityStateWatcherList() { Clear(); }
 
     void AddWatcherLocked(
-        OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+        RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
 
     // Notifies all watchers in the list about a change to state.
-    void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
+    void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
+                      const absl::Status& status);
 
     void Clear() { watchers_.clear(); }
 
@@ -294,7 +323,7 @@ class Subchannel {
     // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
     // be a set instead of a map.
     std::map<ConnectivityStateWatcherInterface*,
-             OrphanablePtr<ConnectivityStateWatcherInterface>>
+             RefCountedPtr<ConnectivityStateWatcherInterface>>
         watchers_;
   };
 
@@ -312,12 +341,13 @@ class Subchannel {
     void AddWatcherLocked(
         Subchannel* subchannel, grpc_connectivity_state initial_state,
         grpc_core::UniquePtr<char> health_check_service_name,
-        OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+        RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
     void RemoveWatcherLocked(const char* health_check_service_name,
                              ConnectivityStateWatcherInterface* watcher);
 
     // Notifies the watcher when the subchannel's state changes.
-    void NotifyLocked(grpc_connectivity_state state);
+    void NotifyLocked(grpc_connectivity_state state,
+                      const absl::Status& status);
 
     grpc_connectivity_state CheckConnectivityStateLocked(
         Subchannel* subchannel, const char* health_check_service_name);
@@ -332,8 +362,11 @@ class Subchannel {
 
   class ConnectedSubchannelStateWatcher;
 
+  class AsyncWatcherNotifierLocked;
+
   // Sets the subchannel's connectivity state to \a state.
-  void SetConnectivityStateLocked(grpc_connectivity_state state);
+  void SetConnectivityStateLocked(grpc_connectivity_state state,
+                                  const absl::Status& status);
 
   // Methods for connection.
   void MaybeStartConnectingLocked();
@@ -376,6 +409,7 @@ class Subchannel {
 
   // Connectivity state tracking.
   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
+  absl::Status status_;
   // The list of watchers without a health check service name.
   ConnectivityStateWatcherList watcher_list_;
   // The map of watchers with health check service names.
@@ -393,6 +427,8 @@ class Subchannel {
   bool have_retry_alarm_ = false;
   // reset_backoff() was called while alarm was pending.
   bool retry_immediately_ = false;
+  // Keepalive time period (-1 for unset)
+  int keepalive_time_ = -1;
 
   // Channelz tracking.
   RefCountedPtr<channelz::SubchannelNode> channelz_node_;