Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / core / ext / xds / xds_client.h
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_H
18 #define GRPC_CORE_EXT_XDS_XDS_CLIENT_H
19
20 #include <grpc/support/port_platform.h>
21
22 #include <set>
23 #include <vector>
24
25 #include "absl/strings/string_view.h"
26 #include "absl/types/optional.h"
27
28 #include "src/core/ext/xds/xds_api.h"
29 #include "src/core/ext/xds/xds_bootstrap.h"
30 #include "src/core/ext/xds/xds_client_stats.h"
31 #include "src/core/lib/channel/channelz.h"
32 #include "src/core/lib/gprpp/dual_ref_counted.h"
33 #include "src/core/lib/gprpp/map.h"
34 #include "src/core/lib/gprpp/memory.h"
35 #include "src/core/lib/gprpp/orphanable.h"
36 #include "src/core/lib/gprpp/ref_counted.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/gprpp/sync.h"
39
40 namespace grpc_core {
41
42 extern TraceFlag xds_client_trace;
43
44 class XdsClient : public DualRefCounted<XdsClient> {
45  public:
46   // Listener data watcher interface.  Implemented by callers.
47   class ListenerWatcherInterface {
48    public:
49     virtual ~ListenerWatcherInterface() = default;
50     virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
51     virtual void OnError(grpc_error* error) = 0;
52     virtual void OnResourceDoesNotExist() = 0;
53   };
54
55   // RouteConfiguration data watcher interface.  Implemented by callers.
56   class RouteConfigWatcherInterface {
57    public:
58     virtual ~RouteConfigWatcherInterface() = default;
59     virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
60     virtual void OnError(grpc_error* error) = 0;
61     virtual void OnResourceDoesNotExist() = 0;
62   };
63
64   // Cluster data watcher interface.  Implemented by callers.
65   class ClusterWatcherInterface {
66    public:
67     virtual ~ClusterWatcherInterface() = default;
68     virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
69     virtual void OnError(grpc_error* error) = 0;
70     virtual void OnResourceDoesNotExist() = 0;
71   };
72
73   // Endpoint data watcher interface.  Implemented by callers.
74   class EndpointWatcherInterface {
75    public:
76     virtual ~EndpointWatcherInterface() = default;
77     virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
78     virtual void OnError(grpc_error* error) = 0;
79     virtual void OnResourceDoesNotExist() = 0;
80   };
81
82   // Factory function to get or create the global XdsClient instance.
83   // If *error is not GRPC_ERROR_NONE upon return, then there was
84   // an error initializing the client.
85   static RefCountedPtr<XdsClient> GetOrCreate(grpc_error** error);
86
87   // Callers should not instantiate directly.  Use GetOrCreate() instead.
88   explicit XdsClient(grpc_error** error);
89   ~XdsClient();
90
91   grpc_pollset_set* interested_parties() const { return interested_parties_; }
92
93   // TODO(roth): When we add federation, there will be multiple channels
94   // inside the XdsClient, and the set of channels may change over time,
95   // but not every channel may use every one of the child channels, so
96   // this API will need to change.  At minumum, we will need to hold a
97   // ref to the parent channelz node so that we can update its list of
98   // children as the set of xDS channels changes.  However, we may also
99   // want to make this a bit more selective such that only those
100   // channels on which a given parent channel is actually requesting
101   // resources will actually be marked as its children.
102   void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
103   void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
104
105   void Orphan() override;
106
107   // Start and cancel listener data watch for a listener.
108   // The XdsClient takes ownership of the watcher, but the caller may
109   // keep a raw pointer to the watcher, which may be used only for
110   // cancellation.  (Because the caller does not own the watcher, the
111   // pointer must not be used for any other purpose.)
112   // If the caller is going to start a new watch after cancelling the
113   // old one, it should set delay_unsubscription to true.
114   void WatchListenerData(absl::string_view listener_name,
115                          std::unique_ptr<ListenerWatcherInterface> watcher);
116   void CancelListenerDataWatch(absl::string_view listener_name,
117                                ListenerWatcherInterface* watcher,
118                                bool delay_unsubscription = false);
119
120   // Start and cancel route config data watch for a listener.
121   // The XdsClient takes ownership of the watcher, but the caller may
122   // keep a raw pointer to the watcher, which may be used only for
123   // cancellation.  (Because the caller does not own the watcher, the
124   // pointer must not be used for any other purpose.)
125   // If the caller is going to start a new watch after cancelling the
126   // old one, it should set delay_unsubscription to true.
127   void WatchRouteConfigData(
128       absl::string_view route_config_name,
129       std::unique_ptr<RouteConfigWatcherInterface> watcher);
130   void CancelRouteConfigDataWatch(absl::string_view route_config_name,
131                                   RouteConfigWatcherInterface* watcher,
132                                   bool delay_unsubscription = false);
133
134   // Start and cancel cluster data watch for a cluster.
135   // The XdsClient takes ownership of the watcher, but the caller may
136   // keep a raw pointer to the watcher, which may be used only for
137   // cancellation.  (Because the caller does not own the watcher, the
138   // pointer must not be used for any other purpose.)
139   // If the caller is going to start a new watch after cancelling the
140   // old one, it should set delay_unsubscription to true.
141   void WatchClusterData(absl::string_view cluster_name,
142                         std::unique_ptr<ClusterWatcherInterface> watcher);
143   void CancelClusterDataWatch(absl::string_view cluster_name,
144                               ClusterWatcherInterface* watcher,
145                               bool delay_unsubscription = false);
146
147   // Start and cancel endpoint data watch for a cluster.
148   // The XdsClient takes ownership of the watcher, but the caller may
149   // keep a raw pointer to the watcher, which may be used only for
150   // cancellation.  (Because the caller does not own the watcher, the
151   // pointer must not be used for any other purpose.)
152   // If the caller is going to start a new watch after cancelling the
153   // old one, it should set delay_unsubscription to true.
154   void WatchEndpointData(absl::string_view eds_service_name,
155                          std::unique_ptr<EndpointWatcherInterface> watcher);
156   void CancelEndpointDataWatch(absl::string_view eds_service_name,
157                                EndpointWatcherInterface* watcher,
158                                bool delay_unsubscription = false);
159
160   // Adds and removes drop stats for cluster_name and eds_service_name.
161   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
162       absl::string_view lrs_server, absl::string_view cluster_name,
163       absl::string_view eds_service_name);
164   void RemoveClusterDropStats(absl::string_view /*lrs_server*/,
165                               absl::string_view cluster_name,
166                               absl::string_view eds_service_name,
167                               XdsClusterDropStats* cluster_drop_stats);
168
169   // Adds and removes locality stats for cluster_name and eds_service_name
170   // for the specified locality.
171   RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
172       absl::string_view lrs_server, absl::string_view cluster_name,
173       absl::string_view eds_service_name,
174       RefCountedPtr<XdsLocalityName> locality);
175   void RemoveClusterLocalityStats(
176       absl::string_view /*lrs_server*/, absl::string_view cluster_name,
177       absl::string_view eds_service_name,
178       const RefCountedPtr<XdsLocalityName>& locality,
179       XdsClusterLocalityStats* cluster_locality_stats);
180
181   // Resets connection backoff state.
182   void ResetBackoff();
183
184  private:
185   // Contains a channel to the xds server and all the data related to the
186   // channel.  Holds a ref to the xds client object.
187   //
188   // Currently, there is only one ChannelState object per XdsClient
189   // object, and it has essentially the same lifetime.  But in the
190   // future, when we add federation support, a single XdsClient may have
191   // multiple underlying channels to talk to different xDS servers.
192   class ChannelState : public InternallyRefCounted<ChannelState> {
193    public:
194     template <typename T>
195     class RetryableCall;
196
197     class AdsCallState;
198     class LrsCallState;
199
200     ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
201                  grpc_channel* channel);
202     ~ChannelState();
203
204     void Orphan() override;
205
206     grpc_channel* channel() const { return channel_; }
207     XdsClient* xds_client() const { return xds_client_.get(); }
208     AdsCallState* ads_calld() const;
209     LrsCallState* lrs_calld() const;
210
211     void MaybeStartLrsCall();
212     void StopLrsCall();
213
214     bool HasActiveAdsCall() const;
215
216     void StartConnectivityWatchLocked();
217     void CancelConnectivityWatchLocked();
218
219     void Subscribe(const std::string& type_url, const std::string& name);
220     void Unsubscribe(const std::string& type_url, const std::string& name,
221                      bool delay_unsubscription);
222
223    private:
224     class StateWatcher;
225
226     // The owning xds client.
227     WeakRefCountedPtr<XdsClient> xds_client_;
228
229     // The channel and its status.
230     grpc_channel* channel_;
231     bool shutting_down_ = false;
232     StateWatcher* watcher_ = nullptr;
233
234     // The retryable XDS calls.
235     OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
236     OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
237   };
238
239   struct ListenerState {
240     std::map<ListenerWatcherInterface*,
241              std::unique_ptr<ListenerWatcherInterface>>
242         watchers;
243     // The latest data seen from LDS.
244     absl::optional<XdsApi::LdsUpdate> update;
245   };
246
247   struct RouteConfigState {
248     std::map<RouteConfigWatcherInterface*,
249              std::unique_ptr<RouteConfigWatcherInterface>>
250         watchers;
251     // The latest data seen from RDS.
252     absl::optional<XdsApi::RdsUpdate> update;
253   };
254
255   struct ClusterState {
256     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
257         watchers;
258     // The latest data seen from CDS.
259     absl::optional<XdsApi::CdsUpdate> update;
260   };
261
262   struct EndpointState {
263     std::map<EndpointWatcherInterface*,
264              std::unique_ptr<EndpointWatcherInterface>>
265         watchers;
266     // The latest data seen from EDS.
267     absl::optional<XdsApi::EdsUpdate> update;
268   };
269
270   // TODO(roth): Change this to store exactly one instance of
271   // XdsClusterDropStats and exactly one instance of
272   // XdsClusterLocalityStats per locality.  We can return multiple refs
273   // to the same object instead of registering multiple objects.
274   struct LoadReportState {
275     struct LocalityState {
276       std::set<XdsClusterLocalityStats*> locality_stats;
277       std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats;
278     };
279
280     std::set<XdsClusterDropStats*> drop_stats;
281     XdsClusterDropStats::Snapshot deleted_drop_stats;
282     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
283              XdsLocalityName::Less>
284         locality_stats;
285     grpc_millis last_report_time = ExecCtx::Get()->Now();
286   };
287
288   // Sends an error notification to all watchers.
289   void NotifyOnErrorLocked(grpc_error* error);
290
291   XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
292       bool send_all_clusters, const std::set<std::string>& clusters);
293
294   const grpc_millis request_timeout_;
295   grpc_pollset_set* interested_parties_;
296   std::unique_ptr<XdsBootstrap> bootstrap_;
297   XdsApi api_;
298
299   Mutex mu_;
300
301   // The channel for communicating with the xds server.
302   OrphanablePtr<ChannelState> chand_;
303
304   // One entry for each watched LDS resource.
305   std::map<std::string /*listener_name*/, ListenerState> listener_map_;
306   // One entry for each watched RDS resource.
307   std::map<std::string /*route_config_name*/, RouteConfigState>
308       route_config_map_;
309   // One entry for each watched CDS resource.
310   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
311   // One entry for each watched EDS resource.
312   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
313
314   // Load report data.
315   std::map<
316       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
317       LoadReportState>
318       load_report_map_;
319
320   bool shutting_down_ = false;
321 };
322
323 namespace internal {
324 void SetXdsChannelArgsForTest(grpc_channel_args* args);
325 void UnsetGlobalXdsClientForTest();
326 }  // namespace internal
327
328 }  // namespace grpc_core
329
330 #endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */