Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / core / ext / xds / xds_client.h
index c918434..49ec9dc 100644 (file)
@@ -20,6 +20,7 @@
 #include <grpc/support/port_platform.h>
 
 #include <set>
+#include <vector>
 
 #include "absl/strings/string_view.h"
 #include "absl/types/optional.h"
 #include "src/core/ext/xds/xds_api.h"
 #include "src/core/ext/xds/xds_bootstrap.h"
 #include "src/core/ext/xds/xds_client_stats.h"
+#include "src/core/lib/channel/channelz.h"
+#include "src/core/lib/gprpp/dual_ref_counted.h"
 #include "src/core/lib/gprpp/map.h"
 #include "src/core/lib/gprpp/memory.h"
 #include "src/core/lib/gprpp/orphanable.h"
 #include "src/core/lib/gprpp/ref_counted.h"
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
-#include "src/core/lib/iomgr/work_serializer.h"
+#include "src/core/lib/gprpp/sync.h"
 
 namespace grpc_core {
 
-extern TraceFlag xds_client_trace;
+extern TraceFlag grpc_xds_client_trace;
 
-class XdsClient : public InternallyRefCounted<XdsClient> {
+class XdsClient : public DualRefCounted<XdsClient> {
  public:
   // Listener data watcher interface.  Implemented by callers.
   class ListenerWatcherInterface {
    public:
     virtual ~ListenerWatcherInterface() = default;
-
-    virtual void OnListenerChanged(std::vector<XdsApi::Route> routes) = 0;
-
+    virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
     virtual void OnError(grpc_error* error) = 0;
+    virtual void OnResourceDoesNotExist() = 0;
+  };
 
+  // RouteConfiguration data watcher interface.  Implemented by callers.
+  class RouteConfigWatcherInterface {
+   public:
+    virtual ~RouteConfigWatcherInterface() = default;
+    virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
+    virtual void OnError(grpc_error* error) = 0;
     virtual void OnResourceDoesNotExist() = 0;
   };
 
@@ -56,11 +65,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
   class ClusterWatcherInterface {
    public:
     virtual ~ClusterWatcherInterface() = default;
-
     virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
-
     virtual void OnError(grpc_error* error) = 0;
-
     virtual void OnResourceDoesNotExist() = 0;
   };
 
@@ -68,24 +74,63 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
   class EndpointWatcherInterface {
    public:
     virtual ~EndpointWatcherInterface() = default;
-
     virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
-
     virtual void OnError(grpc_error* error) = 0;
-
     virtual void OnResourceDoesNotExist() = 0;
   };
 
-  // If *error is not GRPC_ERROR_NONE after construction, then there was
+  // Factory function to get or create the global XdsClient instance.
+  // If *error is not GRPC_ERROR_NONE upon return, then there was
   // an error initializing the client.
-  XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
-            grpc_pollset_set* interested_parties, absl::string_view server_name,
-            std::unique_ptr<ListenerWatcherInterface> watcher,
-            const grpc_channel_args& channel_args, grpc_error** error);
-  ~XdsClient();
+  static RefCountedPtr<XdsClient> GetOrCreate(grpc_error** error);
+
+  // Callers should not instantiate directly.  Use GetOrCreate() instead.
+  explicit XdsClient(grpc_error** error);
+  ~XdsClient() override;
+
+  grpc_pollset_set* interested_parties() const { return interested_parties_; }
+
+  // TODO(roth): When we add federation, there will be multiple channels
+  // inside the XdsClient, and the set of channels may change over time,
+  // but not every channel may use every one of the child channels, so
+  // this API will need to change.  At minumum, we will need to hold a
+  // ref to the parent channelz node so that we can update its list of
+  // children as the set of xDS channels changes.  However, we may also
+  // want to make this a bit more selective such that only those
+  // channels on which a given parent channel is actually requesting
+  // resources will actually be marked as its children.
+  void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
+  void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
 
   void Orphan() override;
 
+  // Start and cancel listener data watch for a listener.
+  // The XdsClient takes ownership of the watcher, but the caller may
+  // keep a raw pointer to the watcher, which may be used only for
+  // cancellation.  (Because the caller does not own the watcher, the
+  // pointer must not be used for any other purpose.)
+  // If the caller is going to start a new watch after cancelling the
+  // old one, it should set delay_unsubscription to true.
+  void WatchListenerData(absl::string_view listener_name,
+                         std::unique_ptr<ListenerWatcherInterface> watcher);
+  void CancelListenerDataWatch(absl::string_view listener_name,
+                               ListenerWatcherInterface* watcher,
+                               bool delay_unsubscription = false);
+
+  // Start and cancel route config data watch for a listener.
+  // The XdsClient takes ownership of the watcher, but the caller may
+  // keep a raw pointer to the watcher, which may be used only for
+  // cancellation.  (Because the caller does not own the watcher, the
+  // pointer must not be used for any other purpose.)
+  // If the caller is going to start a new watch after cancelling the
+  // old one, it should set delay_unsubscription to true.
+  void WatchRouteConfigData(
+      absl::string_view route_config_name,
+      std::unique_ptr<RouteConfigWatcherInterface> watcher);
+  void CancelRouteConfigDataWatch(absl::string_view route_config_name,
+                                  RouteConfigWatcherInterface* watcher,
+                                  bool delay_unsubscription = false);
+
   // Start and cancel cluster data watch for a cluster.
   // The XdsClient takes ownership of the watcher, but the caller may
   // keep a raw pointer to the watcher, which may be used only for
@@ -136,24 +181,14 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
   // Resets connection backoff state.
   void ResetBackoff();
 
-  // Helpers for encoding the XdsClient object in channel args.
-  grpc_arg MakeChannelArg() const;
-  static RefCountedPtr<XdsClient> GetFromChannelArgs(
-      const grpc_channel_args& args);
-  static grpc_channel_args* RemoveFromChannelArgs(
-      const grpc_channel_args& args);
-
  private:
   // Contains a channel to the xds server and all the data related to the
   // channel.  Holds a ref to the xds client object.
-  // TODO(roth): This is separate from the XdsClient object because it was
-  // originally designed to be able to swap itself out in case the
-  // balancer name changed.  Now that the balancer name is going to be
-  // coming from the bootstrap file, we don't really need this level of
-  // indirection unless we decide to support watching the bootstrap file
-  // for changes.  At some point, if we decide that we're never going to
-  // need to do that, then we can eliminate this class and move its
-  // contents directly into the XdsClient class.
+  //
+  // Currently, there is only one ChannelState object per XdsClient
+  // object, and it has essentially the same lifetime.  But in the
+  // future, when we add federation support, a single XdsClient may have
+  // multiple underlying channels to talk to different xDS servers.
   class ChannelState : public InternallyRefCounted<ChannelState> {
    public:
     template <typename T>
@@ -162,8 +197,9 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
     class AdsCallState;
     class LrsCallState;
 
-    ChannelState(RefCountedPtr<XdsClient> xds_client, grpc_channel* channel);
-    ~ChannelState();
+    ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
+                 const XdsBootstrap::XdsServer& server);
+    ~ChannelState() override;
 
     void Orphan() override;
 
@@ -188,7 +224,9 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
     class StateWatcher;
 
     // The owning xds client.
-    RefCountedPtr<XdsClient> xds_client_;
+    WeakRefCountedPtr<XdsClient> xds_client_;
+
+    const XdsBootstrap::XdsServer& server_;
 
     // The channel and its status.
     grpc_channel* channel_;
@@ -200,6 +238,22 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
     OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
   };
 
+  struct ListenerState {
+    std::map<ListenerWatcherInterface*,
+             std::unique_ptr<ListenerWatcherInterface>>
+        watchers;
+    // The latest data seen from LDS.
+    absl::optional<XdsApi::LdsUpdate> update;
+  };
+
+  struct RouteConfigState {
+    std::map<RouteConfigWatcherInterface*,
+             std::unique_ptr<RouteConfigWatcherInterface>>
+        watchers;
+    // The latest data seen from RDS.
+    absl::optional<XdsApi::RdsUpdate> update;
+  };
+
   struct ClusterState {
     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
         watchers;
@@ -217,12 +271,12 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
 
   struct LoadReportState {
     struct LocalityState {
-      std::set<XdsClusterLocalityStats*> locality_stats;
-      std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats;
+      XdsClusterLocalityStats* locality_stats = nullptr;
+      XdsClusterLocalityStats::Snapshot deleted_locality_stats;
     };
 
-    std::set<XdsClusterDropStats*> drop_stats;
-    XdsClusterDropStats::DroppedRequestsMap deleted_drop_stats;
+    XdsClusterDropStats* drop_stats = nullptr;
+    XdsClusterDropStats::Snapshot deleted_drop_stats;
     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
              XdsLocalityName::Less>
         locality_stats;
@@ -230,47 +284,48 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
   };
 
   // Sends an error notification to all watchers.
-  void NotifyOnError(grpc_error* error);
+  void NotifyOnErrorLocked(grpc_error* error);
 
-  XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
+  XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
       bool send_all_clusters, const std::set<std::string>& clusters);
 
-  // Channel arg vtable functions.
-  static void* ChannelArgCopy(void* p);
-  static void ChannelArgDestroy(void* p);
-  static int ChannelArgCmp(void* p, void* q);
-
-  static const grpc_arg_pointer_vtable kXdsClientVtable;
-
   const grpc_millis request_timeout_;
-
-  std::shared_ptr<WorkSerializer> work_serializer_;
   grpc_pollset_set* interested_parties_;
-
   std::unique_ptr<XdsBootstrap> bootstrap_;
   XdsApi api_;
 
-  const std::string server_name_;
-  std::unique_ptr<ListenerWatcherInterface> listener_watcher_;
+  Mutex mu_;
 
   // The channel for communicating with the xds server.
   OrphanablePtr<ChannelState> chand_;
 
-  absl::optional<XdsApi::LdsUpdate> lds_result_;
-  absl::optional<XdsApi::RdsUpdate> rds_result_;
-
+  // One entry for each watched LDS resource.
+  std::map<std::string /*listener_name*/, ListenerState> listener_map_;
+  // One entry for each watched RDS resource.
+  std::map<std::string /*route_config_name*/, RouteConfigState>
+      route_config_map_;
   // One entry for each watched CDS resource.
   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
   // One entry for each watched EDS resource.
   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
+
+  // Load report data.
   std::map<
       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
       LoadReportState>
       load_report_map_;
 
+  // Stores the most recent accepted resource version for each resource type.
+  std::map<std::string /*type*/, std::string /*version*/> resource_version_map_;
+
   bool shutting_down_ = false;
 };
 
+namespace internal {
+void SetXdsChannelArgsForTest(grpc_channel_args* args);
+void UnsetGlobalXdsClientForTest();
+}  // namespace internal
+
 }  // namespace grpc_core
 
 #endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */