Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / core / ext / transport / inproc / inproc_transport.cc
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/string_util.h>
23 #include <grpc/support/sync.h>
24 #include <grpc/support/time.h>
25 #include <string.h>
26 #include "src/core/ext/transport/inproc/inproc_transport.h"
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "src/core/lib/surface/api_trace.h"
31 #include "src/core/lib/surface/channel.h"
32 #include "src/core/lib/surface/channel_stack_type.h"
33 #include "src/core/lib/surface/server.h"
34 #include "src/core/lib/transport/connectivity_state.h"
35 #include "src/core/lib/transport/error_utils.h"
36 #include "src/core/lib/transport/transport_impl.h"
37
38 #define INPROC_LOG(...)                               \
39   do {                                                \
40     if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
41       gpr_log(__VA_ARGS__);                           \
42     }                                                 \
43   } while (0)
44
45 namespace {
46 grpc_slice g_empty_slice;
47 grpc_slice g_fake_path_key;
48 grpc_slice g_fake_path_value;
49 grpc_slice g_fake_auth_key;
50 grpc_slice g_fake_auth_value;
51
52 struct inproc_stream;
53 bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
54 void op_state_machine(void* arg, grpc_error* error);
55 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
56                   bool is_initial);
57 grpc_error* fill_in_metadata(inproc_stream* s,
58                              const grpc_metadata_batch* metadata,
59                              uint32_t flags, grpc_metadata_batch* out_md,
60                              uint32_t* outflags, bool* markfilled);
61
62 struct shared_mu {
63   shared_mu() {
64     // Share one lock between both sides since both sides get affected
65     gpr_mu_init(&mu);
66     gpr_ref_init(&refs, 2);
67   }
68
69   ~shared_mu() { gpr_mu_destroy(&mu); }
70
71   gpr_mu mu;
72   gpr_refcount refs;
73 };
74
75 struct inproc_transport {
76   inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
77                    bool is_client)
78       : mu(mu), is_client(is_client) {
79     base.vtable = vtable;
80     // Start each side of transport with 2 refs since they each have a ref
81     // to the other
82     gpr_ref_init(&refs, 2);
83     grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY,
84                                  is_client ? "inproc_client" : "inproc_server");
85   }
86
87   ~inproc_transport() {
88     grpc_connectivity_state_destroy(&connectivity);
89     if (gpr_unref(&mu->refs)) {
90       mu->~shared_mu();
91       gpr_free(mu);
92     }
93   }
94
95   void ref() {
96     INPROC_LOG(GPR_INFO, "ref_transport %p", this);
97     gpr_ref(&refs);
98   }
99
100   void unref() {
101     INPROC_LOG(GPR_INFO, "unref_transport %p", this);
102     if (!gpr_unref(&refs)) {
103       return;
104     }
105     INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
106     this->~inproc_transport();
107     gpr_free(this);
108   }
109
110   grpc_transport base;
111   shared_mu* mu;
112   gpr_refcount refs;
113   bool is_client;
114   grpc_connectivity_state_tracker connectivity;
115   void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
116                            const void* server_data);
117   void* accept_stream_data;
118   bool is_closed = false;
119   struct inproc_transport* other_side;
120   struct inproc_stream* stream_list = nullptr;
121 };
122
123 struct inproc_stream {
124   inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
125                 const void* server_data, grpc_core::Arena* arena)
126       : t(t), refs(refcount), arena(arena) {
127     // Ref this stream right now for ctor and list.
128     ref("inproc_init_stream:init");
129     ref("inproc_init_stream:list");
130
131     grpc_metadata_batch_init(&to_read_initial_md);
132     grpc_metadata_batch_init(&to_read_trailing_md);
133     GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this,
134                       grpc_schedule_on_exec_ctx);
135     grpc_metadata_batch_init(&write_buffer_initial_md);
136     grpc_metadata_batch_init(&write_buffer_trailing_md);
137
138     stream_list_prev = nullptr;
139     gpr_mu_lock(&t->mu->mu);
140     stream_list_next = t->stream_list;
141     if (t->stream_list) {
142       t->stream_list->stream_list_prev = this;
143     }
144     t->stream_list = this;
145     gpr_mu_unlock(&t->mu->mu);
146
147     if (!server_data) {
148       t->ref();
149       inproc_transport* st = t->other_side;
150       st->ref();
151       other_side = nullptr;  // will get filled in soon
152       // Pass the client-side stream address to the server-side for a ref
153       ref("inproc_init_stream:clt");  // ref it now on behalf of server
154                                       // side to avoid destruction
155       INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
156                  st->accept_stream_cb, st->accept_stream_data);
157       (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)this);
158     } else {
159       // This is the server-side and is being called through accept_stream_cb
160       inproc_stream* cs = (inproc_stream*)server_data;
161       other_side = cs;
162       // Ref the server-side stream on behalf of the client now
163       ref("inproc_init_stream:srv");
164
165       // Now we are about to affect the other side, so lock the transport
166       // to make sure that it doesn't get destroyed
167       gpr_mu_lock(&t->mu->mu);
168       cs->other_side = this;
169       // Now transfer from the other side's write_buffer if any to the to_read
170       // buffer
171       if (cs->write_buffer_initial_md_filled) {
172         fill_in_metadata(this, &cs->write_buffer_initial_md,
173                          cs->write_buffer_initial_md_flags, &to_read_initial_md,
174                          &to_read_initial_md_flags, &to_read_initial_md_filled);
175         deadline = GPR_MIN(deadline, cs->write_buffer_deadline);
176         grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
177         cs->write_buffer_initial_md_filled = false;
178       }
179       if (cs->write_buffer_trailing_md_filled) {
180         fill_in_metadata(this, &cs->write_buffer_trailing_md, 0,
181                          &to_read_trailing_md, nullptr,
182                          &to_read_trailing_md_filled);
183         grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
184         cs->write_buffer_trailing_md_filled = false;
185       }
186       if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
187         cancel_other_error = cs->write_buffer_cancel_error;
188         cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
189       }
190
191       gpr_mu_unlock(&t->mu->mu);
192     }
193   }
194
195   ~inproc_stream() {
196     GRPC_ERROR_UNREF(write_buffer_cancel_error);
197     GRPC_ERROR_UNREF(cancel_self_error);
198     GRPC_ERROR_UNREF(cancel_other_error);
199
200     if (recv_inited) {
201       grpc_slice_buffer_destroy_internal(&recv_message);
202     }
203
204     t->unref();
205
206     if (closure_at_destroy) {
207       GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE);
208     }
209   }
210
211 #ifndef NDEBUG
212 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
213 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
214 #else
215 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
216 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
217 #endif
218   void ref(const char* reason) {
219     INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
220     STREAM_REF(refs, reason);
221   }
222
223   void unref(const char* reason) {
224     INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
225     STREAM_UNREF(refs, reason);
226   }
227 #undef STREAM_REF
228 #undef STREAM_UNREF
229
230   inproc_transport* t;
231   grpc_metadata_batch to_read_initial_md;
232   uint32_t to_read_initial_md_flags = 0;
233   bool to_read_initial_md_filled = false;
234   grpc_metadata_batch to_read_trailing_md;
235   bool to_read_trailing_md_filled = false;
236   bool ops_needed = false;
237   bool op_closure_scheduled = false;
238   grpc_closure op_closure;
239   // Write buffer used only during gap at init time when client-side
240   // stream is set up but server side stream is not yet set up
241   grpc_metadata_batch write_buffer_initial_md;
242   bool write_buffer_initial_md_filled = false;
243   uint32_t write_buffer_initial_md_flags = 0;
244   grpc_millis write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
245   grpc_metadata_batch write_buffer_trailing_md;
246   bool write_buffer_trailing_md_filled = false;
247   grpc_error* write_buffer_cancel_error = GRPC_ERROR_NONE;
248
249   struct inproc_stream* other_side;
250   bool other_side_closed = false;               // won't talk anymore
251   bool write_buffer_other_side_closed = false;  // on hold
252   grpc_stream_refcount* refs;
253   grpc_closure* closure_at_destroy = nullptr;
254
255   grpc_core::Arena* arena;
256
257   grpc_transport_stream_op_batch* send_message_op = nullptr;
258   grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
259   grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
260   grpc_transport_stream_op_batch* recv_message_op = nullptr;
261   grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
262
263   grpc_slice_buffer recv_message;
264   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
265   bool recv_inited = false;
266
267   bool initial_md_sent = false;
268   bool trailing_md_sent = false;
269   bool initial_md_recvd = false;
270   bool trailing_md_recvd = false;
271
272   bool closed = false;
273
274   grpc_error* cancel_self_error = GRPC_ERROR_NONE;
275   grpc_error* cancel_other_error = GRPC_ERROR_NONE;
276
277   grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
278
279   bool listed = true;
280   struct inproc_stream* stream_list_prev;
281   struct inproc_stream* stream_list_next;
282 };
283
284 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
285                   bool is_initial) {
286   for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
287        md = md->next) {
288     char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
289     char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
290     gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL",
291             is_client ? "CLI" : "SVR", key, value);
292     gpr_free(key);
293     gpr_free(value);
294   }
295 }
296
297 grpc_error* fill_in_metadata(inproc_stream* s,
298                              const grpc_metadata_batch* metadata,
299                              uint32_t flags, grpc_metadata_batch* out_md,
300                              uint32_t* outflags, bool* markfilled) {
301   if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
302     log_metadata(metadata, s->t->is_client, outflags != nullptr);
303   }
304
305   if (outflags != nullptr) {
306     *outflags = flags;
307   }
308   if (markfilled != nullptr) {
309     *markfilled = true;
310   }
311   grpc_error* error = GRPC_ERROR_NONE;
312   for (grpc_linked_mdelem* elem = metadata->list.head;
313        (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) {
314     grpc_linked_mdelem* nelem =
315         static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*nelem)));
316     nelem->md =
317         grpc_mdelem_from_slices(grpc_slice_intern(GRPC_MDKEY(elem->md)),
318                                 grpc_slice_intern(GRPC_MDVALUE(elem->md)));
319
320     error = grpc_metadata_batch_link_tail(out_md, nelem);
321   }
322   return error;
323 }
324
325 int init_stream(grpc_transport* gt, grpc_stream* gs,
326                 grpc_stream_refcount* refcount, const void* server_data,
327                 grpc_core::Arena* arena) {
328   INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
329   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
330   new (gs) inproc_stream(t, refcount, server_data, arena);
331   return 0;  // return value is not important
332 }
333
334 void close_stream_locked(inproc_stream* s) {
335   if (!s->closed) {
336     // Release the metadata that we would have written out
337     grpc_metadata_batch_destroy(&s->write_buffer_initial_md);
338     grpc_metadata_batch_destroy(&s->write_buffer_trailing_md);
339
340     if (s->listed) {
341       inproc_stream* p = s->stream_list_prev;
342       inproc_stream* n = s->stream_list_next;
343       if (p != nullptr) {
344         p->stream_list_next = n;
345       } else {
346         s->t->stream_list = n;
347       }
348       if (n != nullptr) {
349         n->stream_list_prev = p;
350       }
351       s->listed = false;
352       s->unref("close_stream:list");
353     }
354     s->closed = true;
355     s->unref("close_stream:closing");
356   }
357 }
358
359 // This function means that we are done talking/listening to the other side
360 void close_other_side_locked(inproc_stream* s, const char* reason) {
361   if (s->other_side != nullptr) {
362     // First release the metadata that came from the other side's arena
363     grpc_metadata_batch_destroy(&s->to_read_initial_md);
364     grpc_metadata_batch_destroy(&s->to_read_trailing_md);
365
366     s->other_side->unref(reason);
367     s->other_side_closed = true;
368     s->other_side = nullptr;
369   } else if (!s->other_side_closed) {
370     s->write_buffer_other_side_closed = true;
371   }
372 }
373
374 // Call the on_complete closure associated with this stream_op_batch if
375 // this stream_op_batch is only one of the pending operations for this
376 // stream. This is called when one of the pending operations for the stream
377 // is done and about to be NULLed out
378 void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
379                                   grpc_transport_stream_op_batch* op,
380                                   const char* msg) {
381   int is_sm = static_cast<int>(op == s->send_message_op);
382   int is_stm = static_cast<int>(op == s->send_trailing_md_op);
383   // TODO(vjpai): We should not consider the recv ops here, since they
384   // have their own callbacks.  We should invoke a batch's on_complete
385   // as soon as all of the batch's send ops are complete, even if there
386   // are still recv ops pending.
387   int is_rim = static_cast<int>(op == s->recv_initial_md_op);
388   int is_rm = static_cast<int>(op == s->recv_message_op);
389   int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
390
391   if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
392     INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error);
393     GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error));
394   }
395 }
396
397 void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) {
398   if (s && s->ops_needed && !s->op_closure_scheduled) {
399     GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error));
400     s->op_closure_scheduled = true;
401     s->ops_needed = false;
402   }
403 }
404
405 void fail_helper_locked(inproc_stream* s, grpc_error* error) {
406   INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
407   // If we're failing this side, we need to make sure that
408   // we also send or have already sent trailing metadata
409   if (!s->trailing_md_sent) {
410     // Send trailing md to the other side indicating cancellation
411     s->trailing_md_sent = true;
412
413     grpc_metadata_batch fake_md;
414     grpc_metadata_batch_init(&fake_md);
415
416     inproc_stream* other = s->other_side;
417     grpc_metadata_batch* dest = (other == nullptr)
418                                     ? &s->write_buffer_trailing_md
419                                     : &other->to_read_trailing_md;
420     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
421                                           : &other->to_read_trailing_md_filled;
422     fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled);
423     grpc_metadata_batch_destroy(&fake_md);
424
425     if (other != nullptr) {
426       if (other->cancel_other_error == GRPC_ERROR_NONE) {
427         other->cancel_other_error = GRPC_ERROR_REF(error);
428       }
429       maybe_schedule_op_closure_locked(other, error);
430     } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
431       s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
432     }
433   }
434   if (s->recv_initial_md_op) {
435     grpc_error* err;
436     if (!s->t->is_client) {
437       // If this is a server, provide initial metadata with a path and authority
438       // since it expects that as well as no error yet
439       grpc_metadata_batch fake_md;
440       grpc_metadata_batch_init(&fake_md);
441       grpc_linked_mdelem* path_md =
442           static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*path_md)));
443       path_md->md = grpc_mdelem_from_slices(g_fake_path_key, g_fake_path_value);
444       GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, path_md) ==
445                  GRPC_ERROR_NONE);
446       grpc_linked_mdelem* auth_md =
447           static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*auth_md)));
448       auth_md->md = grpc_mdelem_from_slices(g_fake_auth_key, g_fake_auth_value);
449       GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, auth_md) ==
450                  GRPC_ERROR_NONE);
451
452       fill_in_metadata(
453           s, &fake_md, 0,
454           s->recv_initial_md_op->payload->recv_initial_metadata
455               .recv_initial_metadata,
456           s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
457           nullptr);
458       grpc_metadata_batch_destroy(&fake_md);
459       err = GRPC_ERROR_NONE;
460     } else {
461       err = GRPC_ERROR_REF(error);
462     }
463     if (s->recv_initial_md_op->payload->recv_initial_metadata
464             .trailing_metadata_available != nullptr) {
465       // Set to true unconditionally, because we're failing the call, so even
466       // if we haven't actually seen the send_trailing_metadata op from the
467       // other side, we're going to return trailing metadata anyway.
468       *s->recv_initial_md_op->payload->recv_initial_metadata
469            .trailing_metadata_available = true;
470     }
471     INPROC_LOG(GPR_INFO,
472                "fail_helper %p scheduling initial-metadata-ready %p %p", s,
473                error, err);
474     GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
475                            .recv_initial_metadata_ready,
476                        err);
477     // Last use of err so no need to REF and then UNREF it
478
479     complete_if_batch_end_locked(
480         s, error, s->recv_initial_md_op,
481         "fail_helper scheduling recv-initial-metadata-on-complete");
482     s->recv_initial_md_op = nullptr;
483   }
484   if (s->recv_message_op) {
485     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s,
486                error);
487     GRPC_CLOSURE_SCHED(
488         s->recv_message_op->payload->recv_message.recv_message_ready,
489         GRPC_ERROR_REF(error));
490     complete_if_batch_end_locked(
491         s, error, s->recv_message_op,
492         "fail_helper scheduling recv-message-on-complete");
493     s->recv_message_op = nullptr;
494   }
495   if (s->send_message_op) {
496     s->send_message_op->payload->send_message.send_message.reset();
497     complete_if_batch_end_locked(
498         s, error, s->send_message_op,
499         "fail_helper scheduling send-message-on-complete");
500     s->send_message_op = nullptr;
501   }
502   if (s->send_trailing_md_op) {
503     complete_if_batch_end_locked(
504         s, error, s->send_trailing_md_op,
505         "fail_helper scheduling send-trailng-md-on-complete");
506     s->send_trailing_md_op = nullptr;
507   }
508   if (s->recv_trailing_md_op) {
509     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
510                s, error);
511     GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
512                            .recv_trailing_metadata_ready,
513                        GRPC_ERROR_REF(error));
514     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
515                s, error);
516     complete_if_batch_end_locked(
517         s, error, s->recv_trailing_md_op,
518         "fail_helper scheduling recv-trailing-metadata-on-complete");
519     s->recv_trailing_md_op = nullptr;
520   }
521   close_other_side_locked(s, "fail_helper:other_side");
522   close_stream_locked(s);
523
524   GRPC_ERROR_UNREF(error);
525 }
526
527 // TODO(vjpai): It should not be necessary to drain the incoming byte
528 // stream and create a new one; instead, we should simply pass the byte
529 // stream from the sender directly to the receiver as-is.
530 //
531 // Note that fixing this will also avoid the assumption in this code
532 // that the incoming byte stream's next() call will always return
533 // synchronously.  That assumption is true today but may not always be
534 // true in the future.
535 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
536   size_t remaining =
537       sender->send_message_op->payload->send_message.send_message->length();
538   if (receiver->recv_inited) {
539     grpc_slice_buffer_destroy_internal(&receiver->recv_message);
540   }
541   grpc_slice_buffer_init(&receiver->recv_message);
542   receiver->recv_inited = true;
543   do {
544     grpc_slice message_slice;
545     grpc_closure unused;
546     GPR_ASSERT(
547         sender->send_message_op->payload->send_message.send_message->Next(
548             SIZE_MAX, &unused));
549     grpc_error* error =
550         sender->send_message_op->payload->send_message.send_message->Pull(
551             &message_slice);
552     if (error != GRPC_ERROR_NONE) {
553       cancel_stream_locked(sender, GRPC_ERROR_REF(error));
554       break;
555     }
556     GPR_ASSERT(error == GRPC_ERROR_NONE);
557     remaining -= GRPC_SLICE_LENGTH(message_slice);
558     grpc_slice_buffer_add(&receiver->recv_message, message_slice);
559   } while (remaining > 0);
560   sender->send_message_op->payload->send_message.send_message.reset();
561
562   receiver->recv_stream.Init(&receiver->recv_message, 0);
563   receiver->recv_message_op->payload->recv_message.recv_message->reset(
564       receiver->recv_stream.get());
565   INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
566              receiver);
567   GRPC_CLOSURE_SCHED(
568       receiver->recv_message_op->payload->recv_message.recv_message_ready,
569       GRPC_ERROR_NONE);
570   complete_if_batch_end_locked(
571       sender, GRPC_ERROR_NONE, sender->send_message_op,
572       "message_transfer scheduling sender on_complete");
573   complete_if_batch_end_locked(
574       receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
575       "message_transfer scheduling receiver on_complete");
576
577   receiver->recv_message_op = nullptr;
578   sender->send_message_op = nullptr;
579 }
580
581 void op_state_machine(void* arg, grpc_error* error) {
582   // This function gets called when we have contents in the unprocessed reads
583   // Get what we want based on our ops wanted
584   // Schedule our appropriate closures
585   // and then return to ops_needed state if still needed
586
587   // Since this is a closure directly invoked by the combiner, it should not
588   // unref the error parameter explicitly; the combiner will do that implicitly
589   grpc_error* new_err = GRPC_ERROR_NONE;
590
591   bool needs_close = false;
592
593   INPROC_LOG(GPR_INFO, "op_state_machine %p", arg);
594   inproc_stream* s = static_cast<inproc_stream*>(arg);
595   gpr_mu* mu = &s->t->mu->mu;  // keep aside in case s gets closed
596   gpr_mu_lock(mu);
597   s->op_closure_scheduled = false;
598   // cancellation takes precedence
599   inproc_stream* other = s->other_side;
600
601   if (s->cancel_self_error != GRPC_ERROR_NONE) {
602     fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
603     goto done;
604   } else if (s->cancel_other_error != GRPC_ERROR_NONE) {
605     fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
606     goto done;
607   } else if (error != GRPC_ERROR_NONE) {
608     fail_helper_locked(s, GRPC_ERROR_REF(error));
609     goto done;
610   }
611
612   if (s->send_message_op && other) {
613     if (other->recv_message_op) {
614       message_transfer_locked(s, other);
615       maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
616     } else if (!s->t->is_client && s->trailing_md_sent) {
617       // A server send will never be matched if the server already sent status
618       s->send_message_op->payload->send_message.send_message.reset();
619       complete_if_batch_end_locked(
620           s, GRPC_ERROR_NONE, s->send_message_op,
621           "op_state_machine scheduling send-message-on-complete");
622       s->send_message_op = nullptr;
623     }
624   }
625   // Pause a send trailing metadata if there is still an outstanding
626   // send message unless we know that the send message will never get
627   // matched to a receive. This happens on the client if the server has
628   // already sent status or on the server if the client has requested
629   // status
630   if (s->send_trailing_md_op &&
631       (!s->send_message_op ||
632        (s->t->is_client &&
633         (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
634        (!s->t->is_client && other &&
635         (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
636          other->recv_trailing_md_op)))) {
637     grpc_metadata_batch* dest = (other == nullptr)
638                                     ? &s->write_buffer_trailing_md
639                                     : &other->to_read_trailing_md;
640     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
641                                           : &other->to_read_trailing_md_filled;
642     if (*destfilled || s->trailing_md_sent) {
643       // The buffer is already in use; that's an error!
644       INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
645       new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
646       fail_helper_locked(s, GRPC_ERROR_REF(new_err));
647       goto done;
648     } else {
649       if (!other || !other->closed) {
650         fill_in_metadata(s,
651                          s->send_trailing_md_op->payload->send_trailing_metadata
652                              .send_trailing_metadata,
653                          0, dest, nullptr, destfilled);
654       }
655       s->trailing_md_sent = true;
656       if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
657         INPROC_LOG(GPR_INFO,
658                    "op_state_machine %p scheduling trailing-metadata-ready", s);
659         GRPC_CLOSURE_SCHED(
660             s->recv_trailing_md_op->payload->recv_trailing_metadata
661                 .recv_trailing_metadata_ready,
662             GRPC_ERROR_NONE);
663         INPROC_LOG(GPR_INFO,
664                    "op_state_machine %p scheduling trailing-md-on-complete", s);
665         GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
666                            GRPC_ERROR_NONE);
667         s->recv_trailing_md_op = nullptr;
668         needs_close = true;
669       }
670     }
671     maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
672     complete_if_batch_end_locked(
673         s, GRPC_ERROR_NONE, s->send_trailing_md_op,
674         "op_state_machine scheduling send-trailing-metadata-on-complete");
675     s->send_trailing_md_op = nullptr;
676   }
677   if (s->recv_initial_md_op) {
678     if (s->initial_md_recvd) {
679       new_err =
680           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
681       INPROC_LOG(
682           GPR_INFO,
683           "op_state_machine %p scheduling on_complete errors for already "
684           "recvd initial md %p",
685           s, new_err);
686       fail_helper_locked(s, GRPC_ERROR_REF(new_err));
687       goto done;
688     }
689
690     if (s->to_read_initial_md_filled) {
691       s->initial_md_recvd = true;
692       new_err = fill_in_metadata(
693           s, &s->to_read_initial_md, s->to_read_initial_md_flags,
694           s->recv_initial_md_op->payload->recv_initial_metadata
695               .recv_initial_metadata,
696           s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
697           nullptr);
698       s->recv_initial_md_op->payload->recv_initial_metadata
699           .recv_initial_metadata->deadline = s->deadline;
700       if (s->recv_initial_md_op->payload->recv_initial_metadata
701               .trailing_metadata_available != nullptr) {
702         *s->recv_initial_md_op->payload->recv_initial_metadata
703              .trailing_metadata_available =
704             (other != nullptr && other->send_trailing_md_op != nullptr);
705       }
706       grpc_metadata_batch_clear(&s->to_read_initial_md);
707       s->to_read_initial_md_filled = false;
708       INPROC_LOG(GPR_INFO,
709                  "op_state_machine %p scheduling initial-metadata-ready %p", s,
710                  new_err);
711       GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
712                              .recv_initial_metadata_ready,
713                          GRPC_ERROR_REF(new_err));
714       complete_if_batch_end_locked(
715           s, new_err, s->recv_initial_md_op,
716           "op_state_machine scheduling recv-initial-metadata-on-complete");
717       s->recv_initial_md_op = nullptr;
718
719       if (new_err != GRPC_ERROR_NONE) {
720         INPROC_LOG(GPR_INFO,
721                    "op_state_machine %p scheduling on_complete errors2 %p", s,
722                    new_err);
723         fail_helper_locked(s, GRPC_ERROR_REF(new_err));
724         goto done;
725       }
726     }
727   }
728   if (s->recv_message_op) {
729     if (other && other->send_message_op) {
730       message_transfer_locked(other, s);
731       maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
732     }
733   }
734   if (s->to_read_trailing_md_filled) {
735     if (s->trailing_md_recvd) {
736       new_err =
737           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
738       INPROC_LOG(
739           GPR_INFO,
740           "op_state_machine %p scheduling on_complete errors for already "
741           "recvd trailing md %p",
742           s, new_err);
743       fail_helper_locked(s, GRPC_ERROR_REF(new_err));
744       goto done;
745     }
746     if (s->recv_message_op != nullptr) {
747       // This message needs to be wrapped up because it will never be
748       // satisfied
749       *s->recv_message_op->payload->recv_message.recv_message = nullptr;
750       INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
751       GRPC_CLOSURE_SCHED(
752           s->recv_message_op->payload->recv_message.recv_message_ready,
753           GRPC_ERROR_NONE);
754       complete_if_batch_end_locked(
755           s, new_err, s->recv_message_op,
756           "op_state_machine scheduling recv-message-on-complete");
757       s->recv_message_op = nullptr;
758     }
759     if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
760       // Nothing further will try to receive from this stream, so finish off
761       // any outstanding send_message op
762       s->send_message_op->payload->send_message.send_message.reset();
763       complete_if_batch_end_locked(
764           s, new_err, s->send_message_op,
765           "op_state_machine scheduling send-message-on-complete");
766       s->send_message_op = nullptr;
767     }
768     if (s->recv_trailing_md_op != nullptr) {
769       // We wanted trailing metadata and we got it
770       s->trailing_md_recvd = true;
771       new_err =
772           fill_in_metadata(s, &s->to_read_trailing_md, 0,
773                            s->recv_trailing_md_op->payload
774                                ->recv_trailing_metadata.recv_trailing_metadata,
775                            nullptr, nullptr);
776       grpc_metadata_batch_clear(&s->to_read_trailing_md);
777       s->to_read_trailing_md_filled = false;
778
779       // We should schedule the recv_trailing_md_op completion if
780       // 1. this stream is the client-side
781       // 2. this stream is the server-side AND has already sent its trailing md
782       //    (If the server hasn't already sent its trailing md, it doesn't have
783       //     a final status, so don't mark this op complete)
784       if (s->t->is_client || s->trailing_md_sent) {
785         INPROC_LOG(GPR_INFO,
786                    "op_state_machine %p scheduling trailing-md-on-complete %p",
787                    s, new_err);
788         GRPC_CLOSURE_SCHED(
789             s->recv_trailing_md_op->payload->recv_trailing_metadata
790                 .recv_trailing_metadata_ready,
791             GRPC_ERROR_REF(new_err));
792         GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
793                            GRPC_ERROR_REF(new_err));
794         s->recv_trailing_md_op = nullptr;
795         needs_close = true;
796       } else {
797         INPROC_LOG(GPR_INFO,
798                    "op_state_machine %p server needs to delay handling "
799                    "trailing-md-on-complete %p",
800                    s, new_err);
801       }
802     } else {
803       INPROC_LOG(
804           GPR_INFO,
805           "op_state_machine %p has trailing md but not yet waiting for it", s);
806     }
807   }
808   if (s->trailing_md_recvd && s->recv_message_op) {
809     // No further message will come on this stream, so finish off the
810     // recv_message_op
811     INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
812     *s->recv_message_op->payload->recv_message.recv_message = nullptr;
813     GRPC_CLOSURE_SCHED(
814         s->recv_message_op->payload->recv_message.recv_message_ready,
815         GRPC_ERROR_NONE);
816     complete_if_batch_end_locked(
817         s, new_err, s->recv_message_op,
818         "op_state_machine scheduling recv-message-on-complete");
819     s->recv_message_op = nullptr;
820   }
821   if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
822       s->send_message_op) {
823     // Nothing further will try to receive from this stream, so finish off
824     // any outstanding send_message op
825     s->send_message_op->payload->send_message.send_message.reset();
826     complete_if_batch_end_locked(
827         s, new_err, s->send_message_op,
828         "op_state_machine scheduling send-message-on-complete");
829     s->send_message_op = nullptr;
830   }
831   if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
832       s->recv_message_op || s->recv_trailing_md_op) {
833     // Didn't get the item we wanted so we still need to get
834     // rescheduled
835     INPROC_LOG(
836         GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
837         s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
838         s->recv_message_op, s->recv_trailing_md_op);
839     s->ops_needed = true;
840   }
841 done:
842   if (needs_close) {
843     close_other_side_locked(s, "op_state_machine");
844     close_stream_locked(s);
845   }
846   gpr_mu_unlock(mu);
847   GRPC_ERROR_UNREF(new_err);
848 }
849
850 bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
851   bool ret = false;  // was the cancel accepted
852   INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error));
853   if (s->cancel_self_error == GRPC_ERROR_NONE) {
854     ret = true;
855     s->cancel_self_error = GRPC_ERROR_REF(error);
856     maybe_schedule_op_closure_locked(s, s->cancel_self_error);
857     // Send trailing md to the other side indicating cancellation, even if we
858     // already have
859     s->trailing_md_sent = true;
860
861     grpc_metadata_batch cancel_md;
862     grpc_metadata_batch_init(&cancel_md);
863
864     inproc_stream* other = s->other_side;
865     grpc_metadata_batch* dest = (other == nullptr)
866                                     ? &s->write_buffer_trailing_md
867                                     : &other->to_read_trailing_md;
868     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
869                                           : &other->to_read_trailing_md_filled;
870     fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled);
871     grpc_metadata_batch_destroy(&cancel_md);
872
873     if (other != nullptr) {
874       if (other->cancel_other_error == GRPC_ERROR_NONE) {
875         other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
876       }
877       maybe_schedule_op_closure_locked(other, other->cancel_other_error);
878     } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
879       s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
880     }
881
882     // if we are a server and already received trailing md but
883     // couldn't complete that because we hadn't yet sent out trailing
884     // md, now's the chance
885     if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
886       GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
887                              .recv_trailing_metadata_ready,
888                          GRPC_ERROR_REF(s->cancel_self_error));
889       complete_if_batch_end_locked(
890           s, s->cancel_self_error, s->recv_trailing_md_op,
891           "cancel_stream scheduling trailing-md-on-complete");
892       s->recv_trailing_md_op = nullptr;
893     }
894   }
895
896   close_other_side_locked(s, "cancel_stream:other_side");
897   close_stream_locked(s);
898
899   GRPC_ERROR_UNREF(error);
900   return ret;
901 }
902
903 void do_nothing(void* arg, grpc_error* error) {}
904
905 void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
906                        grpc_transport_stream_op_batch* op) {
907   INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
908   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
909   gpr_mu* mu = &s->t->mu->mu;  // save aside in case s gets closed
910   gpr_mu_lock(mu);
911
912   if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
913     if (op->send_initial_metadata) {
914       log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
915                    s->t->is_client, true);
916     }
917     if (op->send_trailing_metadata) {
918       log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
919                    s->t->is_client, false);
920     }
921   }
922   grpc_error* error = GRPC_ERROR_NONE;
923   grpc_closure* on_complete = op->on_complete;
924   // TODO(roth): This is a hack needed because we use data inside of the
925   // closure itself to do the barrier calculation (i.e., to ensure that
926   // we don't schedule the closure until all ops in the batch have been
927   // completed).  This can go away once we move to a new C++ closure API
928   // that provides the ability to create a barrier closure.
929   if (on_complete == nullptr) {
930     on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
931                                     nullptr, grpc_schedule_on_exec_ctx);
932   }
933
934   if (op->cancel_stream) {
935     // Call cancel_stream_locked without ref'ing the cancel_error because
936     // this function is responsible to make sure that that field gets unref'ed
937     cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
938     // this op can complete without an error
939   } else if (s->cancel_self_error != GRPC_ERROR_NONE) {
940     // already self-canceled so still give it an error
941     error = GRPC_ERROR_REF(s->cancel_self_error);
942   } else {
943     INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
944                s->t->is_client ? "client" : "server",
945                op->send_initial_metadata ? " send_initial_metadata" : "",
946                op->send_message ? " send_message" : "",
947                op->send_trailing_metadata ? " send_trailing_metadata" : "",
948                op->recv_initial_metadata ? " recv_initial_metadata" : "",
949                op->recv_message ? " recv_message" : "",
950                op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
951   }
952
953   bool needs_close = false;
954
955   inproc_stream* other = s->other_side;
956   if (error == GRPC_ERROR_NONE &&
957       (op->send_initial_metadata || op->send_trailing_metadata)) {
958     if (s->t->is_closed) {
959       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
960     }
961     if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
962       grpc_metadata_batch* dest = (other == nullptr)
963                                       ? &s->write_buffer_initial_md
964                                       : &other->to_read_initial_md;
965       uint32_t* destflags = (other == nullptr)
966                                 ? &s->write_buffer_initial_md_flags
967                                 : &other->to_read_initial_md_flags;
968       bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
969                                             : &other->to_read_initial_md_filled;
970       if (*destfilled || s->initial_md_sent) {
971         // The buffer is already in use; that's an error!
972         INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
973         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
974       } else {
975         if (!other || !other->closed) {
976           fill_in_metadata(
977               s, op->payload->send_initial_metadata.send_initial_metadata,
978               op->payload->send_initial_metadata.send_initial_metadata_flags,
979               dest, destflags, destfilled);
980         }
981         if (s->t->is_client) {
982           grpc_millis* dl =
983               (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
984           *dl = GPR_MIN(*dl, op->payload->send_initial_metadata
985                                  .send_initial_metadata->deadline);
986           s->initial_md_sent = true;
987         }
988       }
989       maybe_schedule_op_closure_locked(other, error);
990     }
991   }
992
993   if (error == GRPC_ERROR_NONE &&
994       (op->send_message || op->send_trailing_metadata ||
995        op->recv_initial_metadata || op->recv_message ||
996        op->recv_trailing_metadata)) {
997     // Mark ops that need to be processed by the closure
998     if (op->send_message) {
999       s->send_message_op = op;
1000     }
1001     if (op->send_trailing_metadata) {
1002       s->send_trailing_md_op = op;
1003     }
1004     if (op->recv_initial_metadata) {
1005       s->recv_initial_md_op = op;
1006     }
1007     if (op->recv_message) {
1008       s->recv_message_op = op;
1009     }
1010     if (op->recv_trailing_metadata) {
1011       s->recv_trailing_md_op = op;
1012     }
1013
1014     // We want to initiate the closure if:
1015     // 1. We want to send a message and the other side wants to receive
1016     // 2. We want to send trailing metadata and there isn't an unmatched send
1017     //    or the other side wants trailing metadata
1018     // 3. We want initial metadata and the other side has sent it
1019     // 4. We want to receive a message and there is a message ready
1020     // 5. There is trailing metadata, even if nothing specifically wants
1021     //    that because that can shut down the receive message as well
1022     if ((op->send_message && other && other->recv_message_op != nullptr) ||
1023         (op->send_trailing_metadata &&
1024          (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1025         (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1026         (op->recv_message && other && other->send_message_op != nullptr) ||
1027         (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1028       if (!s->op_closure_scheduled) {
1029         GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE);
1030         s->op_closure_scheduled = true;
1031       }
1032     } else {
1033       s->ops_needed = true;
1034     }
1035   } else {
1036     if (error != GRPC_ERROR_NONE) {
1037       // Consume any send message that was sent here but that we are not pushing
1038       // to the other side
1039       if (op->send_message) {
1040         op->payload->send_message.send_message.reset();
1041       }
1042       // Schedule op's closures that we didn't push to op state machine
1043       if (op->recv_initial_metadata) {
1044         if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1045             nullptr) {
1046           // Set to true unconditionally, because we're failing the call, so
1047           // even if we haven't actually seen the send_trailing_metadata op
1048           // from the other side, we're going to return trailing metadata
1049           // anyway.
1050           *op->payload->recv_initial_metadata.trailing_metadata_available =
1051               true;
1052         }
1053         INPROC_LOG(
1054             GPR_INFO,
1055             "perform_stream_op error %p scheduling initial-metadata-ready %p",
1056             s, error);
1057         GRPC_CLOSURE_SCHED(
1058             op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1059             GRPC_ERROR_REF(error));
1060       }
1061       if (op->recv_message) {
1062         INPROC_LOG(
1063             GPR_INFO,
1064             "perform_stream_op error %p scheduling recv message-ready %p", s,
1065             error);
1066         GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1067                            GRPC_ERROR_REF(error));
1068       }
1069       if (op->recv_trailing_metadata) {
1070         INPROC_LOG(
1071             GPR_INFO,
1072             "perform_stream_op error %p scheduling trailing-metadata-ready %p",
1073             s, error);
1074         GRPC_CLOSURE_SCHED(
1075             op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1076             GRPC_ERROR_REF(error));
1077       }
1078     }
1079     INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
1080                error);
1081     GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
1082   }
1083   if (needs_close) {
1084     close_other_side_locked(s, "perform_stream_op:other_side");
1085     close_stream_locked(s);
1086   }
1087   gpr_mu_unlock(mu);
1088   GRPC_ERROR_UNREF(error);
1089 }
1090
1091 void close_transport_locked(inproc_transport* t) {
1092   INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1093   grpc_connectivity_state_set(&t->connectivity, GRPC_CHANNEL_SHUTDOWN,
1094                               "close transport");
1095   if (!t->is_closed) {
1096     t->is_closed = true;
1097     /* Also end all streams on this transport */
1098     while (t->stream_list != nullptr) {
1099       // cancel_stream_locked also adjusts stream list
1100       cancel_stream_locked(
1101           t->stream_list,
1102           grpc_error_set_int(
1103               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
1104               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1105     }
1106   }
1107 }
1108
1109 void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1110   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1111   INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1112   gpr_mu_lock(&t->mu->mu);
1113   if (op->on_connectivity_state_change) {
1114     grpc_connectivity_state_notify_on_state_change(
1115         &t->connectivity, op->connectivity_state,
1116         op->on_connectivity_state_change);
1117   }
1118   if (op->set_accept_stream) {
1119     t->accept_stream_cb = op->set_accept_stream_fn;
1120     t->accept_stream_data = op->set_accept_stream_user_data;
1121   }
1122   if (op->on_consumed) {
1123     GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1124   }
1125
1126   bool do_close = false;
1127   if (op->goaway_error != GRPC_ERROR_NONE) {
1128     do_close = true;
1129     GRPC_ERROR_UNREF(op->goaway_error);
1130   }
1131   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1132     do_close = true;
1133     GRPC_ERROR_UNREF(op->disconnect_with_error);
1134   }
1135
1136   if (do_close) {
1137     close_transport_locked(t);
1138   }
1139   gpr_mu_unlock(&t->mu->mu);
1140 }
1141
1142 void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1143                     grpc_closure* then_schedule_closure) {
1144   INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1145   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1146   s->closure_at_destroy = then_schedule_closure;
1147   s->~inproc_stream();
1148 }
1149
1150 void destroy_transport(grpc_transport* gt) {
1151   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1152   INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1153   gpr_mu_lock(&t->mu->mu);
1154   close_transport_locked(t);
1155   gpr_mu_unlock(&t->mu->mu);
1156   t->other_side->unref();
1157   t->unref();
1158 }
1159
1160 /*******************************************************************************
1161  * INTEGRATION GLUE
1162  */
1163
1164 void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* pollset) {
1165   // Nothing to do here
1166 }
1167
1168 void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
1169                      grpc_pollset_set* pollset_set) {
1170   // Nothing to do here
1171 }
1172
1173 grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
1174
1175 const grpc_transport_vtable inproc_vtable = {
1176     sizeof(inproc_stream), "inproc",        init_stream,
1177     set_pollset,           set_pollset_set, perform_stream_op,
1178     perform_transport_op,  destroy_stream,  destroy_transport,
1179     get_endpoint};
1180
1181 /*******************************************************************************
1182  * Main inproc transport functions
1183  */
1184 void inproc_transports_create(grpc_transport** server_transport,
1185                               const grpc_channel_args* server_args,
1186                               grpc_transport** client_transport,
1187                               const grpc_channel_args* client_args) {
1188   INPROC_LOG(GPR_INFO, "inproc_transports_create");
1189   shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1190   inproc_transport* st = new (gpr_malloc(sizeof(*st)))
1191       inproc_transport(&inproc_vtable, mu, /*is_client=*/false);
1192   inproc_transport* ct = new (gpr_malloc(sizeof(*ct)))
1193       inproc_transport(&inproc_vtable, mu, /*is_client=*/true);
1194   st->other_side = ct;
1195   ct->other_side = st;
1196   *server_transport = reinterpret_cast<grpc_transport*>(st);
1197   *client_transport = reinterpret_cast<grpc_transport*>(ct);
1198 }
1199 }  // namespace
1200
1201 /*******************************************************************************
1202  * GLOBAL INIT AND DESTROY
1203  */
1204 void grpc_inproc_transport_init(void) {
1205   grpc_core::ExecCtx exec_ctx;
1206   g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
1207
1208   grpc_slice key_tmp = grpc_slice_from_static_string(":path");
1209   g_fake_path_key = grpc_slice_intern(key_tmp);
1210   grpc_slice_unref_internal(key_tmp);
1211
1212   g_fake_path_value = grpc_slice_from_static_string("/");
1213
1214   grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
1215   g_fake_auth_key = grpc_slice_intern(auth_tmp);
1216   grpc_slice_unref_internal(auth_tmp);
1217
1218   g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
1219 }
1220
1221 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1222                                          grpc_channel_args* args,
1223                                          void* reserved) {
1224   GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1225                  (server, args));
1226
1227   grpc_core::ExecCtx exec_ctx;
1228
1229   const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
1230
1231   // Add a default authority channel argument for the client
1232
1233   grpc_arg default_authority_arg;
1234   default_authority_arg.type = GRPC_ARG_STRING;
1235   default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY;
1236   default_authority_arg.value.string = (char*)"inproc.authority";
1237   grpc_channel_args* client_args =
1238       grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
1239
1240   grpc_transport* server_transport;
1241   grpc_transport* client_transport;
1242   inproc_transports_create(&server_transport, server_args, &client_transport,
1243                            client_args);
1244
1245   // TODO(ncteisen): design and support channelz GetSocket for inproc.
1246   grpc_server_setup_transport(server, server_transport, nullptr, server_args,
1247                               nullptr);
1248   grpc_channel* channel = grpc_channel_create(
1249       "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1250
1251   // Free up created channel args
1252   grpc_channel_args_destroy(client_args);
1253
1254   // Now finish scheduled operations
1255
1256   return channel;
1257 }
1258
1259 void grpc_inproc_transport_shutdown(void) {
1260   grpc_core::ExecCtx exec_ctx;
1261   grpc_slice_unref_internal(g_empty_slice);
1262   grpc_slice_unref_internal(g_fake_path_key);
1263   grpc_slice_unref_internal(g_fake_path_value);
1264   grpc_slice_unref_internal(g_fake_auth_key);
1265   grpc_slice_unref_internal(g_fake_auth_value);
1266 }