3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
25 #include "test/core/util/passthru_endpoint.h"
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/string_util.h>
32 #include "src/core/lib/iomgr/sockaddr.h"
34 #include "src/core/lib/slice/slice_internal.h"
36 typedef struct passthru_endpoint passthru_endpoint;
40 passthru_endpoint* parent;
41 grpc_slice_buffer read_buffer;
42 grpc_slice_buffer* on_read_out;
43 grpc_closure* on_read;
44 grpc_resource_user* resource_user;
47 struct passthru_endpoint {
50 grpc_passthru_endpoint_stats* stats;
56 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
57 grpc_closure* cb, bool urgent) {
58 half* m = reinterpret_cast<half*>(ep);
59 gpr_mu_lock(&m->parent->mu);
60 if (m->parent->shutdown) {
62 cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
63 } else if (m->read_buffer.count > 0) {
64 grpc_slice_buffer_swap(&m->read_buffer, slices);
65 GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
68 m->on_read_out = slices;
70 gpr_mu_unlock(&m->parent->mu);
73 static half* other_half(half* h) {
74 if (h == &h->parent->client) return &h->parent->server;
75 return &h->parent->client;
78 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
79 grpc_closure* cb, void* arg) {
80 half* m = other_half(reinterpret_cast<half*>(ep));
81 gpr_mu_lock(&m->parent->mu);
82 grpc_error* error = GRPC_ERROR_NONE;
83 gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
84 if (m->parent->shutdown) {
85 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
86 } else if (m->on_read != nullptr) {
87 for (size_t i = 0; i < slices->count; i++) {
88 grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
90 GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
93 for (size_t i = 0; i < slices->count; i++) {
94 grpc_slice_buffer_add(&m->read_buffer,
95 grpc_slice_copy(slices->slices[i]));
98 gpr_mu_unlock(&m->parent->mu);
99 GRPC_CLOSURE_SCHED(cb, error);
102 static void me_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
104 static void me_add_to_pollset_set(grpc_endpoint* ep,
105 grpc_pollset_set* pollset) {}
107 static void me_delete_from_pollset_set(grpc_endpoint* ep,
108 grpc_pollset_set* pollset) {}
110 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
111 half* m = reinterpret_cast<half*>(ep);
112 gpr_mu_lock(&m->parent->mu);
113 m->parent->shutdown = true;
117 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
118 m->on_read = nullptr;
124 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
125 m->on_read = nullptr;
127 gpr_mu_unlock(&m->parent->mu);
128 grpc_resource_user_shutdown(m->resource_user);
129 GRPC_ERROR_UNREF(why);
132 static void me_destroy(grpc_endpoint* ep) {
133 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
135 if (0 == --p->halves) {
136 gpr_mu_unlock(&p->mu);
137 gpr_mu_destroy(&p->mu);
138 grpc_passthru_endpoint_stats_destroy(p->stats);
139 grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
140 grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
141 grpc_resource_user_unref(p->client.resource_user);
142 grpc_resource_user_unref(p->server.resource_user);
145 gpr_mu_unlock(&p->mu);
149 static char* me_get_peer(grpc_endpoint* ep) {
150 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
151 return (reinterpret_cast<half*>(ep)) == &p->client
152 ? gpr_strdup("fake:mock_client_endpoint")
153 : gpr_strdup("fake:mock_server_endpoint");
156 static int me_get_fd(grpc_endpoint* ep) { return -1; }
158 static bool me_can_track_err(grpc_endpoint* ep) { return false; }
160 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
161 half* m = reinterpret_cast<half*>(ep);
162 return m->resource_user;
165 static const grpc_endpoint_vtable vtable = {
169 me_add_to_pollset_set,
170 me_delete_from_pollset_set,
173 me_get_resource_user,
179 static void half_init(half* m, passthru_endpoint* parent,
180 grpc_resource_quota* resource_quota,
181 const char* half_name) {
182 m->base.vtable = &vtable;
184 grpc_slice_buffer_init(&m->read_buffer);
185 m->on_read = nullptr;
187 gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
189 m->resource_user = grpc_resource_user_create(resource_quota, name);
193 void grpc_passthru_endpoint_create(grpc_endpoint** client,
194 grpc_endpoint** server,
195 grpc_resource_quota* resource_quota,
196 grpc_passthru_endpoint_stats* stats) {
197 passthru_endpoint* m =
198 static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
201 if (stats == nullptr) {
202 m->stats = grpc_passthru_endpoint_stats_create();
204 gpr_ref(&stats->refs);
207 half_init(&m->client, m, resource_quota, "client");
208 half_init(&m->server, m, resource_quota, "server");
210 *client = &m->client.base;
211 *server = &m->server.base;
214 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
215 grpc_passthru_endpoint_stats* stats =
216 static_cast<grpc_passthru_endpoint_stats*>(
217 gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
218 memset(stats, 0, sizeof(*stats));
219 gpr_ref_init(&stats->refs, 1);
223 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
224 if (gpr_unref(&stats->refs)) {