3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/string_util.h>
23 #include <grpc/support/sync.h>
24 #include <grpc/support/time.h>
26 #include "src/core/ext/transport/inproc/inproc_transport.h"
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "src/core/lib/surface/api_trace.h"
31 #include "src/core/lib/surface/channel.h"
32 #include "src/core/lib/surface/channel_stack_type.h"
33 #include "src/core/lib/surface/server.h"
34 #include "src/core/lib/transport/connectivity_state.h"
35 #include "src/core/lib/transport/error_utils.h"
36 #include "src/core/lib/transport/transport_impl.h"
38 #define INPROC_LOG(...) \
40 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
41 gpr_log(__VA_ARGS__); \
46 grpc_slice g_empty_slice;
47 grpc_slice g_fake_path_key;
48 grpc_slice g_fake_path_value;
49 grpc_slice g_fake_auth_key;
50 grpc_slice g_fake_auth_value;
53 bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
54 void op_state_machine(void* arg, grpc_error* error);
55 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
57 grpc_error* fill_in_metadata(inproc_stream* s,
58 const grpc_metadata_batch* metadata,
59 uint32_t flags, grpc_metadata_batch* out_md,
60 uint32_t* outflags, bool* markfilled);
64 // Share one lock between both sides since both sides get affected
66 gpr_ref_init(&refs, 2);
69 ~shared_mu() { gpr_mu_destroy(&mu); }
75 struct inproc_transport {
76 inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
78 : mu(mu), is_client(is_client) {
80 // Start each side of transport with 2 refs since they each have a ref
82 gpr_ref_init(&refs, 2);
83 grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY,
84 is_client ? "inproc_client" : "inproc_server");
88 grpc_connectivity_state_destroy(&connectivity);
89 if (gpr_unref(&mu->refs)) {
96 INPROC_LOG(GPR_INFO, "ref_transport %p", this);
101 INPROC_LOG(GPR_INFO, "unref_transport %p", this);
102 if (!gpr_unref(&refs)) {
105 INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
106 this->~inproc_transport();
114 grpc_connectivity_state_tracker connectivity;
115 void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
116 const void* server_data);
117 void* accept_stream_data;
118 bool is_closed = false;
119 struct inproc_transport* other_side;
120 struct inproc_stream* stream_list = nullptr;
123 struct inproc_stream {
124 inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
125 const void* server_data, grpc_core::Arena* arena)
126 : t(t), refs(refcount), arena(arena) {
127 // Ref this stream right now for ctor and list.
128 ref("inproc_init_stream:init");
129 ref("inproc_init_stream:list");
131 grpc_metadata_batch_init(&to_read_initial_md);
132 grpc_metadata_batch_init(&to_read_trailing_md);
133 GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this,
134 grpc_schedule_on_exec_ctx);
135 grpc_metadata_batch_init(&write_buffer_initial_md);
136 grpc_metadata_batch_init(&write_buffer_trailing_md);
138 stream_list_prev = nullptr;
139 gpr_mu_lock(&t->mu->mu);
140 stream_list_next = t->stream_list;
141 if (t->stream_list) {
142 t->stream_list->stream_list_prev = this;
144 t->stream_list = this;
145 gpr_mu_unlock(&t->mu->mu);
149 inproc_transport* st = t->other_side;
151 other_side = nullptr; // will get filled in soon
152 // Pass the client-side stream address to the server-side for a ref
153 ref("inproc_init_stream:clt"); // ref it now on behalf of server
154 // side to avoid destruction
155 INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
156 st->accept_stream_cb, st->accept_stream_data);
157 (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)this);
159 // This is the server-side and is being called through accept_stream_cb
160 inproc_stream* cs = (inproc_stream*)server_data;
162 // Ref the server-side stream on behalf of the client now
163 ref("inproc_init_stream:srv");
165 // Now we are about to affect the other side, so lock the transport
166 // to make sure that it doesn't get destroyed
167 gpr_mu_lock(&t->mu->mu);
168 cs->other_side = this;
169 // Now transfer from the other side's write_buffer if any to the to_read
171 if (cs->write_buffer_initial_md_filled) {
172 fill_in_metadata(this, &cs->write_buffer_initial_md,
173 cs->write_buffer_initial_md_flags, &to_read_initial_md,
174 &to_read_initial_md_flags, &to_read_initial_md_filled);
175 deadline = GPR_MIN(deadline, cs->write_buffer_deadline);
176 grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
177 cs->write_buffer_initial_md_filled = false;
179 if (cs->write_buffer_trailing_md_filled) {
180 fill_in_metadata(this, &cs->write_buffer_trailing_md, 0,
181 &to_read_trailing_md, nullptr,
182 &to_read_trailing_md_filled);
183 grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
184 cs->write_buffer_trailing_md_filled = false;
186 if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
187 cancel_other_error = cs->write_buffer_cancel_error;
188 cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
191 gpr_mu_unlock(&t->mu->mu);
196 GRPC_ERROR_UNREF(write_buffer_cancel_error);
197 GRPC_ERROR_UNREF(cancel_self_error);
198 GRPC_ERROR_UNREF(cancel_other_error);
201 grpc_slice_buffer_destroy_internal(&recv_message);
206 if (closure_at_destroy) {
207 GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE);
212 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
213 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
215 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
216 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
218 void ref(const char* reason) {
219 INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
220 STREAM_REF(refs, reason);
223 void unref(const char* reason) {
224 INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
225 STREAM_UNREF(refs, reason);
231 grpc_metadata_batch to_read_initial_md;
232 uint32_t to_read_initial_md_flags = 0;
233 bool to_read_initial_md_filled = false;
234 grpc_metadata_batch to_read_trailing_md;
235 bool to_read_trailing_md_filled = false;
236 bool ops_needed = false;
237 bool op_closure_scheduled = false;
238 grpc_closure op_closure;
239 // Write buffer used only during gap at init time when client-side
240 // stream is set up but server side stream is not yet set up
241 grpc_metadata_batch write_buffer_initial_md;
242 bool write_buffer_initial_md_filled = false;
243 uint32_t write_buffer_initial_md_flags = 0;
244 grpc_millis write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
245 grpc_metadata_batch write_buffer_trailing_md;
246 bool write_buffer_trailing_md_filled = false;
247 grpc_error* write_buffer_cancel_error = GRPC_ERROR_NONE;
249 struct inproc_stream* other_side;
250 bool other_side_closed = false; // won't talk anymore
251 bool write_buffer_other_side_closed = false; // on hold
252 grpc_stream_refcount* refs;
253 grpc_closure* closure_at_destroy = nullptr;
255 grpc_core::Arena* arena;
257 grpc_transport_stream_op_batch* send_message_op = nullptr;
258 grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
259 grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
260 grpc_transport_stream_op_batch* recv_message_op = nullptr;
261 grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
263 grpc_slice_buffer recv_message;
264 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
265 bool recv_inited = false;
267 bool initial_md_sent = false;
268 bool trailing_md_sent = false;
269 bool initial_md_recvd = false;
270 bool trailing_md_recvd = false;
274 grpc_error* cancel_self_error = GRPC_ERROR_NONE;
275 grpc_error* cancel_other_error = GRPC_ERROR_NONE;
277 grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
280 struct inproc_stream* stream_list_prev;
281 struct inproc_stream* stream_list_next;
284 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
286 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
288 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
289 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
290 gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL",
291 is_client ? "CLI" : "SVR", key, value);
297 grpc_error* fill_in_metadata(inproc_stream* s,
298 const grpc_metadata_batch* metadata,
299 uint32_t flags, grpc_metadata_batch* out_md,
300 uint32_t* outflags, bool* markfilled) {
301 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
302 log_metadata(metadata, s->t->is_client, outflags != nullptr);
305 if (outflags != nullptr) {
308 if (markfilled != nullptr) {
311 grpc_error* error = GRPC_ERROR_NONE;
312 for (grpc_linked_mdelem* elem = metadata->list.head;
313 (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) {
314 grpc_linked_mdelem* nelem =
315 static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*nelem)));
317 grpc_mdelem_from_slices(grpc_slice_intern(GRPC_MDKEY(elem->md)),
318 grpc_slice_intern(GRPC_MDVALUE(elem->md)));
320 error = grpc_metadata_batch_link_tail(out_md, nelem);
325 int init_stream(grpc_transport* gt, grpc_stream* gs,
326 grpc_stream_refcount* refcount, const void* server_data,
327 grpc_core::Arena* arena) {
328 INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
329 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
330 new (gs) inproc_stream(t, refcount, server_data, arena);
331 return 0; // return value is not important
334 void close_stream_locked(inproc_stream* s) {
336 // Release the metadata that we would have written out
337 grpc_metadata_batch_destroy(&s->write_buffer_initial_md);
338 grpc_metadata_batch_destroy(&s->write_buffer_trailing_md);
341 inproc_stream* p = s->stream_list_prev;
342 inproc_stream* n = s->stream_list_next;
344 p->stream_list_next = n;
346 s->t->stream_list = n;
349 n->stream_list_prev = p;
352 s->unref("close_stream:list");
355 s->unref("close_stream:closing");
359 // This function means that we are done talking/listening to the other side
360 void close_other_side_locked(inproc_stream* s, const char* reason) {
361 if (s->other_side != nullptr) {
362 // First release the metadata that came from the other side's arena
363 grpc_metadata_batch_destroy(&s->to_read_initial_md);
364 grpc_metadata_batch_destroy(&s->to_read_trailing_md);
366 s->other_side->unref(reason);
367 s->other_side_closed = true;
368 s->other_side = nullptr;
369 } else if (!s->other_side_closed) {
370 s->write_buffer_other_side_closed = true;
374 // Call the on_complete closure associated with this stream_op_batch if
375 // this stream_op_batch is only one of the pending operations for this
376 // stream. This is called when one of the pending operations for the stream
377 // is done and about to be NULLed out
378 void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
379 grpc_transport_stream_op_batch* op,
381 int is_sm = static_cast<int>(op == s->send_message_op);
382 int is_stm = static_cast<int>(op == s->send_trailing_md_op);
383 // TODO(vjpai): We should not consider the recv ops here, since they
384 // have their own callbacks. We should invoke a batch's on_complete
385 // as soon as all of the batch's send ops are complete, even if there
386 // are still recv ops pending.
387 int is_rim = static_cast<int>(op == s->recv_initial_md_op);
388 int is_rm = static_cast<int>(op == s->recv_message_op);
389 int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
391 if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
392 INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error);
393 GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error));
397 void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) {
398 if (s && s->ops_needed && !s->op_closure_scheduled) {
399 GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error));
400 s->op_closure_scheduled = true;
401 s->ops_needed = false;
405 void fail_helper_locked(inproc_stream* s, grpc_error* error) {
406 INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
407 // If we're failing this side, we need to make sure that
408 // we also send or have already sent trailing metadata
409 if (!s->trailing_md_sent) {
410 // Send trailing md to the other side indicating cancellation
411 s->trailing_md_sent = true;
413 grpc_metadata_batch fake_md;
414 grpc_metadata_batch_init(&fake_md);
416 inproc_stream* other = s->other_side;
417 grpc_metadata_batch* dest = (other == nullptr)
418 ? &s->write_buffer_trailing_md
419 : &other->to_read_trailing_md;
420 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
421 : &other->to_read_trailing_md_filled;
422 fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled);
423 grpc_metadata_batch_destroy(&fake_md);
425 if (other != nullptr) {
426 if (other->cancel_other_error == GRPC_ERROR_NONE) {
427 other->cancel_other_error = GRPC_ERROR_REF(error);
429 maybe_schedule_op_closure_locked(other, error);
430 } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
431 s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
434 if (s->recv_initial_md_op) {
436 if (!s->t->is_client) {
437 // If this is a server, provide initial metadata with a path and authority
438 // since it expects that as well as no error yet
439 grpc_metadata_batch fake_md;
440 grpc_metadata_batch_init(&fake_md);
441 grpc_linked_mdelem* path_md =
442 static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*path_md)));
443 path_md->md = grpc_mdelem_from_slices(g_fake_path_key, g_fake_path_value);
444 GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, path_md) ==
446 grpc_linked_mdelem* auth_md =
447 static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*auth_md)));
448 auth_md->md = grpc_mdelem_from_slices(g_fake_auth_key, g_fake_auth_value);
449 GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, auth_md) ==
454 s->recv_initial_md_op->payload->recv_initial_metadata
455 .recv_initial_metadata,
456 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
458 grpc_metadata_batch_destroy(&fake_md);
459 err = GRPC_ERROR_NONE;
461 err = GRPC_ERROR_REF(error);
463 if (s->recv_initial_md_op->payload->recv_initial_metadata
464 .trailing_metadata_available != nullptr) {
465 // Set to true unconditionally, because we're failing the call, so even
466 // if we haven't actually seen the send_trailing_metadata op from the
467 // other side, we're going to return trailing metadata anyway.
468 *s->recv_initial_md_op->payload->recv_initial_metadata
469 .trailing_metadata_available = true;
472 "fail_helper %p scheduling initial-metadata-ready %p %p", s,
474 GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
475 .recv_initial_metadata_ready,
477 // Last use of err so no need to REF and then UNREF it
479 complete_if_batch_end_locked(
480 s, error, s->recv_initial_md_op,
481 "fail_helper scheduling recv-initial-metadata-on-complete");
482 s->recv_initial_md_op = nullptr;
484 if (s->recv_message_op) {
485 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s,
488 s->recv_message_op->payload->recv_message.recv_message_ready,
489 GRPC_ERROR_REF(error));
490 complete_if_batch_end_locked(
491 s, error, s->recv_message_op,
492 "fail_helper scheduling recv-message-on-complete");
493 s->recv_message_op = nullptr;
495 if (s->send_message_op) {
496 s->send_message_op->payload->send_message.send_message.reset();
497 complete_if_batch_end_locked(
498 s, error, s->send_message_op,
499 "fail_helper scheduling send-message-on-complete");
500 s->send_message_op = nullptr;
502 if (s->send_trailing_md_op) {
503 complete_if_batch_end_locked(
504 s, error, s->send_trailing_md_op,
505 "fail_helper scheduling send-trailng-md-on-complete");
506 s->send_trailing_md_op = nullptr;
508 if (s->recv_trailing_md_op) {
509 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
511 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
512 .recv_trailing_metadata_ready,
513 GRPC_ERROR_REF(error));
514 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
516 complete_if_batch_end_locked(
517 s, error, s->recv_trailing_md_op,
518 "fail_helper scheduling recv-trailing-metadata-on-complete");
519 s->recv_trailing_md_op = nullptr;
521 close_other_side_locked(s, "fail_helper:other_side");
522 close_stream_locked(s);
524 GRPC_ERROR_UNREF(error);
527 // TODO(vjpai): It should not be necessary to drain the incoming byte
528 // stream and create a new one; instead, we should simply pass the byte
529 // stream from the sender directly to the receiver as-is.
531 // Note that fixing this will also avoid the assumption in this code
532 // that the incoming byte stream's next() call will always return
533 // synchronously. That assumption is true today but may not always be
534 // true in the future.
535 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
537 sender->send_message_op->payload->send_message.send_message->length();
538 if (receiver->recv_inited) {
539 grpc_slice_buffer_destroy_internal(&receiver->recv_message);
541 grpc_slice_buffer_init(&receiver->recv_message);
542 receiver->recv_inited = true;
544 grpc_slice message_slice;
547 sender->send_message_op->payload->send_message.send_message->Next(
550 sender->send_message_op->payload->send_message.send_message->Pull(
552 if (error != GRPC_ERROR_NONE) {
553 cancel_stream_locked(sender, GRPC_ERROR_REF(error));
556 GPR_ASSERT(error == GRPC_ERROR_NONE);
557 remaining -= GRPC_SLICE_LENGTH(message_slice);
558 grpc_slice_buffer_add(&receiver->recv_message, message_slice);
559 } while (remaining > 0);
560 sender->send_message_op->payload->send_message.send_message.reset();
562 receiver->recv_stream.Init(&receiver->recv_message, 0);
563 receiver->recv_message_op->payload->recv_message.recv_message->reset(
564 receiver->recv_stream.get());
565 INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
568 receiver->recv_message_op->payload->recv_message.recv_message_ready,
570 complete_if_batch_end_locked(
571 sender, GRPC_ERROR_NONE, sender->send_message_op,
572 "message_transfer scheduling sender on_complete");
573 complete_if_batch_end_locked(
574 receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
575 "message_transfer scheduling receiver on_complete");
577 receiver->recv_message_op = nullptr;
578 sender->send_message_op = nullptr;
581 void op_state_machine(void* arg, grpc_error* error) {
582 // This function gets called when we have contents in the unprocessed reads
583 // Get what we want based on our ops wanted
584 // Schedule our appropriate closures
585 // and then return to ops_needed state if still needed
587 // Since this is a closure directly invoked by the combiner, it should not
588 // unref the error parameter explicitly; the combiner will do that implicitly
589 grpc_error* new_err = GRPC_ERROR_NONE;
591 bool needs_close = false;
593 INPROC_LOG(GPR_INFO, "op_state_machine %p", arg);
594 inproc_stream* s = static_cast<inproc_stream*>(arg);
595 gpr_mu* mu = &s->t->mu->mu; // keep aside in case s gets closed
597 s->op_closure_scheduled = false;
598 // cancellation takes precedence
599 inproc_stream* other = s->other_side;
601 if (s->cancel_self_error != GRPC_ERROR_NONE) {
602 fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
604 } else if (s->cancel_other_error != GRPC_ERROR_NONE) {
605 fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
607 } else if (error != GRPC_ERROR_NONE) {
608 fail_helper_locked(s, GRPC_ERROR_REF(error));
612 if (s->send_message_op && other) {
613 if (other->recv_message_op) {
614 message_transfer_locked(s, other);
615 maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
616 } else if (!s->t->is_client && s->trailing_md_sent) {
617 // A server send will never be matched if the server already sent status
618 s->send_message_op->payload->send_message.send_message.reset();
619 complete_if_batch_end_locked(
620 s, GRPC_ERROR_NONE, s->send_message_op,
621 "op_state_machine scheduling send-message-on-complete");
622 s->send_message_op = nullptr;
625 // Pause a send trailing metadata if there is still an outstanding
626 // send message unless we know that the send message will never get
627 // matched to a receive. This happens on the client if the server has
628 // already sent status or on the server if the client has requested
630 if (s->send_trailing_md_op &&
631 (!s->send_message_op ||
633 (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
634 (!s->t->is_client && other &&
635 (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
636 other->recv_trailing_md_op)))) {
637 grpc_metadata_batch* dest = (other == nullptr)
638 ? &s->write_buffer_trailing_md
639 : &other->to_read_trailing_md;
640 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
641 : &other->to_read_trailing_md_filled;
642 if (*destfilled || s->trailing_md_sent) {
643 // The buffer is already in use; that's an error!
644 INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
645 new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
646 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
649 if (!other || !other->closed) {
651 s->send_trailing_md_op->payload->send_trailing_metadata
652 .send_trailing_metadata,
653 0, dest, nullptr, destfilled);
655 s->trailing_md_sent = true;
656 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
658 "op_state_machine %p scheduling trailing-metadata-ready", s);
660 s->recv_trailing_md_op->payload->recv_trailing_metadata
661 .recv_trailing_metadata_ready,
664 "op_state_machine %p scheduling trailing-md-on-complete", s);
665 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
667 s->recv_trailing_md_op = nullptr;
671 maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
672 complete_if_batch_end_locked(
673 s, GRPC_ERROR_NONE, s->send_trailing_md_op,
674 "op_state_machine scheduling send-trailing-metadata-on-complete");
675 s->send_trailing_md_op = nullptr;
677 if (s->recv_initial_md_op) {
678 if (s->initial_md_recvd) {
680 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
683 "op_state_machine %p scheduling on_complete errors for already "
684 "recvd initial md %p",
686 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
690 if (s->to_read_initial_md_filled) {
691 s->initial_md_recvd = true;
692 new_err = fill_in_metadata(
693 s, &s->to_read_initial_md, s->to_read_initial_md_flags,
694 s->recv_initial_md_op->payload->recv_initial_metadata
695 .recv_initial_metadata,
696 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
698 s->recv_initial_md_op->payload->recv_initial_metadata
699 .recv_initial_metadata->deadline = s->deadline;
700 if (s->recv_initial_md_op->payload->recv_initial_metadata
701 .trailing_metadata_available != nullptr) {
702 *s->recv_initial_md_op->payload->recv_initial_metadata
703 .trailing_metadata_available =
704 (other != nullptr && other->send_trailing_md_op != nullptr);
706 grpc_metadata_batch_clear(&s->to_read_initial_md);
707 s->to_read_initial_md_filled = false;
709 "op_state_machine %p scheduling initial-metadata-ready %p", s,
711 GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
712 .recv_initial_metadata_ready,
713 GRPC_ERROR_REF(new_err));
714 complete_if_batch_end_locked(
715 s, new_err, s->recv_initial_md_op,
716 "op_state_machine scheduling recv-initial-metadata-on-complete");
717 s->recv_initial_md_op = nullptr;
719 if (new_err != GRPC_ERROR_NONE) {
721 "op_state_machine %p scheduling on_complete errors2 %p", s,
723 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
728 if (s->recv_message_op) {
729 if (other && other->send_message_op) {
730 message_transfer_locked(other, s);
731 maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
734 if (s->to_read_trailing_md_filled) {
735 if (s->trailing_md_recvd) {
737 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
740 "op_state_machine %p scheduling on_complete errors for already "
741 "recvd trailing md %p",
743 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
746 if (s->recv_message_op != nullptr) {
747 // This message needs to be wrapped up because it will never be
749 *s->recv_message_op->payload->recv_message.recv_message = nullptr;
750 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
752 s->recv_message_op->payload->recv_message.recv_message_ready,
754 complete_if_batch_end_locked(
755 s, new_err, s->recv_message_op,
756 "op_state_machine scheduling recv-message-on-complete");
757 s->recv_message_op = nullptr;
759 if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
760 // Nothing further will try to receive from this stream, so finish off
761 // any outstanding send_message op
762 s->send_message_op->payload->send_message.send_message.reset();
763 complete_if_batch_end_locked(
764 s, new_err, s->send_message_op,
765 "op_state_machine scheduling send-message-on-complete");
766 s->send_message_op = nullptr;
768 if (s->recv_trailing_md_op != nullptr) {
769 // We wanted trailing metadata and we got it
770 s->trailing_md_recvd = true;
772 fill_in_metadata(s, &s->to_read_trailing_md, 0,
773 s->recv_trailing_md_op->payload
774 ->recv_trailing_metadata.recv_trailing_metadata,
776 grpc_metadata_batch_clear(&s->to_read_trailing_md);
777 s->to_read_trailing_md_filled = false;
779 // We should schedule the recv_trailing_md_op completion if
780 // 1. this stream is the client-side
781 // 2. this stream is the server-side AND has already sent its trailing md
782 // (If the server hasn't already sent its trailing md, it doesn't have
783 // a final status, so don't mark this op complete)
784 if (s->t->is_client || s->trailing_md_sent) {
786 "op_state_machine %p scheduling trailing-md-on-complete %p",
789 s->recv_trailing_md_op->payload->recv_trailing_metadata
790 .recv_trailing_metadata_ready,
791 GRPC_ERROR_REF(new_err));
792 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
793 GRPC_ERROR_REF(new_err));
794 s->recv_trailing_md_op = nullptr;
798 "op_state_machine %p server needs to delay handling "
799 "trailing-md-on-complete %p",
805 "op_state_machine %p has trailing md but not yet waiting for it", s);
808 if (s->trailing_md_recvd && s->recv_message_op) {
809 // No further message will come on this stream, so finish off the
811 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
812 *s->recv_message_op->payload->recv_message.recv_message = nullptr;
814 s->recv_message_op->payload->recv_message.recv_message_ready,
816 complete_if_batch_end_locked(
817 s, new_err, s->recv_message_op,
818 "op_state_machine scheduling recv-message-on-complete");
819 s->recv_message_op = nullptr;
821 if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
822 s->send_message_op) {
823 // Nothing further will try to receive from this stream, so finish off
824 // any outstanding send_message op
825 s->send_message_op->payload->send_message.send_message.reset();
826 complete_if_batch_end_locked(
827 s, new_err, s->send_message_op,
828 "op_state_machine scheduling send-message-on-complete");
829 s->send_message_op = nullptr;
831 if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
832 s->recv_message_op || s->recv_trailing_md_op) {
833 // Didn't get the item we wanted so we still need to get
836 GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
837 s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
838 s->recv_message_op, s->recv_trailing_md_op);
839 s->ops_needed = true;
843 close_other_side_locked(s, "op_state_machine");
844 close_stream_locked(s);
847 GRPC_ERROR_UNREF(new_err);
850 bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
851 bool ret = false; // was the cancel accepted
852 INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error));
853 if (s->cancel_self_error == GRPC_ERROR_NONE) {
855 s->cancel_self_error = GRPC_ERROR_REF(error);
856 maybe_schedule_op_closure_locked(s, s->cancel_self_error);
857 // Send trailing md to the other side indicating cancellation, even if we
859 s->trailing_md_sent = true;
861 grpc_metadata_batch cancel_md;
862 grpc_metadata_batch_init(&cancel_md);
864 inproc_stream* other = s->other_side;
865 grpc_metadata_batch* dest = (other == nullptr)
866 ? &s->write_buffer_trailing_md
867 : &other->to_read_trailing_md;
868 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
869 : &other->to_read_trailing_md_filled;
870 fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled);
871 grpc_metadata_batch_destroy(&cancel_md);
873 if (other != nullptr) {
874 if (other->cancel_other_error == GRPC_ERROR_NONE) {
875 other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
877 maybe_schedule_op_closure_locked(other, other->cancel_other_error);
878 } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
879 s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
882 // if we are a server and already received trailing md but
883 // couldn't complete that because we hadn't yet sent out trailing
884 // md, now's the chance
885 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
886 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
887 .recv_trailing_metadata_ready,
888 GRPC_ERROR_REF(s->cancel_self_error));
889 complete_if_batch_end_locked(
890 s, s->cancel_self_error, s->recv_trailing_md_op,
891 "cancel_stream scheduling trailing-md-on-complete");
892 s->recv_trailing_md_op = nullptr;
896 close_other_side_locked(s, "cancel_stream:other_side");
897 close_stream_locked(s);
899 GRPC_ERROR_UNREF(error);
903 void do_nothing(void* arg, grpc_error* error) {}
905 void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
906 grpc_transport_stream_op_batch* op) {
907 INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
908 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
909 gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
912 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
913 if (op->send_initial_metadata) {
914 log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
915 s->t->is_client, true);
917 if (op->send_trailing_metadata) {
918 log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
919 s->t->is_client, false);
922 grpc_error* error = GRPC_ERROR_NONE;
923 grpc_closure* on_complete = op->on_complete;
924 // TODO(roth): This is a hack needed because we use data inside of the
925 // closure itself to do the barrier calculation (i.e., to ensure that
926 // we don't schedule the closure until all ops in the batch have been
927 // completed). This can go away once we move to a new C++ closure API
928 // that provides the ability to create a barrier closure.
929 if (on_complete == nullptr) {
930 on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
931 nullptr, grpc_schedule_on_exec_ctx);
934 if (op->cancel_stream) {
935 // Call cancel_stream_locked without ref'ing the cancel_error because
936 // this function is responsible to make sure that that field gets unref'ed
937 cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
938 // this op can complete without an error
939 } else if (s->cancel_self_error != GRPC_ERROR_NONE) {
940 // already self-canceled so still give it an error
941 error = GRPC_ERROR_REF(s->cancel_self_error);
943 INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
944 s->t->is_client ? "client" : "server",
945 op->send_initial_metadata ? " send_initial_metadata" : "",
946 op->send_message ? " send_message" : "",
947 op->send_trailing_metadata ? " send_trailing_metadata" : "",
948 op->recv_initial_metadata ? " recv_initial_metadata" : "",
949 op->recv_message ? " recv_message" : "",
950 op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
953 bool needs_close = false;
955 inproc_stream* other = s->other_side;
956 if (error == GRPC_ERROR_NONE &&
957 (op->send_initial_metadata || op->send_trailing_metadata)) {
958 if (s->t->is_closed) {
959 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
961 if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
962 grpc_metadata_batch* dest = (other == nullptr)
963 ? &s->write_buffer_initial_md
964 : &other->to_read_initial_md;
965 uint32_t* destflags = (other == nullptr)
966 ? &s->write_buffer_initial_md_flags
967 : &other->to_read_initial_md_flags;
968 bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
969 : &other->to_read_initial_md_filled;
970 if (*destfilled || s->initial_md_sent) {
971 // The buffer is already in use; that's an error!
972 INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
973 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
975 if (!other || !other->closed) {
977 s, op->payload->send_initial_metadata.send_initial_metadata,
978 op->payload->send_initial_metadata.send_initial_metadata_flags,
979 dest, destflags, destfilled);
981 if (s->t->is_client) {
983 (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
984 *dl = GPR_MIN(*dl, op->payload->send_initial_metadata
985 .send_initial_metadata->deadline);
986 s->initial_md_sent = true;
989 maybe_schedule_op_closure_locked(other, error);
993 if (error == GRPC_ERROR_NONE &&
994 (op->send_message || op->send_trailing_metadata ||
995 op->recv_initial_metadata || op->recv_message ||
996 op->recv_trailing_metadata)) {
997 // Mark ops that need to be processed by the closure
998 if (op->send_message) {
999 s->send_message_op = op;
1001 if (op->send_trailing_metadata) {
1002 s->send_trailing_md_op = op;
1004 if (op->recv_initial_metadata) {
1005 s->recv_initial_md_op = op;
1007 if (op->recv_message) {
1008 s->recv_message_op = op;
1010 if (op->recv_trailing_metadata) {
1011 s->recv_trailing_md_op = op;
1014 // We want to initiate the closure if:
1015 // 1. We want to send a message and the other side wants to receive
1016 // 2. We want to send trailing metadata and there isn't an unmatched send
1017 // or the other side wants trailing metadata
1018 // 3. We want initial metadata and the other side has sent it
1019 // 4. We want to receive a message and there is a message ready
1020 // 5. There is trailing metadata, even if nothing specifically wants
1021 // that because that can shut down the receive message as well
1022 if ((op->send_message && other && other->recv_message_op != nullptr) ||
1023 (op->send_trailing_metadata &&
1024 (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1025 (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1026 (op->recv_message && other && other->send_message_op != nullptr) ||
1027 (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1028 if (!s->op_closure_scheduled) {
1029 GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE);
1030 s->op_closure_scheduled = true;
1033 s->ops_needed = true;
1036 if (error != GRPC_ERROR_NONE) {
1037 // Consume any send message that was sent here but that we are not pushing
1038 // to the other side
1039 if (op->send_message) {
1040 op->payload->send_message.send_message.reset();
1042 // Schedule op's closures that we didn't push to op state machine
1043 if (op->recv_initial_metadata) {
1044 if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1046 // Set to true unconditionally, because we're failing the call, so
1047 // even if we haven't actually seen the send_trailing_metadata op
1048 // from the other side, we're going to return trailing metadata
1050 *op->payload->recv_initial_metadata.trailing_metadata_available =
1055 "perform_stream_op error %p scheduling initial-metadata-ready %p",
1058 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1059 GRPC_ERROR_REF(error));
1061 if (op->recv_message) {
1064 "perform_stream_op error %p scheduling recv message-ready %p", s,
1066 GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1067 GRPC_ERROR_REF(error));
1069 if (op->recv_trailing_metadata) {
1072 "perform_stream_op error %p scheduling trailing-metadata-ready %p",
1075 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1076 GRPC_ERROR_REF(error));
1079 INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
1081 GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
1084 close_other_side_locked(s, "perform_stream_op:other_side");
1085 close_stream_locked(s);
1088 GRPC_ERROR_UNREF(error);
1091 void close_transport_locked(inproc_transport* t) {
1092 INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1093 grpc_connectivity_state_set(&t->connectivity, GRPC_CHANNEL_SHUTDOWN,
1095 if (!t->is_closed) {
1096 t->is_closed = true;
1097 /* Also end all streams on this transport */
1098 while (t->stream_list != nullptr) {
1099 // cancel_stream_locked also adjusts stream list
1100 cancel_stream_locked(
1103 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
1104 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1109 void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1110 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1111 INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1112 gpr_mu_lock(&t->mu->mu);
1113 if (op->on_connectivity_state_change) {
1114 grpc_connectivity_state_notify_on_state_change(
1115 &t->connectivity, op->connectivity_state,
1116 op->on_connectivity_state_change);
1118 if (op->set_accept_stream) {
1119 t->accept_stream_cb = op->set_accept_stream_fn;
1120 t->accept_stream_data = op->set_accept_stream_user_data;
1122 if (op->on_consumed) {
1123 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1126 bool do_close = false;
1127 if (op->goaway_error != GRPC_ERROR_NONE) {
1129 GRPC_ERROR_UNREF(op->goaway_error);
1131 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1133 GRPC_ERROR_UNREF(op->disconnect_with_error);
1137 close_transport_locked(t);
1139 gpr_mu_unlock(&t->mu->mu);
1142 void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1143 grpc_closure* then_schedule_closure) {
1144 INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1145 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1146 s->closure_at_destroy = then_schedule_closure;
1147 s->~inproc_stream();
1150 void destroy_transport(grpc_transport* gt) {
1151 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1152 INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1153 gpr_mu_lock(&t->mu->mu);
1154 close_transport_locked(t);
1155 gpr_mu_unlock(&t->mu->mu);
1156 t->other_side->unref();
1160 /*******************************************************************************
1164 void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* pollset) {
1165 // Nothing to do here
1168 void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
1169 grpc_pollset_set* pollset_set) {
1170 // Nothing to do here
1173 grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
1175 const grpc_transport_vtable inproc_vtable = {
1176 sizeof(inproc_stream), "inproc", init_stream,
1177 set_pollset, set_pollset_set, perform_stream_op,
1178 perform_transport_op, destroy_stream, destroy_transport,
1181 /*******************************************************************************
1182 * Main inproc transport functions
1184 void inproc_transports_create(grpc_transport** server_transport,
1185 const grpc_channel_args* server_args,
1186 grpc_transport** client_transport,
1187 const grpc_channel_args* client_args) {
1188 INPROC_LOG(GPR_INFO, "inproc_transports_create");
1189 shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1190 inproc_transport* st = new (gpr_malloc(sizeof(*st)))
1191 inproc_transport(&inproc_vtable, mu, /*is_client=*/false);
1192 inproc_transport* ct = new (gpr_malloc(sizeof(*ct)))
1193 inproc_transport(&inproc_vtable, mu, /*is_client=*/true);
1194 st->other_side = ct;
1195 ct->other_side = st;
1196 *server_transport = reinterpret_cast<grpc_transport*>(st);
1197 *client_transport = reinterpret_cast<grpc_transport*>(ct);
1201 /*******************************************************************************
1202 * GLOBAL INIT AND DESTROY
1204 void grpc_inproc_transport_init(void) {
1205 grpc_core::ExecCtx exec_ctx;
1206 g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
1208 grpc_slice key_tmp = grpc_slice_from_static_string(":path");
1209 g_fake_path_key = grpc_slice_intern(key_tmp);
1210 grpc_slice_unref_internal(key_tmp);
1212 g_fake_path_value = grpc_slice_from_static_string("/");
1214 grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
1215 g_fake_auth_key = grpc_slice_intern(auth_tmp);
1216 grpc_slice_unref_internal(auth_tmp);
1218 g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
1221 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1222 grpc_channel_args* args,
1224 GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1227 grpc_core::ExecCtx exec_ctx;
1229 const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
1231 // Add a default authority channel argument for the client
1233 grpc_arg default_authority_arg;
1234 default_authority_arg.type = GRPC_ARG_STRING;
1235 default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY;
1236 default_authority_arg.value.string = (char*)"inproc.authority";
1237 grpc_channel_args* client_args =
1238 grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
1240 grpc_transport* server_transport;
1241 grpc_transport* client_transport;
1242 inproc_transports_create(&server_transport, server_args, &client_transport,
1245 // TODO(ncteisen): design and support channelz GetSocket for inproc.
1246 grpc_server_setup_transport(server, server_transport, nullptr, server_args,
1248 grpc_channel* channel = grpc_channel_create(
1249 "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1251 // Free up created channel args
1252 grpc_channel_args_destroy(client_args);
1254 // Now finish scheduled operations
1259 void grpc_inproc_transport_shutdown(void) {
1260 grpc_core::ExecCtx exec_ctx;
1261 grpc_slice_unref_internal(g_empty_slice);
1262 grpc_slice_unref_internal(g_fake_path_key);
1263 grpc_slice_unref_internal(g_fake_path_value);
1264 grpc_slice_unref_internal(g_fake_auth_key);
1265 grpc_slice_unref_internal(g_fake_auth_value);