Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / subchannel.h
index e3efc89..46ffb2f 100644 (file)
 
 #include <grpc/support/port_platform.h>
 
+#include <deque>
+
 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
 #include "src/core/ext/filters/client_channel/connector.h"
 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
 #include "src/core/lib/backoff/backoff.h"
 #include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/gpr/time_precise.h"
 #include "src/core/lib/gprpp/arena.h"
+#include "src/core/lib/gprpp/map.h"
 #include "src/core/lib/gprpp/ref_counted.h"
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
 #include "src/core/lib/gprpp/sync.h"
 #include "src/core/lib/transport/connectivity_state.h"
 #include "src/core/lib/transport/metadata.h"
 
-// Channel arg containing a grpc_resolved_address to connect to.
+// Channel arg containing a URI indicating the address to connect to.
 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
 
 // For debugging refcounting.
 #ifndef NDEBUG
 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
-  (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
@@ -70,36 +73,21 @@ class SubchannelCall;
 
 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
  public:
-  struct CallArgs {
-    grpc_polling_entity* pollent;
-    grpc_slice path;
-    gpr_timespec start_time;
-    grpc_millis deadline;
-    Arena* arena;
-    grpc_call_context_element* context;
-    grpc_core::CallCombiner* call_combiner;
-    size_t parent_data_size;
-  };
-
   ConnectedSubchannel(
       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
-      RefCountedPtr<channelz::SubchannelNode> channelz_subchannel,
-      intptr_t socket_uuid);
-  ~ConnectedSubchannel();
+      RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
+  ~ConnectedSubchannel() override;
+
+  void StartWatch(grpc_pollset_set* interested_parties,
+                  OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
 
-  void NotifyOnStateChange(grpc_pollset_set* interested_parties,
-                           grpc_connectivity_state* state,
-                           grpc_closure* closure);
   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
-  RefCountedPtr<SubchannelCall> CreateCall(const CallArgs& args,
-                                           grpc_error** error);
 
   grpc_channel_stack* channel_stack() const { return channel_stack_; }
   const grpc_channel_args* args() const { return args_; }
   channelz::SubchannelNode* channelz_subchannel() const {
     return channelz_subchannel_.get();
   }
-  intptr_t socket_uuid() const { return socket_uuid_; }
 
   size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
 
@@ -109,17 +97,23 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
   // ref counted pointer to the channelz node in this connected subchannel's
   // owning subchannel.
   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
-  // uuid of this subchannel's socket. 0 if this subchannel is not connected.
-  const intptr_t socket_uuid_;
 };
 
 // Implements the interface of RefCounted<>.
 class SubchannelCall {
  public:
-  SubchannelCall(RefCountedPtr<ConnectedSubchannel> connected_subchannel,
-                 const ConnectedSubchannel::CallArgs& args)
-      : connected_subchannel_(std::move(connected_subchannel)),
-        deadline_(args.deadline) {}
+  struct Args {
+    RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+    grpc_polling_entity* pollent;
+    grpc_slice path;
+    gpr_cycle_counter start_time;
+    grpc_millis deadline;
+    Arena* arena;
+    grpc_call_context_element* context;
+    CallCombiner* call_combiner;
+    size_t parent_data_size;
+  };
+  static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
 
   // Continues processing a transport stream op batch.
   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
@@ -152,6 +146,8 @@ class SubchannelCall {
   template <typename T>
   friend class RefCountedPtr;
 
+  SubchannelCall(Args args, grpc_error** error);
+
   // If channelz is enabled, intercepts recv_trailing so that we may check the
   // status and associate it to a subchannel.
   void MaybeInterceptRecvTrailingMetadata(
@@ -174,17 +170,69 @@ class SubchannelCall {
 
 // A subchannel that knows how to connect to exactly one target address. It
 // provides a target for load balancing.
+//
+// Note that this is the "real" subchannel implementation, whose API is
+// different from the SubchannelInterface that is exposed to LB policy
+// implementations.  The client channel provides an adaptor class
+// (SubchannelWrapper) that "converts" between the two.
 class Subchannel {
  public:
+  class ConnectivityStateWatcherInterface
+      : public RefCounted<ConnectivityStateWatcherInterface> {
+   public:
+    struct ConnectivityStateChange {
+      grpc_connectivity_state state;
+      absl::Status status;
+      RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+    };
+
+    ~ConnectivityStateWatcherInterface() override = default;
+
+    // Will be invoked whenever the subchannel's connectivity state
+    // changes.  There will be only one invocation of this method on a
+    // given watcher instance at any given time.
+    // Implementations should call PopConnectivityStateChange to get the next
+    // connectivity state change.
+    virtual void OnConnectivityStateChange() = 0;
+
+    virtual grpc_pollset_set* interested_parties() = 0;
+
+    // Enqueues connectivity state change notifications.
+    // When the state changes to READY, connected_subchannel will
+    // contain a ref to the connected subchannel.  When it changes from
+    // READY to some other state, the implementation must release its
+    // ref to the connected subchannel.
+    // TODO(yashkt): This is currently needed to send the state updates in the
+    // right order when asynchronously notifying. This will no longer be
+    // necessary when we have access to EventManager.
+    void PushConnectivityStateChange(ConnectivityStateChange state_change);
+
+    // Dequeues connectivity state change notifications.
+    ConnectivityStateChange PopConnectivityStateChange();
+
+   private:
+    // Keeps track of the updates that the watcher instance must be notified of.
+    // TODO(yashkt): This is currently needed to send the state updates in the
+    // right order when asynchronously notifying. This will no longer be
+    // necessary when we have access to EventManager.
+    std::deque<ConnectivityStateChange> connectivity_state_queue_;
+    Mutex mu_;  // protects the queue
+  };
+
   // The ctor and dtor are not intended to use directly.
-  Subchannel(SubchannelKey* key, grpc_connector* connector,
+  Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
              const grpc_channel_args* args);
   ~Subchannel();
 
   // Creates a subchannel given \a connector and \a args.
-  static Subchannel* Create(grpc_connector* connector,
+  static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
                             const grpc_channel_args* args);
 
+  // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
+  // is larger than the subchannel's current keepalive time. The updated value
+  // will have an affect when the subchannel creates a new ConnectedSubchannel.
+  void ThrottleKeepaliveTime(int new_keepalive_time);
+
   // Strong and weak refcounting.
   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
@@ -193,28 +241,45 @@ class Subchannel {
   // Attempts to return a strong ref when only the weak refcount is guaranteed
   // non-zero. If the strong refcount is zero, does not alter the refcount and
   // returns null.
-  Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-
-  intptr_t GetChildSocketUuid();
+  Subchannel* RefFromWeakRef();
 
   // Gets the string representing the subchannel address.
   // Caller doesn't take ownership.
   const char* GetTargetAddress();
 
-  // Gets the connected subchannel - or nullptr if not connected (which may
-  // happen before it initially connects or during transient failures).
-  RefCountedPtr<ConnectedSubchannel> connected_subchannel();
+  const grpc_channel_args* channel_args() const { return args_; }
 
   channelz::SubchannelNode* channelz_node();
 
-  // Polls the current connectivity state of the subchannel.
-  grpc_connectivity_state CheckConnectivity(bool inhibit_health_checking);
-
-  // When the connectivity state of the subchannel changes from \a *state,
-  // invokes \a notify and updates \a *state with the new state.
-  void NotifyOnStateChange(grpc_pollset_set* interested_parties,
-                           grpc_connectivity_state* state, grpc_closure* notify,
-                           bool inhibit_health_checking);
+  // Returns the current connectivity state of the subchannel.
+  // If health_check_service_name is non-null, the returned connectivity
+  // state will be based on the state reported by the backend for that
+  // service name.
+  // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
+  grpc_connectivity_state CheckConnectivityState(
+      const char* health_check_service_name,
+      RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
+
+  // Starts watching the subchannel's connectivity state.
+  // The first callback to the watcher will be delivered when the
+  // subchannel's connectivity state becomes a value other than
+  // initial_state, which may happen immediately.
+  // Subsequent callbacks will be delivered as the subchannel's state
+  // changes.
+  // The watcher will be destroyed either when the subchannel is
+  // destroyed or when CancelConnectivityStateWatch() is called.
+  void WatchConnectivityState(
+      grpc_connectivity_state initial_state,
+      grpc_core::UniquePtr<char> health_check_service_name,
+      RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
+
+  // Cancels a connectivity state watch.
+  // If the watcher has already been destroyed, this is a no-op.
+  void CancelConnectivityStateWatch(const char* health_check_service_name,
+                                    ConnectivityStateWatcherInterface* watcher);
+
+  // Attempt to connect to the backend.  Has no effect if already connected.
+  void AttemptToConnect();
 
   // Resets the connection backoff of the subchannel.
   // TODO(roth): Move connection backoff out of subchannels and up into LB
@@ -236,12 +301,72 @@ class Subchannel {
                                                  grpc_resolved_address* addr);
 
  private:
-  struct ExternalStateWatcher;
+  // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
+  // the subchannel's state.
+  class ConnectivityStateWatcherList {
+   public:
+    ~ConnectivityStateWatcherList() { Clear(); }
+
+    void AddWatcherLocked(
+        RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
+    void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
+
+    // Notifies all watchers in the list about a change to state.
+    void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
+                      const absl::Status& status);
+
+    void Clear() { watchers_.clear(); }
+
+    bool empty() const { return watchers_.empty(); }
+
+   private:
+    // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
+    // be a set instead of a map.
+    std::map<ConnectivityStateWatcherInterface*,
+             RefCountedPtr<ConnectivityStateWatcherInterface>>
+        watchers_;
+  };
+
+  // A map that tracks ConnectivityStateWatcherInterfaces using a particular
+  // health check service name.
+  //
+  // There is one entry in the map for each health check service name.
+  // Entries exist only as long as there are watchers using the
+  // corresponding service name.
+  //
+  // A health check client is maintained only while the subchannel is in
+  // state READY.
+  class HealthWatcherMap {
+   public:
+    void AddWatcherLocked(
+        Subchannel* subchannel, grpc_connectivity_state initial_state,
+        grpc_core::UniquePtr<char> health_check_service_name,
+        RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
+    void RemoveWatcherLocked(const char* health_check_service_name,
+                             ConnectivityStateWatcherInterface* watcher);
+
+    // Notifies the watcher when the subchannel's state changes.
+    void NotifyLocked(grpc_connectivity_state state,
+                      const absl::Status& status);
+
+    grpc_connectivity_state CheckConnectivityStateLocked(
+        Subchannel* subchannel, const char* health_check_service_name);
+
+    void ShutdownLocked();
+
+   private:
+    class HealthWatcher;
+
+    std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
+  };
+
   class ConnectedSubchannelStateWatcher;
 
+  class AsyncWatcherNotifierLocked;
+
   // Sets the subchannel's connectivity state to \a state.
   void SetConnectivityStateLocked(grpc_connectivity_state state,
-                                  const char* reason);
+                                  const absl::Status& status);
 
   // Methods for connection.
   void MaybeStartConnectingLocked();
@@ -273,21 +398,22 @@ class Subchannel {
   gpr_atm ref_pair_;
 
   // Connection states.
-  grpc_connector* connector_ = nullptr;
+  OrphanablePtr<SubchannelConnector> connector_;
   // Set during connection.
-  grpc_connect_out_args connecting_result_;
+  SubchannelConnector::Result connecting_result_;
   grpc_closure on_connecting_finished_;
   // Active connection, or null.
   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
-  OrphanablePtr<ConnectedSubchannelStateWatcher> connected_subchannel_watcher_;
   bool connecting_ = false;
   bool disconnected_ = false;
 
   // Connectivity state tracking.
-  grpc_connectivity_state_tracker state_tracker_;
-  grpc_connectivity_state_tracker state_and_health_tracker_;
-  UniquePtr<char> health_check_service_name_;
-  ExternalStateWatcher* external_state_watcher_list_ = nullptr;
+  grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
+  absl::Status status_;
+  // The list of watchers without a health check service name.
+  ConnectivityStateWatcherList watcher_list_;
+  // The map of watchers with health check service names.
+  HealthWatcherMap health_watcher_map_;
 
   // Backoff state.
   BackOff backoff_;
@@ -301,6 +427,8 @@ class Subchannel {
   bool have_retry_alarm_ = false;
   // reset_backoff() was called while alarm was pending.
   bool retry_immediately_ = false;
+  // Keepalive time period (-1 for unset)
+  int keepalive_time_ = -1;
 
   // Channelz tracking.
   RefCountedPtr<channelz::SubchannelNode> channelz_node_;