Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / test / core / util / passthru_endpoint.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include "test/core/util/passthru_endpoint.h"
26
27 #include <inttypes.h>
28 #include <string.h>
29
30 #include <string>
31
32 #include "absl/strings/str_format.h"
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/string_util.h>
36 #include "src/core/lib/iomgr/sockaddr.h"
37
38 #include "src/core/lib/slice/slice_internal.h"
39
40 typedef struct passthru_endpoint passthru_endpoint;
41
42 typedef struct {
43   grpc_endpoint base;
44   passthru_endpoint* parent;
45   grpc_slice_buffer read_buffer;
46   grpc_slice_buffer* on_read_out;
47   grpc_closure* on_read;
48   grpc_resource_user* resource_user;
49 } half;
50
51 struct passthru_endpoint {
52   gpr_mu mu;
53   int halves;
54   grpc_passthru_endpoint_stats* stats;
55   bool shutdown;
56   half client;
57   half server;
58 };
59
60 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
61                     grpc_closure* cb, bool /*urgent*/) {
62   half* m = reinterpret_cast<half*>(ep);
63   gpr_mu_lock(&m->parent->mu);
64   if (m->parent->shutdown) {
65     grpc_core::ExecCtx::Run(
66         DEBUG_LOCATION, cb,
67         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
68   } else if (m->read_buffer.count > 0) {
69     grpc_slice_buffer_swap(&m->read_buffer, slices);
70     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
71   } else {
72     m->on_read = cb;
73     m->on_read_out = slices;
74   }
75   gpr_mu_unlock(&m->parent->mu);
76 }
77
78 static half* other_half(half* h) {
79   if (h == &h->parent->client) return &h->parent->server;
80   return &h->parent->client;
81 }
82
83 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
84                      grpc_closure* cb, void* /*arg*/) {
85   half* m = other_half(reinterpret_cast<half*>(ep));
86   gpr_mu_lock(&m->parent->mu);
87   grpc_error* error = GRPC_ERROR_NONE;
88   gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
89   if (m->parent->shutdown) {
90     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
91   } else if (m->on_read != nullptr) {
92     for (size_t i = 0; i < slices->count; i++) {
93       grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
94     }
95     grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
96     m->on_read = nullptr;
97   } else {
98     for (size_t i = 0; i < slices->count; i++) {
99       grpc_slice_buffer_add(&m->read_buffer,
100                             grpc_slice_copy(slices->slices[i]));
101     }
102   }
103   gpr_mu_unlock(&m->parent->mu);
104   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
105 }
106
107 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
108                               grpc_pollset* /*pollset*/) {}
109
110 static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
111                                   grpc_pollset_set* /*pollset*/) {}
112
113 static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
114                                        grpc_pollset_set* /*pollset*/) {}
115
116 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
117   half* m = reinterpret_cast<half*>(ep);
118   gpr_mu_lock(&m->parent->mu);
119   m->parent->shutdown = true;
120   if (m->on_read) {
121     grpc_core::ExecCtx::Run(
122         DEBUG_LOCATION, m->on_read,
123         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
124     m->on_read = nullptr;
125   }
126   m = other_half(m);
127   if (m->on_read) {
128     grpc_core::ExecCtx::Run(
129         DEBUG_LOCATION, m->on_read,
130         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
131     m->on_read = nullptr;
132   }
133   gpr_mu_unlock(&m->parent->mu);
134   grpc_resource_user_shutdown(m->resource_user);
135   GRPC_ERROR_UNREF(why);
136 }
137
138 static void me_destroy(grpc_endpoint* ep) {
139   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
140   gpr_mu_lock(&p->mu);
141   if (0 == --p->halves) {
142     gpr_mu_unlock(&p->mu);
143     gpr_mu_destroy(&p->mu);
144     grpc_passthru_endpoint_stats_destroy(p->stats);
145     grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
146     grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
147     grpc_resource_user_unref(p->client.resource_user);
148     grpc_resource_user_unref(p->server.resource_user);
149     gpr_free(p);
150   } else {
151     gpr_mu_unlock(&p->mu);
152   }
153 }
154
155 static absl::string_view me_get_peer(grpc_endpoint* ep) {
156   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
157   return (reinterpret_cast<half*>(ep)) == &p->client
158              ? "fake:mock_client_endpoint"
159              : "fake:mock_server_endpoint";
160 }
161
162 static absl::string_view me_get_local_address(grpc_endpoint* ep) {
163   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
164   return (reinterpret_cast<half*>(ep)) == &p->client
165              ? "fake:mock_client_endpoint"
166              : "fake:mock_server_endpoint";
167 }
168
169 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
170
171 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
172
173 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
174   half* m = reinterpret_cast<half*>(ep);
175   return m->resource_user;
176 }
177
178 static const grpc_endpoint_vtable vtable = {
179     me_read,
180     me_write,
181     me_add_to_pollset,
182     me_add_to_pollset_set,
183     me_delete_from_pollset_set,
184     me_shutdown,
185     me_destroy,
186     me_get_resource_user,
187     me_get_peer,
188     me_get_local_address,
189     me_get_fd,
190     me_can_track_err,
191 };
192
193 static void half_init(half* m, passthru_endpoint* parent,
194                       grpc_resource_quota* resource_quota,
195                       const char* half_name) {
196   m->base.vtable = &vtable;
197   m->parent = parent;
198   grpc_slice_buffer_init(&m->read_buffer);
199   m->on_read = nullptr;
200   std::string name = absl::StrFormat("passthru_endpoint_%s_%" PRIxPTR,
201                                      half_name, (intptr_t)parent);
202   m->resource_user = grpc_resource_user_create(resource_quota, name.c_str());
203 }
204
205 void grpc_passthru_endpoint_create(grpc_endpoint** client,
206                                    grpc_endpoint** server,
207                                    grpc_resource_quota* resource_quota,
208                                    grpc_passthru_endpoint_stats* stats) {
209   passthru_endpoint* m =
210       static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
211   m->halves = 2;
212   m->shutdown = 0;
213   if (stats == nullptr) {
214     m->stats = grpc_passthru_endpoint_stats_create();
215   } else {
216     gpr_ref(&stats->refs);
217     m->stats = stats;
218   }
219   half_init(&m->client, m, resource_quota, "client");
220   half_init(&m->server, m, resource_quota, "server");
221   gpr_mu_init(&m->mu);
222   *client = &m->client.base;
223   *server = &m->server.base;
224 }
225
226 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
227   grpc_passthru_endpoint_stats* stats =
228       static_cast<grpc_passthru_endpoint_stats*>(
229           gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
230   memset(stats, 0, sizeof(*stats));
231   gpr_ref_init(&stats->refs, 1);
232   return stats;
233 }
234
235 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
236   if (gpr_unref(&stats->refs)) {
237     gpr_free(stats);
238   }
239 }