Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / cpp / server / server_builder.cc
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <utility>
20
21 #include <grpc/support/cpu.h>
22 #include <grpc/support/log.h>
23 #include <grpcpp/impl/service_type.h>
24 #include <grpcpp/resource_quota.h>
25 #include <grpcpp/server.h>
26 #include <grpcpp/server_builder.h>
27
28 #include "src/core/lib/channel/channel_args.h"
29 #include "src/core/lib/gpr/string.h"
30 #include "src/core/lib/gpr/useful.h"
31 #include "src/cpp/server/external_connection_acceptor_impl.h"
32 #include "src/cpp/server/thread_pool_interface.h"
33
34 namespace grpc {
35
36 static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
37     g_plugin_factory_list;
38 static gpr_once once_init_plugin_list = GPR_ONCE_INIT;
39
40 static void do_plugin_list_init(void) {
41   g_plugin_factory_list =
42       new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
43 }
44
45 ServerBuilder::ServerBuilder()
46     : max_receive_message_size_(INT_MIN),
47       max_send_message_size_(INT_MIN),
48       sync_server_settings_(SyncServerSettings()),
49       resource_quota_(nullptr) {
50   gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
51   for (const auto& value : *g_plugin_factory_list) {
52     plugins_.emplace_back(value());
53   }
54
55   // all compression algorithms enabled by default.
56   enabled_compression_algorithms_bitset_ =
57       (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
58   memset(&maybe_default_compression_level_, 0,
59          sizeof(maybe_default_compression_level_));
60   memset(&maybe_default_compression_algorithm_, 0,
61          sizeof(maybe_default_compression_algorithm_));
62 }
63
64 ServerBuilder::~ServerBuilder() {
65   if (resource_quota_ != nullptr) {
66     grpc_resource_quota_unref(resource_quota_);
67   }
68 }
69
70 std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
71     bool is_frequently_polled) {
72   grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue(
73       GRPC_CQ_NEXT,
74       is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
75       nullptr);
76   cqs_.push_back(cq);
77   return std::unique_ptr<grpc::ServerCompletionQueue>(cq);
78 }
79
80 ServerBuilder& ServerBuilder::RegisterService(Service* service) {
81   services_.emplace_back(new NamedService(service));
82   return *this;
83 }
84
85 ServerBuilder& ServerBuilder::RegisterService(const std::string& host,
86                                               Service* service) {
87   services_.emplace_back(new NamedService(host, service));
88   return *this;
89 }
90
91 ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
92     AsyncGenericService* service) {
93   if (generic_service_ || callback_generic_service_) {
94     gpr_log(GPR_ERROR,
95             "Adding multiple generic services is unsupported for now. "
96             "Dropping the service %p",
97             service);
98   } else {
99     generic_service_ = service;
100   }
101   return *this;
102 }
103
104 ServerBuilder& ServerBuilder::RegisterCallbackGenericService(
105     CallbackGenericService* service) {
106   if (generic_service_ || callback_generic_service_) {
107     gpr_log(GPR_ERROR,
108             "Adding multiple generic services is unsupported for now. "
109             "Dropping the service %p",
110             service);
111   } else {
112     callback_generic_service_ = service;
113   }
114   return *this;
115 }
116
117 ServerBuilder& ServerBuilder::SetContextAllocator(
118     std::unique_ptr<grpc::ContextAllocator> context_allocator) {
119   context_allocator_ = std::move(context_allocator);
120   return *this;
121 }
122
123 std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor>
124 ServerBuilder::experimental_type::AddExternalConnectionAcceptor(
125     experimental_type::ExternalConnectionType type,
126     std::shared_ptr<ServerCredentials> creds) {
127   std::string name_prefix("external:");
128   char count_str[GPR_LTOA_MIN_BUFSIZE];
129   gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str);
130   builder_->acceptors_.emplace_back(
131       std::make_shared<grpc::internal::ExternalConnectionAcceptorImpl>(
132           name_prefix.append(count_str), type, creds));
133   return builder_->acceptors_.back()->GetAcceptor();
134 }
135
136 void ServerBuilder::experimental_type::SetAuthorizationPolicyProvider(
137     std::shared_ptr<experimental::AuthorizationPolicyProviderInterface>
138         provider) {
139   builder_->authorization_provider_ = std::move(provider);
140 }
141
142 ServerBuilder& ServerBuilder::SetOption(
143     std::unique_ptr<ServerBuilderOption> option) {
144   options_.push_back(std::move(option));
145   return *this;
146 }
147
148 ServerBuilder& ServerBuilder::SetSyncServerOption(
149     ServerBuilder::SyncServerOption option, int val) {
150   switch (option) {
151     case NUM_CQS:
152       sync_server_settings_.num_cqs = val;
153       break;
154     case MIN_POLLERS:
155       sync_server_settings_.min_pollers = val;
156       break;
157     case MAX_POLLERS:
158       sync_server_settings_.max_pollers = val;
159       break;
160     case CQ_TIMEOUT_MSEC:
161       sync_server_settings_.cq_timeout_msec = val;
162       break;
163   }
164   return *this;
165 }
166
167 ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
168     grpc_compression_algorithm algorithm, bool enabled) {
169   if (enabled) {
170     GPR_BITSET(&enabled_compression_algorithms_bitset_, algorithm);
171   } else {
172     GPR_BITCLEAR(&enabled_compression_algorithms_bitset_, algorithm);
173   }
174   return *this;
175 }
176
177 ServerBuilder& ServerBuilder::SetDefaultCompressionLevel(
178     grpc_compression_level level) {
179   maybe_default_compression_level_.is_set = true;
180   maybe_default_compression_level_.level = level;
181   return *this;
182 }
183
184 ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
185     grpc_compression_algorithm algorithm) {
186   maybe_default_compression_algorithm_.is_set = true;
187   maybe_default_compression_algorithm_.algorithm = algorithm;
188   return *this;
189 }
190
191 ServerBuilder& ServerBuilder::SetResourceQuota(
192     const grpc::ResourceQuota& resource_quota) {
193   if (resource_quota_ != nullptr) {
194     grpc_resource_quota_unref(resource_quota_);
195   }
196   resource_quota_ = resource_quota.c_resource_quota();
197   grpc_resource_quota_ref(resource_quota_);
198   return *this;
199 }
200
201 ServerBuilder& ServerBuilder::AddListeningPort(
202     const std::string& addr_uri, std::shared_ptr<ServerCredentials> creds,
203     int* selected_port) {
204   const std::string uri_scheme = "dns:";
205   std::string addr = addr_uri;
206   if (addr_uri.compare(0, uri_scheme.size(), uri_scheme) == 0) {
207     size_t pos = uri_scheme.size();
208     while (addr_uri[pos] == '/') ++pos;  // Skip slashes.
209     addr = addr_uri.substr(pos);
210   }
211   Port port = {addr, std::move(creds), selected_port};
212   ports_.push_back(port);
213   return *this;
214 }
215
216 ChannelArguments ServerBuilder::BuildChannelArgs() {
217   ChannelArguments args;
218   if (max_receive_message_size_ >= -1) {
219     args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
220   }
221   if (max_send_message_size_ >= -1) {
222     args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
223   }
224   for (const auto& option : options_) {
225     option->UpdateArguments(&args);
226     option->UpdatePlugins(&plugins_);
227   }
228   args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
229               enabled_compression_algorithms_bitset_);
230   if (maybe_default_compression_level_.is_set) {
231     args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL,
232                 maybe_default_compression_level_.level);
233   }
234   if (maybe_default_compression_algorithm_.is_set) {
235     args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
236                 maybe_default_compression_algorithm_.algorithm);
237   }
238   if (resource_quota_ != nullptr) {
239     args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
240                               grpc_resource_quota_arg_vtable());
241   }
242   for (const auto& plugin : plugins_) {
243     plugin->UpdateServerBuilder(this);
244     plugin->UpdateChannelArguments(&args);
245   }
246   if (authorization_provider_ != nullptr) {
247     args.SetPointerWithVtable(GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER,
248                               authorization_provider_->c_provider(),
249                               grpc_authorization_policy_provider_arg_vtable());
250   }
251   return args;
252 }
253
254 std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
255   ChannelArguments args = BuildChannelArgs();
256
257   // == Determine if the server has any syncrhonous methods ==
258   bool has_sync_methods = false;
259   for (const auto& value : services_) {
260     if (value->service->has_synchronous_methods()) {
261       has_sync_methods = true;
262       break;
263     }
264   }
265
266   if (!has_sync_methods) {
267     for (const auto& value : plugins_) {
268       if (value->has_sync_methods()) {
269         has_sync_methods = true;
270         break;
271       }
272     }
273   }
274
275   // If this is a Sync server, i.e a server expositing sync API, then the server
276   // needs to create some completion queues to listen for incoming requests.
277   // 'sync_server_cqs' are those internal completion queues.
278   //
279   // This is different from the completion queues added to the server via
280   // ServerBuilder's AddCompletionQueue() method (those completion queues
281   // are in 'cqs_' member variable of ServerBuilder object)
282   std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
283       sync_server_cqs(
284           std::make_shared<
285               std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>());
286
287   bool has_frequently_polled_cqs = false;
288   for (const auto& cq : cqs_) {
289     if (cq->IsFrequentlyPolled()) {
290       has_frequently_polled_cqs = true;
291       break;
292     }
293   }
294
295   // == Determine if the server has any callback methods ==
296   bool has_callback_methods = false;
297   for (const auto& service : services_) {
298     if (service->service->has_callback_methods()) {
299       has_callback_methods = true;
300       has_frequently_polled_cqs = true;
301       break;
302     }
303   }
304
305   if (callback_generic_service_ != nullptr) {
306     has_frequently_polled_cqs = true;
307   }
308
309   const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs;
310
311   if (has_sync_methods) {
312     grpc_cq_polling_type polling_type =
313         is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
314
315     // Create completion queues to listen to incoming rpc requests
316     for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
317       sync_server_cqs->emplace_back(
318           new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr));
319     }
320   }
321
322   // TODO(vjpai): Add a section here for plugins once they can support callback
323   // methods
324
325   if (has_sync_methods) {
326     // This is a Sync server
327     gpr_log(GPR_INFO,
328             "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
329             "%d, CQ timeout (msec): %d",
330             sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
331             sync_server_settings_.max_pollers,
332             sync_server_settings_.cq_timeout_msec);
333   }
334
335   if (has_callback_methods) {
336     gpr_log(GPR_INFO, "Callback server.");
337   }
338
339   std::unique_ptr<grpc::Server> server(new grpc::Server(
340       &args, sync_server_cqs, sync_server_settings_.min_pollers,
341       sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
342       std::move(acceptors_), server_config_fetcher_, resource_quota_,
343       std::move(interceptor_creators_)));
344
345   ServerInitializer* initializer = server->initializer();
346
347   // Register all the completion queues with the server. i.e
348   //  1. sync_server_cqs: internal completion queues created IF this is a sync
349   //     server
350   //  2. cqs_: Completion queues added via AddCompletionQueue() call
351
352   for (const auto& cq : *sync_server_cqs) {
353     grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
354     has_frequently_polled_cqs = true;
355   }
356
357   if (has_callback_methods || callback_generic_service_ != nullptr) {
358     auto* cq = server->CallbackCQ();
359     grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
360   }
361
362   // cqs_ contains the completion queue added by calling the ServerBuilder's
363   // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
364   // calling Next() or AsyncNext()) and hence are not safe to be used for
365   // listening to incoming channels. Such completion queues must be registered
366   // as non-listening queues. In debug mode, these should have their server list
367   // tracked since these are provided the user and must be Shutdown by the user
368   // after the server is shutdown.
369   for (const auto& cq : cqs_) {
370     grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
371     cq->RegisterServer(server.get());
372   }
373
374   if (!has_frequently_polled_cqs) {
375     gpr_log(GPR_ERROR,
376             "At least one of the completion queues must be frequently polled");
377     return nullptr;
378   }
379
380   server->RegisterContextAllocator(std::move(context_allocator_));
381
382   for (const auto& value : services_) {
383     if (!server->RegisterService(value->host.get(), value->service)) {
384       return nullptr;
385     }
386   }
387
388   for (const auto& value : plugins_) {
389     value->InitServer(initializer);
390   }
391
392   if (generic_service_) {
393     server->RegisterAsyncGenericService(generic_service_);
394   } else if (callback_generic_service_) {
395     server->RegisterCallbackGenericService(callback_generic_service_);
396   } else {
397     for (const auto& value : services_) {
398       if (value->service->has_generic_methods()) {
399         gpr_log(GPR_ERROR,
400                 "Some methods were marked generic but there is no "
401                 "generic service registered.");
402         return nullptr;
403       }
404     }
405   }
406
407   bool added_port = false;
408   for (auto& port : ports_) {
409     int r = server->AddListeningPort(port.addr, port.creds.get());
410     if (!r) {
411       if (added_port) server->Shutdown();
412       return nullptr;
413     }
414     added_port = true;
415     if (port.selected_port != nullptr) {
416       *port.selected_port = r;
417     }
418   }
419
420   auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
421   server->Start(cqs_data, cqs_.size());
422
423   for (const auto& value : plugins_) {
424     value->Finish(initializer);
425   }
426
427   return server;
428 }
429
430 void ServerBuilder::InternalAddPluginFactory(
431     std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
432   gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
433   (*g_plugin_factory_list).push_back(CreatePlugin);
434 }
435
436 ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) {
437   switch (id) {
438     case GRPC_WORKAROUND_ID_CRONET_COMPRESSION:
439       return AddChannelArgument(GRPC_ARG_WORKAROUND_CRONET_COMPRESSION, 1);
440     default:
441       gpr_log(GPR_ERROR, "Workaround %u does not exist or is obsolete.", id);
442       return *this;
443   }
444 }
445
446 }  // namespace grpc