Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / cpp / common / completion_queue_cc.cc
1 /*
2  * Copyright 2015 gRPC authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 #include <memory>
19
20 #include <grpc/grpc.h>
21 #include <grpc/support/cpu.h>
22 #include <grpc/support/log.h>
23 #include <grpcpp/completion_queue.h>
24 #include <grpcpp/impl/grpc_library.h>
25 #include <grpcpp/support/time.h>
26
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/gprpp/thd.h"
31
32 namespace grpc {
33 namespace {
34
35 internal::GrpcLibraryInitializer g_gli_initializer;
36
37 gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
38 grpc_core::Mutex* g_callback_alternative_mu;
39
40 // Implement a ref-counted callback CQ for global use in the alternative
41 // implementation so that its threads are only created once. Do this using
42 // explicit ref-counts and raw pointers rather than a shared-ptr since that
43 // has a non-trivial destructor and thus can't be used for global variables.
44 struct CallbackAlternativeCQ {
45   int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
46   CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu);
47   std::vector<grpc_core::Thread>* nexting_threads
48       ABSL_GUARDED_BY(g_callback_alternative_mu);
49
50   CompletionQueue* Ref() {
51     grpc_core::MutexLock lock(&*g_callback_alternative_mu);
52     refs++;
53     if (refs == 1) {
54       cq = new CompletionQueue;
55       int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores() / 2, 2, 16);
56       nexting_threads = new std::vector<grpc_core::Thread>;
57       for (int i = 0; i < num_nexting_threads; i++) {
58         nexting_threads->emplace_back(
59             "nexting_thread",
60             [](void* arg) {
61               grpc_completion_queue* cq =
62                   static_cast<CompletionQueue*>(arg)->cq();
63               while (true) {
64                 // Use the raw Core next function rather than the C++ Next since
65                 // Next incorporates FinalizeResult and we actually want that
66                 // called from the callback functor itself.
67                 // TODO(vjpai): Migrate below to next without a timeout or idle
68                 // phase. That's currently starving out some other polling,
69                 // though.
70                 auto ev = grpc_completion_queue_next(
71                     cq,
72                     gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
73                                  gpr_time_from_millis(1000, GPR_TIMESPAN)),
74                     nullptr);
75                 if (ev.type == GRPC_QUEUE_SHUTDOWN) {
76                   return;
77                 }
78                 if (ev.type == GRPC_QUEUE_TIMEOUT) {
79                   gpr_sleep_until(
80                       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
81                                    gpr_time_from_millis(100, GPR_TIMESPAN)));
82                   continue;
83                 }
84                 GPR_DEBUG_ASSERT(ev.type == GRPC_OP_COMPLETE);
85                 // We can always execute the callback inline rather than
86                 // pushing it to another Executor thread because this
87                 // thread is definitely running on a background thread, does not
88                 // hold any application locks before executing the callback,
89                 // and cannot be entered recursively.
90                 auto* functor =
91                     static_cast<grpc_completion_queue_functor*>(ev.tag);
92                 functor->functor_run(functor, ev.success);
93               }
94             },
95             cq);
96       }
97       for (auto& th : *nexting_threads) {
98         th.Start();
99       }
100     }
101     return cq;
102   }
103
104   void Unref() {
105     grpc_core::MutexLock lock(g_callback_alternative_mu);
106     refs--;
107     if (refs == 0) {
108       cq->Shutdown();
109       for (auto& th : *nexting_threads) {
110         th.Join();
111       }
112       delete nexting_threads;
113       delete cq;
114     }
115   }
116 };
117
118 CallbackAlternativeCQ g_callback_alternative_cq;
119
120 }  // namespace
121
122 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
123 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
124 // a 'grpc_completion_queue' instance (which is being passed as the input to
125 // this constructor), one must have already called grpc_init().
126 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
127     : GrpcLibraryCodegen(false), cq_(take) {
128   InitialAvalanching();
129 }
130
131 void CompletionQueue::Shutdown() {
132   g_gli_initializer.summon();
133 #ifndef NDEBUG
134   if (!ServerListEmpty()) {
135     gpr_log(GPR_ERROR,
136             "CompletionQueue shutdown being shutdown before its server.");
137   }
138 #endif
139   CompleteAvalanching();
140 }
141
142 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
143     void** tag, bool* ok, gpr_timespec deadline) {
144   for (;;) {
145     auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
146     switch (ev.type) {
147       case GRPC_QUEUE_TIMEOUT:
148         return TIMEOUT;
149       case GRPC_QUEUE_SHUTDOWN:
150         return SHUTDOWN;
151       case GRPC_OP_COMPLETE:
152         auto core_cq_tag =
153             static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag);
154         *ok = ev.success != 0;
155         *tag = core_cq_tag;
156         if (core_cq_tag->FinalizeResult(tag, ok)) {
157           return GOT_EVENT;
158         }
159         break;
160     }
161   }
162 }
163
164 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
165     CompletionQueue* cq)
166     : cq_(cq), flushed_(false) {
167   grpc_completion_queue_thread_local_cache_init(cq_->cq_);
168 }
169
170 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
171   GPR_ASSERT(flushed_);
172 }
173
174 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
175   int res = 0;
176   void* res_tag;
177   flushed_ = true;
178   if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
179                                                      &res)) {
180     auto core_cq_tag =
181         static_cast<::grpc::internal::CompletionQueueTag*>(res_tag);
182     *ok = res == 1;
183     if (core_cq_tag->FinalizeResult(tag, ok)) {
184       return true;
185     }
186   }
187   return false;
188 }
189
190 CompletionQueue* CompletionQueue::CallbackAlternativeCQ() {
191   gpr_once_init(&g_once_init_callback_alternative,
192                 [] { g_callback_alternative_mu = new grpc_core::Mutex(); });
193   return g_callback_alternative_cq.Ref();
194 }
195
196 void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq)
197     ABSL_NO_THREAD_SAFETY_ANALYSIS {
198   (void)cq;
199   // This accesses g_callback_alternative_cq without acquiring the mutex
200   // but it's considered safe because it just reads the pointer address.
201   GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq);
202   g_callback_alternative_cq.Unref();
203 }
204
205 }  // namespace grpc