Imported Upstream version 1.36.0
[platform/upstream/grpc.git] / src / core / ext / xds / xds_server_config_fetcher.cc
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/xds/xds_certificate_provider.h"
22 #include "src/core/ext/xds/xds_client.h"
23 #include "src/core/lib/channel/channel_args.h"
24 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
25 #include "src/core/lib/surface/api_trace.h"
26 #include "src/core/lib/surface/server.h"
27
28 namespace grpc_core {
29
30 TraceFlag grpc_xds_server_config_fetcher_trace(false,
31                                                "xds_server_config_fetcher");
32
33 namespace {
34
35 class XdsServerConfigFetcher : public grpc_server_config_fetcher {
36  public:
37   explicit XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)
38       : xds_client_(std::move(xds_client)) {
39     GPR_ASSERT(xds_client_ != nullptr);
40   }
41
42   void StartWatch(std::string listening_address, grpc_channel_args* args,
43                   std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
44                       watcher) override {
45     grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
46     auto listener_watcher = absl::make_unique<ListenerWatcher>(
47         std::move(watcher), args, xds_client_);
48     auto* listener_watcher_ptr = listener_watcher.get();
49     // TODO(yashykt): Get the resource name id from bootstrap
50     listening_address = absl::StrCat(
51         "grpc/server?xds.resource.listening_address=", listening_address);
52     xds_client_->WatchListenerData(listening_address,
53                                    std::move(listener_watcher));
54     MutexLock lock(&mu_);
55     auto& watcher_state = watchers_[watcher_ptr];
56     watcher_state.listening_address = listening_address;
57     watcher_state.listener_watcher = listener_watcher_ptr;
58   }
59
60   void CancelWatch(
61       grpc_server_config_fetcher::WatcherInterface* watcher) override {
62     MutexLock lock(&mu_);
63     auto it = watchers_.find(watcher);
64     if (it != watchers_.end()) {
65       // Cancel the watch on the listener before erasing
66       xds_client_->CancelListenerDataWatch(it->second.listening_address,
67                                            it->second.listener_watcher,
68                                            false /* delay_unsubscription */);
69       watchers_.erase(it);
70     }
71   }
72
73   // Return the interested parties from the xds client so that it can be polled.
74   grpc_pollset_set* interested_parties() override {
75     return xds_client_->interested_parties();
76   }
77
78  private:
79   class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
80    public:
81     explicit ListenerWatcher(
82         std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
83             server_config_watcher,
84         grpc_channel_args* args, RefCountedPtr<XdsClient> xds_client)
85         : server_config_watcher_(std::move(server_config_watcher)),
86           args_(args),
87           xds_client_(std::move(xds_client)) {}
88
89     ~ListenerWatcher() override { grpc_channel_args_destroy(args_); }
90
91     // Deleted due to special handling required for args_. Copy the channel args
92     // if we ever need these.
93     ListenerWatcher(const ListenerWatcher&) = delete;
94     ListenerWatcher& operator=(const ListenerWatcher&) = delete;
95
96     void OnListenerChanged(XdsApi::LdsUpdate listener) override {
97       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_server_config_fetcher_trace)) {
98         gpr_log(
99             GPR_INFO,
100             "[ListenerWatcher %p] Received LDS update from xds client %p: %s",
101             this, xds_client_.get(), listener.ToString().c_str());
102       }
103       grpc_error* error = GRPC_ERROR_NONE;
104       bool update_needed = UpdateXdsCertificateProvider(listener, &error);
105       if (error != GRPC_ERROR_NONE) {
106         OnError(error);
107         return;
108       }
109       // Only send an update, if something changed.
110       if (updated_once_ && !update_needed) {
111         return;
112       }
113       updated_once_ = true;
114       grpc_channel_args* updated_args = nullptr;
115       if (xds_certificate_provider_ != nullptr) {
116         grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
117         updated_args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
118       } else {
119         updated_args = grpc_channel_args_copy(args_);
120       }
121       server_config_watcher_->UpdateConfig(updated_args);
122     }
123
124     void OnError(grpc_error* error) override {
125       gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this,
126               grpc_error_string(error));
127       GRPC_ERROR_UNREF(error);
128       // TODO(yashykt): We might want to bubble this error to the application.
129     }
130
131     void OnResourceDoesNotExist() override {
132       gpr_log(GPR_ERROR,
133               "ListenerWatcher:%p XdsClient reports requested listener does "
134               "not exist",
135               this);
136       // TODO(yashykt): We might want to bubble this error to the application.
137     }
138
139    private:
140     // Returns true if the xds certificate provider changed in a way that
141     // required a new security connector to be created, false otherwise.
142     bool UpdateXdsCertificateProvider(const XdsApi::LdsUpdate& listener,
143                                       grpc_error** error) {
144       // Early out if channel is not configured to use xDS security.
145       grpc_server_credentials* server_creds =
146           grpc_find_server_credentials_in_args(args_);
147       if (server_creds == nullptr ||
148           server_creds->type() != kCredentialsTypeXds) {
149         xds_certificate_provider_ = nullptr;
150         return false;
151       }
152       if (xds_certificate_provider_ == nullptr) {
153         xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>();
154       }
155       // Configure root cert.
156       absl::string_view root_provider_instance_name =
157           listener.downstream_tls_context.common_tls_context
158               .combined_validation_context
159               .validation_context_certificate_provider_instance.instance_name;
160       absl::string_view root_provider_cert_name =
161           listener.downstream_tls_context.common_tls_context
162               .combined_validation_context
163               .validation_context_certificate_provider_instance
164               .certificate_name;
165       RefCountedPtr<grpc_tls_certificate_provider> new_root_provider;
166       if (!root_provider_instance_name.empty()) {
167         new_root_provider =
168             xds_client_->certificate_provider_store()
169                 .CreateOrGetCertificateProvider(root_provider_instance_name);
170         if (new_root_provider == nullptr) {
171           *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
172               absl::StrCat("Certificate provider instance name: \"",
173                            root_provider_instance_name, "\" not recognized.")
174                   .c_str());
175           return false;
176         }
177       }
178       // Configure identity cert.
179       absl::string_view identity_provider_instance_name =
180           listener.downstream_tls_context.common_tls_context
181               .tls_certificate_certificate_provider_instance.instance_name;
182       absl::string_view identity_provider_cert_name =
183           listener.downstream_tls_context.common_tls_context
184               .tls_certificate_certificate_provider_instance.certificate_name;
185       RefCountedPtr<grpc_tls_certificate_provider> new_identity_provider;
186       if (!identity_provider_instance_name.empty()) {
187         new_identity_provider = xds_client_->certificate_provider_store()
188                                     .CreateOrGetCertificateProvider(
189                                         identity_provider_instance_name);
190         if (new_identity_provider == nullptr) {
191           *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
192               absl::StrCat("Certificate provider instance name: \"",
193                            identity_provider_instance_name,
194                            "\" not recognized.")
195                   .c_str());
196           return false;
197         }
198       }
199       bool security_connector_update_required = false;
200       if (((new_root_provider == nullptr) !=
201            (root_certificate_provider_ == nullptr)) ||
202           ((new_identity_provider == nullptr) !=
203            (identity_certificate_provider_ == nullptr)) ||
204           (listener.downstream_tls_context.require_client_certificate !=
205            xds_certificate_provider_->GetRequireClientCertificate(""))) {
206         security_connector_update_required = true;
207       }
208       if (root_certificate_provider_ != new_root_provider) {
209         root_certificate_provider_ = std::move(new_root_provider);
210       }
211       if (identity_certificate_provider_ != new_identity_provider) {
212         identity_certificate_provider_ = std::move(new_identity_provider);
213       }
214       xds_certificate_provider_->UpdateRootCertNameAndDistributor(
215           "", root_provider_cert_name,
216           root_certificate_provider_ == nullptr
217               ? nullptr
218               : root_certificate_provider_->distributor());
219       xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
220           "", identity_provider_cert_name,
221           identity_certificate_provider_ == nullptr
222               ? nullptr
223               : identity_certificate_provider_->distributor());
224       xds_certificate_provider_->UpdateRequireClientCertificate(
225           "", listener.downstream_tls_context.require_client_certificate);
226       return security_connector_update_required;
227     }
228
229     std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
230         server_config_watcher_;
231     grpc_channel_args* args_;
232     RefCountedPtr<XdsClient> xds_client_;
233     RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
234     RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
235     RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
236     bool updated_once_ = false;
237   };
238
239   struct WatcherState {
240     std::string listening_address;
241     ListenerWatcher* listener_watcher = nullptr;
242   };
243
244   RefCountedPtr<XdsClient> xds_client_;
245   Mutex mu_;
246   std::map<grpc_server_config_fetcher::WatcherInterface*, WatcherState>
247       watchers_;
248 };
249
250 }  // namespace
251 }  // namespace grpc_core
252
253 grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() {
254   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
255   grpc_core::ExecCtx exec_ctx;
256   GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ());
257   grpc_error* error = GRPC_ERROR_NONE;
258   grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client =
259       grpc_core::XdsClient::GetOrCreate(&error);
260   if (error != GRPC_ERROR_NONE) {
261     gpr_log(GPR_ERROR, "Failed to create xds client: %s",
262             grpc_error_string(error));
263     GRPC_ERROR_UNREF(error);
264     return nullptr;
265   }
266   return new grpc_core::XdsServerConfigFetcher(std::move(xds_client));
267 }