Imported Upstream version 1.26.0
[platform/upstream/grpc.git] / test / core / util / passthru_endpoint.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include "test/core/util/passthru_endpoint.h"
26
27 #include <inttypes.h>
28 #include <string.h>
29
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/string_util.h>
32 #include "src/core/lib/iomgr/sockaddr.h"
33
34 #include "src/core/lib/slice/slice_internal.h"
35
36 typedef struct passthru_endpoint passthru_endpoint;
37
38 typedef struct {
39   grpc_endpoint base;
40   passthru_endpoint* parent;
41   grpc_slice_buffer read_buffer;
42   grpc_slice_buffer* on_read_out;
43   grpc_closure* on_read;
44   grpc_resource_user* resource_user;
45 } half;
46
47 struct passthru_endpoint {
48   gpr_mu mu;
49   int halves;
50   grpc_passthru_endpoint_stats* stats;
51   bool shutdown;
52   half client;
53   half server;
54 };
55
56 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
57                     grpc_closure* cb, bool /*urgent*/) {
58   half* m = reinterpret_cast<half*>(ep);
59   gpr_mu_lock(&m->parent->mu);
60   if (m->parent->shutdown) {
61     grpc_core::ExecCtx::Run(
62         DEBUG_LOCATION, cb,
63         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
64   } else if (m->read_buffer.count > 0) {
65     grpc_slice_buffer_swap(&m->read_buffer, slices);
66     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
67   } else {
68     m->on_read = cb;
69     m->on_read_out = slices;
70   }
71   gpr_mu_unlock(&m->parent->mu);
72 }
73
74 static half* other_half(half* h) {
75   if (h == &h->parent->client) return &h->parent->server;
76   return &h->parent->client;
77 }
78
79 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
80                      grpc_closure* cb, void* /*arg*/) {
81   half* m = other_half(reinterpret_cast<half*>(ep));
82   gpr_mu_lock(&m->parent->mu);
83   grpc_error* error = GRPC_ERROR_NONE;
84   gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
85   if (m->parent->shutdown) {
86     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
87   } else if (m->on_read != nullptr) {
88     for (size_t i = 0; i < slices->count; i++) {
89       grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
90     }
91     grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
92     m->on_read = nullptr;
93   } else {
94     for (size_t i = 0; i < slices->count; i++) {
95       grpc_slice_buffer_add(&m->read_buffer,
96                             grpc_slice_copy(slices->slices[i]));
97     }
98   }
99   gpr_mu_unlock(&m->parent->mu);
100   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
101 }
102
103 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
104                               grpc_pollset* /*pollset*/) {}
105
106 static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
107                                   grpc_pollset_set* /*pollset*/) {}
108
109 static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
110                                        grpc_pollset_set* /*pollset*/) {}
111
112 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
113   half* m = reinterpret_cast<half*>(ep);
114   gpr_mu_lock(&m->parent->mu);
115   m->parent->shutdown = true;
116   if (m->on_read) {
117     grpc_core::ExecCtx::Run(
118         DEBUG_LOCATION, m->on_read,
119         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
120     m->on_read = nullptr;
121   }
122   m = other_half(m);
123   if (m->on_read) {
124     grpc_core::ExecCtx::Run(
125         DEBUG_LOCATION, m->on_read,
126         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
127     m->on_read = nullptr;
128   }
129   gpr_mu_unlock(&m->parent->mu);
130   grpc_resource_user_shutdown(m->resource_user);
131   GRPC_ERROR_UNREF(why);
132 }
133
134 static void me_destroy(grpc_endpoint* ep) {
135   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
136   gpr_mu_lock(&p->mu);
137   if (0 == --p->halves) {
138     gpr_mu_unlock(&p->mu);
139     gpr_mu_destroy(&p->mu);
140     grpc_passthru_endpoint_stats_destroy(p->stats);
141     grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
142     grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
143     grpc_resource_user_unref(p->client.resource_user);
144     grpc_resource_user_unref(p->server.resource_user);
145     gpr_free(p);
146   } else {
147     gpr_mu_unlock(&p->mu);
148   }
149 }
150
151 static char* me_get_peer(grpc_endpoint* ep) {
152   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
153   return (reinterpret_cast<half*>(ep)) == &p->client
154              ? gpr_strdup("fake:mock_client_endpoint")
155              : gpr_strdup("fake:mock_server_endpoint");
156 }
157
158 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
159
160 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
161
162 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
163   half* m = reinterpret_cast<half*>(ep);
164   return m->resource_user;
165 }
166
167 static const grpc_endpoint_vtable vtable = {
168     me_read,
169     me_write,
170     me_add_to_pollset,
171     me_add_to_pollset_set,
172     me_delete_from_pollset_set,
173     me_shutdown,
174     me_destroy,
175     me_get_resource_user,
176     me_get_peer,
177     me_get_fd,
178     me_can_track_err,
179 };
180
181 static void half_init(half* m, passthru_endpoint* parent,
182                       grpc_resource_quota* resource_quota,
183                       const char* half_name) {
184   m->base.vtable = &vtable;
185   m->parent = parent;
186   grpc_slice_buffer_init(&m->read_buffer);
187   m->on_read = nullptr;
188   char* name;
189   gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
190                (intptr_t)parent);
191   m->resource_user = grpc_resource_user_create(resource_quota, name);
192   gpr_free(name);
193 }
194
195 void grpc_passthru_endpoint_create(grpc_endpoint** client,
196                                    grpc_endpoint** server,
197                                    grpc_resource_quota* resource_quota,
198                                    grpc_passthru_endpoint_stats* stats) {
199   passthru_endpoint* m =
200       static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
201   m->halves = 2;
202   m->shutdown = 0;
203   if (stats == nullptr) {
204     m->stats = grpc_passthru_endpoint_stats_create();
205   } else {
206     gpr_ref(&stats->refs);
207     m->stats = stats;
208   }
209   half_init(&m->client, m, resource_quota, "client");
210   half_init(&m->server, m, resource_quota, "server");
211   gpr_mu_init(&m->mu);
212   *client = &m->client.base;
213   *server = &m->server.base;
214 }
215
216 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
217   grpc_passthru_endpoint_stats* stats =
218       static_cast<grpc_passthru_endpoint_stats*>(
219           gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
220   memset(stats, 0, sizeof(*stats));
221   gpr_ref_init(&stats->refs, 1);
222   return stats;
223 }
224
225 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
226   if (gpr_unref(&stats->refs)) {
227     gpr_free(stats);
228   }
229 }