Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / resolver / dns / c_ares / grpc_ares_ev_driver_libuv.cc
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1 && defined(GRPC_UV)
22
23 #include <ares.h>
24 #include <uv.h>
25
26 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
27
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/time.h>
32 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/iomgr/combiner.h"
35
36 namespace grpc_core {
37
38 void ares_uv_poll_cb(uv_poll_t* handle, int status, int events);
39
40 void ares_uv_poll_close_cb(uv_handle_t* handle) { Delete(handle); }
41
42 class GrpcPolledFdLibuv : public GrpcPolledFd {
43  public:
44   GrpcPolledFdLibuv(ares_socket_t as, grpc_combiner* combiner)
45       : as_(as), combiner_(combiner) {
46     gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, (intptr_t)as);
47     handle_ = New<uv_poll_t>();
48     uv_poll_init_socket(uv_default_loop(), handle_, as);
49     handle_->data = this;
50     GRPC_COMBINER_REF(combiner_, "libuv ares event driver");
51   }
52
53   ~GrpcPolledFdLibuv() {
54     gpr_free(name_);
55     GRPC_COMBINER_UNREF(combiner_, "libuv ares event driver");
56   }
57
58   void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
59     GPR_ASSERT(read_closure_ == nullptr);
60     GPR_ASSERT((poll_events_ & UV_READABLE) == 0);
61     read_closure_ = read_closure;
62     poll_events_ |= UV_READABLE;
63     uv_poll_start(handle_, poll_events_, ares_uv_poll_cb);
64   }
65
66   void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
67     GPR_ASSERT(write_closure_ == nullptr);
68     GPR_ASSERT((poll_events_ & UV_WRITABLE) == 0);
69     write_closure_ = write_closure;
70     poll_events_ |= UV_WRITABLE;
71     uv_poll_start(handle_, poll_events_, ares_uv_poll_cb);
72   }
73
74   bool IsFdStillReadableLocked() override {
75     /* uv_poll_t is based on poll, which is level triggered. So, if cares
76      * leaves some data unread, the event will trigger again. */
77     return false;
78   }
79
80   void ShutdownInternalLocked(grpc_error* error) {
81     uv_poll_stop(handle_);
82     uv_close(reinterpret_cast<uv_handle_t*>(handle_), ares_uv_poll_close_cb);
83     if (read_closure_ != nullptr) {
84       GRPC_CLOSURE_SCHED(read_closure_, GRPC_ERROR_CANCELLED);
85     }
86     if (write_closure_ != nullptr) {
87       GRPC_CLOSURE_SCHED(write_closure_, GRPC_ERROR_CANCELLED);
88     }
89   }
90
91   void ShutdownLocked(grpc_error* error) override {
92     if (grpc_core::ExecCtx::Get() == nullptr) {
93       grpc_core::ExecCtx exec_ctx;
94       ShutdownInternalLocked(error);
95     } else {
96       ShutdownInternalLocked(error);
97     }
98   }
99
100   ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
101
102   const char* GetName() override { return name_; }
103
104   char* name_;
105   ares_socket_t as_;
106   uv_poll_t* handle_;
107   grpc_closure* read_closure_ = nullptr;
108   grpc_closure* write_closure_ = nullptr;
109   int poll_events_ = 0;
110   grpc_combiner* combiner_;
111 };
112
113 struct AresUvPollCbArg {
114   AresUvPollCbArg(uv_poll_t* handle, int status, int events)
115       : handle(handle), status(status), events(events) {}
116
117   uv_poll_t* handle;
118   int status;
119   int events;
120 };
121
122 static void ares_uv_poll_cb_locked(void* arg, grpc_error* error) {
123   grpc_core::UniquePtr<AresUvPollCbArg> arg_struct(
124       reinterpret_cast<AresUvPollCbArg*>(arg));
125   uv_poll_t* handle = arg_struct->handle;
126   int status = arg_struct->status;
127   int events = arg_struct->events;
128   GrpcPolledFdLibuv* polled_fd =
129       reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
130   if (status < 0) {
131     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("cares polling error");
132     error =
133         grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
134                            grpc_slice_from_static_string(uv_strerror(status)));
135   }
136   if (events & UV_READABLE) {
137     GPR_ASSERT(polled_fd->read_closure_ != nullptr);
138     GRPC_CLOSURE_SCHED(polled_fd->read_closure_, error);
139     polled_fd->read_closure_ = nullptr;
140     polled_fd->poll_events_ &= ~UV_READABLE;
141   }
142   if (events & UV_WRITABLE) {
143     GPR_ASSERT(polled_fd->write_closure_ != nullptr);
144     GRPC_CLOSURE_SCHED(polled_fd->write_closure_, error);
145     polled_fd->write_closure_ = nullptr;
146     polled_fd->poll_events_ &= ~UV_WRITABLE;
147   }
148   uv_poll_start(handle, polled_fd->poll_events_, ares_uv_poll_cb);
149 }
150
151 void ares_uv_poll_cb(uv_poll_t* handle, int status, int events) {
152   grpc_core::ExecCtx exec_ctx;
153   GrpcPolledFdLibuv* polled_fd =
154       reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
155   AresUvPollCbArg* arg = New<AresUvPollCbArg>(handle, status, events);
156   GRPC_CLOSURE_SCHED(
157       GRPC_CLOSURE_CREATE(ares_uv_poll_cb_locked, arg,
158                           grpc_combiner_scheduler(polled_fd->combiner_)),
159       GRPC_ERROR_NONE);
160 }
161
162 class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory {
163  public:
164   GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
165                                       grpc_pollset_set* driver_pollset_set,
166                                       grpc_combiner* combiner) override {
167     return New<GrpcPolledFdLibuv>(as, combiner);
168   }
169
170   void ConfigureAresChannelLocked(ares_channel channel) override {}
171 };
172
173 UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
174   return UniquePtr<GrpcPolledFdFactory>(New<GrpcPolledFdFactoryLibuv>());
175 }
176
177 }  // namespace grpc_core
178
179 #endif /* GRPC_ARES == 1 && defined(GRPC_UV) */