Imported Upstream version 1.32.0
[platform/upstream/grpc.git] / src / core / ext / xds / xds_client.h
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_H
18 #define GRPC_CORE_EXT_XDS_XDS_CLIENT_H
19
20 #include <grpc/support/port_platform.h>
21
22 #include <set>
23
24 #include "absl/strings/string_view.h"
25 #include "absl/types/optional.h"
26
27 #include "src/core/ext/xds/xds_api.h"
28 #include "src/core/ext/xds/xds_bootstrap.h"
29 #include "src/core/ext/xds/xds_client_stats.h"
30 #include "src/core/lib/gprpp/map.h"
31 #include "src/core/lib/gprpp/memory.h"
32 #include "src/core/lib/gprpp/orphanable.h"
33 #include "src/core/lib/gprpp/ref_counted.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/iomgr/work_serializer.h"
36
37 namespace grpc_core {
38
39 extern TraceFlag xds_client_trace;
40
41 class XdsClient : public InternallyRefCounted<XdsClient> {
42  public:
43   // Listener data watcher interface.  Implemented by callers.
44   class ListenerWatcherInterface {
45    public:
46     virtual ~ListenerWatcherInterface() = default;
47
48     virtual void OnListenerChanged(std::vector<XdsApi::Route> routes) = 0;
49
50     virtual void OnError(grpc_error* error) = 0;
51
52     virtual void OnResourceDoesNotExist() = 0;
53   };
54
55   // Cluster data watcher interface.  Implemented by callers.
56   class ClusterWatcherInterface {
57    public:
58     virtual ~ClusterWatcherInterface() = default;
59
60     virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
61
62     virtual void OnError(grpc_error* error) = 0;
63
64     virtual void OnResourceDoesNotExist() = 0;
65   };
66
67   // Endpoint data watcher interface.  Implemented by callers.
68   class EndpointWatcherInterface {
69    public:
70     virtual ~EndpointWatcherInterface() = default;
71
72     virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
73
74     virtual void OnError(grpc_error* error) = 0;
75
76     virtual void OnResourceDoesNotExist() = 0;
77   };
78
79   // If *error is not GRPC_ERROR_NONE after construction, then there was
80   // an error initializing the client.
81   XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
82             grpc_pollset_set* interested_parties, absl::string_view server_name,
83             std::unique_ptr<ListenerWatcherInterface> watcher,
84             const grpc_channel_args& channel_args, grpc_error** error);
85   ~XdsClient();
86
87   void Orphan() override;
88
89   // Start and cancel cluster data watch for a cluster.
90   // The XdsClient takes ownership of the watcher, but the caller may
91   // keep a raw pointer to the watcher, which may be used only for
92   // cancellation.  (Because the caller does not own the watcher, the
93   // pointer must not be used for any other purpose.)
94   // If the caller is going to start a new watch after cancelling the
95   // old one, it should set delay_unsubscription to true.
96   void WatchClusterData(absl::string_view cluster_name,
97                         std::unique_ptr<ClusterWatcherInterface> watcher);
98   void CancelClusterDataWatch(absl::string_view cluster_name,
99                               ClusterWatcherInterface* watcher,
100                               bool delay_unsubscription = false);
101
102   // Start and cancel endpoint data watch for a cluster.
103   // The XdsClient takes ownership of the watcher, but the caller may
104   // keep a raw pointer to the watcher, which may be used only for
105   // cancellation.  (Because the caller does not own the watcher, the
106   // pointer must not be used for any other purpose.)
107   // If the caller is going to start a new watch after cancelling the
108   // old one, it should set delay_unsubscription to true.
109   void WatchEndpointData(absl::string_view eds_service_name,
110                          std::unique_ptr<EndpointWatcherInterface> watcher);
111   void CancelEndpointDataWatch(absl::string_view eds_service_name,
112                                EndpointWatcherInterface* watcher,
113                                bool delay_unsubscription = false);
114
115   // Adds and removes drop stats for cluster_name and eds_service_name.
116   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
117       absl::string_view lrs_server, absl::string_view cluster_name,
118       absl::string_view eds_service_name);
119   void RemoveClusterDropStats(absl::string_view /*lrs_server*/,
120                               absl::string_view cluster_name,
121                               absl::string_view eds_service_name,
122                               XdsClusterDropStats* cluster_drop_stats);
123
124   // Adds and removes locality stats for cluster_name and eds_service_name
125   // for the specified locality.
126   RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
127       absl::string_view lrs_server, absl::string_view cluster_name,
128       absl::string_view eds_service_name,
129       RefCountedPtr<XdsLocalityName> locality);
130   void RemoveClusterLocalityStats(
131       absl::string_view /*lrs_server*/, absl::string_view cluster_name,
132       absl::string_view eds_service_name,
133       const RefCountedPtr<XdsLocalityName>& locality,
134       XdsClusterLocalityStats* cluster_locality_stats);
135
136   // Resets connection backoff state.
137   void ResetBackoff();
138
139   // Helpers for encoding the XdsClient object in channel args.
140   grpc_arg MakeChannelArg() const;
141   static RefCountedPtr<XdsClient> GetFromChannelArgs(
142       const grpc_channel_args& args);
143   static grpc_channel_args* RemoveFromChannelArgs(
144       const grpc_channel_args& args);
145
146  private:
147   // Contains a channel to the xds server and all the data related to the
148   // channel.  Holds a ref to the xds client object.
149   // TODO(roth): This is separate from the XdsClient object because it was
150   // originally designed to be able to swap itself out in case the
151   // balancer name changed.  Now that the balancer name is going to be
152   // coming from the bootstrap file, we don't really need this level of
153   // indirection unless we decide to support watching the bootstrap file
154   // for changes.  At some point, if we decide that we're never going to
155   // need to do that, then we can eliminate this class and move its
156   // contents directly into the XdsClient class.
157   class ChannelState : public InternallyRefCounted<ChannelState> {
158    public:
159     template <typename T>
160     class RetryableCall;
161
162     class AdsCallState;
163     class LrsCallState;
164
165     ChannelState(RefCountedPtr<XdsClient> xds_client, grpc_channel* channel);
166     ~ChannelState();
167
168     void Orphan() override;
169
170     grpc_channel* channel() const { return channel_; }
171     XdsClient* xds_client() const { return xds_client_.get(); }
172     AdsCallState* ads_calld() const;
173     LrsCallState* lrs_calld() const;
174
175     void MaybeStartLrsCall();
176     void StopLrsCall();
177
178     bool HasActiveAdsCall() const;
179
180     void StartConnectivityWatchLocked();
181     void CancelConnectivityWatchLocked();
182
183     void Subscribe(const std::string& type_url, const std::string& name);
184     void Unsubscribe(const std::string& type_url, const std::string& name,
185                      bool delay_unsubscription);
186
187    private:
188     class StateWatcher;
189
190     // The owning xds client.
191     RefCountedPtr<XdsClient> xds_client_;
192
193     // The channel and its status.
194     grpc_channel* channel_;
195     bool shutting_down_ = false;
196     StateWatcher* watcher_ = nullptr;
197
198     // The retryable XDS calls.
199     OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
200     OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
201   };
202
203   struct ClusterState {
204     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
205         watchers;
206     // The latest data seen from CDS.
207     absl::optional<XdsApi::CdsUpdate> update;
208   };
209
210   struct EndpointState {
211     std::map<EndpointWatcherInterface*,
212              std::unique_ptr<EndpointWatcherInterface>>
213         watchers;
214     // The latest data seen from EDS.
215     absl::optional<XdsApi::EdsUpdate> update;
216   };
217
218   struct LoadReportState {
219     struct LocalityState {
220       std::set<XdsClusterLocalityStats*> locality_stats;
221       std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats;
222     };
223
224     std::set<XdsClusterDropStats*> drop_stats;
225     XdsClusterDropStats::DroppedRequestsMap deleted_drop_stats;
226     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
227              XdsLocalityName::Less>
228         locality_stats;
229     grpc_millis last_report_time = ExecCtx::Get()->Now();
230   };
231
232   // Sends an error notification to all watchers.
233   void NotifyOnError(grpc_error* error);
234
235   XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
236       bool send_all_clusters, const std::set<std::string>& clusters);
237
238   // Channel arg vtable functions.
239   static void* ChannelArgCopy(void* p);
240   static void ChannelArgDestroy(void* p);
241   static int ChannelArgCmp(void* p, void* q);
242
243   static const grpc_arg_pointer_vtable kXdsClientVtable;
244
245   const grpc_millis request_timeout_;
246
247   std::shared_ptr<WorkSerializer> work_serializer_;
248   grpc_pollset_set* interested_parties_;
249
250   std::unique_ptr<XdsBootstrap> bootstrap_;
251   XdsApi api_;
252
253   const std::string server_name_;
254   std::unique_ptr<ListenerWatcherInterface> listener_watcher_;
255
256   // The channel for communicating with the xds server.
257   OrphanablePtr<ChannelState> chand_;
258
259   absl::optional<XdsApi::LdsUpdate> lds_result_;
260   absl::optional<XdsApi::RdsUpdate> rds_result_;
261
262   // One entry for each watched CDS resource.
263   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
264   // One entry for each watched EDS resource.
265   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
266   std::map<
267       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
268       LoadReportState>
269       load_report_map_;
270
271   bool shutting_down_ = false;
272 };
273
274 }  // namespace grpc_core
275
276 #endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */