3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
25 #include "test/core/util/passthru_endpoint.h"
32 #include "absl/strings/str_format.h"
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/string_util.h>
36 #include "src/core/lib/iomgr/sockaddr.h"
38 #include "src/core/lib/slice/slice_internal.h"
40 typedef struct passthru_endpoint passthru_endpoint;
44 passthru_endpoint* parent;
45 grpc_slice_buffer read_buffer;
46 grpc_slice_buffer* on_read_out;
47 grpc_closure* on_read;
48 grpc_resource_user* resource_user;
51 struct passthru_endpoint {
54 grpc_passthru_endpoint_stats* stats;
60 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
61 grpc_closure* cb, bool /*urgent*/) {
62 half* m = reinterpret_cast<half*>(ep);
63 gpr_mu_lock(&m->parent->mu);
64 if (m->parent->shutdown) {
65 grpc_core::ExecCtx::Run(
67 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
68 } else if (m->read_buffer.count > 0) {
69 grpc_slice_buffer_swap(&m->read_buffer, slices);
70 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
73 m->on_read_out = slices;
75 gpr_mu_unlock(&m->parent->mu);
78 static half* other_half(half* h) {
79 if (h == &h->parent->client) return &h->parent->server;
80 return &h->parent->client;
83 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
84 grpc_closure* cb, void* /*arg*/) {
85 half* m = other_half(reinterpret_cast<half*>(ep));
86 gpr_mu_lock(&m->parent->mu);
87 grpc_error* error = GRPC_ERROR_NONE;
88 gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
89 if (m->parent->shutdown) {
90 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
91 } else if (m->on_read != nullptr) {
92 for (size_t i = 0; i < slices->count; i++) {
93 grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
95 grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
98 for (size_t i = 0; i < slices->count; i++) {
99 grpc_slice_buffer_add(&m->read_buffer,
100 grpc_slice_copy(slices->slices[i]));
103 gpr_mu_unlock(&m->parent->mu);
104 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
107 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
108 grpc_pollset* /*pollset*/) {}
110 static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
111 grpc_pollset_set* /*pollset*/) {}
113 static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
114 grpc_pollset_set* /*pollset*/) {}
116 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
117 half* m = reinterpret_cast<half*>(ep);
118 gpr_mu_lock(&m->parent->mu);
119 m->parent->shutdown = true;
121 grpc_core::ExecCtx::Run(
122 DEBUG_LOCATION, m->on_read,
123 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
124 m->on_read = nullptr;
128 grpc_core::ExecCtx::Run(
129 DEBUG_LOCATION, m->on_read,
130 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
131 m->on_read = nullptr;
133 gpr_mu_unlock(&m->parent->mu);
134 grpc_resource_user_shutdown(m->resource_user);
135 GRPC_ERROR_UNREF(why);
138 static void me_destroy(grpc_endpoint* ep) {
139 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
141 if (0 == --p->halves) {
142 gpr_mu_unlock(&p->mu);
143 gpr_mu_destroy(&p->mu);
144 grpc_passthru_endpoint_stats_destroy(p->stats);
145 grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
146 grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
147 grpc_resource_user_unref(p->client.resource_user);
148 grpc_resource_user_unref(p->server.resource_user);
151 gpr_mu_unlock(&p->mu);
155 static absl::string_view me_get_peer(grpc_endpoint* ep) {
156 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
157 return (reinterpret_cast<half*>(ep)) == &p->client
158 ? "fake:mock_client_endpoint"
159 : "fake:mock_server_endpoint";
162 static absl::string_view me_get_local_address(grpc_endpoint* ep) {
163 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
164 return (reinterpret_cast<half*>(ep)) == &p->client
165 ? "fake:mock_client_endpoint"
166 : "fake:mock_server_endpoint";
169 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
171 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
173 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
174 half* m = reinterpret_cast<half*>(ep);
175 return m->resource_user;
178 static const grpc_endpoint_vtable vtable = {
182 me_add_to_pollset_set,
183 me_delete_from_pollset_set,
186 me_get_resource_user,
188 me_get_local_address,
193 static void half_init(half* m, passthru_endpoint* parent,
194 grpc_resource_quota* resource_quota,
195 const char* half_name) {
196 m->base.vtable = &vtable;
198 grpc_slice_buffer_init(&m->read_buffer);
199 m->on_read = nullptr;
200 std::string name = absl::StrFormat("passthru_endpoint_%s_%" PRIxPTR,
201 half_name, (intptr_t)parent);
202 m->resource_user = grpc_resource_user_create(resource_quota, name.c_str());
205 void grpc_passthru_endpoint_create(grpc_endpoint** client,
206 grpc_endpoint** server,
207 grpc_resource_quota* resource_quota,
208 grpc_passthru_endpoint_stats* stats) {
209 passthru_endpoint* m =
210 static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
213 if (stats == nullptr) {
214 m->stats = grpc_passthru_endpoint_stats_create();
216 gpr_ref(&stats->refs);
219 half_init(&m->client, m, resource_quota, "client");
220 half_init(&m->server, m, resource_quota, "server");
222 *client = &m->client.base;
223 *server = &m->server.base;
226 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
227 grpc_passthru_endpoint_stats* stats =
228 static_cast<grpc_passthru_endpoint_stats*>(
229 gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
230 memset(stats, 0, sizeof(*stats));
231 gpr_ref_init(&stats->refs, 1);
235 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
236 if (gpr_unref(&stats->refs)) {