Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / transport / flow_control.h
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <stdint.h>
25
26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/transport/bdp_estimator.h"
30 #include "src/core/lib/transport/pid_controller.h"
31
32 struct grpc_chttp2_transport;
33 struct grpc_chttp2_stream;
34
35 extern grpc_core::TraceFlag grpc_flowctl_trace;
36
37 namespace grpc {
38 namespace testing {
39 class TrickledCHTTP2;  // to make this a friend
40 }  // namespace testing
41 }  // namespace grpc
42
43 namespace grpc_core {
44 namespace chttp2 {
45
46 static constexpr uint32_t kDefaultWindow = 65535;
47 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
48 // TODO(ncteisen): Tune this
49 static constexpr uint32_t kFrameSize = 1024 * 1024;
50
51 class TransportFlowControl;
52 class StreamFlowControl;
53
54 // Encapsulates a collections of actions the transport needs to take with
55 // regard to flow control. Each action comes with urgencies that tell the
56 // transport how quickly the action must take place.
57 class FlowControlAction {
58  public:
59   enum class Urgency : uint8_t {
60     // Nothing to be done.
61     NO_ACTION_NEEDED = 0,
62     // Initiate a write to update the initial window immediately.
63     UPDATE_IMMEDIATELY,
64     // Push the flow control update into a send buffer, to be sent
65     // out the next time a write is initiated.
66     QUEUE_UPDATE,
67   };
68
69   Urgency send_stream_update() const { return send_stream_update_; }
70   Urgency send_transport_update() const { return send_transport_update_; }
71   Urgency send_initial_window_update() const {
72     return send_initial_window_update_;
73   }
74   Urgency send_max_frame_size_update() const {
75     return send_max_frame_size_update_;
76   }
77   uint32_t initial_window_size() const { return initial_window_size_; }
78   uint32_t max_frame_size() const { return max_frame_size_; }
79
80   FlowControlAction& set_send_stream_update(Urgency u) {
81     send_stream_update_ = u;
82     return *this;
83   }
84   FlowControlAction& set_send_transport_update(Urgency u) {
85     send_transport_update_ = u;
86     return *this;
87   }
88   FlowControlAction& set_send_initial_window_update(Urgency u,
89                                                     uint32_t update) {
90     send_initial_window_update_ = u;
91     initial_window_size_ = update;
92     return *this;
93   }
94   FlowControlAction& set_send_max_frame_size_update(Urgency u,
95                                                     uint32_t update) {
96     send_max_frame_size_update_ = u;
97     max_frame_size_ = update;
98     return *this;
99   }
100
101   static const char* UrgencyString(Urgency u);
102   void Trace(grpc_chttp2_transport* t) const;
103
104  private:
105   Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
106   Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
107   Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
108   Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
109   uint32_t initial_window_size_ = 0;
110   uint32_t max_frame_size_ = 0;
111 };
112
113 class FlowControlTrace {
114  public:
115   FlowControlTrace(const char* reason, TransportFlowControl* tfc,
116                    StreamFlowControl* sfc) {
117     if (enabled_) Init(reason, tfc, sfc);
118   }
119
120   ~FlowControlTrace() {
121     if (enabled_) Finish();
122   }
123
124  private:
125   void Init(const char* reason, TransportFlowControl* tfc,
126             StreamFlowControl* sfc);
127   void Finish();
128
129   const bool enabled_ = GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace);
130
131   TransportFlowControl* tfc_;
132   StreamFlowControl* sfc_;
133   const char* reason_;
134   int64_t remote_window_;
135   int64_t target_window_;
136   int64_t announced_window_;
137   int64_t remote_window_delta_;
138   int64_t local_window_delta_;
139   int64_t announced_window_delta_;
140 };
141
142 // Fat interface with all methods a flow control implementation needs to
143 // support.
144 class TransportFlowControlBase {
145  public:
146   TransportFlowControlBase() {}
147   virtual ~TransportFlowControlBase() {}
148
149   // Is flow control enabled? This is needed in other codepaths like the checks
150   // in parsing and in writing.
151   virtual bool flow_control_enabled() const = 0;
152
153   // Called to check if the transport needs to send a WINDOW_UPDATE frame
154   virtual uint32_t MaybeSendUpdate(bool /* writing_anyway */) = 0;
155
156   // Using the protected members, returns and Action to be taken by the
157   // tranport.
158   virtual FlowControlAction MakeAction() = 0;
159
160   // Using the protected members, returns and Action to be taken by the
161   // tranport. Also checks for updates to our BDP estimate and acts
162   // accordingly.
163   virtual FlowControlAction PeriodicUpdate() = 0;
164
165   // Called to do bookkeeping when a stream owned by this transport sends
166   // data on the wire
167   virtual void StreamSentData(int64_t /* size */) = 0;
168
169   // Called to do bookkeeping when a stream owned by this transport receives
170   // data from the wire. Also does error checking for frame size.
171   virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) = 0;
172
173   // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
174   virtual void RecvUpdate(uint32_t /* size */) = 0;
175
176   // Returns the BdpEstimator held by this object. Caller is responsible for
177   // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
178   // bdp estimator actions inside TransportFlowControl
179   virtual BdpEstimator* bdp_estimator() { return nullptr; }
180
181   // Getters
182   int64_t remote_window() const { return remote_window_; }
183   virtual int64_t target_window() const { return target_initial_window_size_; }
184   int64_t announced_window() const { return announced_window_; }
185
186   // Used in certain benchmarks in which we don't want FlowControl to be a
187   // factor
188   virtual void TestOnlyForceHugeWindow() {}
189
190  protected:
191   friend class ::grpc::testing::TrickledCHTTP2;
192   int64_t remote_window_ = kDefaultWindow;
193   int64_t target_initial_window_size_ = kDefaultWindow;
194   int64_t announced_window_ = kDefaultWindow;
195 };
196
197 // Implementation of flow control that does NOTHING. Always returns maximum
198 // values, never initiates writes, and assumes that the remote peer is doing
199 // the same. To be used to narrow down on flow control as the cause of negative
200 // performance.
201 class TransportFlowControlDisabled final : public TransportFlowControlBase {
202  public:
203   // Maxes out all values
204   TransportFlowControlDisabled(grpc_chttp2_transport* t);
205
206   bool flow_control_enabled() const override { return false; }
207
208   // Never do anything.
209   uint32_t MaybeSendUpdate(bool /* writing_anyway */) override { return 0; }
210   FlowControlAction MakeAction() override { return FlowControlAction(); }
211   FlowControlAction PeriodicUpdate() override { return FlowControlAction(); }
212   void StreamSentData(int64_t /* size */) override {}
213   grpc_error* RecvData(int64_t /* incoming_frame_size */) override {
214     return GRPC_ERROR_NONE;
215   }
216   void RecvUpdate(uint32_t /* size */) override {}
217 };
218
219 // Implementation of flow control that abides to HTTP/2 spec and attempts
220 // to be as performant as possible.
221 class TransportFlowControl final : public TransportFlowControlBase {
222  public:
223   TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
224   ~TransportFlowControl() {}
225
226   bool flow_control_enabled() const override { return true; }
227
228   bool bdp_probe() const { return enable_bdp_probe_; }
229
230   // returns an announce if we should send a transport update to our peer,
231   // else returns zero; writing_anyway indicates if a write would happen
232   // regardless of the send - if it is false and this function returns non-zero,
233   // this announce will cause a write to occur
234   uint32_t MaybeSendUpdate(bool writing_anyway) override;
235
236   // Reads the flow control data and returns and actionable struct that will
237   // tell chttp2 exactly what it needs to do
238   FlowControlAction MakeAction() override {
239     return UpdateAction(FlowControlAction());
240   }
241
242   // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
243   // to perform more complex flow control calculations and return an action
244   // to let chttp2 change its parameters
245   FlowControlAction PeriodicUpdate() override;
246
247   void StreamSentData(int64_t size) override { remote_window_ -= size; }
248
249   grpc_error* ValidateRecvData(int64_t incoming_frame_size);
250   void CommitRecvData(int64_t incoming_frame_size) {
251     announced_window_ -= incoming_frame_size;
252   }
253
254   grpc_error* RecvData(int64_t incoming_frame_size) override {
255     FlowControlTrace trace("  data recv", this, nullptr);
256     grpc_error* error = ValidateRecvData(incoming_frame_size);
257     if (error != GRPC_ERROR_NONE) return error;
258     CommitRecvData(incoming_frame_size);
259     return GRPC_ERROR_NONE;
260   }
261
262   // we have received a WINDOW_UPDATE frame for a transport
263   void RecvUpdate(uint32_t size) override {
264     FlowControlTrace trace("t updt recv", this, nullptr);
265     remote_window_ += size;
266   }
267
268   // See comment above announced_stream_total_over_incoming_window_ for the
269   // logic behind this decision.
270   int64_t target_window() const override {
271     return static_cast<uint32_t> GPR_MIN(
272         (int64_t)((1u << 31) - 1),
273         announced_stream_total_over_incoming_window_ +
274             target_initial_window_size_);
275   }
276
277   const grpc_chttp2_transport* transport() const { return t_; }
278
279   void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
280     if (delta > 0) {
281       announced_stream_total_over_incoming_window_ -= delta;
282     }
283   }
284
285   void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
286     if (delta > 0) {
287       announced_stream_total_over_incoming_window_ += delta;
288     }
289   }
290
291   BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
292
293   void TestOnlyForceHugeWindow() override {
294     announced_window_ = 1024 * 1024 * 1024;
295     remote_window_ = 1024 * 1024 * 1024;
296   }
297
298  private:
299   double TargetLogBdp();
300   double SmoothLogBdp(double value);
301   FlowControlAction::Urgency DeltaUrgency(int64_t value,
302                                           grpc_chttp2_setting_id setting_id);
303
304   FlowControlAction UpdateAction(FlowControlAction action) {
305     if (announced_window_ < target_window() / 2) {
306       action.set_send_transport_update(
307           FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
308     }
309     return action;
310   }
311
312   const grpc_chttp2_transport* const t_;
313
314   /** calculating what we should give for local window:
315       we track the total amount of flow control over initial window size
316       across all streams: this is data that we want to receive right now (it
317       has an outstanding read)
318       and the total amount of flow control under initial window size across all
319       streams: this is data we've read early
320       we want to adjust incoming_window such that:
321       incoming_window = total_over - max(bdp - total_under, 0) */
322   int64_t announced_stream_total_over_incoming_window_ = 0;
323
324   /** should we probe bdp? */
325   const bool enable_bdp_probe_;
326
327   /* bdp estimation */
328   grpc_core::BdpEstimator bdp_estimator_;
329
330   /* pid controller */
331   grpc_core::PidController pid_controller_;
332   grpc_millis last_pid_update_ = 0;
333 };
334
335 // Fat interface with all methods a stream flow control implementation needs
336 // to support.
337 class StreamFlowControlBase {
338  public:
339   StreamFlowControlBase() {}
340   virtual ~StreamFlowControlBase() {}
341
342   // Updates an action using the protected members.
343   virtual FlowControlAction UpdateAction(FlowControlAction /* action */) {
344     abort();
345   }
346
347   // Using the protected members, returns an Action for this stream to be
348   // taken by the tranport.
349   virtual FlowControlAction MakeAction() = 0;
350
351   // Bookkeeping for when data is sent on this stream.
352   virtual void SentData(int64_t /* outgoing_frame_size */) = 0;
353
354   // Bookkeeping and error checking for when data is received by this stream.
355   virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) = 0;
356
357   // Called to check if this stream needs to send a WINDOW_UPDATE frame.
358   virtual uint32_t MaybeSendUpdate() = 0;
359
360   // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
361   virtual void RecvUpdate(uint32_t /* size */) = 0;
362
363   // Bookkeeping for when a call pulls bytes out of the transport. At this
364   // point we consider the data 'used' and can thus let out peer know we are
365   // ready for more data.
366   virtual void IncomingByteStreamUpdate(size_t /* max_size_hint */,
367                                         size_t /* have_already */) {
368     abort();
369   }
370
371   // Used in certain benchmarks in which we don't want FlowControl to be a
372   // factor
373   virtual void TestOnlyForceHugeWindow() {}
374
375   // Getters
376   int64_t remote_window_delta() { return remote_window_delta_; }
377   int64_t local_window_delta() { return local_window_delta_; }
378   int64_t announced_window_delta() { return announced_window_delta_; }
379
380  protected:
381   friend class ::grpc::testing::TrickledCHTTP2;
382   int64_t remote_window_delta_ = 0;
383   int64_t local_window_delta_ = 0;
384   int64_t announced_window_delta_ = 0;
385 };
386
387 // Implementation of flow control that does NOTHING. Always returns maximum
388 // values, never initiates writes, and assumes that the remote peer is doing
389 // the same. To be used to narrow down on flow control as the cause of negative
390 // performance.
391 class StreamFlowControlDisabled : public StreamFlowControlBase {
392  public:
393   FlowControlAction UpdateAction(FlowControlAction action) override {
394     return action;
395   }
396   FlowControlAction MakeAction() override { return FlowControlAction(); }
397   void SentData(int64_t /* outgoing_frame_size */) override {}
398   grpc_error* RecvData(int64_t /* incoming_frame_size */) override {
399     return GRPC_ERROR_NONE;
400   }
401   uint32_t MaybeSendUpdate() override { return 0; }
402   void RecvUpdate(uint32_t /* size */) override {}
403   void IncomingByteStreamUpdate(size_t /* max_size_hint */,
404                                 size_t /* have_already */) override {}
405 };
406
407 // Implementation of flow control that abides to HTTP/2 spec and attempts
408 // to be as performant as possible.
409 class StreamFlowControl final : public StreamFlowControlBase {
410  public:
411   StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
412   ~StreamFlowControl() {
413     tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
414   }
415
416   FlowControlAction UpdateAction(FlowControlAction action) override;
417   FlowControlAction MakeAction() override {
418     return UpdateAction(tfc_->MakeAction());
419   }
420
421   // we have sent data on the wire, we must track this in our bookkeeping for
422   // the remote peer's flow control.
423   void SentData(int64_t outgoing_frame_size) override {
424     FlowControlTrace tracer("  data sent", tfc_, this);
425     tfc_->StreamSentData(outgoing_frame_size);
426     remote_window_delta_ -= outgoing_frame_size;
427   }
428
429   // we have received data from the wire
430   grpc_error* RecvData(int64_t incoming_frame_size) override;
431
432   // returns an announce if we should send a stream update to our peer, else
433   // returns zero
434   uint32_t MaybeSendUpdate() override;
435
436   // we have received a WINDOW_UPDATE frame for a stream
437   void RecvUpdate(uint32_t size) override {
438     FlowControlTrace trace("s updt recv", tfc_, this);
439     remote_window_delta_ += size;
440   }
441
442   // the application is asking for a certain amount of bytes
443   void IncomingByteStreamUpdate(size_t max_size_hint,
444                                 size_t have_already) override;
445
446   int64_t remote_window_delta() const { return remote_window_delta_; }
447   int64_t local_window_delta() const { return local_window_delta_; }
448   int64_t announced_window_delta() const { return announced_window_delta_; }
449
450   const grpc_chttp2_stream* stream() const { return s_; }
451
452   void TestOnlyForceHugeWindow() override {
453     announced_window_delta_ = 1024 * 1024 * 1024;
454     local_window_delta_ = 1024 * 1024 * 1024;
455     remote_window_delta_ = 1024 * 1024 * 1024;
456   }
457
458  private:
459   TransportFlowControl* const tfc_;
460   const grpc_chttp2_stream* const s_;
461
462   void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
463     tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
464     announced_window_delta_ += change;
465     tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
466   }
467 };
468
469 class TestOnlyTransportTargetWindowEstimatesMocker {
470  public:
471   virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {}
472   virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
473       double current_target) = 0;
474 };
475
476 extern TestOnlyTransportTargetWindowEstimatesMocker*
477     g_test_only_transport_target_window_estimates_mocker;
478
479 }  // namespace chttp2
480 }  // namespace grpc_core
481
482 #endif