3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
22 #include <grpc/support/port_platform.h>
26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/transport/bdp_estimator.h"
30 #include "src/core/lib/transport/pid_controller.h"
32 struct grpc_chttp2_transport;
33 struct grpc_chttp2_stream;
35 extern grpc_core::TraceFlag grpc_flowctl_trace;
39 class TrickledCHTTP2; // to make this a friend
40 } // namespace testing
46 static constexpr uint32_t kDefaultWindow = 65535;
47 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
48 // TODO(ncteisen): Tune this
49 static constexpr uint32_t kFrameSize = 1024 * 1024;
51 class TransportFlowControl;
52 class StreamFlowControl;
54 // Encapsulates a collections of actions the transport needs to take with
55 // regard to flow control. Each action comes with urgencies that tell the
56 // transport how quickly the action must take place.
57 class FlowControlAction {
59 enum class Urgency : uint8_t {
60 // Nothing to be done.
62 // Initiate a write to update the initial window immediately.
64 // Push the flow control update into a send buffer, to be sent
65 // out the next time a write is initiated.
69 Urgency send_stream_update() const { return send_stream_update_; }
70 Urgency send_transport_update() const { return send_transport_update_; }
71 Urgency send_initial_window_update() const {
72 return send_initial_window_update_;
74 Urgency send_max_frame_size_update() const {
75 return send_max_frame_size_update_;
77 uint32_t initial_window_size() const { return initial_window_size_; }
78 uint32_t max_frame_size() const { return max_frame_size_; }
80 FlowControlAction& set_send_stream_update(Urgency u) {
81 send_stream_update_ = u;
84 FlowControlAction& set_send_transport_update(Urgency u) {
85 send_transport_update_ = u;
88 FlowControlAction& set_send_initial_window_update(Urgency u,
90 send_initial_window_update_ = u;
91 initial_window_size_ = update;
94 FlowControlAction& set_send_max_frame_size_update(Urgency u,
96 send_max_frame_size_update_ = u;
97 max_frame_size_ = update;
101 static const char* UrgencyString(Urgency u);
102 void Trace(grpc_chttp2_transport* t) const;
105 Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
106 Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
107 Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
108 Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
109 uint32_t initial_window_size_ = 0;
110 uint32_t max_frame_size_ = 0;
113 class FlowControlTrace {
115 FlowControlTrace(const char* reason, TransportFlowControl* tfc,
116 StreamFlowControl* sfc) {
117 if (enabled_) Init(reason, tfc, sfc);
120 ~FlowControlTrace() {
121 if (enabled_) Finish();
125 void Init(const char* reason, TransportFlowControl* tfc,
126 StreamFlowControl* sfc);
129 const bool enabled_ = GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace);
131 TransportFlowControl* tfc_;
132 StreamFlowControl* sfc_;
134 int64_t remote_window_;
135 int64_t target_window_;
136 int64_t announced_window_;
137 int64_t remote_window_delta_;
138 int64_t local_window_delta_;
139 int64_t announced_window_delta_;
142 // Fat interface with all methods a flow control implementation needs to
144 class TransportFlowControlBase {
146 TransportFlowControlBase() {}
147 virtual ~TransportFlowControlBase() {}
149 // Is flow control enabled? This is needed in other codepaths like the checks
150 // in parsing and in writing.
151 virtual bool flow_control_enabled() const = 0;
153 // Called to check if the transport needs to send a WINDOW_UPDATE frame
154 virtual uint32_t MaybeSendUpdate(bool /* writing_anyway */) = 0;
156 // Using the protected members, returns and Action to be taken by the
158 virtual FlowControlAction MakeAction() = 0;
160 // Using the protected members, returns and Action to be taken by the
161 // tranport. Also checks for updates to our BDP estimate and acts
163 virtual FlowControlAction PeriodicUpdate() = 0;
165 // Called to do bookkeeping when a stream owned by this transport sends
167 virtual void StreamSentData(int64_t /* size */) = 0;
169 // Called to do bookkeeping when a stream owned by this transport receives
170 // data from the wire. Also does error checking for frame size.
171 virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) = 0;
173 // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
174 virtual void RecvUpdate(uint32_t /* size */) = 0;
176 // Returns the BdpEstimator held by this object. Caller is responsible for
177 // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
178 // bdp estimator actions inside TransportFlowControl
179 virtual BdpEstimator* bdp_estimator() { return nullptr; }
182 int64_t remote_window() const { return remote_window_; }
183 virtual int64_t target_window() const { return target_initial_window_size_; }
184 int64_t announced_window() const { return announced_window_; }
186 // Used in certain benchmarks in which we don't want FlowControl to be a
188 virtual void TestOnlyForceHugeWindow() {}
191 friend class ::grpc::testing::TrickledCHTTP2;
192 int64_t remote_window_ = kDefaultWindow;
193 int64_t target_initial_window_size_ = kDefaultWindow;
194 int64_t announced_window_ = kDefaultWindow;
197 // Implementation of flow control that does NOTHING. Always returns maximum
198 // values, never initiates writes, and assumes that the remote peer is doing
199 // the same. To be used to narrow down on flow control as the cause of negative
201 class TransportFlowControlDisabled final : public TransportFlowControlBase {
203 // Maxes out all values
204 TransportFlowControlDisabled(grpc_chttp2_transport* t);
206 bool flow_control_enabled() const override { return false; }
208 // Never do anything.
209 uint32_t MaybeSendUpdate(bool /* writing_anyway */) override { return 0; }
210 FlowControlAction MakeAction() override { return FlowControlAction(); }
211 FlowControlAction PeriodicUpdate() override { return FlowControlAction(); }
212 void StreamSentData(int64_t /* size */) override {}
213 grpc_error* RecvData(int64_t /* incoming_frame_size */) override {
214 return GRPC_ERROR_NONE;
216 void RecvUpdate(uint32_t /* size */) override {}
219 // Implementation of flow control that abides to HTTP/2 spec and attempts
220 // to be as performant as possible.
221 class TransportFlowControl final : public TransportFlowControlBase {
223 TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
224 ~TransportFlowControl() {}
226 bool flow_control_enabled() const override { return true; }
228 bool bdp_probe() const { return enable_bdp_probe_; }
230 // returns an announce if we should send a transport update to our peer,
231 // else returns zero; writing_anyway indicates if a write would happen
232 // regardless of the send - if it is false and this function returns non-zero,
233 // this announce will cause a write to occur
234 uint32_t MaybeSendUpdate(bool writing_anyway) override;
236 // Reads the flow control data and returns and actionable struct that will
237 // tell chttp2 exactly what it needs to do
238 FlowControlAction MakeAction() override {
239 return UpdateAction(FlowControlAction());
242 // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
243 // to perform more complex flow control calculations and return an action
244 // to let chttp2 change its parameters
245 FlowControlAction PeriodicUpdate() override;
247 void StreamSentData(int64_t size) override { remote_window_ -= size; }
249 grpc_error* ValidateRecvData(int64_t incoming_frame_size);
250 void CommitRecvData(int64_t incoming_frame_size) {
251 announced_window_ -= incoming_frame_size;
254 grpc_error* RecvData(int64_t incoming_frame_size) override {
255 FlowControlTrace trace(" data recv", this, nullptr);
256 grpc_error* error = ValidateRecvData(incoming_frame_size);
257 if (error != GRPC_ERROR_NONE) return error;
258 CommitRecvData(incoming_frame_size);
259 return GRPC_ERROR_NONE;
262 // we have received a WINDOW_UPDATE frame for a transport
263 void RecvUpdate(uint32_t size) override {
264 FlowControlTrace trace("t updt recv", this, nullptr);
265 remote_window_ += size;
268 // See comment above announced_stream_total_over_incoming_window_ for the
269 // logic behind this decision.
270 int64_t target_window() const override {
271 return static_cast<uint32_t> GPR_MIN(
272 (int64_t)((1u << 31) - 1),
273 announced_stream_total_over_incoming_window_ +
274 target_initial_window_size_);
277 const grpc_chttp2_transport* transport() const { return t_; }
279 void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
281 announced_stream_total_over_incoming_window_ -= delta;
285 void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
287 announced_stream_total_over_incoming_window_ += delta;
291 BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
293 void TestOnlyForceHugeWindow() override {
294 announced_window_ = 1024 * 1024 * 1024;
295 remote_window_ = 1024 * 1024 * 1024;
299 double TargetLogBdp();
300 double SmoothLogBdp(double value);
301 FlowControlAction::Urgency DeltaUrgency(int64_t value,
302 grpc_chttp2_setting_id setting_id);
304 FlowControlAction UpdateAction(FlowControlAction action) {
305 if (announced_window_ < target_window() / 2) {
306 action.set_send_transport_update(
307 FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
312 const grpc_chttp2_transport* const t_;
314 /** calculating what we should give for local window:
315 we track the total amount of flow control over initial window size
316 across all streams: this is data that we want to receive right now (it
317 has an outstanding read)
318 and the total amount of flow control under initial window size across all
319 streams: this is data we've read early
320 we want to adjust incoming_window such that:
321 incoming_window = total_over - max(bdp - total_under, 0) */
322 int64_t announced_stream_total_over_incoming_window_ = 0;
324 /** should we probe bdp? */
325 const bool enable_bdp_probe_;
328 grpc_core::BdpEstimator bdp_estimator_;
331 grpc_core::PidController pid_controller_;
332 grpc_millis last_pid_update_ = 0;
335 // Fat interface with all methods a stream flow control implementation needs
337 class StreamFlowControlBase {
339 StreamFlowControlBase() {}
340 virtual ~StreamFlowControlBase() {}
342 // Updates an action using the protected members.
343 virtual FlowControlAction UpdateAction(FlowControlAction /* action */) {
347 // Using the protected members, returns an Action for this stream to be
348 // taken by the tranport.
349 virtual FlowControlAction MakeAction() = 0;
351 // Bookkeeping for when data is sent on this stream.
352 virtual void SentData(int64_t /* outgoing_frame_size */) = 0;
354 // Bookkeeping and error checking for when data is received by this stream.
355 virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) = 0;
357 // Called to check if this stream needs to send a WINDOW_UPDATE frame.
358 virtual uint32_t MaybeSendUpdate() = 0;
360 // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
361 virtual void RecvUpdate(uint32_t /* size */) = 0;
363 // Bookkeeping for when a call pulls bytes out of the transport. At this
364 // point we consider the data 'used' and can thus let out peer know we are
365 // ready for more data.
366 virtual void IncomingByteStreamUpdate(size_t /* max_size_hint */,
367 size_t /* have_already */) {
371 // Used in certain benchmarks in which we don't want FlowControl to be a
373 virtual void TestOnlyForceHugeWindow() {}
376 int64_t remote_window_delta() { return remote_window_delta_; }
377 int64_t local_window_delta() { return local_window_delta_; }
378 int64_t announced_window_delta() { return announced_window_delta_; }
381 friend class ::grpc::testing::TrickledCHTTP2;
382 int64_t remote_window_delta_ = 0;
383 int64_t local_window_delta_ = 0;
384 int64_t announced_window_delta_ = 0;
387 // Implementation of flow control that does NOTHING. Always returns maximum
388 // values, never initiates writes, and assumes that the remote peer is doing
389 // the same. To be used to narrow down on flow control as the cause of negative
391 class StreamFlowControlDisabled : public StreamFlowControlBase {
393 FlowControlAction UpdateAction(FlowControlAction action) override {
396 FlowControlAction MakeAction() override { return FlowControlAction(); }
397 void SentData(int64_t /* outgoing_frame_size */) override {}
398 grpc_error* RecvData(int64_t /* incoming_frame_size */) override {
399 return GRPC_ERROR_NONE;
401 uint32_t MaybeSendUpdate() override { return 0; }
402 void RecvUpdate(uint32_t /* size */) override {}
403 void IncomingByteStreamUpdate(size_t /* max_size_hint */,
404 size_t /* have_already */) override {}
407 // Implementation of flow control that abides to HTTP/2 spec and attempts
408 // to be as performant as possible.
409 class StreamFlowControl final : public StreamFlowControlBase {
411 StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
412 ~StreamFlowControl() {
413 tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
416 FlowControlAction UpdateAction(FlowControlAction action) override;
417 FlowControlAction MakeAction() override {
418 return UpdateAction(tfc_->MakeAction());
421 // we have sent data on the wire, we must track this in our bookkeeping for
422 // the remote peer's flow control.
423 void SentData(int64_t outgoing_frame_size) override {
424 FlowControlTrace tracer(" data sent", tfc_, this);
425 tfc_->StreamSentData(outgoing_frame_size);
426 remote_window_delta_ -= outgoing_frame_size;
429 // we have received data from the wire
430 grpc_error* RecvData(int64_t incoming_frame_size) override;
432 // returns an announce if we should send a stream update to our peer, else
434 uint32_t MaybeSendUpdate() override;
436 // we have received a WINDOW_UPDATE frame for a stream
437 void RecvUpdate(uint32_t size) override {
438 FlowControlTrace trace("s updt recv", tfc_, this);
439 remote_window_delta_ += size;
442 // the application is asking for a certain amount of bytes
443 void IncomingByteStreamUpdate(size_t max_size_hint,
444 size_t have_already) override;
446 int64_t remote_window_delta() const { return remote_window_delta_; }
447 int64_t local_window_delta() const { return local_window_delta_; }
448 int64_t announced_window_delta() const { return announced_window_delta_; }
450 const grpc_chttp2_stream* stream() const { return s_; }
452 void TestOnlyForceHugeWindow() override {
453 announced_window_delta_ = 1024 * 1024 * 1024;
454 local_window_delta_ = 1024 * 1024 * 1024;
455 remote_window_delta_ = 1024 * 1024 * 1024;
459 TransportFlowControl* const tfc_;
460 const grpc_chttp2_stream* const s_;
462 void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
463 tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
464 announced_window_delta_ += change;
465 tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
469 class TestOnlyTransportTargetWindowEstimatesMocker {
471 virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {}
472 virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
473 double current_target) = 0;
476 extern TestOnlyTransportTargetWindowEstimatesMocker*
477 g_test_only_transport_target_window_estimates_mocker;
479 } // namespace chttp2
480 } // namespace grpc_core