#include <grpc/support/port_platform.h>
#include <set>
+#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
+#include "src/core/lib/channel/channelz.h"
+#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
-#include "src/core/lib/iomgr/work_serializer.h"
+#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
-extern TraceFlag xds_client_trace;
+extern TraceFlag grpc_xds_client_trace;
-class XdsClient : public InternallyRefCounted<XdsClient> {
+class XdsClient : public DualRefCounted<XdsClient> {
public:
// Listener data watcher interface. Implemented by callers.
class ListenerWatcherInterface {
public:
virtual ~ListenerWatcherInterface() = default;
-
- virtual void OnListenerChanged(std::vector<XdsApi::Route> routes) = 0;
-
+ virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
virtual void OnError(grpc_error* error) = 0;
+ virtual void OnResourceDoesNotExist() = 0;
+ };
+ // RouteConfiguration data watcher interface. Implemented by callers.
+ class RouteConfigWatcherInterface {
+ public:
+ virtual ~RouteConfigWatcherInterface() = default;
+ virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
+ virtual void OnError(grpc_error* error) = 0;
virtual void OnResourceDoesNotExist() = 0;
};
class ClusterWatcherInterface {
public:
virtual ~ClusterWatcherInterface() = default;
-
virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
-
virtual void OnError(grpc_error* error) = 0;
-
virtual void OnResourceDoesNotExist() = 0;
};
class EndpointWatcherInterface {
public:
virtual ~EndpointWatcherInterface() = default;
-
virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
-
virtual void OnError(grpc_error* error) = 0;
-
virtual void OnResourceDoesNotExist() = 0;
};
- // If *error is not GRPC_ERROR_NONE after construction, then there was
+ // Factory function to get or create the global XdsClient instance.
+ // If *error is not GRPC_ERROR_NONE upon return, then there was
// an error initializing the client.
- XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
- grpc_pollset_set* interested_parties, absl::string_view server_name,
- std::unique_ptr<ListenerWatcherInterface> watcher,
- const grpc_channel_args& channel_args, grpc_error** error);
- ~XdsClient();
+ static RefCountedPtr<XdsClient> GetOrCreate(grpc_error** error);
+
+ // Callers should not instantiate directly. Use GetOrCreate() instead.
+ explicit XdsClient(grpc_error** error);
+ ~XdsClient() override;
+
+ grpc_pollset_set* interested_parties() const { return interested_parties_; }
+
+ // TODO(roth): When we add federation, there will be multiple channels
+ // inside the XdsClient, and the set of channels may change over time,
+ // but not every channel may use every one of the child channels, so
+ // this API will need to change. At minumum, we will need to hold a
+ // ref to the parent channelz node so that we can update its list of
+ // children as the set of xDS channels changes. However, we may also
+ // want to make this a bit more selective such that only those
+ // channels on which a given parent channel is actually requesting
+ // resources will actually be marked as its children.
+ void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
+ void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
void Orphan() override;
+ // Start and cancel listener data watch for a listener.
+ // The XdsClient takes ownership of the watcher, but the caller may
+ // keep a raw pointer to the watcher, which may be used only for
+ // cancellation. (Because the caller does not own the watcher, the
+ // pointer must not be used for any other purpose.)
+ // If the caller is going to start a new watch after cancelling the
+ // old one, it should set delay_unsubscription to true.
+ void WatchListenerData(absl::string_view listener_name,
+ std::unique_ptr<ListenerWatcherInterface> watcher);
+ void CancelListenerDataWatch(absl::string_view listener_name,
+ ListenerWatcherInterface* watcher,
+ bool delay_unsubscription = false);
+
+ // Start and cancel route config data watch for a listener.
+ // The XdsClient takes ownership of the watcher, but the caller may
+ // keep a raw pointer to the watcher, which may be used only for
+ // cancellation. (Because the caller does not own the watcher, the
+ // pointer must not be used for any other purpose.)
+ // If the caller is going to start a new watch after cancelling the
+ // old one, it should set delay_unsubscription to true.
+ void WatchRouteConfigData(
+ absl::string_view route_config_name,
+ std::unique_ptr<RouteConfigWatcherInterface> watcher);
+ void CancelRouteConfigDataWatch(absl::string_view route_config_name,
+ RouteConfigWatcherInterface* watcher,
+ bool delay_unsubscription = false);
+
// Start and cancel cluster data watch for a cluster.
// The XdsClient takes ownership of the watcher, but the caller may
// keep a raw pointer to the watcher, which may be used only for
// Resets connection backoff state.
void ResetBackoff();
- // Helpers for encoding the XdsClient object in channel args.
- grpc_arg MakeChannelArg() const;
- static RefCountedPtr<XdsClient> GetFromChannelArgs(
- const grpc_channel_args& args);
- static grpc_channel_args* RemoveFromChannelArgs(
- const grpc_channel_args& args);
-
private:
// Contains a channel to the xds server and all the data related to the
// channel. Holds a ref to the xds client object.
- // TODO(roth): This is separate from the XdsClient object because it was
- // originally designed to be able to swap itself out in case the
- // balancer name changed. Now that the balancer name is going to be
- // coming from the bootstrap file, we don't really need this level of
- // indirection unless we decide to support watching the bootstrap file
- // for changes. At some point, if we decide that we're never going to
- // need to do that, then we can eliminate this class and move its
- // contents directly into the XdsClient class.
+ //
+ // Currently, there is only one ChannelState object per XdsClient
+ // object, and it has essentially the same lifetime. But in the
+ // future, when we add federation support, a single XdsClient may have
+ // multiple underlying channels to talk to different xDS servers.
class ChannelState : public InternallyRefCounted<ChannelState> {
public:
template <typename T>
class AdsCallState;
class LrsCallState;
- ChannelState(RefCountedPtr<XdsClient> xds_client, grpc_channel* channel);
- ~ChannelState();
+ ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
+ const XdsBootstrap::XdsServer& server);
+ ~ChannelState() override;
void Orphan() override;
class StateWatcher;
// The owning xds client.
- RefCountedPtr<XdsClient> xds_client_;
+ WeakRefCountedPtr<XdsClient> xds_client_;
+
+ const XdsBootstrap::XdsServer& server_;
// The channel and its status.
grpc_channel* channel_;
OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
};
+ struct ListenerState {
+ std::map<ListenerWatcherInterface*,
+ std::unique_ptr<ListenerWatcherInterface>>
+ watchers;
+ // The latest data seen from LDS.
+ absl::optional<XdsApi::LdsUpdate> update;
+ };
+
+ struct RouteConfigState {
+ std::map<RouteConfigWatcherInterface*,
+ std::unique_ptr<RouteConfigWatcherInterface>>
+ watchers;
+ // The latest data seen from RDS.
+ absl::optional<XdsApi::RdsUpdate> update;
+ };
+
struct ClusterState {
std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
watchers;
struct LoadReportState {
struct LocalityState {
- std::set<XdsClusterLocalityStats*> locality_stats;
- std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats;
+ XdsClusterLocalityStats* locality_stats = nullptr;
+ XdsClusterLocalityStats::Snapshot deleted_locality_stats;
};
- std::set<XdsClusterDropStats*> drop_stats;
- XdsClusterDropStats::DroppedRequestsMap deleted_drop_stats;
+ XdsClusterDropStats* drop_stats = nullptr;
+ XdsClusterDropStats::Snapshot deleted_drop_stats;
std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
XdsLocalityName::Less>
locality_stats;
};
// Sends an error notification to all watchers.
- void NotifyOnError(grpc_error* error);
+ void NotifyOnErrorLocked(grpc_error* error);
- XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
+ XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters);
- // Channel arg vtable functions.
- static void* ChannelArgCopy(void* p);
- static void ChannelArgDestroy(void* p);
- static int ChannelArgCmp(void* p, void* q);
-
- static const grpc_arg_pointer_vtable kXdsClientVtable;
-
const grpc_millis request_timeout_;
-
- std::shared_ptr<WorkSerializer> work_serializer_;
grpc_pollset_set* interested_parties_;
-
std::unique_ptr<XdsBootstrap> bootstrap_;
XdsApi api_;
- const std::string server_name_;
- std::unique_ptr<ListenerWatcherInterface> listener_watcher_;
+ Mutex mu_;
// The channel for communicating with the xds server.
OrphanablePtr<ChannelState> chand_;
- absl::optional<XdsApi::LdsUpdate> lds_result_;
- absl::optional<XdsApi::RdsUpdate> rds_result_;
-
+ // One entry for each watched LDS resource.
+ std::map<std::string /*listener_name*/, ListenerState> listener_map_;
+ // One entry for each watched RDS resource.
+ std::map<std::string /*route_config_name*/, RouteConfigState>
+ route_config_map_;
// One entry for each watched CDS resource.
std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
// One entry for each watched EDS resource.
std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
+
+ // Load report data.
std::map<
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
LoadReportState>
load_report_map_;
+ // Stores the most recent accepted resource version for each resource type.
+ std::map<std::string /*type*/, std::string /*version*/> resource_version_map_;
+
bool shutting_down_ = false;
};
+namespace internal {
+void SetXdsChannelArgsForTest(grpc_channel_args* args);
+void UnsetGlobalXdsClientForTest();
+} // namespace internal
+
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */