Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / subchannel.h
index fde19dc..46ffb2f 100644 (file)
@@ -21,6 +21,8 @@
 
 #include <grpc/support/port_platform.h>
 
+#include <deque>
+
 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
 #include "src/core/ext/filters/client_channel/connector.h"
 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
 #include "src/core/lib/transport/connectivity_state.h"
 #include "src/core/lib/transport/metadata.h"
 
-// Channel arg containing a grpc_resolved_address to connect to.
+// Channel arg containing a URI indicating the address to connect to.
 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
 
 // For debugging refcounting.
 #ifndef NDEBUG
 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
-  (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
@@ -75,7 +76,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
   ConnectedSubchannel(
       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
-  ~ConnectedSubchannel();
+  ~ConnectedSubchannel() override;
 
   void StartWatch(grpc_pollset_set* interested_parties,
                   OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
@@ -177,35 +178,61 @@ class SubchannelCall {
 class Subchannel {
  public:
   class ConnectivityStateWatcherInterface
-      : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
+      : public RefCounted<ConnectivityStateWatcherInterface> {
    public:
-    virtual ~ConnectivityStateWatcherInterface() = default;
+    struct ConnectivityStateChange {
+      grpc_connectivity_state state;
+      absl::Status status;
+      RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+    };
+
+    ~ConnectivityStateWatcherInterface() override = default;
 
     // Will be invoked whenever the subchannel's connectivity state
     // changes.  There will be only one invocation of this method on a
     // given watcher instance at any given time.
-    //
+    // Implementations should call PopConnectivityStateChange to get the next
+    // connectivity state change.
+    virtual void OnConnectivityStateChange() = 0;
+
+    virtual grpc_pollset_set* interested_parties() = 0;
+
+    // Enqueues connectivity state change notifications.
     // When the state changes to READY, connected_subchannel will
     // contain a ref to the connected subchannel.  When it changes from
     // READY to some other state, the implementation must release its
     // ref to the connected subchannel.
-    virtual void OnConnectivityStateChange(
-        grpc_connectivity_state new_state,
-        RefCountedPtr<ConnectedSubchannel> connected_subchannel)  // NOLINT
-        = 0;
+    // TODO(yashkt): This is currently needed to send the state updates in the
+    // right order when asynchronously notifying. This will no longer be
+    // necessary when we have access to EventManager.
+    void PushConnectivityStateChange(ConnectivityStateChange state_change);
 
-    virtual grpc_pollset_set* interested_parties() = 0;
+    // Dequeues connectivity state change notifications.
+    ConnectivityStateChange PopConnectivityStateChange();
+
+   private:
+    // Keeps track of the updates that the watcher instance must be notified of.
+    // TODO(yashkt): This is currently needed to send the state updates in the
+    // right order when asynchronously notifying. This will no longer be
+    // necessary when we have access to EventManager.
+    std::deque<ConnectivityStateChange> connectivity_state_queue_;
+    Mutex mu_;  // protects the queue
   };
 
   // The ctor and dtor are not intended to use directly.
-  Subchannel(SubchannelKey* key, grpc_connector* connector,
+  Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
              const grpc_channel_args* args);
   ~Subchannel();
 
   // Creates a subchannel given \a connector and \a args.
-  static Subchannel* Create(grpc_connector* connector,
+  static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
                             const grpc_channel_args* args);
 
+  // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
+  // is larger than the subchannel's current keepalive time. The updated value
+  // will have an affect when the subchannel creates a new ConnectedSubchannel.
+  void ThrottleKeepaliveTime(int new_keepalive_time);
+
   // Strong and weak refcounting.
   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
@@ -214,7 +241,7 @@ class Subchannel {
   // Attempts to return a strong ref when only the weak refcount is guaranteed
   // non-zero. If the strong refcount is zero, does not alter the refcount and
   // returns null.
-  Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+  Subchannel* RefFromWeakRef();
 
   // Gets the string representing the subchannel address.
   // Caller doesn't take ownership.
@@ -243,8 +270,8 @@ class Subchannel {
   // destroyed or when CancelConnectivityStateWatch() is called.
   void WatchConnectivityState(
       grpc_connectivity_state initial_state,
-      UniquePtr<char> health_check_service_name,
-      OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+      grpc_core::UniquePtr<char> health_check_service_name,
+      RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
 
   // Cancels a connectivity state watch.
   // If the watcher has already been destroyed, this is a no-op.
@@ -281,11 +308,12 @@ class Subchannel {
     ~ConnectivityStateWatcherList() { Clear(); }
 
     void AddWatcherLocked(
-        OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+        RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
 
     // Notifies all watchers in the list about a change to state.
-    void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
+    void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
+                      const absl::Status& status);
 
     void Clear() { watchers_.clear(); }
 
@@ -294,8 +322,8 @@ class Subchannel {
    private:
     // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
     // be a set instead of a map.
-    Map<ConnectivityStateWatcherInterface*,
-        OrphanablePtr<ConnectivityStateWatcherInterface>>
+    std::map<ConnectivityStateWatcherInterface*,
+             RefCountedPtr<ConnectivityStateWatcherInterface>>
         watchers_;
   };
 
@@ -312,13 +340,14 @@ class Subchannel {
    public:
     void AddWatcherLocked(
         Subchannel* subchannel, grpc_connectivity_state initial_state,
-        UniquePtr<char> health_check_service_name,
-        OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+        grpc_core::UniquePtr<char> health_check_service_name,
+        RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
     void RemoveWatcherLocked(const char* health_check_service_name,
                              ConnectivityStateWatcherInterface* watcher);
 
     // Notifies the watcher when the subchannel's state changes.
-    void NotifyLocked(grpc_connectivity_state state);
+    void NotifyLocked(grpc_connectivity_state state,
+                      const absl::Status& status);
 
     grpc_connectivity_state CheckConnectivityStateLocked(
         Subchannel* subchannel, const char* health_check_service_name);
@@ -328,13 +357,16 @@ class Subchannel {
    private:
     class HealthWatcher;
 
-    Map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
+    std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
   };
 
   class ConnectedSubchannelStateWatcher;
 
+  class AsyncWatcherNotifierLocked;
+
   // Sets the subchannel's connectivity state to \a state.
-  void SetConnectivityStateLocked(grpc_connectivity_state state);
+  void SetConnectivityStateLocked(grpc_connectivity_state state,
+                                  const absl::Status& status);
 
   // Methods for connection.
   void MaybeStartConnectingLocked();
@@ -366,9 +398,9 @@ class Subchannel {
   gpr_atm ref_pair_;
 
   // Connection states.
-  grpc_connector* connector_ = nullptr;
+  OrphanablePtr<SubchannelConnector> connector_;
   // Set during connection.
-  grpc_connect_out_args connecting_result_;
+  SubchannelConnector::Result connecting_result_;
   grpc_closure on_connecting_finished_;
   // Active connection, or null.
   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
@@ -377,6 +409,7 @@ class Subchannel {
 
   // Connectivity state tracking.
   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
+  absl::Status status_;
   // The list of watchers without a health check service name.
   ConnectivityStateWatcherList watcher_list_;
   // The map of watchers with health check service names.
@@ -394,6 +427,8 @@ class Subchannel {
   bool have_retry_alarm_ = false;
   // reset_backoff() was called while alarm was pending.
   bool retry_immediately_ = false;
+  // Keepalive time period (-1 for unset)
+  int keepalive_time_ = -1;
 
   // Channelz tracking.
   RefCountedPtr<channelz::SubchannelNode> channelz_node_;