3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
25 #include "test/core/util/passthru_endpoint.h"
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/string_util.h>
32 #include "src/core/lib/iomgr/sockaddr.h"
34 #include "src/core/lib/slice/slice_internal.h"
36 typedef struct passthru_endpoint passthru_endpoint;
40 passthru_endpoint* parent;
41 grpc_slice_buffer read_buffer;
42 grpc_slice_buffer* on_read_out;
43 grpc_closure* on_read;
44 grpc_resource_user* resource_user;
47 struct passthru_endpoint {
50 grpc_passthru_endpoint_stats* stats;
56 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
57 grpc_closure* cb, bool /*urgent*/) {
58 half* m = reinterpret_cast<half*>(ep);
59 gpr_mu_lock(&m->parent->mu);
60 if (m->parent->shutdown) {
61 grpc_core::ExecCtx::Run(
63 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
64 } else if (m->read_buffer.count > 0) {
65 grpc_slice_buffer_swap(&m->read_buffer, slices);
66 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
69 m->on_read_out = slices;
71 gpr_mu_unlock(&m->parent->mu);
74 static half* other_half(half* h) {
75 if (h == &h->parent->client) return &h->parent->server;
76 return &h->parent->client;
79 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
80 grpc_closure* cb, void* /*arg*/) {
81 half* m = other_half(reinterpret_cast<half*>(ep));
82 gpr_mu_lock(&m->parent->mu);
83 grpc_error* error = GRPC_ERROR_NONE;
84 gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
85 if (m->parent->shutdown) {
86 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
87 } else if (m->on_read != nullptr) {
88 for (size_t i = 0; i < slices->count; i++) {
89 grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
91 grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
94 for (size_t i = 0; i < slices->count; i++) {
95 grpc_slice_buffer_add(&m->read_buffer,
96 grpc_slice_copy(slices->slices[i]));
99 gpr_mu_unlock(&m->parent->mu);
100 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
103 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
104 grpc_pollset* /*pollset*/) {}
106 static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
107 grpc_pollset_set* /*pollset*/) {}
109 static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
110 grpc_pollset_set* /*pollset*/) {}
112 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
113 half* m = reinterpret_cast<half*>(ep);
114 gpr_mu_lock(&m->parent->mu);
115 m->parent->shutdown = true;
117 grpc_core::ExecCtx::Run(
118 DEBUG_LOCATION, m->on_read,
119 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
120 m->on_read = nullptr;
124 grpc_core::ExecCtx::Run(
125 DEBUG_LOCATION, m->on_read,
126 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
127 m->on_read = nullptr;
129 gpr_mu_unlock(&m->parent->mu);
130 grpc_resource_user_shutdown(m->resource_user);
131 GRPC_ERROR_UNREF(why);
134 static void me_destroy(grpc_endpoint* ep) {
135 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
137 if (0 == --p->halves) {
138 gpr_mu_unlock(&p->mu);
139 gpr_mu_destroy(&p->mu);
140 grpc_passthru_endpoint_stats_destroy(p->stats);
141 grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
142 grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
143 grpc_resource_user_unref(p->client.resource_user);
144 grpc_resource_user_unref(p->server.resource_user);
147 gpr_mu_unlock(&p->mu);
151 static char* me_get_peer(grpc_endpoint* ep) {
152 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
153 return (reinterpret_cast<half*>(ep)) == &p->client
154 ? gpr_strdup("fake:mock_client_endpoint")
155 : gpr_strdup("fake:mock_server_endpoint");
158 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
160 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
162 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
163 half* m = reinterpret_cast<half*>(ep);
164 return m->resource_user;
167 static const grpc_endpoint_vtable vtable = {
171 me_add_to_pollset_set,
172 me_delete_from_pollset_set,
175 me_get_resource_user,
181 static void half_init(half* m, passthru_endpoint* parent,
182 grpc_resource_quota* resource_quota,
183 const char* half_name) {
184 m->base.vtable = &vtable;
186 grpc_slice_buffer_init(&m->read_buffer);
187 m->on_read = nullptr;
189 gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
191 m->resource_user = grpc_resource_user_create(resource_quota, name);
195 void grpc_passthru_endpoint_create(grpc_endpoint** client,
196 grpc_endpoint** server,
197 grpc_resource_quota* resource_quota,
198 grpc_passthru_endpoint_stats* stats) {
199 passthru_endpoint* m =
200 static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
203 if (stats == nullptr) {
204 m->stats = grpc_passthru_endpoint_stats_create();
206 gpr_ref(&stats->refs);
209 half_init(&m->client, m, resource_quota, "client");
210 half_init(&m->server, m, resource_quota, "server");
212 *client = &m->client.base;
213 *server = &m->server.base;
216 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
217 grpc_passthru_endpoint_stats* stats =
218 static_cast<grpc_passthru_endpoint_stats*>(
219 gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
220 memset(stats, 0, sizeof(*stats));
221 gpr_ref_init(&stats->refs, 1);
225 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
226 if (gpr_unref(&stats->refs)) {