Imported Upstream version 1.18.0
[platform/upstream/grpc.git] / test / core / util / passthru_endpoint.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include "test/core/util/passthru_endpoint.h"
26
27 #include <inttypes.h>
28 #include <string.h>
29
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/string_util.h>
32 #include "src/core/lib/iomgr/sockaddr.h"
33
34 #include "src/core/lib/slice/slice_internal.h"
35
36 typedef struct passthru_endpoint passthru_endpoint;
37
38 typedef struct {
39   grpc_endpoint base;
40   passthru_endpoint* parent;
41   grpc_slice_buffer read_buffer;
42   grpc_slice_buffer* on_read_out;
43   grpc_closure* on_read;
44   grpc_resource_user* resource_user;
45 } half;
46
47 struct passthru_endpoint {
48   gpr_mu mu;
49   int halves;
50   grpc_passthru_endpoint_stats* stats;
51   bool shutdown;
52   half client;
53   half server;
54 };
55
56 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
57                     grpc_closure* cb) {
58   half* m = reinterpret_cast<half*>(ep);
59   gpr_mu_lock(&m->parent->mu);
60   if (m->parent->shutdown) {
61     GRPC_CLOSURE_SCHED(
62         cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
63   } else if (m->read_buffer.count > 0) {
64     grpc_slice_buffer_swap(&m->read_buffer, slices);
65     GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
66   } else {
67     m->on_read = cb;
68     m->on_read_out = slices;
69   }
70   gpr_mu_unlock(&m->parent->mu);
71 }
72
73 static half* other_half(half* h) {
74   if (h == &h->parent->client) return &h->parent->server;
75   return &h->parent->client;
76 }
77
78 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
79                      grpc_closure* cb, void* arg) {
80   half* m = other_half(reinterpret_cast<half*>(ep));
81   gpr_mu_lock(&m->parent->mu);
82   grpc_error* error = GRPC_ERROR_NONE;
83   gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
84   if (m->parent->shutdown) {
85     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
86   } else if (m->on_read != nullptr) {
87     for (size_t i = 0; i < slices->count; i++) {
88       grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
89     }
90     GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
91     m->on_read = nullptr;
92   } else {
93     for (size_t i = 0; i < slices->count; i++) {
94       grpc_slice_buffer_add(&m->read_buffer,
95                             grpc_slice_copy(slices->slices[i]));
96     }
97   }
98   gpr_mu_unlock(&m->parent->mu);
99   GRPC_CLOSURE_SCHED(cb, error);
100 }
101
102 static void me_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
103
104 static void me_add_to_pollset_set(grpc_endpoint* ep,
105                                   grpc_pollset_set* pollset) {}
106
107 static void me_delete_from_pollset_set(grpc_endpoint* ep,
108                                        grpc_pollset_set* pollset) {}
109
110 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
111   half* m = reinterpret_cast<half*>(ep);
112   gpr_mu_lock(&m->parent->mu);
113   m->parent->shutdown = true;
114   if (m->on_read) {
115     GRPC_CLOSURE_SCHED(
116         m->on_read,
117         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
118     m->on_read = nullptr;
119   }
120   m = other_half(m);
121   if (m->on_read) {
122     GRPC_CLOSURE_SCHED(
123         m->on_read,
124         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
125     m->on_read = nullptr;
126   }
127   gpr_mu_unlock(&m->parent->mu);
128   grpc_resource_user_shutdown(m->resource_user);
129   GRPC_ERROR_UNREF(why);
130 }
131
132 static void me_destroy(grpc_endpoint* ep) {
133   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
134   gpr_mu_lock(&p->mu);
135   if (0 == --p->halves) {
136     gpr_mu_unlock(&p->mu);
137     gpr_mu_destroy(&p->mu);
138     grpc_passthru_endpoint_stats_destroy(p->stats);
139     grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
140     grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
141     grpc_resource_user_unref(p->client.resource_user);
142     grpc_resource_user_unref(p->server.resource_user);
143     gpr_free(p);
144   } else {
145     gpr_mu_unlock(&p->mu);
146   }
147 }
148
149 static char* me_get_peer(grpc_endpoint* ep) {
150   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
151   return (reinterpret_cast<half*>(ep)) == &p->client
152              ? gpr_strdup("fake:mock_client_endpoint")
153              : gpr_strdup("fake:mock_server_endpoint");
154 }
155
156 static int me_get_fd(grpc_endpoint* ep) { return -1; }
157
158 static bool me_can_track_err(grpc_endpoint* ep) { return false; }
159
160 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
161   half* m = reinterpret_cast<half*>(ep);
162   return m->resource_user;
163 }
164
165 static const grpc_endpoint_vtable vtable = {
166     me_read,
167     me_write,
168     me_add_to_pollset,
169     me_add_to_pollset_set,
170     me_delete_from_pollset_set,
171     me_shutdown,
172     me_destroy,
173     me_get_resource_user,
174     me_get_peer,
175     me_get_fd,
176     me_can_track_err,
177 };
178
179 static void half_init(half* m, passthru_endpoint* parent,
180                       grpc_resource_quota* resource_quota,
181                       const char* half_name) {
182   m->base.vtable = &vtable;
183   m->parent = parent;
184   grpc_slice_buffer_init(&m->read_buffer);
185   m->on_read = nullptr;
186   char* name;
187   gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
188                (intptr_t)parent);
189   m->resource_user = grpc_resource_user_create(resource_quota, name);
190   gpr_free(name);
191 }
192
193 void grpc_passthru_endpoint_create(grpc_endpoint** client,
194                                    grpc_endpoint** server,
195                                    grpc_resource_quota* resource_quota,
196                                    grpc_passthru_endpoint_stats* stats) {
197   passthru_endpoint* m =
198       static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
199   m->halves = 2;
200   m->shutdown = 0;
201   if (stats == nullptr) {
202     m->stats = grpc_passthru_endpoint_stats_create();
203   } else {
204     gpr_ref(&stats->refs);
205     m->stats = stats;
206   }
207   half_init(&m->client, m, resource_quota, "client");
208   half_init(&m->server, m, resource_quota, "server");
209   gpr_mu_init(&m->mu);
210   *client = &m->client.base;
211   *server = &m->server.base;
212 }
213
214 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
215   grpc_passthru_endpoint_stats* stats =
216       static_cast<grpc_passthru_endpoint_stats*>(
217           gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
218   memset(stats, 0, sizeof(*stats));
219   gpr_ref_init(&stats->refs, 1);
220   return stats;
221 }
222
223 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
224   if (gpr_unref(&stats->refs)) {
225     gpr_free(stats);
226   }
227 }