Imported Upstream version 1.25.0
[platform/upstream/grpc.git] / test / core / util / passthru_endpoint.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include "test/core/util/passthru_endpoint.h"
26
27 #include <inttypes.h>
28 #include <string.h>
29
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/string_util.h>
32 #include "src/core/lib/iomgr/sockaddr.h"
33
34 #include "src/core/lib/slice/slice_internal.h"
35
36 typedef struct passthru_endpoint passthru_endpoint;
37
38 typedef struct {
39   grpc_endpoint base;
40   passthru_endpoint* parent;
41   grpc_slice_buffer read_buffer;
42   grpc_slice_buffer* on_read_out;
43   grpc_closure* on_read;
44   grpc_resource_user* resource_user;
45 } half;
46
47 struct passthru_endpoint {
48   gpr_mu mu;
49   int halves;
50   grpc_passthru_endpoint_stats* stats;
51   bool shutdown;
52   half client;
53   half server;
54 };
55
56 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
57                     grpc_closure* cb, bool /*urgent*/) {
58   half* m = reinterpret_cast<half*>(ep);
59   gpr_mu_lock(&m->parent->mu);
60   if (m->parent->shutdown) {
61     GRPC_CLOSURE_SCHED(
62         cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
63   } else if (m->read_buffer.count > 0) {
64     grpc_slice_buffer_swap(&m->read_buffer, slices);
65     GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
66   } else {
67     m->on_read = cb;
68     m->on_read_out = slices;
69   }
70   gpr_mu_unlock(&m->parent->mu);
71 }
72
73 static half* other_half(half* h) {
74   if (h == &h->parent->client) return &h->parent->server;
75   return &h->parent->client;
76 }
77
78 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
79                      grpc_closure* cb, void* /*arg*/) {
80   half* m = other_half(reinterpret_cast<half*>(ep));
81   gpr_mu_lock(&m->parent->mu);
82   grpc_error* error = GRPC_ERROR_NONE;
83   gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
84   if (m->parent->shutdown) {
85     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
86   } else if (m->on_read != nullptr) {
87     for (size_t i = 0; i < slices->count; i++) {
88       grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
89     }
90     GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
91     m->on_read = nullptr;
92   } else {
93     for (size_t i = 0; i < slices->count; i++) {
94       grpc_slice_buffer_add(&m->read_buffer,
95                             grpc_slice_copy(slices->slices[i]));
96     }
97   }
98   gpr_mu_unlock(&m->parent->mu);
99   GRPC_CLOSURE_SCHED(cb, error);
100 }
101
102 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
103                               grpc_pollset* /*pollset*/) {}
104
105 static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
106                                   grpc_pollset_set* /*pollset*/) {}
107
108 static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
109                                        grpc_pollset_set* /*pollset*/) {}
110
111 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
112   half* m = reinterpret_cast<half*>(ep);
113   gpr_mu_lock(&m->parent->mu);
114   m->parent->shutdown = true;
115   if (m->on_read) {
116     GRPC_CLOSURE_SCHED(
117         m->on_read,
118         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
119     m->on_read = nullptr;
120   }
121   m = other_half(m);
122   if (m->on_read) {
123     GRPC_CLOSURE_SCHED(
124         m->on_read,
125         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
126     m->on_read = nullptr;
127   }
128   gpr_mu_unlock(&m->parent->mu);
129   grpc_resource_user_shutdown(m->resource_user);
130   GRPC_ERROR_UNREF(why);
131 }
132
133 static void me_destroy(grpc_endpoint* ep) {
134   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
135   gpr_mu_lock(&p->mu);
136   if (0 == --p->halves) {
137     gpr_mu_unlock(&p->mu);
138     gpr_mu_destroy(&p->mu);
139     grpc_passthru_endpoint_stats_destroy(p->stats);
140     grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
141     grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
142     grpc_resource_user_unref(p->client.resource_user);
143     grpc_resource_user_unref(p->server.resource_user);
144     gpr_free(p);
145   } else {
146     gpr_mu_unlock(&p->mu);
147   }
148 }
149
150 static char* me_get_peer(grpc_endpoint* ep) {
151   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
152   return (reinterpret_cast<half*>(ep)) == &p->client
153              ? gpr_strdup("fake:mock_client_endpoint")
154              : gpr_strdup("fake:mock_server_endpoint");
155 }
156
157 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
158
159 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
160
161 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
162   half* m = reinterpret_cast<half*>(ep);
163   return m->resource_user;
164 }
165
166 static const grpc_endpoint_vtable vtable = {
167     me_read,
168     me_write,
169     me_add_to_pollset,
170     me_add_to_pollset_set,
171     me_delete_from_pollset_set,
172     me_shutdown,
173     me_destroy,
174     me_get_resource_user,
175     me_get_peer,
176     me_get_fd,
177     me_can_track_err,
178 };
179
180 static void half_init(half* m, passthru_endpoint* parent,
181                       grpc_resource_quota* resource_quota,
182                       const char* half_name) {
183   m->base.vtable = &vtable;
184   m->parent = parent;
185   grpc_slice_buffer_init(&m->read_buffer);
186   m->on_read = nullptr;
187   char* name;
188   gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
189                (intptr_t)parent);
190   m->resource_user = grpc_resource_user_create(resource_quota, name);
191   gpr_free(name);
192 }
193
194 void grpc_passthru_endpoint_create(grpc_endpoint** client,
195                                    grpc_endpoint** server,
196                                    grpc_resource_quota* resource_quota,
197                                    grpc_passthru_endpoint_stats* stats) {
198   passthru_endpoint* m =
199       static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
200   m->halves = 2;
201   m->shutdown = 0;
202   if (stats == nullptr) {
203     m->stats = grpc_passthru_endpoint_stats_create();
204   } else {
205     gpr_ref(&stats->refs);
206     m->stats = stats;
207   }
208   half_init(&m->client, m, resource_quota, "client");
209   half_init(&m->server, m, resource_quota, "server");
210   gpr_mu_init(&m->mu);
211   *client = &m->client.base;
212   *server = &m->server.base;
213 }
214
215 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
216   grpc_passthru_endpoint_stats* stats =
217       static_cast<grpc_passthru_endpoint_stats*>(
218           gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
219   memset(stats, 0, sizeof(*stats));
220   gpr_ref_init(&stats->refs, 1);
221   return stats;
222 }
223
224 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
225   if (gpr_unref(&stats->refs)) {
226     gpr_free(stats);
227   }
228 }