10d633c0dd443016cad9863b9fe582dc33f547cf
[platform/upstream/grpc.git] / src / cpp / common / alarm.cc
1 /*
2  * Copyright 2018 gRPC authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 #include <grpcpp/alarm.h>
19
20 #include <memory>
21
22 #include <grpc/support/log.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpcpp/completion_queue.h>
25 #include <grpcpp/impl/grpc_library.h>
26 #include <grpcpp/support/time.h>
27 #include "src/core/lib/iomgr/exec_ctx.h"
28 #include "src/core/lib/iomgr/executor.h"
29 #include "src/core/lib/iomgr/timer.h"
30 #include "src/core/lib/surface/completion_queue.h"
31
32 #include <grpc/support/log.h>
33 #include "src/core/lib/debug/trace.h"
34
35 namespace grpc {
36
37 namespace internal {
38 class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
39  public:
40   AlarmImpl() : cq_(nullptr), tag_(nullptr) {
41     gpr_ref_init(&refs_, 1);
42     grpc_timer_init_unset(&timer_);
43   }
44   ~AlarmImpl() override {}
45   bool FinalizeResult(void** tag, bool* /*status*/) override {
46     *tag = tag_;
47     Unref();
48     return true;
49   }
50   void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
51     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
52     grpc_core::ExecCtx exec_ctx;
53     GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
54     cq_ = cq->cq();
55     tag_ = tag;
56     GPR_ASSERT(grpc_cq_begin_op(cq_, this));
57     GRPC_CLOSURE_INIT(
58         &on_alarm_,
59         [](void* arg, grpc_error_handle error) {
60           // queue the op on the completion queue
61           AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
62           alarm->Ref();
63           // Preserve the cq and reset the cq_ so that the alarm
64           // can be reset when the alarm tag is delivered.
65           grpc_completion_queue* cq = alarm->cq_;
66           alarm->cq_ = nullptr;
67           grpc_cq_end_op(
68               cq, alarm, error,
69               [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg,
70               &alarm->completion_);
71           GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
72         },
73         this, grpc_schedule_on_exec_ctx);
74     grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
75                     &on_alarm_);
76   }
77   void Set(gpr_timespec deadline, std::function<void(bool)> f) {
78     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
79     grpc_core::ExecCtx exec_ctx;
80     // Don't use any CQ at all. Instead just use the timer to fire the function
81     callback_ = std::move(f);
82     Ref();
83     GRPC_CLOSURE_INIT(
84         &on_alarm_,
85         [](void* arg, grpc_error_handle error) {
86           grpc_core::Executor::Run(
87               GRPC_CLOSURE_CREATE(
88                   [](void* arg, grpc_error_handle error) {
89                     AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
90                     alarm->callback_(error == GRPC_ERROR_NONE);
91                     alarm->Unref();
92                   },
93                   arg, nullptr),
94               error);
95         },
96         this, grpc_schedule_on_exec_ctx);
97     grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
98                     &on_alarm_);
99   }
100   void Cancel() {
101     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
102     grpc_core::ExecCtx exec_ctx;
103     grpc_timer_cancel(&timer_);
104   }
105   void Destroy() {
106     Cancel();
107     Unref();
108   }
109
110  private:
111   void Ref() { gpr_ref(&refs_); }
112   void Unref() {
113     if (gpr_unref(&refs_)) {
114       delete this;
115     }
116   }
117
118   grpc_timer timer_;
119   gpr_refcount refs_;
120   grpc_closure on_alarm_;
121   grpc_cq_completion completion_;
122   // completion queue where events about this alarm will be posted
123   grpc_completion_queue* cq_;
124   void* tag_;
125   std::function<void(bool)> callback_;
126 };
127 }  // namespace internal
128
129 static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer;
130
131 Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {
132   g_gli_initializer.summon();
133 }
134
135 void Alarm::SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline,
136                         void* tag) {
137   // Note that we know that alarm_ is actually an internal::AlarmImpl
138   // but we declared it as the base pointer to avoid a forward declaration
139   // or exposing core data structures in the C++ public headers.
140   // Thus it is safe to use a static_cast to the subclass here, and the
141   // C++ style guide allows us to do so in this case
142   static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
143 }
144
145 void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
146   // Note that we know that alarm_ is actually an internal::AlarmImpl
147   // but we declared it as the base pointer to avoid a forward declaration
148   // or exposing core data structures in the C++ public headers.
149   // Thus it is safe to use a static_cast to the subclass here, and the
150   // C++ style guide allows us to do so in this case
151   static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
152 }
153
154 Alarm::~Alarm() {
155   if (alarm_ != nullptr) {
156     static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
157   }
158 }
159
160 void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); }
161 }  // namespace grpc