2 // Copyright 2019 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_H
18 #define GRPC_CORE_EXT_XDS_XDS_CLIENT_H
20 #include <grpc/support/port_platform.h>
25 #include "absl/strings/string_view.h"
26 #include "absl/types/optional.h"
28 #include "src/core/ext/xds/xds_api.h"
29 #include "src/core/ext/xds/xds_bootstrap.h"
30 #include "src/core/ext/xds/xds_client_stats.h"
31 #include "src/core/lib/channel/channelz.h"
32 #include "src/core/lib/gprpp/dual_ref_counted.h"
33 #include "src/core/lib/gprpp/map.h"
34 #include "src/core/lib/gprpp/memory.h"
35 #include "src/core/lib/gprpp/orphanable.h"
36 #include "src/core/lib/gprpp/ref_counted.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/gprpp/sync.h"
42 extern TraceFlag grpc_xds_client_trace;
44 class XdsClient : public DualRefCounted<XdsClient> {
46 // Listener data watcher interface. Implemented by callers.
47 class ListenerWatcherInterface {
49 virtual ~ListenerWatcherInterface() = default;
50 virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
51 virtual void OnError(grpc_error* error) = 0;
52 virtual void OnResourceDoesNotExist() = 0;
55 // RouteConfiguration data watcher interface. Implemented by callers.
56 class RouteConfigWatcherInterface {
58 virtual ~RouteConfigWatcherInterface() = default;
59 virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
60 virtual void OnError(grpc_error* error) = 0;
61 virtual void OnResourceDoesNotExist() = 0;
64 // Cluster data watcher interface. Implemented by callers.
65 class ClusterWatcherInterface {
67 virtual ~ClusterWatcherInterface() = default;
68 virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
69 virtual void OnError(grpc_error* error) = 0;
70 virtual void OnResourceDoesNotExist() = 0;
73 // Endpoint data watcher interface. Implemented by callers.
74 class EndpointWatcherInterface {
76 virtual ~EndpointWatcherInterface() = default;
77 virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
78 virtual void OnError(grpc_error* error) = 0;
79 virtual void OnResourceDoesNotExist() = 0;
82 // Factory function to get or create the global XdsClient instance.
83 // If *error is not GRPC_ERROR_NONE upon return, then there was
84 // an error initializing the client.
85 static RefCountedPtr<XdsClient> GetOrCreate(grpc_error** error);
87 // Callers should not instantiate directly. Use GetOrCreate() instead.
88 explicit XdsClient(grpc_error** error);
89 ~XdsClient() override;
91 grpc_pollset_set* interested_parties() const { return interested_parties_; }
93 // TODO(roth): When we add federation, there will be multiple channels
94 // inside the XdsClient, and the set of channels may change over time,
95 // but not every channel may use every one of the child channels, so
96 // this API will need to change. At minumum, we will need to hold a
97 // ref to the parent channelz node so that we can update its list of
98 // children as the set of xDS channels changes. However, we may also
99 // want to make this a bit more selective such that only those
100 // channels on which a given parent channel is actually requesting
101 // resources will actually be marked as its children.
102 void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
103 void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
105 void Orphan() override;
107 // Start and cancel listener data watch for a listener.
108 // The XdsClient takes ownership of the watcher, but the caller may
109 // keep a raw pointer to the watcher, which may be used only for
110 // cancellation. (Because the caller does not own the watcher, the
111 // pointer must not be used for any other purpose.)
112 // If the caller is going to start a new watch after cancelling the
113 // old one, it should set delay_unsubscription to true.
114 void WatchListenerData(absl::string_view listener_name,
115 std::unique_ptr<ListenerWatcherInterface> watcher);
116 void CancelListenerDataWatch(absl::string_view listener_name,
117 ListenerWatcherInterface* watcher,
118 bool delay_unsubscription = false);
120 // Start and cancel route config data watch for a listener.
121 // The XdsClient takes ownership of the watcher, but the caller may
122 // keep a raw pointer to the watcher, which may be used only for
123 // cancellation. (Because the caller does not own the watcher, the
124 // pointer must not be used for any other purpose.)
125 // If the caller is going to start a new watch after cancelling the
126 // old one, it should set delay_unsubscription to true.
127 void WatchRouteConfigData(
128 absl::string_view route_config_name,
129 std::unique_ptr<RouteConfigWatcherInterface> watcher);
130 void CancelRouteConfigDataWatch(absl::string_view route_config_name,
131 RouteConfigWatcherInterface* watcher,
132 bool delay_unsubscription = false);
134 // Start and cancel cluster data watch for a cluster.
135 // The XdsClient takes ownership of the watcher, but the caller may
136 // keep a raw pointer to the watcher, which may be used only for
137 // cancellation. (Because the caller does not own the watcher, the
138 // pointer must not be used for any other purpose.)
139 // If the caller is going to start a new watch after cancelling the
140 // old one, it should set delay_unsubscription to true.
141 void WatchClusterData(absl::string_view cluster_name,
142 std::unique_ptr<ClusterWatcherInterface> watcher);
143 void CancelClusterDataWatch(absl::string_view cluster_name,
144 ClusterWatcherInterface* watcher,
145 bool delay_unsubscription = false);
147 // Start and cancel endpoint data watch for a cluster.
148 // The XdsClient takes ownership of the watcher, but the caller may
149 // keep a raw pointer to the watcher, which may be used only for
150 // cancellation. (Because the caller does not own the watcher, the
151 // pointer must not be used for any other purpose.)
152 // If the caller is going to start a new watch after cancelling the
153 // old one, it should set delay_unsubscription to true.
154 void WatchEndpointData(absl::string_view eds_service_name,
155 std::unique_ptr<EndpointWatcherInterface> watcher);
156 void CancelEndpointDataWatch(absl::string_view eds_service_name,
157 EndpointWatcherInterface* watcher,
158 bool delay_unsubscription = false);
160 // Adds and removes drop stats for cluster_name and eds_service_name.
161 RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
162 absl::string_view lrs_server, absl::string_view cluster_name,
163 absl::string_view eds_service_name);
164 void RemoveClusterDropStats(absl::string_view /*lrs_server*/,
165 absl::string_view cluster_name,
166 absl::string_view eds_service_name,
167 XdsClusterDropStats* cluster_drop_stats);
169 // Adds and removes locality stats for cluster_name and eds_service_name
170 // for the specified locality.
171 RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
172 absl::string_view lrs_server, absl::string_view cluster_name,
173 absl::string_view eds_service_name,
174 RefCountedPtr<XdsLocalityName> locality);
175 void RemoveClusterLocalityStats(
176 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
177 absl::string_view eds_service_name,
178 const RefCountedPtr<XdsLocalityName>& locality,
179 XdsClusterLocalityStats* cluster_locality_stats);
181 // Resets connection backoff state.
185 // Contains a channel to the xds server and all the data related to the
186 // channel. Holds a ref to the xds client object.
188 // Currently, there is only one ChannelState object per XdsClient
189 // object, and it has essentially the same lifetime. But in the
190 // future, when we add federation support, a single XdsClient may have
191 // multiple underlying channels to talk to different xDS servers.
192 class ChannelState : public InternallyRefCounted<ChannelState> {
194 template <typename T>
200 ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
201 grpc_channel* channel);
202 ~ChannelState() override;
204 void Orphan() override;
206 grpc_channel* channel() const { return channel_; }
207 XdsClient* xds_client() const { return xds_client_.get(); }
208 AdsCallState* ads_calld() const;
209 LrsCallState* lrs_calld() const;
211 void MaybeStartLrsCall();
214 bool HasActiveAdsCall() const;
216 void StartConnectivityWatchLocked();
217 void CancelConnectivityWatchLocked();
219 void Subscribe(const std::string& type_url, const std::string& name);
220 void Unsubscribe(const std::string& type_url, const std::string& name,
221 bool delay_unsubscription);
226 // The owning xds client.
227 WeakRefCountedPtr<XdsClient> xds_client_;
229 // The channel and its status.
230 grpc_channel* channel_;
231 bool shutting_down_ = false;
232 StateWatcher* watcher_ = nullptr;
234 // The retryable XDS calls.
235 OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
236 OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
239 struct ListenerState {
240 std::map<ListenerWatcherInterface*,
241 std::unique_ptr<ListenerWatcherInterface>>
243 // The latest data seen from LDS.
244 absl::optional<XdsApi::LdsUpdate> update;
247 struct RouteConfigState {
248 std::map<RouteConfigWatcherInterface*,
249 std::unique_ptr<RouteConfigWatcherInterface>>
251 // The latest data seen from RDS.
252 absl::optional<XdsApi::RdsUpdate> update;
255 struct ClusterState {
256 std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
258 // The latest data seen from CDS.
259 absl::optional<XdsApi::CdsUpdate> update;
262 struct EndpointState {
263 std::map<EndpointWatcherInterface*,
264 std::unique_ptr<EndpointWatcherInterface>>
266 // The latest data seen from EDS.
267 absl::optional<XdsApi::EdsUpdate> update;
270 struct LoadReportState {
271 struct LocalityState {
272 XdsClusterLocalityStats* locality_stats = nullptr;
273 XdsClusterLocalityStats::Snapshot deleted_locality_stats;
276 XdsClusterDropStats* drop_stats = nullptr;
277 XdsClusterDropStats::Snapshot deleted_drop_stats;
278 std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
279 XdsLocalityName::Less>
281 grpc_millis last_report_time = ExecCtx::Get()->Now();
284 // Sends an error notification to all watchers.
285 void NotifyOnErrorLocked(grpc_error* error);
287 XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
288 bool send_all_clusters, const std::set<std::string>& clusters);
290 const grpc_millis request_timeout_;
291 grpc_pollset_set* interested_parties_;
292 std::unique_ptr<XdsBootstrap> bootstrap_;
297 // The channel for communicating with the xds server.
298 OrphanablePtr<ChannelState> chand_;
300 // One entry for each watched LDS resource.
301 std::map<std::string /*listener_name*/, ListenerState> listener_map_;
302 // One entry for each watched RDS resource.
303 std::map<std::string /*route_config_name*/, RouteConfigState>
305 // One entry for each watched CDS resource.
306 std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
307 // One entry for each watched EDS resource.
308 std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
312 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
316 // Stores the most recent accepted resource version for each resource type.
317 std::map<std::string /*type*/, std::string /*version*/> resource_version_map_;
319 bool shutting_down_ = false;
323 void SetXdsChannelArgsForTest(grpc_channel_args* args);
324 void UnsetGlobalXdsClientForTest();
325 } // namespace internal
327 } // namespace grpc_core
329 #endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */