2 * Copyright 2015 gRPC authors.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <grpc/grpc.h>
21 #include <grpc/support/cpu.h>
22 #include <grpc/support/log.h>
23 #include <grpcpp/completion_queue.h>
24 #include <grpcpp/impl/grpc_library.h>
25 #include <grpcpp/support/time.h>
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/gprpp/thd.h"
35 internal::GrpcLibraryInitializer g_gli_initializer;
37 gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
38 grpc_core::Mutex* g_callback_alternative_mu;
40 // Implement a ref-counted callback CQ for global use in the alternative
41 // implementation so that its threads are only created once. Do this using
42 // explicit ref-counts and raw pointers rather than a shared-ptr since that
43 // has a non-trivial destructor and thus can't be used for global variables.
44 struct CallbackAlternativeCQ {
45 int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
46 CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu);
47 std::vector<grpc_core::Thread>* nexting_threads
48 ABSL_GUARDED_BY(g_callback_alternative_mu);
50 CompletionQueue* Ref() {
51 grpc_core::MutexLock lock(&*g_callback_alternative_mu);
54 cq = new CompletionQueue;
55 int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores() / 2, 2, 16);
56 nexting_threads = new std::vector<grpc_core::Thread>;
57 for (int i = 0; i < num_nexting_threads; i++) {
58 nexting_threads->emplace_back(
61 grpc_completion_queue* cq =
62 static_cast<CompletionQueue*>(arg)->cq();
64 // Use the raw Core next function rather than the C++ Next since
65 // Next incorporates FinalizeResult and we actually want that
66 // called from the callback functor itself.
67 // TODO(vjpai): Migrate below to next without a timeout or idle
68 // phase. That's currently starving out some other polling,
70 auto ev = grpc_completion_queue_next(
72 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
73 gpr_time_from_millis(1000, GPR_TIMESPAN)),
75 if (ev.type == GRPC_QUEUE_SHUTDOWN) {
78 if (ev.type == GRPC_QUEUE_TIMEOUT) {
80 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
81 gpr_time_from_millis(100, GPR_TIMESPAN)));
84 GPR_DEBUG_ASSERT(ev.type == GRPC_OP_COMPLETE);
85 // We can always execute the callback inline rather than
86 // pushing it to another Executor thread because this
87 // thread is definitely running on a background thread, does not
88 // hold any application locks before executing the callback,
89 // and cannot be entered recursively.
91 static_cast<grpc_completion_queue_functor*>(ev.tag);
92 functor->functor_run(functor, ev.success);
97 for (auto& th : *nexting_threads) {
105 grpc_core::MutexLock lock(g_callback_alternative_mu);
109 for (auto& th : *nexting_threads) {
112 delete nexting_threads;
118 CallbackAlternativeCQ g_callback_alternative_cq;
122 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
123 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
124 // a 'grpc_completion_queue' instance (which is being passed as the input to
125 // this constructor), one must have already called grpc_init().
126 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
127 : GrpcLibraryCodegen(false), cq_(take) {
128 InitialAvalanching();
131 void CompletionQueue::Shutdown() {
132 g_gli_initializer.summon();
134 if (!ServerListEmpty()) {
136 "CompletionQueue shutdown being shutdown before its server.");
139 CompleteAvalanching();
142 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
143 void** tag, bool* ok, gpr_timespec deadline) {
145 auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
147 case GRPC_QUEUE_TIMEOUT:
149 case GRPC_QUEUE_SHUTDOWN:
151 case GRPC_OP_COMPLETE:
153 static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag);
154 *ok = ev.success != 0;
156 if (core_cq_tag->FinalizeResult(tag, ok)) {
164 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
166 : cq_(cq), flushed_(false) {
167 grpc_completion_queue_thread_local_cache_init(cq_->cq_);
170 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
171 GPR_ASSERT(flushed_);
174 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
178 if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
181 static_cast<::grpc::internal::CompletionQueueTag*>(res_tag);
183 if (core_cq_tag->FinalizeResult(tag, ok)) {
190 CompletionQueue* CompletionQueue::CallbackAlternativeCQ() {
191 gpr_once_init(&g_once_init_callback_alternative,
192 [] { g_callback_alternative_mu = new grpc_core::Mutex(); });
193 return g_callback_alternative_cq.Ref();
196 void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq)
197 ABSL_NO_THREAD_SAFETY_ANALYSIS {
199 // This accesses g_callback_alternative_cq without acquiring the mutex
200 // but it's considered safe because it just reads the pointer address.
201 GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq);
202 g_callback_alternative_cq.Unref();