3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
22 #include <grpc/support/port_platform.h>
26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/abstract.h"
29 #include "src/core/lib/gprpp/manual_constructor.h"
30 #include "src/core/lib/transport/bdp_estimator.h"
31 #include "src/core/lib/transport/pid_controller.h"
33 struct grpc_chttp2_transport;
34 struct grpc_chttp2_stream;
36 extern grpc_core::TraceFlag grpc_flowctl_trace;
40 class TrickledCHTTP2; // to make this a friend
41 } // namespace testing
47 static constexpr uint32_t kDefaultWindow = 65535;
48 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
49 // TODO(ncteisen): Tune this
50 static constexpr uint32_t kFrameSize = 1024 * 1024;
52 class TransportFlowControl;
53 class StreamFlowControl;
55 // Encapsulates a collections of actions the transport needs to take with
56 // regard to flow control. Each action comes with urgencies that tell the
57 // transport how quickly the action must take place.
58 class FlowControlAction {
60 enum class Urgency : uint8_t {
61 // Nothing to be done.
63 // Initiate a write to update the initial window immediately.
65 // Push the flow control update into a send buffer, to be sent
66 // out the next time a write is initiated.
70 Urgency send_stream_update() const { return send_stream_update_; }
71 Urgency send_transport_update() const { return send_transport_update_; }
72 Urgency send_initial_window_update() const {
73 return send_initial_window_update_;
75 Urgency send_max_frame_size_update() const {
76 return send_max_frame_size_update_;
78 uint32_t initial_window_size() const { return initial_window_size_; }
79 uint32_t max_frame_size() const { return max_frame_size_; }
81 FlowControlAction& set_send_stream_update(Urgency u) {
82 send_stream_update_ = u;
85 FlowControlAction& set_send_transport_update(Urgency u) {
86 send_transport_update_ = u;
89 FlowControlAction& set_send_initial_window_update(Urgency u,
91 send_initial_window_update_ = u;
92 initial_window_size_ = update;
95 FlowControlAction& set_send_max_frame_size_update(Urgency u,
97 send_max_frame_size_update_ = u;
98 max_frame_size_ = update;
102 static const char* UrgencyString(Urgency u);
103 void Trace(grpc_chttp2_transport* t) const;
106 Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
107 Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
108 Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
109 Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
110 uint32_t initial_window_size_ = 0;
111 uint32_t max_frame_size_ = 0;
114 class FlowControlTrace {
116 FlowControlTrace(const char* reason, TransportFlowControl* tfc,
117 StreamFlowControl* sfc) {
118 if (enabled_) Init(reason, tfc, sfc);
121 ~FlowControlTrace() {
122 if (enabled_) Finish();
126 void Init(const char* reason, TransportFlowControl* tfc,
127 StreamFlowControl* sfc);
130 const bool enabled_ = grpc_flowctl_trace.enabled();
132 TransportFlowControl* tfc_;
133 StreamFlowControl* sfc_;
135 int64_t remote_window_;
136 int64_t target_window_;
137 int64_t announced_window_;
138 int64_t remote_window_delta_;
139 int64_t local_window_delta_;
140 int64_t announced_window_delta_;
143 // Fat interface with all methods a flow control implementation needs to
144 // support. gRPC C Core does not support pure virtual functions, so instead
145 // we abort in any methods which require implementation in the base class.
146 class TransportFlowControlBase {
148 TransportFlowControlBase() {}
149 virtual ~TransportFlowControlBase() {}
151 // Is flow control enabled? This is needed in other codepaths like the checks
152 // in parsing and in writing.
153 virtual bool flow_control_enabled() const { abort(); }
155 // Called to check if the transport needs to send a WINDOW_UPDATE frame
156 virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); }
158 // Using the protected members, returns and Action to be taken by the
160 virtual FlowControlAction MakeAction() { abort(); }
162 // Using the protected members, returns and Action to be taken by the
163 // tranport. Also checks for updates to our BDP estimate and acts
165 virtual FlowControlAction PeriodicUpdate() { abort(); }
167 // Called to do bookkeeping when a stream owned by this transport sends
169 virtual void StreamSentData(int64_t size) { abort(); }
171 // Called to do bookkeeping when a stream owned by this transport receives
172 // data from the wire. Also does error checking for frame size.
173 virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); }
175 // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
176 virtual void RecvUpdate(uint32_t size) { abort(); }
178 // Returns the BdpEstimator held by this object. Caller is responsible for
179 // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
180 // bdp estimator actions inside TransportFlowControl
181 virtual BdpEstimator* bdp_estimator() { return nullptr; }
184 int64_t remote_window() const { return remote_window_; }
185 virtual int64_t target_window() const { return target_initial_window_size_; }
186 int64_t announced_window() const { return announced_window_; }
188 // Used in certain benchmarks in which we don't want FlowControl to be a
190 virtual void TestOnlyForceHugeWindow() {}
192 GRPC_ABSTRACT_BASE_CLASS
195 friend class ::grpc::testing::TrickledCHTTP2;
196 int64_t remote_window_ = kDefaultWindow;
197 int64_t target_initial_window_size_ = kDefaultWindow;
198 int64_t announced_window_ = kDefaultWindow;
201 // Implementation of flow control that does NOTHING. Always returns maximum
202 // values, never initiates writes, and assumes that the remote peer is doing
203 // the same. To be used to narrow down on flow control as the cause of negative
205 class TransportFlowControlDisabled final : public TransportFlowControlBase {
207 // Maxes out all values
208 TransportFlowControlDisabled(grpc_chttp2_transport* t);
210 bool flow_control_enabled() const override { return false; }
212 // Never do anything.
213 uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; }
214 FlowControlAction MakeAction() override { return FlowControlAction(); }
215 FlowControlAction PeriodicUpdate() override { return FlowControlAction(); }
216 void StreamSentData(int64_t size) override {}
217 grpc_error* RecvData(int64_t incoming_frame_size) override {
218 return GRPC_ERROR_NONE;
220 void RecvUpdate(uint32_t size) override {}
223 // Implementation of flow control that abides to HTTP/2 spec and attempts
224 // to be as performant as possible.
225 class TransportFlowControl final : public TransportFlowControlBase {
227 TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
228 ~TransportFlowControl() {}
230 bool flow_control_enabled() const override { return true; }
232 bool bdp_probe() const { return enable_bdp_probe_; }
234 // returns an announce if we should send a transport update to our peer,
235 // else returns zero; writing_anyway indicates if a write would happen
236 // regardless of the send - if it is false and this function returns non-zero,
237 // this announce will cause a write to occur
238 uint32_t MaybeSendUpdate(bool writing_anyway) override;
240 // Reads the flow control data and returns and actionable struct that will
241 // tell chttp2 exactly what it needs to do
242 FlowControlAction MakeAction() override {
243 return UpdateAction(FlowControlAction());
246 // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
247 // to perform more complex flow control calculations and return an action
248 // to let chttp2 change its parameters
249 FlowControlAction PeriodicUpdate() override;
251 void StreamSentData(int64_t size) override { remote_window_ -= size; }
253 grpc_error* ValidateRecvData(int64_t incoming_frame_size);
254 void CommitRecvData(int64_t incoming_frame_size) {
255 announced_window_ -= incoming_frame_size;
258 grpc_error* RecvData(int64_t incoming_frame_size) override {
259 FlowControlTrace trace(" data recv", this, nullptr);
260 grpc_error* error = ValidateRecvData(incoming_frame_size);
261 if (error != GRPC_ERROR_NONE) return error;
262 CommitRecvData(incoming_frame_size);
263 return GRPC_ERROR_NONE;
266 // we have received a WINDOW_UPDATE frame for a transport
267 void RecvUpdate(uint32_t size) override {
268 FlowControlTrace trace("t updt recv", this, nullptr);
269 remote_window_ += size;
272 // See comment above announced_stream_total_over_incoming_window_ for the
273 // logic behind this decision.
274 int64_t target_window() const override {
275 return static_cast<uint32_t> GPR_MIN(
276 (int64_t)((1u << 31) - 1),
277 announced_stream_total_over_incoming_window_ +
278 target_initial_window_size_);
281 const grpc_chttp2_transport* transport() const { return t_; }
283 void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
285 announced_stream_total_over_incoming_window_ -= delta;
287 announced_stream_total_under_incoming_window_ += -delta;
291 void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
293 announced_stream_total_over_incoming_window_ += delta;
295 announced_stream_total_under_incoming_window_ -= -delta;
299 BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
301 void TestOnlyForceHugeWindow() override {
302 announced_window_ = 1024 * 1024 * 1024;
303 remote_window_ = 1024 * 1024 * 1024;
307 double TargetLogBdp();
308 double SmoothLogBdp(double value);
309 FlowControlAction::Urgency DeltaUrgency(int64_t value,
310 grpc_chttp2_setting_id setting_id);
312 FlowControlAction UpdateAction(FlowControlAction action) {
313 if (announced_window_ < target_window() / 2) {
314 action.set_send_transport_update(
315 FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
320 const grpc_chttp2_transport* const t_;
322 /** calculating what we should give for local window:
323 we track the total amount of flow control over initial window size
324 across all streams: this is data that we want to receive right now (it
325 has an outstanding read)
326 and the total amount of flow control under initial window size across all
327 streams: this is data we've read early
328 we want to adjust incoming_window such that:
329 incoming_window = total_over - max(bdp - total_under, 0) */
330 int64_t announced_stream_total_over_incoming_window_ = 0;
331 int64_t announced_stream_total_under_incoming_window_ = 0;
333 /** should we probe bdp? */
334 const bool enable_bdp_probe_;
337 grpc_core::BdpEstimator bdp_estimator_;
340 grpc_core::PidController pid_controller_;
341 grpc_millis last_pid_update_ = 0;
344 // Fat interface with all methods a stream flow control implementation needs
345 // to support. gRPC C Core does not support pure virtual functions, so instead
346 // we abort in any methods which require implementation in the base class.
347 class StreamFlowControlBase {
349 StreamFlowControlBase() {}
350 virtual ~StreamFlowControlBase() {}
352 // Updates an action using the protected members.
353 virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); }
355 // Using the protected members, returns an Action for this stream to be
356 // taken by the tranport.
357 virtual FlowControlAction MakeAction() { abort(); }
359 // Bookkeeping for when data is sent on this stream.
360 virtual void SentData(int64_t outgoing_frame_size) { abort(); }
362 // Bookkeeping and error checking for when data is received by this stream.
363 virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); }
365 // Called to check if this stream needs to send a WINDOW_UPDATE frame.
366 virtual uint32_t MaybeSendUpdate() { abort(); }
368 // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
369 virtual void RecvUpdate(uint32_t size) { abort(); }
371 // Bookkeeping for when a call pulls bytes out of the transport. At this
372 // point we consider the data 'used' and can thus let out peer know we are
373 // ready for more data.
374 virtual void IncomingByteStreamUpdate(size_t max_size_hint,
375 size_t have_already) {
379 // Used in certain benchmarks in which we don't want FlowControl to be a
381 virtual void TestOnlyForceHugeWindow() {}
384 int64_t remote_window_delta() { return remote_window_delta_; }
385 int64_t local_window_delta() { return local_window_delta_; }
386 int64_t announced_window_delta() { return announced_window_delta_; }
388 GRPC_ABSTRACT_BASE_CLASS
391 friend class ::grpc::testing::TrickledCHTTP2;
392 int64_t remote_window_delta_ = 0;
393 int64_t local_window_delta_ = 0;
394 int64_t announced_window_delta_ = 0;
397 // Implementation of flow control that does NOTHING. Always returns maximum
398 // values, never initiates writes, and assumes that the remote peer is doing
399 // the same. To be used to narrow down on flow control as the cause of negative
401 class StreamFlowControlDisabled : public StreamFlowControlBase {
403 FlowControlAction UpdateAction(FlowControlAction action) override {
406 FlowControlAction MakeAction() override { return FlowControlAction(); }
407 void SentData(int64_t outgoing_frame_size) override {}
408 grpc_error* RecvData(int64_t incoming_frame_size) override {
409 return GRPC_ERROR_NONE;
411 uint32_t MaybeSendUpdate() override { return 0; }
412 void RecvUpdate(uint32_t size) override {}
413 void IncomingByteStreamUpdate(size_t max_size_hint,
414 size_t have_already) override {}
417 // Implementation of flow control that abides to HTTP/2 spec and attempts
418 // to be as performant as possible.
419 class StreamFlowControl final : public StreamFlowControlBase {
421 StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
422 ~StreamFlowControl() {
423 tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
426 FlowControlAction UpdateAction(FlowControlAction action) override;
427 FlowControlAction MakeAction() override {
428 return UpdateAction(tfc_->MakeAction());
431 // we have sent data on the wire, we must track this in our bookkeeping for
432 // the remote peer's flow control.
433 void SentData(int64_t outgoing_frame_size) override {
434 FlowControlTrace tracer(" data sent", tfc_, this);
435 tfc_->StreamSentData(outgoing_frame_size);
436 remote_window_delta_ -= outgoing_frame_size;
439 // we have received data from the wire
440 grpc_error* RecvData(int64_t incoming_frame_size) override;
442 // returns an announce if we should send a stream update to our peer, else
444 uint32_t MaybeSendUpdate() override;
446 // we have received a WINDOW_UPDATE frame for a stream
447 void RecvUpdate(uint32_t size) override {
448 FlowControlTrace trace("s updt recv", tfc_, this);
449 remote_window_delta_ += size;
452 // the application is asking for a certain amount of bytes
453 void IncomingByteStreamUpdate(size_t max_size_hint,
454 size_t have_already) override;
456 int64_t remote_window_delta() const { return remote_window_delta_; }
457 int64_t local_window_delta() const { return local_window_delta_; }
458 int64_t announced_window_delta() const { return announced_window_delta_; }
460 const grpc_chttp2_stream* stream() const { return s_; }
462 void TestOnlyForceHugeWindow() override {
463 announced_window_delta_ = 1024 * 1024 * 1024;
464 local_window_delta_ = 1024 * 1024 * 1024;
465 remote_window_delta_ = 1024 * 1024 * 1024;
469 TransportFlowControl* const tfc_;
470 const grpc_chttp2_stream* const s_;
472 void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
473 tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
474 announced_window_delta_ += change;
475 tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
479 } // namespace chttp2
480 } // namespace grpc_core