3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
22 #include <grpc/support/port_platform.h>
27 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
28 #include "src/core/ext/transport/chttp2/transport/frame.h"
29 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
30 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
31 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
32 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
33 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
34 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
35 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
36 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
37 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
38 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
39 #include "src/core/lib/channel/channelz.h"
40 #include "src/core/lib/compression/stream_compression.h"
41 #include "src/core/lib/gprpp/manual_constructor.h"
42 #include "src/core/lib/iomgr/combiner.h"
43 #include "src/core/lib/iomgr/endpoint.h"
44 #include "src/core/lib/iomgr/timer.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/lib/transport/transport_impl.h"
52 /* streams are kept in various linked lists depending on what things need to
53 happen to them... this enum labels each list */
55 GRPC_CHTTP2_LIST_WRITABLE,
56 GRPC_CHTTP2_LIST_WRITING,
57 GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
58 GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
59 /** streams that are waiting to start because there are too many concurrent
60 streams on the connection */
61 GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
62 STREAM_LIST_COUNT /* must be last */
63 } grpc_chttp2_stream_list_id;
66 GRPC_CHTTP2_WRITE_STATE_IDLE,
67 GRPC_CHTTP2_WRITE_STATE_WRITING,
68 GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
69 } grpc_chttp2_write_state;
72 GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
73 GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
74 } grpc_chttp2_optimization_target;
77 GRPC_CHTTP2_PCL_INITIATE = 0,
79 GRPC_CHTTP2_PCL_INFLIGHT,
80 GRPC_CHTTP2_PCL_COUNT /* must be last */
81 } grpc_chttp2_ping_closure_list;
84 GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
85 GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
86 GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
87 GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
88 GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
89 GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
90 GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
91 GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
92 GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
93 GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
94 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
95 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
96 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
97 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
98 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
99 GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
100 GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
101 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
102 GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
103 GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
104 } grpc_chttp2_initiate_write_reason;
106 const char* grpc_chttp2_initiate_write_reason_string(
107 grpc_chttp2_initiate_write_reason reason);
110 grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {};
111 uint64_t inflight_id = 0;
112 } grpc_chttp2_ping_queue;
115 int max_pings_without_data;
116 int max_ping_strikes;
117 grpc_millis min_sent_ping_interval_without_data;
118 grpc_millis min_recv_ping_interval_without_data;
119 } grpc_chttp2_repeated_ping_policy;
122 grpc_millis last_ping_sent_time;
123 int pings_before_data_required;
124 grpc_timer delayed_ping_timer;
125 bool is_delayed_ping_timer_set;
126 } grpc_chttp2_repeated_ping_state;
129 grpc_millis last_ping_recv_time;
131 } grpc_chttp2_server_ping_recv_state;
133 /* deframer state for the overall http2 stream of bytes */
135 /* prefix: one entry per http2 connection prefix byte */
136 GRPC_DTS_CLIENT_PREFIX_0 = 0,
137 GRPC_DTS_CLIENT_PREFIX_1,
138 GRPC_DTS_CLIENT_PREFIX_2,
139 GRPC_DTS_CLIENT_PREFIX_3,
140 GRPC_DTS_CLIENT_PREFIX_4,
141 GRPC_DTS_CLIENT_PREFIX_5,
142 GRPC_DTS_CLIENT_PREFIX_6,
143 GRPC_DTS_CLIENT_PREFIX_7,
144 GRPC_DTS_CLIENT_PREFIX_8,
145 GRPC_DTS_CLIENT_PREFIX_9,
146 GRPC_DTS_CLIENT_PREFIX_10,
147 GRPC_DTS_CLIENT_PREFIX_11,
148 GRPC_DTS_CLIENT_PREFIX_12,
149 GRPC_DTS_CLIENT_PREFIX_13,
150 GRPC_DTS_CLIENT_PREFIX_14,
151 GRPC_DTS_CLIENT_PREFIX_15,
152 GRPC_DTS_CLIENT_PREFIX_16,
153 GRPC_DTS_CLIENT_PREFIX_17,
154 GRPC_DTS_CLIENT_PREFIX_18,
155 GRPC_DTS_CLIENT_PREFIX_19,
156 GRPC_DTS_CLIENT_PREFIX_20,
157 GRPC_DTS_CLIENT_PREFIX_21,
158 GRPC_DTS_CLIENT_PREFIX_22,
159 GRPC_DTS_CLIENT_PREFIX_23,
160 /* frame header byte 0... */
161 /* must follow from the prefix states */
170 /* ... frame header byte 8 */
172 /* inside a http2 frame */
174 } grpc_chttp2_deframe_transport_state;
177 grpc_chttp2_stream* head;
178 grpc_chttp2_stream* tail;
179 } grpc_chttp2_stream_list;
182 grpc_chttp2_stream* next;
183 grpc_chttp2_stream* prev;
184 } grpc_chttp2_stream_link;
186 /* We keep several sets of connection wide parameters */
188 /* The settings our peer has asked for (and we have acked) */
189 GRPC_PEER_SETTINGS = 0,
190 /* The settings we'd like to have */
192 /* The settings we've published to our peer */
194 /* The settings the peer has acked */
196 GRPC_NUM_SETTING_SETS
197 } grpc_chttp2_setting_set;
200 GRPC_CHTTP2_NO_GOAWAY_SEND,
201 GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
202 GRPC_CHTTP2_GOAWAY_SENT,
203 } grpc_chttp2_sent_goaway_state;
205 typedef struct grpc_chttp2_write_cb {
206 int64_t call_at_byte;
207 grpc_closure* closure;
208 struct grpc_chttp2_write_cb* next;
209 } grpc_chttp2_write_cb;
211 namespace grpc_core {
213 class Chttp2IncomingByteStream : public ByteStream {
215 Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
216 grpc_chttp2_stream* stream, uint32_t frame_size,
219 void Orphan() override;
221 bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
222 grpc_error* Pull(grpc_slice* slice) override;
223 void Shutdown(grpc_error* error) override;
225 // TODO(roth): When I converted this class to C++, I wanted to make it
226 // inherit from RefCounted or InternallyRefCounted instead of continuing
227 // to use its own custom ref-counting code. However, that would require
228 // using multiple inheritence, which sucks in general. And to make matters
229 // worse, it causes problems with our New<> and Delete<> wrappers.
230 // Specifically, unless RefCounted is first in the list of parent classes,
231 // it will see a different value of the address of the object than the one
232 // we actually allocated, in which case gpr_free() will be called on a
233 // different address than the one we got from gpr_malloc(), thus causing a
234 // crash. Given the fragility of depending on that, as well as a desire to
235 // avoid multiple inheritence in general, I've decided to leave this
236 // alone for now. We can revisit this once we're able to link against
237 // libc++, at which point we can eliminate New<> and Delete<> and
238 // switch to std::shared_ptr<>.
239 void Ref() { refs_.Ref(); }
241 if (GPR_UNLIKELY(refs_.Unref())) {
242 grpc_core::Delete(this);
246 void PublishError(grpc_error* error);
248 grpc_error* Push(const grpc_slice& slice, grpc_slice* slice_out);
250 grpc_error* Finished(grpc_error* error, bool reset_on_error);
252 uint32_t remaining_bytes() const { return remaining_bytes_; }
255 static void NextLocked(void* arg, grpc_error* error_ignored);
256 static void OrphanLocked(void* arg, grpc_error* error_ignored);
258 void MaybeCreateStreamDecompressionCtx();
260 grpc_chttp2_transport* transport_; // Immutable.
261 grpc_chttp2_stream* stream_; // Immutable.
263 grpc_core::RefCount refs_;
265 /* Accessed only by transport thread when stream->pending_byte_stream == false
266 * Accessed only by application thread when stream->pending_byte_stream ==
268 uint32_t remaining_bytes_;
270 /* Accessed only by transport thread when stream->pending_byte_stream == false
271 * Accessed only by application thread when stream->pending_byte_stream ==
274 grpc_closure closure;
275 size_t max_size_hint;
276 grpc_closure* on_complete;
278 grpc_closure destroy_action_;
281 } // namespace grpc_core
284 GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
285 GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
286 GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
287 GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
288 } grpc_chttp2_keepalive_state;
290 struct grpc_chttp2_transport {
291 grpc_chttp2_transport(const grpc_channel_args* channel_args,
292 grpc_endpoint* ep, bool is_client,
293 grpc_resource_user* resource_user);
294 ~grpc_chttp2_transport();
296 grpc_transport base; /* must be first */
297 grpc_core::RefCount refs;
301 grpc_resource_user* resource_user;
303 grpc_combiner* combiner;
305 grpc_closure* notify_on_receive_settings = nullptr;
307 /** write execution state of the transport */
308 grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
309 /** is this the first write in a series of writes?
310 set when we initiate writing from idle, cleared when we
311 initiate writing from writing+more */
312 bool is_first_write_in_batch = false;
314 /** is the transport destroying itself? */
315 uint8_t destroying = false;
316 /** has the upper layer closed the transport? */
317 grpc_error* closed_with_error = GRPC_ERROR_NONE;
319 /** is there a read request to the endpoint outstanding? */
320 uint8_t endpoint_reading = 1;
322 grpc_chttp2_optimization_target opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
324 /** various lists of streams */
325 grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
327 /** maps stream id to grpc_chttp2_stream objects */
328 grpc_chttp2_stream_map stream_map;
330 grpc_closure write_action_begin_locked;
331 grpc_closure write_action;
332 grpc_closure write_action_end_locked;
334 grpc_closure read_action_locked;
336 /** incoming read bytes */
337 grpc_slice_buffer read_buffer;
339 /** address to place a newly accepted stream - set and unset by
340 grpc_chttp2_parsing_accept_stream; used by init_stream to
341 publish the accepted server stream */
342 grpc_chttp2_stream** accepting_stream = nullptr;
345 /* accept stream callback */
346 void (*accept_stream)(void* user_data, grpc_transport* transport,
347 const void* server_data);
348 void* accept_stream_user_data;
350 /** connectivity tracking */
351 grpc_connectivity_state_tracker state_tracker;
354 /** data to write now */
355 grpc_slice_buffer outbuf;
356 /** hpack encoding */
357 grpc_chttp2_hpack_compressor hpack_compressor;
358 /** is this a client? */
361 /** data to write next write */
362 grpc_slice_buffer qbuf;
364 /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
366 uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
368 /** Set to a grpc_error object if a goaway frame is received. By default, set
369 * to GRPC_ERROR_NONE */
370 grpc_error* goaway_error = GRPC_ERROR_NONE;
372 grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
374 /** are the local settings dirty and need to be sent? */
375 bool dirtied_local_settings = true;
376 /** have local settings been sent? */
377 bool sent_local_settings = false;
378 /** bitmask of setting indexes to send out
379 Hack: it's common for implementations to assume 65536 bytes initial send
380 window -- this should by rights be 0 */
381 uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
382 /** settings values */
383 uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
385 /** what is the next stream id to be allocated by this peer?
386 copied to next_stream_id in parsing when parsing commences */
387 uint32_t next_stream_id = 0;
389 /** last new stream id */
390 uint32_t last_new_stream_id = 0;
392 /** ping queues for various ping insertion points */
393 grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue();
394 grpc_chttp2_repeated_ping_policy ping_policy;
395 grpc_chttp2_repeated_ping_state ping_state;
396 uint64_t ping_ctr = 0; /* unique id for pings */
397 grpc_closure retry_initiate_ping_locked;
400 size_t ping_ack_count = 0;
401 size_t ping_ack_capacity = 0;
402 uint64_t* ping_acks = nullptr;
403 grpc_chttp2_server_ping_recv_state ping_recv_state;
405 /** parser for headers */
406 grpc_chttp2_hpack_parser hpack_parser;
407 /** simple one shot parsers */
409 grpc_chttp2_window_update_parser window_update;
410 grpc_chttp2_settings_parser settings;
411 grpc_chttp2_ping_parser ping;
412 grpc_chttp2_rst_stream_parser rst_stream;
414 /** parser for goaway frames */
415 grpc_chttp2_goaway_parser goaway_parser;
417 grpc_core::PolymorphicManualConstructor<
418 grpc_core::chttp2::TransportFlowControlBase,
419 grpc_core::chttp2::TransportFlowControl,
420 grpc_core::chttp2::TransportFlowControlDisabled>
422 /** initial window change. This is tracked as we parse settings frames from
423 * the remote peer. If there is a positive delta, then we will make all
424 * streams readable since they may have become unstalled */
425 int64_t initial_window_update = 0;
428 grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
429 uint8_t incoming_frame_type = 0;
430 uint8_t incoming_frame_flags = 0;
431 uint8_t header_eof = 0;
432 bool is_first_frame = true;
433 uint32_t expect_continuation_stream_id = 0;
434 uint32_t incoming_frame_size = 0;
435 uint32_t incoming_stream_id = 0;
438 void* parser_data = nullptr;
439 grpc_chttp2_stream* incoming_stream = nullptr;
440 grpc_error* (*parser)(void* parser_user_data, grpc_chttp2_transport* t,
441 grpc_chttp2_stream* s, const grpc_slice& slice,
444 grpc_chttp2_write_cb* write_cb_pool = nullptr;
447 grpc_closure next_bdp_ping_timer_expired_locked;
448 grpc_closure start_bdp_ping_locked;
449 grpc_closure finish_bdp_ping_locked;
451 /* if non-NULL, close the transport with this error when writes are finished
453 grpc_error* close_transport_on_writes_finished = GRPC_ERROR_NONE;
455 /* a list of closures to run after writes are finished */
456 grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
458 /* buffer pool state */
459 /** have we scheduled a benign cleanup? */
460 bool benign_reclaimer_registered = false;
461 /** have we scheduled a destructive cleanup? */
462 bool destructive_reclaimer_registered = false;
463 /** benign cleanup closure */
464 grpc_closure benign_reclaimer_locked;
465 /** destructive cleanup closure */
466 grpc_closure destructive_reclaimer_locked;
468 /* next bdp ping timer */
469 bool have_next_bdp_ping_timer = false;
470 grpc_timer next_bdp_ping_timer;
472 /* keep-alive ping support */
473 /** Closure to initialize a keepalive ping */
474 grpc_closure init_keepalive_ping_locked;
475 /** Closure to run when the keepalive ping is sent */
476 grpc_closure start_keepalive_ping_locked;
477 /** Cousure to run when the keepalive ping ack is received */
478 grpc_closure finish_keepalive_ping_locked;
479 /** Closrue to run when the keepalive ping timeouts */
480 grpc_closure keepalive_watchdog_fired_locked;
481 /** timer to initiate ping events */
482 grpc_timer keepalive_ping_timer;
483 /** watchdog to kill the transport when waiting for the keepalive ping */
484 grpc_timer keepalive_watchdog_timer;
485 /** time duration in between pings */
486 grpc_millis keepalive_time;
487 /** grace period for a ping to complete before watchdog kicks in */
488 grpc_millis keepalive_timeout;
489 /** if keepalive pings are allowed when there's no outstanding streams */
490 bool keepalive_permit_without_calls = false;
491 /** keep-alive state machine state */
492 grpc_chttp2_keepalive_state keepalive_state;
493 grpc_core::ContextList* cl = nullptr;
494 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
495 uint32_t num_messages_in_next_write = 0;
499 GRPC_METADATA_NOT_PUBLISHED,
500 GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
501 GRPC_METADATA_PUBLISHED_FROM_WIRE,
502 GPRC_METADATA_PUBLISHED_AT_CLOSE
503 } grpc_published_metadata_method;
505 struct grpc_chttp2_stream {
506 grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
507 const void* server_data, grpc_core::Arena* arena);
508 ~grpc_chttp2_stream();
511 grpc_chttp2_transport* t;
512 grpc_stream_refcount* refcount;
513 // Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor
514 // before initializing the rest of the stream, to avoid cache misses. This
515 // field MUST be right after `t` and `refcount`.
517 explicit Reffer(grpc_chttp2_stream* s);
520 grpc_closure destroy_stream;
521 grpc_closure* destroy_stream_arg;
523 grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
524 uint8_t included[STREAM_LIST_COUNT] = {};
526 /** HTTP2 stream id for this stream, or zero if one has not been assigned */
529 /** things the upper layers would like to send */
530 grpc_metadata_batch* send_initial_metadata = nullptr;
531 grpc_closure* send_initial_metadata_finished = nullptr;
532 grpc_metadata_batch* send_trailing_metadata = nullptr;
533 grpc_closure* send_trailing_metadata_finished = nullptr;
535 grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
536 uint32_t fetched_send_message_length = 0;
537 grpc_slice fetching_slice = grpc_empty_slice();
538 int64_t next_message_end_offset;
539 int64_t flow_controlled_bytes_written = 0;
540 int64_t flow_controlled_bytes_flowed = 0;
541 grpc_closure complete_fetch_locked;
542 grpc_closure* fetching_send_message_finished = nullptr;
544 grpc_metadata_batch* recv_initial_metadata;
545 grpc_closure* recv_initial_metadata_ready = nullptr;
546 bool* trailing_metadata_available = nullptr;
547 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
548 grpc_closure* recv_message_ready = nullptr;
549 grpc_metadata_batch* recv_trailing_metadata;
550 grpc_closure* recv_trailing_metadata_finished = nullptr;
552 grpc_transport_stream_stats* collecting_stats = nullptr;
553 grpc_transport_stream_stats stats = grpc_transport_stream_stats();
555 /** Is this stream closed for writing. */
556 bool write_closed = false;
557 /** Is this stream reading half-closed. */
558 bool read_closed = false;
559 /** Are all published incoming byte streams closed. */
560 bool all_incoming_byte_streams_finished = false;
561 /** Has this stream seen an error.
562 If true, then pending incoming frames can be thrown away. */
563 bool seen_error = false;
564 /** Are we buffering writes on this stream? If yes, we won't become writable
565 until there's enough queued up in the flow_controlled_buffer */
566 bool write_buffering = false;
567 /** Has trailing metadata been received. */
568 bool received_trailing_metadata = false;
570 /* have we sent or received the EOS bit? */
571 bool eos_received = false;
572 bool eos_sent = false;
574 /** the error that resulted in this stream being read-closed */
575 grpc_error* read_closed_error = GRPC_ERROR_NONE;
576 /** the error that resulted in this stream being write-closed */
577 grpc_error* write_closed_error = GRPC_ERROR_NONE;
579 grpc_published_metadata_method published_metadata[2] = {};
580 bool final_metadata_requested = false;
582 grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
584 grpc_slice_buffer frame_storage; /* protected by t combiner */
586 grpc_closure* on_next = nullptr; /* protected by t combiner */
587 bool pending_byte_stream = false; /* protected by t combiner */
588 // cached length of buffer to be used by the transport thread in cases where
589 // stream->pending_byte_stream == true. The value is saved before
590 // application threads are allowed to modify
591 // unprocessed_incoming_frames_buffer
592 size_t unprocessed_incoming_frames_buffer_cached_length = 0;
593 /* Accessed only by transport thread when stream->pending_byte_stream == false
594 * Accessed only by application thread when stream->pending_byte_stream ==
596 grpc_slice_buffer unprocessed_incoming_frames_buffer;
597 grpc_closure reset_byte_stream;
598 grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */
599 bool received_last_frame = false; /* protected by t combiner */
601 grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
603 /** saw some stream level error */
604 grpc_error* forced_close_error = GRPC_ERROR_NONE;
605 /** how many header frames have we received? */
606 uint8_t header_frames_received = 0;
607 /** parsing state for data frames */
608 /* Accessed only by transport thread when stream->pending_byte_stream == false
609 * Accessed only by application thread when stream->pending_byte_stream ==
611 grpc_chttp2_data_parser data_parser;
612 /** number of bytes received - reset at end of parse thread execution */
613 int64_t received_bytes = 0;
615 bool sent_initial_metadata = false;
616 bool sent_trailing_metadata = false;
618 grpc_core::PolymorphicManualConstructor<
619 grpc_core::chttp2::StreamFlowControlBase,
620 grpc_core::chttp2::StreamFlowControl,
621 grpc_core::chttp2::StreamFlowControlDisabled>
624 grpc_slice_buffer flow_controlled_buffer;
626 grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
627 grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
628 grpc_chttp2_write_cb* finish_after_write = nullptr;
629 size_t sending_bytes = 0;
631 /* Stream compression method to be used. */
632 grpc_stream_compression_method stream_compression_method =
633 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
634 /* Stream decompression method to be used. */
635 grpc_stream_compression_method stream_decompression_method =
636 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS;
638 /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
640 bool unprocessed_incoming_frames_decompressed = false;
641 /** Whether the bytes needs to be traced using Fathom */
643 /** gRPC header bytes that are already decompressed */
644 size_t decompressed_header_bytes = 0;
645 /** Byte counter for number of bytes written */
646 size_t byte_counter = 0;
648 /** Amount of uncompressed bytes sent out when compressed_data_buffer is
650 size_t uncompressed_data_size;
651 /** Stream compression compress context */
652 grpc_stream_compression_context* stream_compression_ctx;
653 /** Buffer storing data that is compressed but not sent */
654 grpc_slice_buffer compressed_data_buffer;
656 /** Stream compression decompress context */
657 grpc_stream_compression_context* stream_decompression_ctx;
658 /** Temporary buffer storing decompressed data.
659 * Initialized, used, and destroyed only when stream uses (non-identity)
662 grpc_slice_buffer decompressed_data_buffer;
665 /** Transport writing call flow:
666 grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
668 If no other write has been started, a task is enqueued onto our workqueue.
669 When that task executes, it obtains the global lock, and gathers the data
671 The global lock is dropped and we do the syscall to write.
672 After writing, a follow-up check is made to see if another round of writing
675 The actual call chain is documented in the implementation of this function.
677 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
678 grpc_chttp2_initiate_write_reason reason);
681 /** are we writing? */
683 /** if writing: was it a complete flush (false) or a partial flush (true) */
685 /** did we queue any completions as part of beginning the write */
686 bool early_results_scheduled;
687 } grpc_chttp2_begin_write_result;
689 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
690 grpc_chttp2_transport* t);
691 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error);
693 /** Process one slice of incoming data; return 1 if the connection is still
694 viable after reading, or 0 if the connection should be torn down */
695 grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t,
696 const grpc_slice& slice);
698 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
699 grpc_chttp2_stream* s);
700 /** Get a writable stream
701 returns non-zero if there was a stream available */
702 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
703 grpc_chttp2_stream** s);
704 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
705 grpc_chttp2_stream* s);
707 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
708 grpc_chttp2_stream* s);
709 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
710 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
711 grpc_chttp2_stream** s);
713 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
714 grpc_chttp2_stream* s);
715 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
716 grpc_chttp2_stream** s);
718 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
719 grpc_chttp2_stream* s);
720 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
721 grpc_chttp2_stream** s);
722 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
723 grpc_chttp2_stream* s);
725 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
726 grpc_chttp2_stream* s);
727 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
728 grpc_chttp2_stream** s);
729 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
730 grpc_chttp2_stream* s);
732 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
733 grpc_chttp2_stream* s);
734 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
735 grpc_chttp2_stream** s);
736 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
737 grpc_chttp2_stream* s);
739 /********* Flow Control ***************/
741 // Takes in a flow control action and performs all the needed operations.
742 void grpc_chttp2_act_on_flowctl_action(
743 const grpc_core::chttp2::FlowControlAction& action,
744 grpc_chttp2_transport* t, grpc_chttp2_stream* s);
746 /********* End of Flow Control ***************/
748 grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport* t,
750 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
753 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
754 uint32_t goaway_error,
755 const grpc_slice& goaway_text);
757 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
759 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
760 grpc_chttp2_stream* s,
761 grpc_closure** pclosure,
762 grpc_error* error, const char* desc);
764 #define GRPC_HEADER_SIZE_IN_BYTES 5
765 #define MAX_SIZE_T (~(size_t)0)
767 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
768 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
769 (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
771 // extern grpc_core::TraceFlag grpc_http_trace;
772 // extern grpc_core::TraceFlag grpc_flowctl_trace;
774 #define GRPC_CHTTP2_IF_TRACING(stmt) \
776 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { \
781 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
782 grpc_chttp2_stream* stream, grpc_error* error);
783 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
784 grpc_chttp2_stream* s, int close_reads,
785 int close_writes, grpc_error* error);
786 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
789 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
790 grpc_chttp2_stream_ref(stream, reason)
791 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
792 grpc_chttp2_stream_unref(stream, reason)
793 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
794 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
796 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
797 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
798 grpc_chttp2_stream_unref(stream)
799 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
800 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
804 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
805 grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
806 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
807 grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
808 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t,
809 const char* reason, const char* file,
811 if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
812 grpc_core::Delete(t);
815 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t,
816 const char* reason, const char* file,
818 t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
821 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
822 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
823 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
824 if (t->refs.Unref()) {
825 grpc_core::Delete(t);
828 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) {
833 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
835 /** Add a new ping strike to ping_recv_state.ping_strikes. If
836 ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
837 with error code ENHANCE_YOUR_CALM and additional debug data resembling
838 "too_many_pings" followed by immediately closing the connection. */
839 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t);
841 /** add a ref to the stream and add it to the writable list;
842 ref will be dropped in writing.c */
843 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
844 grpc_chttp2_stream* s);
846 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
847 grpc_error* due_to_error);
849 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
850 grpc_chttp2_stream* s);
851 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
852 grpc_chttp2_stream* s);
853 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
854 grpc_chttp2_stream* s);
856 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
857 grpc_chttp2_stream* s, grpc_error* error);
859 /** Set the default keepalive configurations, must only be called at
861 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
864 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */