317526a563a74c7df105e3464a56c272ee4c8a61
[platform/upstream/grpc.git] / test / core / bad_connection / close_fd_test.cc
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * close_fd_test tests the behavior of grpc core when the transport gets
18  * disconnected.
19  * The test creates an http2 transport over a socket pair and closes the
20  * client or server file descriptor to simulate connection breakage while
21  * an RPC call is in progress.
22  *
23  */
24 #include "src/core/lib/iomgr/port.h"
25
26 // This test won't work except with posix sockets enabled
27 #ifdef GRPC_POSIX_SOCKET
28
29 #include "test/core/util/test_config.h"
30
31 #include <stdio.h>
32 #include <string.h>
33 #include <unistd.h>
34
35 #include <grpc/byte_buffer.h>
36 #include <grpc/byte_buffer_reader.h>
37 #include <grpc/grpc.h>
38 #include <grpc/support/alloc.h>
39 #include <grpc/support/log.h>
40 #include <grpc/support/time.h>
41 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
42 #include "src/core/lib/gpr/env.h"
43 #include "src/core/lib/iomgr/endpoint_pair.h"
44 #include "src/core/lib/surface/channel.h"
45 #include "src/core/lib/surface/completion_queue.h"
46 #include "src/core/lib/surface/server.h"
47
48 static void* tag(intptr_t t) { return (void*)t; }
49
50 typedef struct test_ctx test_ctx;
51
52 struct test_ctx {
53   /* completion queue for call notifications on the server */
54   grpc_completion_queue* cq;
55   /* completion queue registered to server for shutdown events */
56   grpc_completion_queue* shutdown_cq;
57   /* client's completion queue */
58   grpc_completion_queue* client_cq;
59   /* completion queue bound to call on the server */
60   grpc_completion_queue* bound_cq;
61   /* Server responds to client calls */
62   grpc_server* server;
63   /* Client calls are sent over the channel */
64   grpc_channel* client;
65   /* encapsulates client, server endpoints */
66   grpc_endpoint_pair* ep;
67 };
68
69 static test_ctx g_ctx;
70
71 /* chttp2 transport that is immediately available (used for testing
72    connected_channel without a client_channel */
73
74 static void server_setup_transport(grpc_transport* transport) {
75   grpc_core::ExecCtx exec_ctx;
76   grpc_endpoint_add_to_pollset(g_ctx.ep->server, grpc_cq_pollset(g_ctx.cq));
77   grpc_server_setup_transport(g_ctx.server, transport, nullptr,
78                               grpc_server_get_channel_args(g_ctx.server),
79                               nullptr);
80 }
81
82 static void client_setup_transport(grpc_transport* transport) {
83   grpc_core::ExecCtx exec_ctx;
84   grpc_endpoint_add_to_pollset(g_ctx.ep->client,
85                                grpc_cq_pollset(g_ctx.client_cq));
86   grpc_arg authority_arg = grpc_channel_arg_string_create(
87       const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
88       const_cast<char*>("test-authority"));
89   grpc_channel_args* args =
90       grpc_channel_args_copy_and_add(nullptr, &authority_arg, 1);
91   /* TODO (pjaikumar): use GRPC_CLIENT_CHANNEL instead of
92    * GRPC_CLIENT_DIRECT_CHANNEL */
93   g_ctx.client = grpc_channel_create("socketpair-target", args,
94                                      GRPC_CLIENT_DIRECT_CHANNEL, transport);
95   grpc_channel_args_destroy(args);
96 }
97
98 static void init_client() {
99   grpc_core::ExecCtx exec_ctx;
100   grpc_transport* transport;
101   transport = grpc_create_chttp2_transport(nullptr, g_ctx.ep->client, true);
102   client_setup_transport(transport);
103   GPR_ASSERT(g_ctx.client);
104   grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
105 }
106
107 static void init_server() {
108   grpc_core::ExecCtx exec_ctx;
109   grpc_transport* transport;
110   GPR_ASSERT(!g_ctx.server);
111   g_ctx.server = grpc_server_create(nullptr, nullptr);
112   grpc_server_register_completion_queue(g_ctx.server, g_ctx.cq, nullptr);
113   grpc_server_start(g_ctx.server);
114   transport = grpc_create_chttp2_transport(nullptr, g_ctx.ep->server, false);
115   server_setup_transport(transport);
116   grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
117 }
118
119 static void test_init() {
120   grpc_endpoint_pair* sfd =
121       static_cast<grpc_endpoint_pair*>(gpr_malloc(sizeof(grpc_endpoint_pair)));
122   memset(&g_ctx, 0, sizeof(g_ctx));
123   g_ctx.ep = sfd;
124   g_ctx.cq = grpc_completion_queue_create_for_next(nullptr);
125   g_ctx.shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
126   g_ctx.bound_cq = grpc_completion_queue_create_for_next(nullptr);
127   g_ctx.client_cq = grpc_completion_queue_create_for_next(nullptr);
128
129   /* Create endpoints */
130   *sfd = grpc_iomgr_create_endpoint_pair("fixture", nullptr);
131   /* Create client, server and setup transport over endpoint pair */
132   init_server();
133   init_client();
134 }
135
136 static void drain_cq(grpc_completion_queue* cq) {
137   grpc_event event;
138   do {
139     event = grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(1),
140                                        nullptr);
141   } while (event.type != GRPC_QUEUE_SHUTDOWN);
142 }
143
144 static void drain_and_destroy_cq(grpc_completion_queue* cq) {
145   grpc_completion_queue_shutdown(cq);
146   drain_cq(cq);
147   grpc_completion_queue_destroy(cq);
148 }
149
150 static void shutdown_server() {
151   if (!g_ctx.server) return;
152   grpc_server_shutdown_and_notify(g_ctx.server, g_ctx.shutdown_cq, tag(1000));
153   GPR_ASSERT(grpc_completion_queue_pluck(g_ctx.shutdown_cq, tag(1000),
154                                          grpc_timeout_seconds_to_deadline(1),
155                                          nullptr)
156                  .type == GRPC_OP_COMPLETE);
157   grpc_server_destroy(g_ctx.server);
158   g_ctx.server = nullptr;
159 }
160
161 static void shutdown_client() {
162   if (!g_ctx.client) return;
163   grpc_channel_destroy(g_ctx.client);
164   g_ctx.client = nullptr;
165 }
166
167 static void end_test() {
168   shutdown_server();
169   shutdown_client();
170
171   drain_and_destroy_cq(g_ctx.cq);
172   drain_and_destroy_cq(g_ctx.client_cq);
173   drain_and_destroy_cq(g_ctx.bound_cq);
174   grpc_completion_queue_destroy(g_ctx.shutdown_cq);
175   gpr_free(g_ctx.ep);
176 }
177
178 typedef enum fd_type { CLIENT_FD, SERVER_FD } fd_type;
179
180 static const char* fd_type_str(fd_type fdtype) {
181   if (fdtype == CLIENT_FD) {
182     return "client";
183   } else if (fdtype == SERVER_FD) {
184     return "server";
185   } else {
186     gpr_log(GPR_ERROR, "Unexpected fd_type %d", fdtype);
187     abort();
188   }
189 }
190
191 static void _test_close_before_server_recv(fd_type fdtype) {
192   grpc_core::ExecCtx exec_ctx;
193   grpc_call* call;
194   grpc_call* server_call;
195   grpc_event event;
196   grpc_slice request_payload_slice =
197       grpc_slice_from_copied_string("hello world");
198   grpc_slice response_payload_slice =
199       grpc_slice_from_copied_string("hello you");
200   grpc_byte_buffer* request_payload =
201       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
202   grpc_byte_buffer* response_payload =
203       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
204   gpr_log(GPR_INFO, "Running test: test_close_%s_before_server_recv",
205           fd_type_str(fdtype));
206   test_init();
207
208   grpc_op ops[6];
209   grpc_op* op;
210   grpc_metadata_array initial_metadata_recv;
211   grpc_metadata_array trailing_metadata_recv;
212   grpc_metadata_array request_metadata_recv;
213   grpc_byte_buffer* request_payload_recv = nullptr;
214   grpc_byte_buffer* response_payload_recv = nullptr;
215   grpc_call_details call_details;
216   grpc_status_code status = GRPC_STATUS__DO_NOT_USE;
217   grpc_call_error error;
218   grpc_slice details;
219
220   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
221   call = grpc_channel_create_call(
222       g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
223       grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
224   GPR_ASSERT(call);
225
226   grpc_metadata_array_init(&initial_metadata_recv);
227   grpc_metadata_array_init(&trailing_metadata_recv);
228   grpc_metadata_array_init(&request_metadata_recv);
229   grpc_call_details_init(&call_details);
230
231   memset(ops, 0, sizeof(ops));
232   op = ops;
233   op->op = GRPC_OP_SEND_INITIAL_METADATA;
234   op->data.send_initial_metadata.count = 0;
235   op->flags = 0;
236   op->reserved = nullptr;
237   op++;
238   op->op = GRPC_OP_SEND_MESSAGE;
239   op->data.send_message.send_message = request_payload;
240   op->flags = 0;
241   op->reserved = nullptr;
242   op++;
243   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
244   op->flags = 0;
245   op->reserved = nullptr;
246   op++;
247   op->op = GRPC_OP_RECV_INITIAL_METADATA;
248   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
249   op->flags = 0;
250   op->reserved = nullptr;
251   op++;
252   op->op = GRPC_OP_RECV_MESSAGE;
253   op->data.recv_message.recv_message = &response_payload_recv;
254   op->flags = 0;
255   op->reserved = nullptr;
256   op++;
257   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
258   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
259   op->data.recv_status_on_client.status = &status;
260   op->data.recv_status_on_client.status_details = &details;
261   op->flags = 0;
262   op->reserved = nullptr;
263   op++;
264   error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
265                                 tag(1), nullptr);
266   GPR_ASSERT(GRPC_CALL_OK == error);
267
268   error = grpc_server_request_call(g_ctx.server, &server_call, &call_details,
269                                    &request_metadata_recv, g_ctx.bound_cq,
270                                    g_ctx.cq, tag(101));
271   GPR_ASSERT(GRPC_CALL_OK == error);
272   event = grpc_completion_queue_next(
273       g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
274   GPR_ASSERT(event.success == 1);
275   GPR_ASSERT(event.tag == tag(101));
276   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
277
278   memset(ops, 0, sizeof(ops));
279   op = ops;
280   op->op = GRPC_OP_SEND_INITIAL_METADATA;
281   op->data.send_initial_metadata.count = 0;
282   op->flags = 0;
283   op->reserved = nullptr;
284   op++;
285   op->op = GRPC_OP_RECV_MESSAGE;
286   op->data.recv_message.recv_message = &request_payload_recv;
287   op->flags = 0;
288   op->reserved = nullptr;
289   op++;
290
291   grpc_endpoint_pair* sfd = g_ctx.ep;
292   int fd;
293   if (fdtype == SERVER_FD) {
294     fd = sfd->server->vtable->get_fd(sfd->server);
295   } else {
296     GPR_ASSERT(fdtype == CLIENT_FD);
297     fd = sfd->client->vtable->get_fd(sfd->client);
298   }
299   /* Connection is closed before the server receives the client's message. */
300   close(fd);
301
302   error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
303                                 tag(102), nullptr);
304   GPR_ASSERT(GRPC_CALL_OK == error);
305
306   event = grpc_completion_queue_next(
307       g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
308
309   /* Batch operation completes on the server side.
310    * event.success will be true if the op completes successfully.
311    * event.success will be false if the op completes with an error. This can
312    * happen due to a race with closing the fd resulting in pending writes
313    * failing due to stream closure.
314    * */
315   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
316   GPR_ASSERT(event.tag == tag(102));
317
318   event = grpc_completion_queue_next(
319       g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
320   /* When the client fd is closed, the server gets EPIPE.
321    * When server fd is closed, server gets EBADF.
322    * In both cases server sends GRPC_STATUS_UNAVALABLE to the client. However,
323    * the client may not receive this grpc_status as it's socket is being closed.
324    * If the client didn't get grpc_status from the server it will time out
325    * waiting on the completion queue. So there 2 2 possibilities:
326    * 1. client times out waiting for server's response
327    * 2. client receives GRPC_STATUS_UNAVAILABLE from server
328    */
329   if (event.type == GRPC_QUEUE_TIMEOUT) {
330     GPR_ASSERT(event.success == 0);
331     GPR_ASSERT(event.tag == nullptr);
332     /* status is not initialized */
333     GPR_ASSERT(status == GRPC_STATUS__DO_NOT_USE);
334   } else {
335     GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
336     GPR_ASSERT(event.success == 1);
337     GPR_ASSERT(event.tag == tag(1));
338     GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
339   }
340
341   grpc_metadata_array_destroy(&initial_metadata_recv);
342   grpc_metadata_array_destroy(&trailing_metadata_recv);
343   grpc_metadata_array_destroy(&request_metadata_recv);
344   grpc_call_details_destroy(&call_details);
345
346   grpc_call_unref(call);
347   grpc_call_unref(server_call);
348
349   grpc_byte_buffer_destroy(request_payload);
350   grpc_byte_buffer_destroy(response_payload);
351   grpc_byte_buffer_destroy(request_payload_recv);
352   grpc_byte_buffer_destroy(response_payload_recv);
353
354   end_test();
355 }
356
357 static void test_close_before_server_recv() {
358   /* Close client side of the connection before server receives message from
359    * client */
360   _test_close_before_server_recv(CLIENT_FD);
361   /* Close server side of the connection before server receives message from
362    * client */
363   _test_close_before_server_recv(SERVER_FD);
364 }
365
366 static void _test_close_before_server_send(fd_type fdtype) {
367   grpc_core::ExecCtx exec_ctx;
368   grpc_call* call;
369   grpc_call* server_call;
370   grpc_event event;
371   grpc_slice request_payload_slice =
372       grpc_slice_from_copied_string("hello world");
373   grpc_slice response_payload_slice =
374       grpc_slice_from_copied_string("hello you");
375   grpc_byte_buffer* request_payload =
376       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
377   grpc_byte_buffer* response_payload =
378       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
379   gpr_log(GPR_INFO, "Running test: test_close_%s_before_server_send",
380           fd_type_str(fdtype));
381   test_init();
382
383   grpc_op ops[6];
384   grpc_op* op;
385   grpc_metadata_array initial_metadata_recv;
386   grpc_metadata_array trailing_metadata_recv;
387   grpc_metadata_array request_metadata_recv;
388   grpc_byte_buffer* request_payload_recv = nullptr;
389   grpc_byte_buffer* response_payload_recv = nullptr;
390   grpc_call_details call_details;
391   grpc_status_code status = GRPC_STATUS__DO_NOT_USE;
392   grpc_call_error error;
393   grpc_slice details;
394   int was_cancelled = 2;
395
396   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
397   call = grpc_channel_create_call(
398       g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
399       grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
400   GPR_ASSERT(call);
401
402   grpc_metadata_array_init(&initial_metadata_recv);
403   grpc_metadata_array_init(&trailing_metadata_recv);
404   grpc_metadata_array_init(&request_metadata_recv);
405   grpc_call_details_init(&call_details);
406
407   memset(ops, 0, sizeof(ops));
408   op = ops;
409   op->op = GRPC_OP_SEND_INITIAL_METADATA;
410   op->data.send_initial_metadata.count = 0;
411   op->flags = 0;
412   op->reserved = nullptr;
413   op++;
414   op->op = GRPC_OP_SEND_MESSAGE;
415   op->data.send_message.send_message = request_payload;
416   op->flags = 0;
417   op->reserved = nullptr;
418   op++;
419   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
420   op->flags = 0;
421   op->reserved = nullptr;
422   op++;
423   op->op = GRPC_OP_RECV_INITIAL_METADATA;
424   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
425   op->flags = 0;
426   op->reserved = nullptr;
427   op++;
428   op->op = GRPC_OP_RECV_MESSAGE;
429   op->data.recv_message.recv_message = &response_payload_recv;
430   op->flags = 0;
431   op->reserved = nullptr;
432   op++;
433   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
434   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
435   op->data.recv_status_on_client.status = &status;
436   op->data.recv_status_on_client.status_details = &details;
437   op->flags = 0;
438   op->reserved = nullptr;
439   op++;
440   error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
441                                 tag(1), nullptr);
442   GPR_ASSERT(GRPC_CALL_OK == error);
443
444   error = grpc_server_request_call(g_ctx.server, &server_call, &call_details,
445                                    &request_metadata_recv, g_ctx.bound_cq,
446                                    g_ctx.cq, tag(101));
447   GPR_ASSERT(GRPC_CALL_OK == error);
448   event = grpc_completion_queue_next(
449       g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
450   GPR_ASSERT(event.success == 1);
451   GPR_ASSERT(event.tag == tag(101));
452   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
453
454   memset(ops, 0, sizeof(ops));
455   op = ops;
456   op->op = GRPC_OP_SEND_INITIAL_METADATA;
457   op->data.send_initial_metadata.count = 0;
458   op->flags = 0;
459   op->reserved = nullptr;
460   op++;
461   op->op = GRPC_OP_RECV_MESSAGE;
462   op->data.recv_message.recv_message = &request_payload_recv;
463   op->flags = 0;
464   op->reserved = nullptr;
465   op++;
466   error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
467                                 tag(102), nullptr);
468   GPR_ASSERT(GRPC_CALL_OK == error);
469
470   event = grpc_completion_queue_next(
471       g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
472   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
473   GPR_ASSERT(event.success == 1);
474   GPR_ASSERT(event.tag == tag(102));
475
476   memset(ops, 0, sizeof(ops));
477   op = ops;
478   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
479   op->data.recv_close_on_server.cancelled = &was_cancelled;
480   op->flags = 0;
481   op->reserved = nullptr;
482   op++;
483   op->op = GRPC_OP_SEND_MESSAGE;
484   op->data.send_message.send_message = response_payload;
485   op->flags = 0;
486   op->reserved = nullptr;
487   op++;
488   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
489   op->data.send_status_from_server.trailing_metadata_count = 0;
490   op->data.send_status_from_server.status = GRPC_STATUS_OK;
491   grpc_slice status_details = grpc_slice_from_static_string("xyz");
492   op->data.send_status_from_server.status_details = &status_details;
493   op->flags = 0;
494   op->reserved = nullptr;
495   op++;
496
497   grpc_endpoint_pair* sfd = g_ctx.ep;
498   int fd;
499   if (fdtype == SERVER_FD) {
500     fd = sfd->server->vtable->get_fd(sfd->server);
501   } else {
502     GPR_ASSERT(fdtype == CLIENT_FD);
503     fd = sfd->client->vtable->get_fd(sfd->client);
504   }
505
506   /* Connection is closed before the server sends message and status to the
507    * client. */
508   close(fd);
509   error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
510                                 tag(103), nullptr);
511   GPR_ASSERT(GRPC_CALL_OK == error);
512
513   /* Batch operation succeeds on the server side */
514   event = grpc_completion_queue_next(
515       g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
516   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
517   GPR_ASSERT(event.success == 1);
518   GPR_ASSERT(event.tag == tag(103));
519
520   event = grpc_completion_queue_next(
521       g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
522   /* In both cases server sends GRPC_STATUS_UNAVALABLE to the client. However,
523    * the client may not receive this grpc_status as it's socket is being closed.
524    * If the client didn't get grpc_status from the server it will time out
525    * waiting on the completion queue
526    */
527   if (event.type == GRPC_OP_COMPLETE) {
528     GPR_ASSERT(event.success == 1);
529     GPR_ASSERT(event.tag == tag(1));
530     GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
531   } else {
532     GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
533     GPR_ASSERT(event.success == 0);
534     GPR_ASSERT(event.tag == nullptr);
535     /* status is not initialized */
536     GPR_ASSERT(status == GRPC_STATUS__DO_NOT_USE);
537   }
538   GPR_ASSERT(was_cancelled == 0);
539
540   grpc_metadata_array_destroy(&initial_metadata_recv);
541   grpc_metadata_array_destroy(&trailing_metadata_recv);
542   grpc_metadata_array_destroy(&request_metadata_recv);
543   grpc_call_details_destroy(&call_details);
544
545   grpc_call_unref(call);
546   grpc_call_unref(server_call);
547
548   grpc_byte_buffer_destroy(request_payload);
549   grpc_byte_buffer_destroy(response_payload);
550   grpc_byte_buffer_destroy(request_payload_recv);
551   grpc_byte_buffer_destroy(response_payload_recv);
552
553   end_test();
554 }
555
556 static void test_close_before_server_send() {
557   /* Close client side of the connection before server sends message to client
558    * */
559   _test_close_before_server_send(CLIENT_FD);
560   /* Close server side of the connection before server sends message to client
561    * */
562   _test_close_before_server_send(SERVER_FD);
563 }
564
565 static void _test_close_before_client_send(fd_type fdtype) {
566   grpc_core::ExecCtx exec_ctx;
567   grpc_call* call;
568   grpc_event event;
569   grpc_slice request_payload_slice =
570       grpc_slice_from_copied_string("hello world");
571   grpc_slice response_payload_slice =
572       grpc_slice_from_copied_string("hello you");
573   grpc_byte_buffer* request_payload =
574       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
575   grpc_byte_buffer* response_payload =
576       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
577   gpr_log(GPR_INFO, "Running test: test_close_%s_before_client_send",
578           fd_type_str(fdtype));
579   test_init();
580
581   grpc_op ops[6];
582   grpc_op* op;
583   grpc_metadata_array initial_metadata_recv;
584   grpc_metadata_array trailing_metadata_recv;
585   grpc_metadata_array request_metadata_recv;
586   grpc_byte_buffer* request_payload_recv = nullptr;
587   grpc_byte_buffer* response_payload_recv = nullptr;
588   grpc_call_details call_details;
589   grpc_status_code status;
590   grpc_call_error error;
591   grpc_slice details;
592
593   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
594   call = grpc_channel_create_call(
595       g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
596       grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
597   GPR_ASSERT(call);
598
599   grpc_metadata_array_init(&initial_metadata_recv);
600   grpc_metadata_array_init(&trailing_metadata_recv);
601   grpc_metadata_array_init(&request_metadata_recv);
602   grpc_call_details_init(&call_details);
603
604   memset(ops, 0, sizeof(ops));
605   op = ops;
606   op->op = GRPC_OP_SEND_INITIAL_METADATA;
607   op->data.send_initial_metadata.count = 0;
608   op->flags = 0;
609   op->reserved = nullptr;
610   op++;
611   op->op = GRPC_OP_SEND_MESSAGE;
612   op->data.send_message.send_message = request_payload;
613   op->flags = 0;
614   op->reserved = nullptr;
615   op++;
616   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
617   op->flags = 0;
618   op->reserved = nullptr;
619   op++;
620   op->op = GRPC_OP_RECV_INITIAL_METADATA;
621   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
622   op->flags = 0;
623   op->reserved = nullptr;
624   op++;
625   op->op = GRPC_OP_RECV_MESSAGE;
626   op->data.recv_message.recv_message = &response_payload_recv;
627   op->flags = 0;
628   op->reserved = nullptr;
629   op++;
630   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
631   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
632   op->data.recv_status_on_client.status = &status;
633   op->data.recv_status_on_client.status_details = &details;
634   op->flags = 0;
635   op->reserved = nullptr;
636   op++;
637
638   grpc_endpoint_pair* sfd = g_ctx.ep;
639   int fd;
640   if (fdtype == SERVER_FD) {
641     fd = sfd->server->vtable->get_fd(sfd->server);
642   } else {
643     GPR_ASSERT(fdtype == CLIENT_FD);
644     fd = sfd->client->vtable->get_fd(sfd->client);
645   }
646   /* Connection is closed before the client sends a batch to the server */
647   close(fd);
648
649   error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
650                                 tag(1), nullptr);
651   GPR_ASSERT(GRPC_CALL_OK == error);
652
653   /* Status unavailable is returned to the client when client or server fd is
654    * closed */
655   event = grpc_completion_queue_next(
656       g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
657   GPR_ASSERT(event.success == 1);
658   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
659   GPR_ASSERT(event.tag == tag(1));
660   GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
661
662   /* No event is received on the server */
663   event = grpc_completion_queue_next(
664       g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
665   GPR_ASSERT(event.success == 0);
666   GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
667   GPR_ASSERT(event.tag == nullptr);
668
669   grpc_slice_unref(details);
670   grpc_metadata_array_destroy(&initial_metadata_recv);
671   grpc_metadata_array_destroy(&trailing_metadata_recv);
672   grpc_metadata_array_destroy(&request_metadata_recv);
673   grpc_call_details_destroy(&call_details);
674
675   grpc_call_unref(call);
676
677   grpc_byte_buffer_destroy(request_payload);
678   grpc_byte_buffer_destroy(response_payload);
679   grpc_byte_buffer_destroy(request_payload_recv);
680   grpc_byte_buffer_destroy(response_payload_recv);
681
682   end_test();
683 }
684 static void test_close_before_client_send() {
685   /* Close client side of the connection before client sends message to server
686    * */
687   _test_close_before_client_send(CLIENT_FD);
688   /* Close server side of the connection before client sends message to server
689    * */
690   _test_close_before_client_send(SERVER_FD);
691 }
692
693 static void _test_close_before_call_create(fd_type fdtype) {
694   grpc_core::ExecCtx exec_ctx;
695   grpc_call* call;
696   grpc_event event;
697   test_init();
698
699   gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(100);
700
701   grpc_endpoint_pair* sfd = g_ctx.ep;
702   int fd;
703   if (fdtype == SERVER_FD) {
704     fd = sfd->server->vtable->get_fd(sfd->server);
705   } else {
706     GPR_ASSERT(fdtype == CLIENT_FD);
707     fd = sfd->client->vtable->get_fd(sfd->client);
708   }
709   /* Connection is closed before the client creates a call */
710   close(fd);
711
712   call = grpc_channel_create_call(
713       g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
714       grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
715   GPR_ASSERT(call);
716
717   /* Client and server time out waiting on their completion queues and nothing
718    * is sent or received */
719   event = grpc_completion_queue_next(
720       g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
721   GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
722   GPR_ASSERT(event.success == 0);
723   GPR_ASSERT(event.tag == nullptr);
724
725   event = grpc_completion_queue_next(
726       g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
727   GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
728   GPR_ASSERT(event.success == 0);
729   GPR_ASSERT(event.tag == nullptr);
730
731   grpc_call_unref(call);
732   end_test();
733 }
734
735 static void test_close_before_call_create() {
736   /* Close client side of the connection before client creates a call */
737   _test_close_before_call_create(CLIENT_FD);
738   /* Close server side of the connection before client creates a call */
739   _test_close_before_call_create(SERVER_FD);
740 }
741
742 int main(int argc, char** argv) {
743   grpc::testing::TestEnvironment env(argc, argv);
744   /* Init grpc */
745   grpc_init();
746   int iterations = 10;
747
748   for (int i = 0; i < iterations; ++i) {
749     test_close_before_call_create();
750     test_close_before_client_send();
751     test_close_before_server_recv();
752     test_close_before_server_send();
753   }
754
755   grpc_shutdown();
756
757   return 0;
758 }
759
760 #else /* GRPC_POSIX_SOCKET */
761
762 int main(int argc, char** argv) { return 1; }
763
764 #endif /* GRPC_POSIX_SOCKET */