2 // Copyright 2019 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_H
18 #define GRPC_CORE_EXT_XDS_XDS_CLIENT_H
20 #include <grpc/support/port_platform.h>
24 #include "absl/strings/string_view.h"
25 #include "absl/types/optional.h"
27 #include "src/core/ext/xds/xds_api.h"
28 #include "src/core/ext/xds/xds_bootstrap.h"
29 #include "src/core/ext/xds/xds_client_stats.h"
30 #include "src/core/lib/gprpp/map.h"
31 #include "src/core/lib/gprpp/memory.h"
32 #include "src/core/lib/gprpp/orphanable.h"
33 #include "src/core/lib/gprpp/ref_counted.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/iomgr/work_serializer.h"
39 extern TraceFlag xds_client_trace;
41 class XdsClient : public InternallyRefCounted<XdsClient> {
43 // Listener data watcher interface. Implemented by callers.
44 class ListenerWatcherInterface {
46 virtual ~ListenerWatcherInterface() = default;
48 virtual void OnListenerChanged(std::vector<XdsApi::Route> routes) = 0;
50 virtual void OnError(grpc_error* error) = 0;
52 virtual void OnResourceDoesNotExist() = 0;
55 // Cluster data watcher interface. Implemented by callers.
56 class ClusterWatcherInterface {
58 virtual ~ClusterWatcherInterface() = default;
60 virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
62 virtual void OnError(grpc_error* error) = 0;
64 virtual void OnResourceDoesNotExist() = 0;
67 // Endpoint data watcher interface. Implemented by callers.
68 class EndpointWatcherInterface {
70 virtual ~EndpointWatcherInterface() = default;
72 virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
74 virtual void OnError(grpc_error* error) = 0;
76 virtual void OnResourceDoesNotExist() = 0;
79 // If *error is not GRPC_ERROR_NONE after construction, then there was
80 // an error initializing the client.
81 XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
82 grpc_pollset_set* interested_parties, absl::string_view server_name,
83 std::unique_ptr<ListenerWatcherInterface> watcher,
84 const grpc_channel_args& channel_args, grpc_error** error);
87 void Orphan() override;
89 // Start and cancel cluster data watch for a cluster.
90 // The XdsClient takes ownership of the watcher, but the caller may
91 // keep a raw pointer to the watcher, which may be used only for
92 // cancellation. (Because the caller does not own the watcher, the
93 // pointer must not be used for any other purpose.)
94 // If the caller is going to start a new watch after cancelling the
95 // old one, it should set delay_unsubscription to true.
96 void WatchClusterData(absl::string_view cluster_name,
97 std::unique_ptr<ClusterWatcherInterface> watcher);
98 void CancelClusterDataWatch(absl::string_view cluster_name,
99 ClusterWatcherInterface* watcher,
100 bool delay_unsubscription = false);
102 // Start and cancel endpoint data watch for a cluster.
103 // The XdsClient takes ownership of the watcher, but the caller may
104 // keep a raw pointer to the watcher, which may be used only for
105 // cancellation. (Because the caller does not own the watcher, the
106 // pointer must not be used for any other purpose.)
107 // If the caller is going to start a new watch after cancelling the
108 // old one, it should set delay_unsubscription to true.
109 void WatchEndpointData(absl::string_view eds_service_name,
110 std::unique_ptr<EndpointWatcherInterface> watcher);
111 void CancelEndpointDataWatch(absl::string_view eds_service_name,
112 EndpointWatcherInterface* watcher,
113 bool delay_unsubscription = false);
115 // Adds and removes drop stats for cluster_name and eds_service_name.
116 RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
117 absl::string_view lrs_server, absl::string_view cluster_name,
118 absl::string_view eds_service_name);
119 void RemoveClusterDropStats(absl::string_view /*lrs_server*/,
120 absl::string_view cluster_name,
121 absl::string_view eds_service_name,
122 XdsClusterDropStats* cluster_drop_stats);
124 // Adds and removes locality stats for cluster_name and eds_service_name
125 // for the specified locality.
126 RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
127 absl::string_view lrs_server, absl::string_view cluster_name,
128 absl::string_view eds_service_name,
129 RefCountedPtr<XdsLocalityName> locality);
130 void RemoveClusterLocalityStats(
131 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
132 absl::string_view eds_service_name,
133 const RefCountedPtr<XdsLocalityName>& locality,
134 XdsClusterLocalityStats* cluster_locality_stats);
136 // Resets connection backoff state.
139 // Helpers for encoding the XdsClient object in channel args.
140 grpc_arg MakeChannelArg() const;
141 static RefCountedPtr<XdsClient> GetFromChannelArgs(
142 const grpc_channel_args& args);
143 static grpc_channel_args* RemoveFromChannelArgs(
144 const grpc_channel_args& args);
147 // Contains a channel to the xds server and all the data related to the
148 // channel. Holds a ref to the xds client object.
149 // TODO(roth): This is separate from the XdsClient object because it was
150 // originally designed to be able to swap itself out in case the
151 // balancer name changed. Now that the balancer name is going to be
152 // coming from the bootstrap file, we don't really need this level of
153 // indirection unless we decide to support watching the bootstrap file
154 // for changes. At some point, if we decide that we're never going to
155 // need to do that, then we can eliminate this class and move its
156 // contents directly into the XdsClient class.
157 class ChannelState : public InternallyRefCounted<ChannelState> {
159 template <typename T>
165 ChannelState(RefCountedPtr<XdsClient> xds_client, grpc_channel* channel);
168 void Orphan() override;
170 grpc_channel* channel() const { return channel_; }
171 XdsClient* xds_client() const { return xds_client_.get(); }
172 AdsCallState* ads_calld() const;
173 LrsCallState* lrs_calld() const;
175 void MaybeStartLrsCall();
178 bool HasActiveAdsCall() const;
180 void StartConnectivityWatchLocked();
181 void CancelConnectivityWatchLocked();
183 void Subscribe(const std::string& type_url, const std::string& name);
184 void Unsubscribe(const std::string& type_url, const std::string& name,
185 bool delay_unsubscription);
190 // The owning xds client.
191 RefCountedPtr<XdsClient> xds_client_;
193 // The channel and its status.
194 grpc_channel* channel_;
195 bool shutting_down_ = false;
196 StateWatcher* watcher_ = nullptr;
198 // The retryable XDS calls.
199 OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
200 OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
203 struct ClusterState {
204 std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
206 // The latest data seen from CDS.
207 absl::optional<XdsApi::CdsUpdate> update;
210 struct EndpointState {
211 std::map<EndpointWatcherInterface*,
212 std::unique_ptr<EndpointWatcherInterface>>
214 // The latest data seen from EDS.
215 absl::optional<XdsApi::EdsUpdate> update;
218 struct LoadReportState {
219 struct LocalityState {
220 std::set<XdsClusterLocalityStats*> locality_stats;
221 std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats;
224 std::set<XdsClusterDropStats*> drop_stats;
225 XdsClusterDropStats::DroppedRequestsMap deleted_drop_stats;
226 std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
227 XdsLocalityName::Less>
229 grpc_millis last_report_time = ExecCtx::Get()->Now();
232 // Sends an error notification to all watchers.
233 void NotifyOnError(grpc_error* error);
235 XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
236 bool send_all_clusters, const std::set<std::string>& clusters);
238 // Channel arg vtable functions.
239 static void* ChannelArgCopy(void* p);
240 static void ChannelArgDestroy(void* p);
241 static int ChannelArgCmp(void* p, void* q);
243 static const grpc_arg_pointer_vtable kXdsClientVtable;
245 const grpc_millis request_timeout_;
247 std::shared_ptr<WorkSerializer> work_serializer_;
248 grpc_pollset_set* interested_parties_;
250 std::unique_ptr<XdsBootstrap> bootstrap_;
253 const std::string server_name_;
254 std::unique_ptr<ListenerWatcherInterface> listener_watcher_;
256 // The channel for communicating with the xds server.
257 OrphanablePtr<ChannelState> chand_;
259 absl::optional<XdsApi::LdsUpdate> lds_result_;
260 absl::optional<XdsApi::RdsUpdate> rds_result_;
262 // One entry for each watched CDS resource.
263 std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
264 // One entry for each watched EDS resource.
265 std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
267 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
271 bool shutting_down_ = false;
274 } // namespace grpc_core
276 #endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */