Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / transport / writing.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/context_list.h"
22 #include "src/core/ext/transport/chttp2/transport/internal.h"
23
24 #include <limits.h>
25
26 #include <grpc/support/log.h>
27
28 #include "src/core/lib/compression/stream_compression.h"
29 #include "src/core/lib/debug/stats.h"
30 #include "src/core/lib/profiling/timers.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/transport/http2_errors.h"
33
34 static void add_to_write_list(grpc_chttp2_write_cb** list,
35                               grpc_chttp2_write_cb* cb) {
36   cb->next = *list;
37   *list = cb;
38 }
39
40 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
41                             grpc_chttp2_write_cb* cb, grpc_error* error) {
42   grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
43                                     "finish_write_cb");
44   cb->next = t->write_cb_pool;
45   t->write_cb_pool = cb;
46 }
47
48 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
49   grpc_chttp2_ping_queue* pq = &t->ping_queue;
50   if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
51     /* no ping needed: wait */
52     return;
53   }
54   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
55     /* ping already in-flight: wait */
56     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
57         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
58       gpr_log(GPR_INFO, "%s: Ping delayed [%p]: already pinging",
59               t->is_client ? "CLIENT" : "SERVER", t->peer_string);
60     }
61     return;
62   }
63   if (t->ping_state.pings_before_data_required == 0 &&
64       t->ping_policy.max_pings_without_data != 0) {
65     /* need to receive something of substance before sending a ping again */
66     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
67         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
68       gpr_log(GPR_INFO, "%s: Ping delayed [%p]: too many recent pings: %d/%d",
69               t->is_client ? "CLIENT" : "SERVER", t->peer_string,
70               t->ping_state.pings_before_data_required,
71               t->ping_policy.max_pings_without_data);
72     }
73     return;
74   }
75   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
76
77   grpc_millis next_allowed_ping_interval =
78       (t->keepalive_permit_without_calls == 0 &&
79        grpc_chttp2_stream_map_size(&t->stream_map) == 0)
80           ? 7200 * GPR_MS_PER_SEC
81           : t->ping_policy.min_sent_ping_interval_without_data;
82   grpc_millis next_allowed_ping =
83       t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
84
85   if (next_allowed_ping > now) {
86     /* not enough elapsed time between successive pings */
87     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
88         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
89       gpr_log(GPR_INFO,
90               "%s: Ping delayed [%p]: not enough time elapsed since last ping. "
91               " Last ping %f: Next ping %f: Now %f",
92               t->is_client ? "CLIENT" : "SERVER", t->peer_string,
93               static_cast<double>(t->ping_state.last_ping_sent_time),
94               static_cast<double>(next_allowed_ping), static_cast<double>(now));
95     }
96     if (!t->ping_state.is_delayed_ping_timer_set) {
97       t->ping_state.is_delayed_ping_timer_set = true;
98       GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
99       grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
100                       &t->retry_initiate_ping_locked);
101     }
102     return;
103   }
104
105   pq->inflight_id = t->ping_ctr;
106   t->ping_ctr++;
107   GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
108   grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
109                          &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
110   grpc_slice_buffer_add(&t->outbuf,
111                         grpc_chttp2_ping_create(false, pq->inflight_id));
112   GRPC_STATS_INC_HTTP2_PINGS_SENT();
113   t->ping_state.last_ping_sent_time = now;
114   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
115       GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
116     gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
117             t->is_client ? "CLIENT" : "SERVER", t->peer_string,
118             t->ping_state.pings_before_data_required,
119             t->ping_policy.max_pings_without_data);
120   }
121   t->ping_state.pings_before_data_required -=
122       (t->ping_state.pings_before_data_required != 0);
123 }
124
125 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
126                         int64_t send_bytes, grpc_chttp2_write_cb** list,
127                         int64_t* ctr, grpc_error* error) {
128   bool sched_any = false;
129   grpc_chttp2_write_cb* cb = *list;
130   *list = nullptr;
131   *ctr += send_bytes;
132   while (cb) {
133     grpc_chttp2_write_cb* next = cb->next;
134     if (cb->call_at_byte <= *ctr) {
135       sched_any = true;
136       finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
137     } else {
138       add_to_write_list(list, cb);
139     }
140     cb = next;
141   }
142   GRPC_ERROR_UNREF(error);
143   return sched_any;
144 }
145
146 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
147                          const char* staller) {
148   if (GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace)) {
149     gpr_log(
150         GPR_DEBUG,
151         "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
152         "to happen in a healthy program that is not seeing flow control stalls."
153         " However, if you know that there are unwanted stalls, here is some "
154         "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
155         ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
156         ":s_win=%d:s_delta=%" PRId64 "]",
157         t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length,
158         s->stream_compression_method ==
159                 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
160             ? 0
161             : s->compressed_data_buffer.length,
162         s->flow_controlled_bytes_flowed,
163         t->settings[GRPC_ACKED_SETTINGS]
164                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
165         t->flow_control->remote_window(),
166         static_cast<uint32_t> GPR_MAX(
167             0,
168             s->flow_control->remote_window_delta() +
169                 (int64_t)t->settings[GRPC_PEER_SETTINGS]
170                                     [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
171         s->flow_control->remote_window_delta());
172   }
173 }
174
175 /* How many bytes would we like to put on the wire during a single syscall */
176 static uint32_t target_write_size(grpc_chttp2_transport* t) {
177   return 1024 * 1024;
178 }
179
180 // Returns true if initial_metadata contains only default headers.
181 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
182   return initial_metadata->list.default_count == initial_metadata->list.count;
183 }
184
185 namespace {
186 class StreamWriteContext;
187
188 class WriteContext {
189  public:
190   WriteContext(grpc_chttp2_transport* t) : t_(t) {
191     GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
192     GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
193   }
194
195   // TODO(ctiller): make this the destructor
196   void FlushStats() {
197     GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
198         initial_metadata_writes_);
199     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
200     GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
201         trailing_metadata_writes_);
202     GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
203   }
204
205   void FlushSettings() {
206     if (t_->dirtied_local_settings && !t_->sent_local_settings) {
207       grpc_slice_buffer_add(
208           &t_->outbuf, grpc_chttp2_settings_create(
209                            t_->settings[GRPC_SENT_SETTINGS],
210                            t_->settings[GRPC_LOCAL_SETTINGS],
211                            t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
212       t_->force_send_settings = false;
213       t_->dirtied_local_settings = false;
214       t_->sent_local_settings = true;
215       GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
216     }
217   }
218
219   void FlushQueuedBuffers() {
220     /* simple writes are queued to qbuf, and flushed here */
221     grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
222     GPR_ASSERT(t_->qbuf.count == 0);
223   }
224
225   void FlushWindowUpdates() {
226     uint32_t transport_announce =
227         t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
228     if (transport_announce) {
229       grpc_transport_one_way_stats throwaway_stats;
230       grpc_slice_buffer_add(
231           &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
232                                                         &throwaway_stats));
233       ResetPingClock();
234     }
235   }
236
237   void FlushPingAcks() {
238     for (size_t i = 0; i < t_->ping_ack_count; i++) {
239       grpc_slice_buffer_add(&t_->outbuf,
240                             grpc_chttp2_ping_create(true, t_->ping_acks[i]));
241     }
242     t_->ping_ack_count = 0;
243   }
244
245   void EnactHpackSettings() {
246     grpc_chttp2_hpack_compressor_set_max_table_size(
247         &t_->hpack_compressor,
248         t_->settings[GRPC_PEER_SETTINGS]
249                     [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
250   }
251
252   void UpdateStreamsNoLongerStalled() {
253     grpc_chttp2_stream* s;
254     while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
255       if (t_->closed_with_error == GRPC_ERROR_NONE &&
256           grpc_chttp2_list_add_writable_stream(t_, s)) {
257         if (!s->refcount->refs.RefIfNonZero()) {
258           grpc_chttp2_list_remove_writable_stream(t_, s);
259         }
260       }
261     }
262   }
263
264   grpc_chttp2_stream* NextStream() {
265     if (t_->outbuf.length > target_write_size(t_)) {
266       result_.partial = true;
267       return nullptr;
268     }
269
270     grpc_chttp2_stream* s;
271     if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
272       return nullptr;
273     }
274
275     return s;
276   }
277
278   void ResetPingClock() {
279     if (!t_->is_client) {
280       t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
281       t_->ping_recv_state.ping_strikes = 0;
282     }
283     t_->ping_state.pings_before_data_required =
284         t_->ping_policy.max_pings_without_data;
285   }
286
287   void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
288   void IncWindowUpdateWrites() { ++flow_control_writes_; }
289   void IncMessageWrites() { ++message_writes_; }
290   void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
291
292   void NoteScheduledResults() { result_.early_results_scheduled = true; }
293
294   grpc_chttp2_transport* transport() const { return t_; }
295
296   grpc_chttp2_begin_write_result Result() {
297     result_.writing = t_->outbuf.count > 0;
298     return result_;
299   }
300
301  private:
302   grpc_chttp2_transport* const t_;
303
304   /* stats histogram counters: we increment these throughout this function,
305      and at the end publish to the central stats histograms */
306   int flow_control_writes_ = 0;
307   int initial_metadata_writes_ = 0;
308   int trailing_metadata_writes_ = 0;
309   int message_writes_ = 0;
310   grpc_chttp2_begin_write_result result_ = {false, false, false};
311 };
312
313 class DataSendContext {
314  public:
315   DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
316                   grpc_chttp2_stream* s)
317       : write_context_(write_context),
318         t_(t),
319         s_(s),
320         sending_bytes_before_(s_->sending_bytes) {}
321
322   uint32_t stream_remote_window() const {
323     return static_cast<uint32_t> GPR_MAX(
324         0, s_->flow_control->remote_window_delta() +
325                (int64_t)t_->settings[GRPC_PEER_SETTINGS]
326                                     [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
327   }
328
329   uint32_t max_outgoing() const {
330     return static_cast<uint32_t> GPR_MIN(
331         t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
332         GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
333   }
334
335   bool AnyOutgoing() const { return max_outgoing() > 0; }
336
337   void FlushUncompressedBytes() {
338     uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
339         max_outgoing(), s_->flow_controlled_buffer.length);
340     is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
341                      s_->fetching_send_message == nullptr &&
342                      s_->send_trailing_metadata != nullptr &&
343                      grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
344     grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
345                             is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
346     s_->flow_control->SentData(send_bytes);
347     s_->sending_bytes += send_bytes;
348   }
349
350   void FlushCompressedBytes() {
351     GPR_DEBUG_ASSERT(s_->stream_compression_method !=
352                      GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
353
354     uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
355         max_outgoing(), s_->compressed_data_buffer.length);
356     bool is_last_data_frame =
357         (send_bytes == s_->compressed_data_buffer.length &&
358          s_->flow_controlled_buffer.length == 0 &&
359          s_->fetching_send_message == nullptr);
360     if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
361         s_->stream_compression_ctx != nullptr) {
362       if (GPR_UNLIKELY(!grpc_stream_compress(
363               s_->stream_compression_ctx, &s_->flow_controlled_buffer,
364               &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
365               GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
366         gpr_log(GPR_ERROR, "Stream compression failed.");
367       }
368       grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
369       s_->stream_compression_ctx = nullptr;
370       /* After finish, bytes in s->compressed_data_buffer may be
371        * more than max_outgoing. Start another round of the current
372        * while loop so that send_bytes and is_last_data_frame are
373        * recalculated. */
374       return;
375     }
376     is_last_frame_ = is_last_data_frame &&
377                      s_->send_trailing_metadata != nullptr &&
378                      grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
379     grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
380                             is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
381     s_->flow_control->SentData(send_bytes);
382     if (s_->compressed_data_buffer.length == 0) {
383       s_->sending_bytes += s_->uncompressed_data_size;
384     }
385   }
386
387   void CompressMoreBytes() {
388     GPR_DEBUG_ASSERT(s_->stream_compression_method !=
389                      GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
390
391     if (s_->stream_compression_ctx == nullptr) {
392       s_->stream_compression_ctx =
393           grpc_stream_compression_context_create(s_->stream_compression_method);
394     }
395     s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
396     if (GPR_UNLIKELY(!grpc_stream_compress(
397             s_->stream_compression_ctx, &s_->flow_controlled_buffer,
398             &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
399             GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
400       gpr_log(GPR_ERROR, "Stream compression failed.");
401     }
402   }
403
404   bool is_last_frame() const { return is_last_frame_; }
405
406   void CallCallbacks() {
407     if (update_list(
408             t_, s_,
409             static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
410             &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
411             GRPC_ERROR_NONE)) {
412       write_context_->NoteScheduledResults();
413     }
414   }
415
416  private:
417   WriteContext* write_context_;
418   grpc_chttp2_transport* t_;
419   grpc_chttp2_stream* s_;
420   const size_t sending_bytes_before_;
421   bool is_last_frame_ = false;
422 };
423
424 class StreamWriteContext {
425  public:
426   StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
427       : write_context_(write_context), t_(write_context->transport()), s_(s) {
428     GRPC_CHTTP2_IF_TRACING(
429         gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
430                 t_->is_client ? "CLIENT" : "SERVER", s->id,
431                 s->sent_initial_metadata, s->send_initial_metadata != nullptr,
432                 (int)(s->flow_control->local_window_delta() -
433                       s->flow_control->announced_window_delta())));
434   }
435
436   void FlushInitialMetadata() {
437     /* send initial metadata if it's available */
438     if (s_->sent_initial_metadata) return;
439     if (s_->send_initial_metadata == nullptr) return;
440
441     // We skip this on the server side if there is no custom initial
442     // metadata, there are no messages to send, and we are also sending
443     // trailing metadata.  This results in a Trailers-Only response,
444     // which is required for retries, as per:
445     // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
446     if (!t_->is_client && s_->fetching_send_message == nullptr &&
447         s_->flow_controlled_buffer.length == 0 &&
448         compressed_data_buffer_len() == 0 &&
449         s_->send_trailing_metadata != nullptr &&
450         is_default_initial_metadata(s_->send_initial_metadata)) {
451       ConvertInitialMetadataToTrailingMetadata();
452     } else {
453       grpc_encode_header_options hopt = {
454           s_->id,  // stream_id
455           false,   // is_eof
456           t_->settings[GRPC_PEER_SETTINGS]
457                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
458               0,  // use_true_binary_metadata
459           t_->settings[GRPC_PEER_SETTINGS]
460                       [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],  // max_frame_size
461           &s_->stats.outgoing                                 // stats
462       };
463       grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
464                                 s_->send_initial_metadata, &hopt, &t_->outbuf);
465       write_context_->ResetPingClock();
466       write_context_->IncInitialMetadataWrites();
467     }
468
469     s_->send_initial_metadata = nullptr;
470     s_->sent_initial_metadata = true;
471     write_context_->NoteScheduledResults();
472     grpc_chttp2_complete_closure_step(
473         t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
474         "send_initial_metadata_finished");
475   }
476
477   bool compressed_data_buffer_len() {
478     return s_->stream_compression_method ==
479                    GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
480                ? 0
481                : s_->compressed_data_buffer.length;
482   }
483
484   void FlushWindowUpdates() {
485     /* send any window updates */
486     const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
487     if (stream_announce == 0) return;
488
489     grpc_slice_buffer_add(
490         &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
491                                                       &s_->stats.outgoing));
492     write_context_->ResetPingClock();
493     write_context_->IncWindowUpdateWrites();
494   }
495
496   void FlushData() {
497     if (!s_->sent_initial_metadata) return;
498
499     if (s_->flow_controlled_buffer.length == 0 &&
500         compressed_data_buffer_len() == 0) {
501       return;  // early out: nothing to do
502     }
503
504     DataSendContext data_send_context(write_context_, t_, s_);
505
506     if (!data_send_context.AnyOutgoing()) {
507       if (t_->flow_control->remote_window() <= 0) {
508         report_stall(t_, s_, "transport");
509         grpc_chttp2_list_add_stalled_by_transport(t_, s_);
510       } else if (data_send_context.stream_remote_window() <= 0) {
511         report_stall(t_, s_, "stream");
512         grpc_chttp2_list_add_stalled_by_stream(t_, s_);
513       }
514       return;  // early out: nothing to do
515     }
516
517     if (s_->stream_compression_method ==
518         GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
519       while (s_->flow_controlled_buffer.length > 0 &&
520              data_send_context.max_outgoing() > 0) {
521         data_send_context.FlushUncompressedBytes();
522       }
523     } else {
524       while ((s_->flow_controlled_buffer.length > 0 ||
525               s_->compressed_data_buffer.length > 0) &&
526              data_send_context.max_outgoing() > 0) {
527         if (s_->compressed_data_buffer.length > 0) {
528           data_send_context.FlushCompressedBytes();
529         } else {
530           data_send_context.CompressMoreBytes();
531         }
532       }
533     }
534     write_context_->ResetPingClock();
535     if (data_send_context.is_last_frame()) {
536       SentLastFrame();
537     }
538     data_send_context.CallCallbacks();
539     stream_became_writable_ = true;
540     if (s_->flow_controlled_buffer.length > 0 ||
541         compressed_data_buffer_len() > 0) {
542       GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
543       grpc_chttp2_list_add_writable_stream(t_, s_);
544     }
545     write_context_->IncMessageWrites();
546   }
547
548   void FlushTrailingMetadata() {
549     if (!s_->sent_initial_metadata) return;
550
551     if (s_->send_trailing_metadata == nullptr) return;
552     if (s_->fetching_send_message != nullptr) return;
553     if (s_->flow_controlled_buffer.length != 0) return;
554     if (compressed_data_buffer_len() != 0) return;
555
556     GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
557     if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {
558       grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
559                               &s_->stats.outgoing, &t_->outbuf);
560     } else {
561       grpc_encode_header_options hopt = {
562           s_->id, true,
563           t_->settings[GRPC_PEER_SETTINGS]
564                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
565               0,
566
567           t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
568           &s_->stats.outgoing};
569       grpc_chttp2_encode_header(&t_->hpack_compressor,
570                                 extra_headers_for_trailing_metadata_,
571                                 num_extra_headers_for_trailing_metadata_,
572                                 s_->send_trailing_metadata, &hopt, &t_->outbuf);
573     }
574     write_context_->IncTrailingMetadataWrites();
575     write_context_->ResetPingClock();
576     SentLastFrame();
577
578     write_context_->NoteScheduledResults();
579     grpc_chttp2_complete_closure_step(
580         t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
581         "send_trailing_metadata_finished");
582   }
583
584   bool stream_became_writable() { return stream_became_writable_; }
585
586  private:
587   void ConvertInitialMetadataToTrailingMetadata() {
588     GRPC_CHTTP2_IF_TRACING(
589         gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
590     // When sending Trailers-Only, we need to move the :status and
591     // content-type headers to the trailers.
592     if (s_->send_initial_metadata->idx.named.status != nullptr) {
593       extra_headers_for_trailing_metadata_
594           [num_extra_headers_for_trailing_metadata_++] =
595               &s_->send_initial_metadata->idx.named.status->md;
596     }
597     if (s_->send_initial_metadata->idx.named.content_type != nullptr) {
598       extra_headers_for_trailing_metadata_
599           [num_extra_headers_for_trailing_metadata_++] =
600               &s_->send_initial_metadata->idx.named.content_type->md;
601     }
602   }
603
604   void SentLastFrame() {
605     s_->send_trailing_metadata = nullptr;
606     s_->sent_trailing_metadata = true;
607     s_->eos_sent = true;
608
609     if (!t_->is_client && !s_->read_closed) {
610       grpc_slice_buffer_add(
611           &t_->outbuf, grpc_chttp2_rst_stream_create(
612                            s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
613     }
614     grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
615                                    GRPC_ERROR_NONE);
616   }
617
618   WriteContext* const write_context_;
619   grpc_chttp2_transport* const t_;
620   grpc_chttp2_stream* const s_;
621   bool stream_became_writable_ = false;
622   grpc_mdelem* extra_headers_for_trailing_metadata_[2];
623   size_t num_extra_headers_for_trailing_metadata_ = 0;
624 };
625 }  // namespace
626
627 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
628     grpc_chttp2_transport* t) {
629   WriteContext ctx(t);
630   ctx.FlushSettings();
631   ctx.FlushPingAcks();
632   ctx.FlushQueuedBuffers();
633   ctx.EnactHpackSettings();
634
635   if (t->flow_control->remote_window() > 0) {
636     ctx.UpdateStreamsNoLongerStalled();
637   }
638
639   /* for each grpc_chttp2_stream that's become writable, frame it's data
640      (according to available window sizes) and add to the output buffer */
641   while (grpc_chttp2_stream* s = ctx.NextStream()) {
642     StreamWriteContext stream_ctx(&ctx, s);
643     size_t orig_len = t->outbuf.length;
644     stream_ctx.FlushInitialMetadata();
645     stream_ctx.FlushWindowUpdates();
646     stream_ctx.FlushData();
647     stream_ctx.FlushTrailingMetadata();
648     if (t->outbuf.length > orig_len) {
649       /* Add this stream to the list of the contexts to be traced at TCP */
650       s->byte_counter += t->outbuf.length - orig_len;
651       if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
652         grpc_core::ContextList::Append(&t->cl, s);
653       }
654     }
655     if (stream_ctx.stream_became_writable()) {
656       if (!grpc_chttp2_list_add_writing_stream(t, s)) {
657         /* already in writing list: drop ref */
658         GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
659       } else {
660         /* ref will be dropped at end of write */
661       }
662     } else {
663       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
664     }
665   }
666
667   ctx.FlushWindowUpdates();
668
669   maybe_initiate_ping(t);
670
671   return ctx.Result();
672 }
673
674 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
675   GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
676   grpc_chttp2_stream* s;
677
678   if (t->channelz_socket != nullptr) {
679     t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
680   }
681   t->num_messages_in_next_write = 0;
682
683   while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
684     if (s->sending_bytes != 0) {
685       update_list(t, s, static_cast<int64_t>(s->sending_bytes),
686                   &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
687                   GRPC_ERROR_REF(error));
688       s->sending_bytes = 0;
689     }
690     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
691   }
692   grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
693   GRPC_ERROR_UNREF(error);
694 }