3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
25 #include "test/core/util/passthru_endpoint.h"
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/string_util.h>
32 #include "src/core/lib/iomgr/sockaddr.h"
34 #include "src/core/lib/slice/slice_internal.h"
36 typedef struct passthru_endpoint passthru_endpoint;
40 passthru_endpoint* parent;
41 grpc_slice_buffer read_buffer;
42 grpc_slice_buffer* on_read_out;
43 grpc_closure* on_read;
44 grpc_resource_user* resource_user;
47 struct passthru_endpoint {
50 grpc_passthru_endpoint_stats* stats;
56 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
57 grpc_closure* cb, bool /*urgent*/) {
58 half* m = reinterpret_cast<half*>(ep);
59 gpr_mu_lock(&m->parent->mu);
60 if (m->parent->shutdown) {
62 cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
63 } else if (m->read_buffer.count > 0) {
64 grpc_slice_buffer_swap(&m->read_buffer, slices);
65 GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
68 m->on_read_out = slices;
70 gpr_mu_unlock(&m->parent->mu);
73 static half* other_half(half* h) {
74 if (h == &h->parent->client) return &h->parent->server;
75 return &h->parent->client;
78 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
79 grpc_closure* cb, void* /*arg*/) {
80 half* m = other_half(reinterpret_cast<half*>(ep));
81 gpr_mu_lock(&m->parent->mu);
82 grpc_error* error = GRPC_ERROR_NONE;
83 gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
84 if (m->parent->shutdown) {
85 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
86 } else if (m->on_read != nullptr) {
87 for (size_t i = 0; i < slices->count; i++) {
88 grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
90 GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
93 for (size_t i = 0; i < slices->count; i++) {
94 grpc_slice_buffer_add(&m->read_buffer,
95 grpc_slice_copy(slices->slices[i]));
98 gpr_mu_unlock(&m->parent->mu);
99 GRPC_CLOSURE_SCHED(cb, error);
102 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
103 grpc_pollset* /*pollset*/) {}
105 static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
106 grpc_pollset_set* /*pollset*/) {}
108 static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
109 grpc_pollset_set* /*pollset*/) {}
111 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
112 half* m = reinterpret_cast<half*>(ep);
113 gpr_mu_lock(&m->parent->mu);
114 m->parent->shutdown = true;
118 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
119 m->on_read = nullptr;
125 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
126 m->on_read = nullptr;
128 gpr_mu_unlock(&m->parent->mu);
129 grpc_resource_user_shutdown(m->resource_user);
130 GRPC_ERROR_UNREF(why);
133 static void me_destroy(grpc_endpoint* ep) {
134 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
136 if (0 == --p->halves) {
137 gpr_mu_unlock(&p->mu);
138 gpr_mu_destroy(&p->mu);
139 grpc_passthru_endpoint_stats_destroy(p->stats);
140 grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
141 grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
142 grpc_resource_user_unref(p->client.resource_user);
143 grpc_resource_user_unref(p->server.resource_user);
146 gpr_mu_unlock(&p->mu);
150 static char* me_get_peer(grpc_endpoint* ep) {
151 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
152 return (reinterpret_cast<half*>(ep)) == &p->client
153 ? gpr_strdup("fake:mock_client_endpoint")
154 : gpr_strdup("fake:mock_server_endpoint");
157 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
159 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
161 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
162 half* m = reinterpret_cast<half*>(ep);
163 return m->resource_user;
166 static const grpc_endpoint_vtable vtable = {
170 me_add_to_pollset_set,
171 me_delete_from_pollset_set,
174 me_get_resource_user,
180 static void half_init(half* m, passthru_endpoint* parent,
181 grpc_resource_quota* resource_quota,
182 const char* half_name) {
183 m->base.vtable = &vtable;
185 grpc_slice_buffer_init(&m->read_buffer);
186 m->on_read = nullptr;
188 gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
190 m->resource_user = grpc_resource_user_create(resource_quota, name);
194 void grpc_passthru_endpoint_create(grpc_endpoint** client,
195 grpc_endpoint** server,
196 grpc_resource_quota* resource_quota,
197 grpc_passthru_endpoint_stats* stats) {
198 passthru_endpoint* m =
199 static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
202 if (stats == nullptr) {
203 m->stats = grpc_passthru_endpoint_stats_create();
205 gpr_ref(&stats->refs);
208 half_init(&m->client, m, resource_quota, "client");
209 half_init(&m->server, m, resource_quota, "server");
211 *client = &m->client.base;
212 *server = &m->server.base;
215 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
216 grpc_passthru_endpoint_stats* stats =
217 static_cast<grpc_passthru_endpoint_stats*>(
218 gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
219 memset(stats, 0, sizeof(*stats));
220 gpr_ref_init(&stats->refs, 1);
224 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
225 if (gpr_unref(&stats->refs)) {