3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/transport/context_list.h"
22 #include "src/core/ext/transport/chttp2/transport/internal.h"
26 #include <grpc/support/log.h>
28 #include "src/core/lib/compression/stream_compression.h"
29 #include "src/core/lib/debug/stats.h"
30 #include "src/core/lib/profiling/timers.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/transport/http2_errors.h"
34 static void add_to_write_list(grpc_chttp2_write_cb** list,
35 grpc_chttp2_write_cb* cb) {
40 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
41 grpc_chttp2_write_cb* cb, grpc_error* error) {
42 grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
44 cb->next = t->write_cb_pool;
45 t->write_cb_pool = cb;
48 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
49 grpc_chttp2_ping_queue* pq = &t->ping_queue;
50 if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
51 /* no ping needed: wait */
54 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
55 /* ping already in-flight: wait */
56 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
57 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
58 gpr_log(GPR_INFO, "%s: Ping delayed [%p]: already pinging",
59 t->is_client ? "CLIENT" : "SERVER", t->peer_string);
63 if (t->ping_state.pings_before_data_required == 0 &&
64 t->ping_policy.max_pings_without_data != 0) {
65 /* need to receive something of substance before sending a ping again */
66 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
67 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
68 gpr_log(GPR_INFO, "%s: Ping delayed [%p]: too many recent pings: %d/%d",
69 t->is_client ? "CLIENT" : "SERVER", t->peer_string,
70 t->ping_state.pings_before_data_required,
71 t->ping_policy.max_pings_without_data);
75 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
77 grpc_millis next_allowed_ping_interval =
78 (t->keepalive_permit_without_calls == 0 &&
79 grpc_chttp2_stream_map_size(&t->stream_map) == 0)
80 ? 7200 * GPR_MS_PER_SEC
81 : t->ping_policy.min_sent_ping_interval_without_data;
82 grpc_millis next_allowed_ping =
83 t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
85 if (next_allowed_ping > now) {
86 /* not enough elapsed time between successive pings */
87 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
88 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
90 "%s: Ping delayed [%p]: not enough time elapsed since last ping. "
91 " Last ping %f: Next ping %f: Now %f",
92 t->is_client ? "CLIENT" : "SERVER", t->peer_string,
93 static_cast<double>(t->ping_state.last_ping_sent_time),
94 static_cast<double>(next_allowed_ping), static_cast<double>(now));
96 if (!t->ping_state.is_delayed_ping_timer_set) {
97 t->ping_state.is_delayed_ping_timer_set = true;
98 GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
99 grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
100 &t->retry_initiate_ping_locked);
105 pq->inflight_id = t->ping_ctr;
107 GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
108 grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
109 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
110 grpc_slice_buffer_add(&t->outbuf,
111 grpc_chttp2_ping_create(false, pq->inflight_id));
112 GRPC_STATS_INC_HTTP2_PINGS_SENT();
113 t->ping_state.last_ping_sent_time = now;
114 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
115 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
116 gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
117 t->is_client ? "CLIENT" : "SERVER", t->peer_string,
118 t->ping_state.pings_before_data_required,
119 t->ping_policy.max_pings_without_data);
121 t->ping_state.pings_before_data_required -=
122 (t->ping_state.pings_before_data_required != 0);
125 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
126 int64_t send_bytes, grpc_chttp2_write_cb** list,
127 int64_t* ctr, grpc_error* error) {
128 bool sched_any = false;
129 grpc_chttp2_write_cb* cb = *list;
133 grpc_chttp2_write_cb* next = cb->next;
134 if (cb->call_at_byte <= *ctr) {
136 finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
138 add_to_write_list(list, cb);
142 GRPC_ERROR_UNREF(error);
146 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
147 const char* staller) {
148 if (GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace)) {
151 "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
152 "to happen in a healthy program that is not seeing flow control stalls."
153 " However, if you know that there are unwanted stalls, here is some "
154 "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
155 ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
156 ":s_win=%d:s_delta=%" PRId64 "]",
157 t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length,
158 s->stream_compression_method ==
159 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
161 : s->compressed_data_buffer.length,
162 s->flow_controlled_bytes_flowed,
163 t->settings[GRPC_ACKED_SETTINGS]
164 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
165 t->flow_control->remote_window(),
166 static_cast<uint32_t> GPR_MAX(
168 s->flow_control->remote_window_delta() +
169 (int64_t)t->settings[GRPC_PEER_SETTINGS]
170 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
171 s->flow_control->remote_window_delta());
175 /* How many bytes would we like to put on the wire during a single syscall */
176 static uint32_t target_write_size(grpc_chttp2_transport* t) {
180 // Returns true if initial_metadata contains only default headers.
181 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
182 return initial_metadata->list.default_count == initial_metadata->list.count;
186 class StreamWriteContext;
190 WriteContext(grpc_chttp2_transport* t) : t_(t) {
191 GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
192 GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
195 // TODO(ctiller): make this the destructor
197 GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
198 initial_metadata_writes_);
199 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
200 GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
201 trailing_metadata_writes_);
202 GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
205 void FlushSettings() {
206 if (t_->dirtied_local_settings && !t_->sent_local_settings) {
207 grpc_slice_buffer_add(
208 &t_->outbuf, grpc_chttp2_settings_create(
209 t_->settings[GRPC_SENT_SETTINGS],
210 t_->settings[GRPC_LOCAL_SETTINGS],
211 t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
212 t_->force_send_settings = false;
213 t_->dirtied_local_settings = false;
214 t_->sent_local_settings = true;
215 GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
219 void FlushQueuedBuffers() {
220 /* simple writes are queued to qbuf, and flushed here */
221 grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
222 GPR_ASSERT(t_->qbuf.count == 0);
225 void FlushWindowUpdates() {
226 uint32_t transport_announce =
227 t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
228 if (transport_announce) {
229 grpc_transport_one_way_stats throwaway_stats;
230 grpc_slice_buffer_add(
231 &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
237 void FlushPingAcks() {
238 for (size_t i = 0; i < t_->ping_ack_count; i++) {
239 grpc_slice_buffer_add(&t_->outbuf,
240 grpc_chttp2_ping_create(true, t_->ping_acks[i]));
242 t_->ping_ack_count = 0;
245 void EnactHpackSettings() {
246 grpc_chttp2_hpack_compressor_set_max_table_size(
247 &t_->hpack_compressor,
248 t_->settings[GRPC_PEER_SETTINGS]
249 [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
252 void UpdateStreamsNoLongerStalled() {
253 grpc_chttp2_stream* s;
254 while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
255 if (t_->closed_with_error == GRPC_ERROR_NONE &&
256 grpc_chttp2_list_add_writable_stream(t_, s)) {
257 if (!s->refcount->refs.RefIfNonZero()) {
258 grpc_chttp2_list_remove_writable_stream(t_, s);
264 grpc_chttp2_stream* NextStream() {
265 if (t_->outbuf.length > target_write_size(t_)) {
266 result_.partial = true;
270 grpc_chttp2_stream* s;
271 if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
278 void ResetPingClock() {
279 if (!t_->is_client) {
280 t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
281 t_->ping_recv_state.ping_strikes = 0;
283 t_->ping_state.pings_before_data_required =
284 t_->ping_policy.max_pings_without_data;
287 void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
288 void IncWindowUpdateWrites() { ++flow_control_writes_; }
289 void IncMessageWrites() { ++message_writes_; }
290 void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
292 void NoteScheduledResults() { result_.early_results_scheduled = true; }
294 grpc_chttp2_transport* transport() const { return t_; }
296 grpc_chttp2_begin_write_result Result() {
297 result_.writing = t_->outbuf.count > 0;
302 grpc_chttp2_transport* const t_;
304 /* stats histogram counters: we increment these throughout this function,
305 and at the end publish to the central stats histograms */
306 int flow_control_writes_ = 0;
307 int initial_metadata_writes_ = 0;
308 int trailing_metadata_writes_ = 0;
309 int message_writes_ = 0;
310 grpc_chttp2_begin_write_result result_ = {false, false, false};
313 class DataSendContext {
315 DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
316 grpc_chttp2_stream* s)
317 : write_context_(write_context),
320 sending_bytes_before_(s_->sending_bytes) {}
322 uint32_t stream_remote_window() const {
323 return static_cast<uint32_t> GPR_MAX(
324 0, s_->flow_control->remote_window_delta() +
325 (int64_t)t_->settings[GRPC_PEER_SETTINGS]
326 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
329 uint32_t max_outgoing() const {
330 return static_cast<uint32_t> GPR_MIN(
331 t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
332 GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
335 bool AnyOutgoing() const { return max_outgoing() > 0; }
337 void FlushUncompressedBytes() {
338 uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
339 max_outgoing(), s_->flow_controlled_buffer.length);
340 is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
341 s_->fetching_send_message == nullptr &&
342 s_->send_trailing_metadata != nullptr &&
343 grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
344 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
345 is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
346 s_->flow_control->SentData(send_bytes);
347 s_->sending_bytes += send_bytes;
350 void FlushCompressedBytes() {
351 GPR_DEBUG_ASSERT(s_->stream_compression_method !=
352 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
354 uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
355 max_outgoing(), s_->compressed_data_buffer.length);
356 bool is_last_data_frame =
357 (send_bytes == s_->compressed_data_buffer.length &&
358 s_->flow_controlled_buffer.length == 0 &&
359 s_->fetching_send_message == nullptr);
360 if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
361 s_->stream_compression_ctx != nullptr) {
362 if (GPR_UNLIKELY(!grpc_stream_compress(
363 s_->stream_compression_ctx, &s_->flow_controlled_buffer,
364 &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
365 GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
366 gpr_log(GPR_ERROR, "Stream compression failed.");
368 grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
369 s_->stream_compression_ctx = nullptr;
370 /* After finish, bytes in s->compressed_data_buffer may be
371 * more than max_outgoing. Start another round of the current
372 * while loop so that send_bytes and is_last_data_frame are
376 is_last_frame_ = is_last_data_frame &&
377 s_->send_trailing_metadata != nullptr &&
378 grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
379 grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
380 is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
381 s_->flow_control->SentData(send_bytes);
382 if (s_->compressed_data_buffer.length == 0) {
383 s_->sending_bytes += s_->uncompressed_data_size;
387 void CompressMoreBytes() {
388 GPR_DEBUG_ASSERT(s_->stream_compression_method !=
389 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
391 if (s_->stream_compression_ctx == nullptr) {
392 s_->stream_compression_ctx =
393 grpc_stream_compression_context_create(s_->stream_compression_method);
395 s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
396 if (GPR_UNLIKELY(!grpc_stream_compress(
397 s_->stream_compression_ctx, &s_->flow_controlled_buffer,
398 &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
399 GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
400 gpr_log(GPR_ERROR, "Stream compression failed.");
404 bool is_last_frame() const { return is_last_frame_; }
406 void CallCallbacks() {
409 static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
410 &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
412 write_context_->NoteScheduledResults();
417 WriteContext* write_context_;
418 grpc_chttp2_transport* t_;
419 grpc_chttp2_stream* s_;
420 const size_t sending_bytes_before_;
421 bool is_last_frame_ = false;
424 class StreamWriteContext {
426 StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
427 : write_context_(write_context), t_(write_context->transport()), s_(s) {
428 GRPC_CHTTP2_IF_TRACING(
429 gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
430 t_->is_client ? "CLIENT" : "SERVER", s->id,
431 s->sent_initial_metadata, s->send_initial_metadata != nullptr,
432 (int)(s->flow_control->local_window_delta() -
433 s->flow_control->announced_window_delta())));
436 void FlushInitialMetadata() {
437 /* send initial metadata if it's available */
438 if (s_->sent_initial_metadata) return;
439 if (s_->send_initial_metadata == nullptr) return;
441 // We skip this on the server side if there is no custom initial
442 // metadata, there are no messages to send, and we are also sending
443 // trailing metadata. This results in a Trailers-Only response,
444 // which is required for retries, as per:
445 // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
446 if (!t_->is_client && s_->fetching_send_message == nullptr &&
447 s_->flow_controlled_buffer.length == 0 &&
448 compressed_data_buffer_len() == 0 &&
449 s_->send_trailing_metadata != nullptr &&
450 is_default_initial_metadata(s_->send_initial_metadata)) {
451 ConvertInitialMetadataToTrailingMetadata();
453 grpc_encode_header_options hopt = {
456 t_->settings[GRPC_PEER_SETTINGS]
457 [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
458 0, // use_true_binary_metadata
459 t_->settings[GRPC_PEER_SETTINGS]
460 [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
461 &s_->stats.outgoing // stats
463 grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
464 s_->send_initial_metadata, &hopt, &t_->outbuf);
465 write_context_->ResetPingClock();
466 write_context_->IncInitialMetadataWrites();
469 s_->send_initial_metadata = nullptr;
470 s_->sent_initial_metadata = true;
471 write_context_->NoteScheduledResults();
472 grpc_chttp2_complete_closure_step(
473 t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
474 "send_initial_metadata_finished");
477 bool compressed_data_buffer_len() {
478 return s_->stream_compression_method ==
479 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
481 : s_->compressed_data_buffer.length;
484 void FlushWindowUpdates() {
485 /* send any window updates */
486 const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
487 if (stream_announce == 0) return;
489 grpc_slice_buffer_add(
490 &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
491 &s_->stats.outgoing));
492 write_context_->ResetPingClock();
493 write_context_->IncWindowUpdateWrites();
497 if (!s_->sent_initial_metadata) return;
499 if (s_->flow_controlled_buffer.length == 0 &&
500 compressed_data_buffer_len() == 0) {
501 return; // early out: nothing to do
504 DataSendContext data_send_context(write_context_, t_, s_);
506 if (!data_send_context.AnyOutgoing()) {
507 if (t_->flow_control->remote_window() <= 0) {
508 report_stall(t_, s_, "transport");
509 grpc_chttp2_list_add_stalled_by_transport(t_, s_);
510 } else if (data_send_context.stream_remote_window() <= 0) {
511 report_stall(t_, s_, "stream");
512 grpc_chttp2_list_add_stalled_by_stream(t_, s_);
514 return; // early out: nothing to do
517 if (s_->stream_compression_method ==
518 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
519 while (s_->flow_controlled_buffer.length > 0 &&
520 data_send_context.max_outgoing() > 0) {
521 data_send_context.FlushUncompressedBytes();
524 while ((s_->flow_controlled_buffer.length > 0 ||
525 s_->compressed_data_buffer.length > 0) &&
526 data_send_context.max_outgoing() > 0) {
527 if (s_->compressed_data_buffer.length > 0) {
528 data_send_context.FlushCompressedBytes();
530 data_send_context.CompressMoreBytes();
534 write_context_->ResetPingClock();
535 if (data_send_context.is_last_frame()) {
538 data_send_context.CallCallbacks();
539 stream_became_writable_ = true;
540 if (s_->flow_controlled_buffer.length > 0 ||
541 compressed_data_buffer_len() > 0) {
542 GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
543 grpc_chttp2_list_add_writable_stream(t_, s_);
545 write_context_->IncMessageWrites();
548 void FlushTrailingMetadata() {
549 if (!s_->sent_initial_metadata) return;
551 if (s_->send_trailing_metadata == nullptr) return;
552 if (s_->fetching_send_message != nullptr) return;
553 if (s_->flow_controlled_buffer.length != 0) return;
554 if (compressed_data_buffer_len() != 0) return;
556 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
557 if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {
558 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
559 &s_->stats.outgoing, &t_->outbuf);
561 grpc_encode_header_options hopt = {
563 t_->settings[GRPC_PEER_SETTINGS]
564 [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
567 t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
568 &s_->stats.outgoing};
569 grpc_chttp2_encode_header(&t_->hpack_compressor,
570 extra_headers_for_trailing_metadata_,
571 num_extra_headers_for_trailing_metadata_,
572 s_->send_trailing_metadata, &hopt, &t_->outbuf);
574 write_context_->IncTrailingMetadataWrites();
575 write_context_->ResetPingClock();
578 write_context_->NoteScheduledResults();
579 grpc_chttp2_complete_closure_step(
580 t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
581 "send_trailing_metadata_finished");
584 bool stream_became_writable() { return stream_became_writable_; }
587 void ConvertInitialMetadataToTrailingMetadata() {
588 GRPC_CHTTP2_IF_TRACING(
589 gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
590 // When sending Trailers-Only, we need to move the :status and
591 // content-type headers to the trailers.
592 if (s_->send_initial_metadata->idx.named.status != nullptr) {
593 extra_headers_for_trailing_metadata_
594 [num_extra_headers_for_trailing_metadata_++] =
595 &s_->send_initial_metadata->idx.named.status->md;
597 if (s_->send_initial_metadata->idx.named.content_type != nullptr) {
598 extra_headers_for_trailing_metadata_
599 [num_extra_headers_for_trailing_metadata_++] =
600 &s_->send_initial_metadata->idx.named.content_type->md;
604 void SentLastFrame() {
605 s_->send_trailing_metadata = nullptr;
606 s_->sent_trailing_metadata = true;
609 if (!t_->is_client && !s_->read_closed) {
610 grpc_slice_buffer_add(
611 &t_->outbuf, grpc_chttp2_rst_stream_create(
612 s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
614 grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
618 WriteContext* const write_context_;
619 grpc_chttp2_transport* const t_;
620 grpc_chttp2_stream* const s_;
621 bool stream_became_writable_ = false;
622 grpc_mdelem* extra_headers_for_trailing_metadata_[2];
623 size_t num_extra_headers_for_trailing_metadata_ = 0;
627 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
628 grpc_chttp2_transport* t) {
632 ctx.FlushQueuedBuffers();
633 ctx.EnactHpackSettings();
635 if (t->flow_control->remote_window() > 0) {
636 ctx.UpdateStreamsNoLongerStalled();
639 /* for each grpc_chttp2_stream that's become writable, frame it's data
640 (according to available window sizes) and add to the output buffer */
641 while (grpc_chttp2_stream* s = ctx.NextStream()) {
642 StreamWriteContext stream_ctx(&ctx, s);
643 size_t orig_len = t->outbuf.length;
644 stream_ctx.FlushInitialMetadata();
645 stream_ctx.FlushWindowUpdates();
646 stream_ctx.FlushData();
647 stream_ctx.FlushTrailingMetadata();
648 if (t->outbuf.length > orig_len) {
649 /* Add this stream to the list of the contexts to be traced at TCP */
650 s->byte_counter += t->outbuf.length - orig_len;
651 if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
652 grpc_core::ContextList::Append(&t->cl, s);
655 if (stream_ctx.stream_became_writable()) {
656 if (!grpc_chttp2_list_add_writing_stream(t, s)) {
657 /* already in writing list: drop ref */
658 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
660 /* ref will be dropped at end of write */
663 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
667 ctx.FlushWindowUpdates();
669 maybe_initiate_ping(t);
674 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
675 GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
676 grpc_chttp2_stream* s;
678 if (t->channelz_socket != nullptr) {
679 t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
681 t->num_messages_in_next_write = 0;
683 while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
684 if (s->sending_bytes != 0) {
685 update_list(t, s, static_cast<int64_t>(s->sending_bytes),
686 &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
687 GRPC_ERROR_REF(error));
688 s->sending_bytes = 0;
690 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
692 grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
693 GRPC_ERROR_UNREF(error);