Imported Upstream version 1.39.0
[platform/upstream/grpc.git] / src / core / lib / iomgr / event_engine / endpoint.cc
1 // Copyright 2021 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GRPC_USE_EVENT_ENGINE
17 #include "src/core/lib/iomgr/event_engine/endpoint.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/slice.h>
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/time.h>
23 #include "absl/strings/string_view.h"
24
25 #include "src/core/lib/address_utils/sockaddr_utils.h"
26 #include "src/core/lib/channel/channel_args.h"
27 #include "src/core/lib/iomgr/endpoint.h"
28 #include "src/core/lib/iomgr/error.h"
29 #include "src/core/lib/iomgr/event_engine/closure.h"
30 #include "src/core/lib/iomgr/event_engine/pollset.h"
31 #include "src/core/lib/iomgr/pollset.h"
32 #include "src/core/lib/iomgr/pollset_set.h"
33 #include "src/core/lib/iomgr/resource_quota.h"
34 #include "src/core/lib/transport/error_utils.h"
35
36 extern grpc_core::TraceFlag grpc_tcp_trace;
37
38 namespace {
39
40 using ::grpc_event_engine::experimental::EventEngine;
41 using ::grpc_event_engine::experimental::ResolvedAddressToURI;
42 using ::grpc_event_engine::experimental::SliceBuffer;
43
44 void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
45                    grpc_closure* cb, bool /* urgent */) {
46   auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
47   if (eeep->endpoint == nullptr) {
48     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_CANCELLED);
49     return;
50   }
51   SliceBuffer* read_buffer = new (&eeep->read_buffer) SliceBuffer(slices);
52   eeep->endpoint->Read(
53       [eeep, cb](absl::Status status) {
54         auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep->read_buffer);
55         read_buffer->~SliceBuffer();
56         grpc_core::ExecCtx exec_ctx;
57         grpc_core::Closure::Run(DEBUG_LOCATION, cb,
58                                 absl_status_to_grpc_error(status));
59         exec_ctx.Flush();
60         grpc_pollset_ee_broadcast_event();
61       },
62       read_buffer, absl::InfiniteFuture());
63 }
64
65 void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
66                     grpc_closure* cb, void* arg) {
67   // TODO(hork): adapt arg to some metrics collection mechanism.
68   (void)arg;
69   auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
70   if (eeep->endpoint == nullptr) {
71     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_CANCELLED);
72     return;
73   }
74   SliceBuffer* write_buffer = new (&eeep->write_buffer) SliceBuffer(slices);
75   eeep->endpoint->Write(
76       [eeep, cb](absl::Status status) {
77         auto* write_buffer =
78             reinterpret_cast<SliceBuffer*>(&eeep->write_buffer);
79         write_buffer->~SliceBuffer();
80         grpc_core::ExecCtx exec_ctx;
81         grpc_core::Closure::Run(DEBUG_LOCATION, cb,
82                                 absl_status_to_grpc_error(status));
83         exec_ctx.Flush();
84         grpc_pollset_ee_broadcast_event();
85       },
86       write_buffer, absl::InfiniteFuture());
87 }
88 void endpoint_add_to_pollset(grpc_endpoint* /* ep */,
89                              grpc_pollset* /* pollset */) {}
90 void endpoint_add_to_pollset_set(grpc_endpoint* /* ep */,
91                                  grpc_pollset_set* /* pollset */) {}
92 void endpoint_delete_from_pollset_set(grpc_endpoint* /* ep */,
93                                       grpc_pollset_set* /* pollset */) {}
94 /// After shutdown, all endpoint operations except destroy are no-op,
95 /// and will return some kind of sane default (empty strings, nullptrs, etc). It
96 /// is the caller's responsibility to ensure that calls to endpoint_shutdown are
97 /// synchronized.
98 void endpoint_shutdown(grpc_endpoint* ep, grpc_error* why) {
99   auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
100   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
101     const char* str = grpc_error_string(why);
102     gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->endpoint.get(),
103             str);
104   }
105   grpc_resource_user_shutdown(eeep->ru);
106   eeep->endpoint.reset();
107 }
108
109 void endpoint_destroy(grpc_endpoint* ep) {
110   auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
111   grpc_resource_user_unref(eeep->ru);
112   delete eeep;
113 }
114
115 grpc_resource_user* endpoint_get_resource_user(grpc_endpoint* ep) {
116   auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
117   return eeep->ru;
118 }
119
120 absl::string_view endpoint_get_peer(grpc_endpoint* ep) {
121   auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
122   if (eeep->endpoint == nullptr) {
123     return "";
124   }
125   if (eeep->peer_address.empty()) {
126     const EventEngine::ResolvedAddress* addr = eeep->endpoint->GetPeerAddress();
127     GPR_ASSERT(addr != nullptr);
128     eeep->peer_address = ResolvedAddressToURI(*addr);
129   }
130   return eeep->peer_address;
131 }
132
133 absl::string_view endpoint_get_local_address(grpc_endpoint* ep) {
134   auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
135   if (eeep->endpoint == nullptr) {
136     return "";
137   }
138   if (eeep->local_address.empty()) {
139     const EventEngine::ResolvedAddress* addr =
140         eeep->endpoint->GetLocalAddress();
141     GPR_ASSERT(addr != nullptr);
142     eeep->local_address = ResolvedAddressToURI(*addr);
143   }
144   return eeep->local_address;
145 }
146
147 int endpoint_get_fd(grpc_endpoint* /* ep */) { return -1; }
148
149 bool endpoint_can_track_err(grpc_endpoint* /* ep */) { return false; }
150
151 grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
152     endpoint_read,
153     endpoint_write,
154     endpoint_add_to_pollset,
155     endpoint_add_to_pollset_set,
156     endpoint_delete_from_pollset_set,
157     endpoint_shutdown,
158     endpoint_destroy,
159     endpoint_get_resource_user,
160     endpoint_get_peer,
161     endpoint_get_local_address,
162     endpoint_get_fd,
163     endpoint_can_track_err};
164
165 }  // namespace
166
167 grpc_event_engine_endpoint* grpc_tcp_server_endpoint_create(
168     std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
169   auto endpoint = new grpc_event_engine_endpoint;
170   endpoint->base.vtable = &grpc_event_engine_endpoint_vtable;
171   // TODO(hork): populate endpoint->ru from the uvEngine's subclass
172   endpoint->endpoint = std::move(ee_endpoint);
173   return endpoint;
174 }
175
176 grpc_endpoint* grpc_tcp_create(const grpc_channel_args* channel_args,
177                                absl::string_view peer_address) {
178   auto endpoint = new grpc_event_engine_endpoint;
179   endpoint->base.vtable = &grpc_event_engine_endpoint_vtable;
180   grpc_resource_quota* resource_quota =
181       grpc_channel_args_find_pointer<grpc_resource_quota>(
182           channel_args, GRPC_ARG_RESOURCE_QUOTA);
183   if (resource_quota != nullptr) {
184     grpc_resource_quota_ref_internal(resource_quota);
185   } else {
186     resource_quota = grpc_resource_quota_create(nullptr);
187   }
188   endpoint->ru = grpc_resource_user_create(resource_quota,
189                                            std::string(peer_address).c_str());
190   grpc_resource_quota_unref_internal(resource_quota);
191   return &endpoint->base;
192 }
193
194 #endif  // GRPC_USE_EVENT_ENGINE