61c366098e152020faa216d3b21145ca998f6539
[platform/upstream/grpc.git] / src / core / lib / iomgr / resource_quota.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/resource_quota.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdint.h>
26 #include <string.h>
27
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/iomgr/combiner.h"
35
36 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
37
38 #define MEMORY_USAGE_ESTIMATION_MAX 65536
39
40 /* Internal linked list pointers for a resource user */
41 typedef struct {
42   grpc_resource_user* next;
43   grpc_resource_user* prev;
44 } grpc_resource_user_link;
45
46 /* Resource users are kept in (potentially) several intrusive linked lists
47    at once. These are the list names. */
48 typedef enum {
49   /* Resource users that are waiting for an allocation */
50   GRPC_RULIST_AWAITING_ALLOCATION,
51   /* Resource users that have free memory available for internal reclamation */
52   GRPC_RULIST_NON_EMPTY_FREE_POOL,
53   /* Resource users that have published a benign reclamation is available */
54   GRPC_RULIST_RECLAIMER_BENIGN,
55   /* Resource users that have published a destructive reclamation is
56      available */
57   GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
58   /* Number of lists: must be last */
59   GRPC_RULIST_COUNT
60 } grpc_rulist;
61
62 struct grpc_resource_user {
63   /* The quota this resource user consumes from */
64   grpc_resource_quota* resource_quota;
65
66   /* Closure to schedule an allocation under the resource quota combiner lock */
67   grpc_closure allocate_closure;
68   /* Closure to publish a non empty free pool under the resource quota combiner
69      lock */
70   grpc_closure add_to_free_pool_closure;
71
72   /* one ref for each ref call (released by grpc_resource_user_unref), and one
73      ref for each byte allocated (released by grpc_resource_user_free) */
74   gpr_atm refs;
75   /* is this resource user unlocked? starts at 0, increases for each shutdown
76      call */
77   gpr_atm shutdown;
78
79   gpr_mu mu;
80   /* The amount of memory (in bytes) this user has cached for its own use: to
81      avoid quota contention, each resource user can keep some memory in
82      addition to what it is immediately using (e.g., for caching), and the quota
83      can pull it back under memory pressure.
84      This value can become negative if more memory has been requested than
85      existed in the free pool, at which point the quota is consulted to bring
86      this value non-negative (asynchronously). */
87   int64_t free_pool;
88   /* A list of closures to call once free_pool becomes non-negative - ie when
89      all outstanding allocations have been granted. */
90   grpc_closure_list on_allocated;
91   /* True if we are currently trying to allocate from the quota, false if not */
92   bool allocating;
93   /* The amount of memory (in bytes) that has been requested from this user
94    * asynchronously but hasn't been granted yet. */
95   int64_t outstanding_allocations;
96   /* True if we are currently trying to add ourselves to the non-free quota
97      list, false otherwise */
98   bool added_to_free_pool;
99
100   /* The number of threads currently allocated to this resource user */
101   gpr_atm num_threads_allocated;
102
103   /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
104    */
105   grpc_closure* reclaimers[2];
106   /* Reclaimers just posted: once we're in the combiner lock, we'll move them
107      to the array above */
108   grpc_closure* new_reclaimers[2];
109   /* Trampoline closures to finish reclamation and re-enter the quota combiner
110      lock */
111   grpc_closure post_reclaimer_closure[2];
112
113   /* Closure to execute under the quota combiner to de-register and shutdown the
114      resource user */
115   grpc_closure destroy_closure;
116
117   /* Links in the various grpc_rulist lists */
118   grpc_resource_user_link links[GRPC_RULIST_COUNT];
119
120   /* The name of this resource user, for debugging/tracing */
121   char* name;
122 };
123
124 struct grpc_resource_quota {
125   /* refcount */
126   gpr_refcount refs;
127
128   /* estimate of current memory usage
129      scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
130   gpr_atm memory_usage_estimation;
131
132   /* Master combiner lock: all activity on a quota executes under this combiner
133    * (so no mutex is needed for this data structure) */
134   grpc_combiner* combiner;
135   /* Size of the resource quota */
136   int64_t size;
137   /* Amount of free memory in the resource quota */
138   int64_t free_pool;
139   /* Used size of memory in the resource quota. Updated as soon as the resource
140    * users start to allocate or free the memory. */
141   gpr_atm used;
142
143   gpr_atm last_size;
144
145   /* Mutex to protect max_threads and num_threads_allocated */
146   /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
147    * and avoid having this mutex; but in that case, each invocation of the
148    * function grpc_resource_user_allocate_threads() would have had to do at
149    * least two atomic loads (for max_threads and num_threads_allocated) followed
150    * by a CAS (on num_threads_allocated).
151    * Moreover, we expect grpc_resource_user_allocate_threads() to be often
152    * called concurrently thereby increasing the chances of failing the CAS
153    * operation. This additional complexity is not worth the tiny perf gain we
154    * may (or may not) have by using atomics */
155   gpr_mu thread_count_mu;
156
157   /* Max number of threads allowed */
158   int max_threads;
159
160   /* Number of threads currently allocated via this resource_quota object */
161   int num_threads_allocated;
162
163   /* Has rq_step been scheduled to occur? */
164   bool step_scheduled;
165
166   /* Are we currently reclaiming memory */
167   bool reclaiming;
168
169   /* Closure around rq_step */
170   grpc_closure rq_step_closure;
171
172   /* Closure around rq_reclamation_done */
173   grpc_closure rq_reclamation_done_closure;
174
175   /* This is only really usable for debugging: it's always a stale pointer, but
176      a stale pointer that might just be fresh enough to guide us to where the
177      reclamation system is stuck */
178   grpc_closure* debug_only_last_initiated_reclaimer;
179   grpc_resource_user* debug_only_last_reclaimer_resource_user;
180
181   /* Roots of all resource user lists */
182   grpc_resource_user* roots[GRPC_RULIST_COUNT];
183
184   char* name;
185 };
186
187 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
188
189 /*******************************************************************************
190  * list management
191  */
192
193 static void rulist_add_head(grpc_resource_user* resource_user,
194                             grpc_rulist list) {
195   grpc_resource_quota* resource_quota = resource_user->resource_quota;
196   grpc_resource_user** root = &resource_quota->roots[list];
197   if (*root == nullptr) {
198     *root = resource_user;
199     resource_user->links[list].next = resource_user->links[list].prev =
200         resource_user;
201   } else {
202     resource_user->links[list].next = *root;
203     resource_user->links[list].prev = (*root)->links[list].prev;
204     resource_user->links[list].next->links[list].prev =
205         resource_user->links[list].prev->links[list].next = resource_user;
206     *root = resource_user;
207   }
208 }
209
210 static void rulist_add_tail(grpc_resource_user* resource_user,
211                             grpc_rulist list) {
212   grpc_resource_quota* resource_quota = resource_user->resource_quota;
213   grpc_resource_user** root = &resource_quota->roots[list];
214   if (*root == nullptr) {
215     *root = resource_user;
216     resource_user->links[list].next = resource_user->links[list].prev =
217         resource_user;
218   } else {
219     resource_user->links[list].next = (*root)->links[list].next;
220     resource_user->links[list].prev = *root;
221     resource_user->links[list].next->links[list].prev =
222         resource_user->links[list].prev->links[list].next = resource_user;
223   }
224 }
225
226 static bool rulist_empty(grpc_resource_quota* resource_quota,
227                          grpc_rulist list) {
228   return resource_quota->roots[list] == nullptr;
229 }
230
231 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
232                                            grpc_rulist list) {
233   grpc_resource_user** root = &resource_quota->roots[list];
234   grpc_resource_user* resource_user = *root;
235   if (resource_user == nullptr) {
236     return nullptr;
237   }
238   if (resource_user->links[list].next == resource_user) {
239     *root = nullptr;
240   } else {
241     resource_user->links[list].next->links[list].prev =
242         resource_user->links[list].prev;
243     resource_user->links[list].prev->links[list].next =
244         resource_user->links[list].next;
245     *root = resource_user->links[list].next;
246   }
247   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
248   return resource_user;
249 }
250
251 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
252   if (resource_user->links[list].next == nullptr) return;
253   grpc_resource_quota* resource_quota = resource_user->resource_quota;
254   if (resource_quota->roots[list] == resource_user) {
255     resource_quota->roots[list] = resource_user->links[list].next;
256     if (resource_quota->roots[list] == resource_user) {
257       resource_quota->roots[list] = nullptr;
258     }
259   }
260   resource_user->links[list].next->links[list].prev =
261       resource_user->links[list].prev;
262   resource_user->links[list].prev->links[list].next =
263       resource_user->links[list].next;
264   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
265 }
266
267 /*******************************************************************************
268  * resource quota state machine
269  */
270
271 static bool rq_alloc(grpc_resource_quota* resource_quota);
272 static bool rq_reclaim_from_per_user_free_pool(
273     grpc_resource_quota* resource_quota);
274 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
275
276 static void rq_step(void* rq, grpc_error* error) {
277   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
278   resource_quota->step_scheduled = false;
279   do {
280     if (rq_alloc(resource_quota)) goto done;
281   } while (rq_reclaim_from_per_user_free_pool(resource_quota));
282
283   if (!rq_reclaim(resource_quota, false)) {
284     rq_reclaim(resource_quota, true);
285   }
286
287 done:
288   grpc_resource_quota_unref_internal(resource_quota);
289 }
290
291 static void rq_step_sched(grpc_resource_quota* resource_quota) {
292   if (resource_quota->step_scheduled) return;
293   resource_quota->step_scheduled = true;
294   grpc_resource_quota_ref_internal(resource_quota);
295   GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
296 }
297
298 /* update the atomically available resource estimate - use no barriers since
299    timeliness of delivery really doesn't matter much */
300 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
301   gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
302   if (resource_quota->size != 0) {
303     memory_usage_estimation =
304         GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
305                                        ((double)resource_quota->size)) *
306                             MEMORY_USAGE_ESTIMATION_MAX),
307                   0, MEMORY_USAGE_ESTIMATION_MAX);
308   }
309   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
310                            memory_usage_estimation);
311 }
312
313 /* returns true if all allocations are completed */
314 static bool rq_alloc(grpc_resource_quota* resource_quota) {
315   grpc_resource_user* resource_user;
316   while ((resource_user = rulist_pop_head(resource_quota,
317                                           GRPC_RULIST_AWAITING_ALLOCATION))) {
318     gpr_mu_lock(&resource_user->mu);
319     if (grpc_resource_quota_trace.enabled()) {
320       gpr_log(GPR_INFO,
321               "RQ: check allocation for user %p shutdown=%" PRIdPTR
322               " free_pool=%" PRId64,
323               resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
324               resource_user->free_pool);
325     }
326     if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
327       resource_user->allocating = false;
328       grpc_closure_list_fail_all(
329           &resource_user->on_allocated,
330           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
331       int64_t aborted_allocations = resource_user->outstanding_allocations;
332       resource_user->outstanding_allocations = 0;
333       resource_user->free_pool += aborted_allocations;
334       GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
335       gpr_mu_unlock(&resource_user->mu);
336       ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
337       continue;
338     }
339     if (resource_user->free_pool < 0 &&
340         -resource_user->free_pool <= resource_quota->free_pool) {
341       int64_t amt = -resource_user->free_pool;
342       resource_user->free_pool = 0;
343       resource_quota->free_pool -= amt;
344       rq_update_estimate(resource_quota);
345       if (grpc_resource_quota_trace.enabled()) {
346         gpr_log(GPR_INFO,
347                 "RQ %s %s: grant alloc %" PRId64
348                 " bytes; rq_free_pool -> %" PRId64,
349                 resource_quota->name, resource_user->name, amt,
350                 resource_quota->free_pool);
351       }
352     } else if (grpc_resource_quota_trace.enabled() &&
353                resource_user->free_pool >= 0) {
354       gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
355               resource_quota->name, resource_user->name);
356     }
357     if (resource_user->free_pool >= 0) {
358       resource_user->allocating = false;
359       resource_user->outstanding_allocations = 0;
360       GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
361       gpr_mu_unlock(&resource_user->mu);
362     } else {
363       rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
364       gpr_mu_unlock(&resource_user->mu);
365       return false;
366     }
367   }
368   return true;
369 }
370
371 /* returns true if any memory could be reclaimed from buffers */
372 static bool rq_reclaim_from_per_user_free_pool(
373     grpc_resource_quota* resource_quota) {
374   grpc_resource_user* resource_user;
375   while ((resource_user = rulist_pop_head(resource_quota,
376                                           GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
377     gpr_mu_lock(&resource_user->mu);
378     resource_user->added_to_free_pool = false;
379     if (resource_user->free_pool > 0) {
380       int64_t amt = resource_user->free_pool;
381       resource_user->free_pool = 0;
382       resource_quota->free_pool += amt;
383       rq_update_estimate(resource_quota);
384       if (grpc_resource_quota_trace.enabled()) {
385         gpr_log(GPR_INFO,
386                 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
387                 " bytes; rq_free_pool -> %" PRId64,
388                 resource_quota->name, resource_user->name, amt,
389                 resource_quota->free_pool);
390       }
391       gpr_mu_unlock(&resource_user->mu);
392       return true;
393     } else {
394       if (grpc_resource_quota_trace.enabled()) {
395         gpr_log(GPR_INFO,
396                 "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
397                 "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
398                 resource_quota->name, resource_user->name,
399                 resource_user->free_pool, resource_quota->free_pool);
400       }
401       gpr_mu_unlock(&resource_user->mu);
402     }
403   }
404   return false;
405 }
406
407 /* returns true if reclamation is proceeding */
408 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
409   if (resource_quota->reclaiming) return true;
410   grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
411                                  : GRPC_RULIST_RECLAIMER_BENIGN;
412   grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
413   if (resource_user == nullptr) return false;
414   if (grpc_resource_quota_trace.enabled()) {
415     gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation", resource_quota->name,
416             resource_user->name, destructive ? "destructive" : "benign");
417   }
418   resource_quota->reclaiming = true;
419   grpc_resource_quota_ref_internal(resource_quota);
420   grpc_closure* c = resource_user->reclaimers[destructive];
421   GPR_ASSERT(c);
422   resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
423   resource_quota->debug_only_last_initiated_reclaimer = c;
424   resource_user->reclaimers[destructive] = nullptr;
425   GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
426   return true;
427 }
428
429 /*******************************************************************************
430  * ru_slice: a slice implementation that is backed by a grpc_resource_user
431  */
432
433 typedef struct {
434   grpc_slice_refcount base;
435   gpr_refcount refs;
436   grpc_resource_user* resource_user;
437   size_t size;
438 } ru_slice_refcount;
439
440 static void ru_slice_ref(void* p) {
441   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
442   gpr_ref(&rc->refs);
443 }
444
445 static void ru_slice_unref(void* p) {
446   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
447   if (gpr_unref(&rc->refs)) {
448     grpc_resource_user_free(rc->resource_user, rc->size);
449     gpr_free(rc);
450   }
451 }
452
453 static const grpc_slice_refcount_vtable ru_slice_vtable = {
454     ru_slice_ref, ru_slice_unref, grpc_slice_default_eq_impl,
455     grpc_slice_default_hash_impl};
456
457 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
458                                   size_t size) {
459   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(
460       gpr_malloc(sizeof(ru_slice_refcount) + size));
461   rc->base.vtable = &ru_slice_vtable;
462   rc->base.sub_refcount = &rc->base;
463   gpr_ref_init(&rc->refs, 1);
464   rc->resource_user = resource_user;
465   rc->size = size;
466   grpc_slice slice;
467   slice.refcount = &rc->base;
468   slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
469   slice.data.refcounted.length = size;
470   return slice;
471 }
472
473 /*******************************************************************************
474  * grpc_resource_quota internal implementation: resource user manipulation under
475  * the combiner
476  */
477
478 static void ru_allocate(void* ru, grpc_error* error) {
479   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
480   if (rulist_empty(resource_user->resource_quota,
481                    GRPC_RULIST_AWAITING_ALLOCATION)) {
482     rq_step_sched(resource_user->resource_quota);
483   }
484   rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
485 }
486
487 static void ru_add_to_free_pool(void* ru, grpc_error* error) {
488   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
489   if (!rulist_empty(resource_user->resource_quota,
490                     GRPC_RULIST_AWAITING_ALLOCATION) &&
491       rulist_empty(resource_user->resource_quota,
492                    GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
493     rq_step_sched(resource_user->resource_quota);
494   }
495   rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
496 }
497
498 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
499                               bool destructive) {
500   grpc_closure* closure = resource_user->new_reclaimers[destructive];
501   GPR_ASSERT(closure != nullptr);
502   resource_user->new_reclaimers[destructive] = nullptr;
503   GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
504   if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
505     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
506     return false;
507   }
508   resource_user->reclaimers[destructive] = closure;
509   return true;
510 }
511
512 static void ru_post_benign_reclaimer(void* ru, grpc_error* error) {
513   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
514   if (!ru_post_reclaimer(resource_user, false)) return;
515   if (!rulist_empty(resource_user->resource_quota,
516                     GRPC_RULIST_AWAITING_ALLOCATION) &&
517       rulist_empty(resource_user->resource_quota,
518                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
519       rulist_empty(resource_user->resource_quota,
520                    GRPC_RULIST_RECLAIMER_BENIGN)) {
521     rq_step_sched(resource_user->resource_quota);
522   }
523   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
524 }
525
526 static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) {
527   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
528   if (!ru_post_reclaimer(resource_user, true)) return;
529   if (!rulist_empty(resource_user->resource_quota,
530                     GRPC_RULIST_AWAITING_ALLOCATION) &&
531       rulist_empty(resource_user->resource_quota,
532                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
533       rulist_empty(resource_user->resource_quota,
534                    GRPC_RULIST_RECLAIMER_BENIGN) &&
535       rulist_empty(resource_user->resource_quota,
536                    GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
537     rq_step_sched(resource_user->resource_quota);
538   }
539   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
540 }
541
542 static void ru_shutdown(void* ru, grpc_error* error) {
543   if (grpc_resource_quota_trace.enabled()) {
544     gpr_log(GPR_INFO, "RU shutdown %p", ru);
545   }
546   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
547   gpr_mu_lock(&resource_user->mu);
548   GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
549   GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
550   resource_user->reclaimers[0] = nullptr;
551   resource_user->reclaimers[1] = nullptr;
552   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
553   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
554   if (resource_user->allocating) {
555     rq_step_sched(resource_user->resource_quota);
556   }
557   gpr_mu_unlock(&resource_user->mu);
558 }
559
560 static void ru_destroy(void* ru, grpc_error* error) {
561   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
562   GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
563   // Free all the remaining thread quota
564   grpc_resource_user_free_threads(resource_user,
565                                   static_cast<int>(gpr_atm_no_barrier_load(
566                                       &resource_user->num_threads_allocated)));
567
568   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
569     rulist_remove(resource_user, static_cast<grpc_rulist>(i));
570   }
571   GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
572   GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
573   if (resource_user->free_pool != 0) {
574     resource_user->resource_quota->free_pool += resource_user->free_pool;
575     rq_step_sched(resource_user->resource_quota);
576   }
577   grpc_resource_quota_unref_internal(resource_user->resource_quota);
578   gpr_mu_destroy(&resource_user->mu);
579   gpr_free(resource_user->name);
580   gpr_free(resource_user);
581 }
582
583 static void ru_allocated_slices(void* arg, grpc_error* error) {
584   grpc_resource_user_slice_allocator* slice_allocator =
585       static_cast<grpc_resource_user_slice_allocator*>(arg);
586   if (error == GRPC_ERROR_NONE) {
587     for (size_t i = 0; i < slice_allocator->count; i++) {
588       grpc_slice_buffer_add_indexed(
589           slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
590                                                  slice_allocator->length));
591     }
592   }
593   GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
594 }
595
596 /*******************************************************************************
597  * grpc_resource_quota internal implementation: quota manipulation under the
598  * combiner
599  */
600
601 typedef struct {
602   int64_t size;
603   grpc_resource_quota* resource_quota;
604   grpc_closure closure;
605 } rq_resize_args;
606
607 static void rq_resize(void* args, grpc_error* error) {
608   rq_resize_args* a = static_cast<rq_resize_args*>(args);
609   int64_t delta = a->size - a->resource_quota->size;
610   a->resource_quota->size += delta;
611   a->resource_quota->free_pool += delta;
612   rq_update_estimate(a->resource_quota);
613   rq_step_sched(a->resource_quota);
614   grpc_resource_quota_unref_internal(a->resource_quota);
615   gpr_free(a);
616 }
617
618 static void rq_reclamation_done(void* rq, grpc_error* error) {
619   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
620   resource_quota->reclaiming = false;
621   rq_step_sched(resource_quota);
622   grpc_resource_quota_unref_internal(resource_quota);
623 }
624
625 /*******************************************************************************
626  * grpc_resource_quota api
627  */
628
629 /* Public API */
630 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
631   grpc_resource_quota* resource_quota =
632       static_cast<grpc_resource_quota*>(gpr_malloc(sizeof(*resource_quota)));
633   gpr_ref_init(&resource_quota->refs, 1);
634   resource_quota->combiner = grpc_combiner_create();
635   resource_quota->free_pool = INT64_MAX;
636   resource_quota->size = INT64_MAX;
637   resource_quota->used = 0;
638   gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
639   gpr_mu_init(&resource_quota->thread_count_mu);
640   resource_quota->max_threads = INT_MAX;
641   resource_quota->num_threads_allocated = 0;
642   resource_quota->step_scheduled = false;
643   resource_quota->reclaiming = false;
644   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
645   if (name != nullptr) {
646     resource_quota->name = gpr_strdup(name);
647   } else {
648     gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
649                  (intptr_t)resource_quota);
650   }
651   GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
652                     grpc_combiner_finally_scheduler(resource_quota->combiner));
653   GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
654                     rq_reclamation_done, resource_quota,
655                     grpc_combiner_scheduler(resource_quota->combiner));
656   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
657     resource_quota->roots[i] = nullptr;
658   }
659   return resource_quota;
660 }
661
662 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
663   if (gpr_unref(&resource_quota->refs)) {
664     // No outstanding thread quota
665     GPR_ASSERT(resource_quota->num_threads_allocated == 0);
666     GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
667     gpr_free(resource_quota->name);
668     gpr_mu_destroy(&resource_quota->thread_count_mu);
669     gpr_free(resource_quota);
670   }
671 }
672
673 /* Public API */
674 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
675   grpc_core::ExecCtx exec_ctx;
676   grpc_resource_quota_unref_internal(resource_quota);
677 }
678
679 grpc_resource_quota* grpc_resource_quota_ref_internal(
680     grpc_resource_quota* resource_quota) {
681   gpr_ref(&resource_quota->refs);
682   return resource_quota;
683 }
684
685 /* Public API */
686 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
687   grpc_resource_quota_ref_internal(resource_quota);
688 }
689
690 double grpc_resource_quota_get_memory_pressure(
691     grpc_resource_quota* resource_quota) {
692   return (static_cast<double>(gpr_atm_no_barrier_load(
693              &resource_quota->memory_usage_estimation))) /
694          (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
695 }
696
697 /* Public API */
698 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
699                                          int new_max_threads) {
700   GPR_ASSERT(new_max_threads >= 0);
701   gpr_mu_lock(&resource_quota->thread_count_mu);
702   resource_quota->max_threads = new_max_threads;
703   gpr_mu_unlock(&resource_quota->thread_count_mu);
704 }
705
706 /* Public API */
707 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
708                                 size_t size) {
709   grpc_core::ExecCtx exec_ctx;
710   rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
711   a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
712   a->size = static_cast<int64_t>(size);
713   gpr_atm_no_barrier_store(&resource_quota->last_size,
714                            (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
715   GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
716   GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
717 }
718
719 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
720   return static_cast<size_t>(
721       gpr_atm_no_barrier_load(&resource_quota->last_size));
722 }
723
724 /*******************************************************************************
725  * grpc_resource_user channel args api
726  */
727
728 grpc_resource_quota* grpc_resource_quota_from_channel_args(
729     const grpc_channel_args* channel_args, bool create) {
730   for (size_t i = 0; i < channel_args->num_args; i++) {
731     if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
732       if (channel_args->args[i].type == GRPC_ARG_POINTER) {
733         return grpc_resource_quota_ref_internal(
734             static_cast<grpc_resource_quota*>(
735                 channel_args->args[i].value.pointer.p));
736       } else {
737         gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
738       }
739     }
740   }
741   return create ? grpc_resource_quota_create(nullptr) : nullptr;
742 }
743
744 static void* rq_copy(void* rq) {
745   grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
746   return rq;
747 }
748
749 static void rq_destroy(void* rq) {
750   grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
751 }
752
753 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
754
755 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
756   static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
757   return &vtable;
758 }
759
760 /*******************************************************************************
761  * grpc_resource_user api
762  */
763
764 grpc_resource_user* grpc_resource_user_create(
765     grpc_resource_quota* resource_quota, const char* name) {
766   grpc_resource_user* resource_user =
767       static_cast<grpc_resource_user*>(gpr_malloc(sizeof(*resource_user)));
768   resource_user->resource_quota =
769       grpc_resource_quota_ref_internal(resource_quota);
770   GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
771                     resource_user,
772                     grpc_combiner_scheduler(resource_quota->combiner));
773   GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
774                     &ru_add_to_free_pool, resource_user,
775                     grpc_combiner_scheduler(resource_quota->combiner));
776   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
777                     &ru_post_benign_reclaimer, resource_user,
778                     grpc_combiner_scheduler(resource_quota->combiner));
779   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
780                     &ru_post_destructive_reclaimer, resource_user,
781                     grpc_combiner_scheduler(resource_quota->combiner));
782   GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
783                     grpc_combiner_scheduler(resource_quota->combiner));
784   gpr_mu_init(&resource_user->mu);
785   gpr_atm_rel_store(&resource_user->refs, 1);
786   gpr_atm_rel_store(&resource_user->shutdown, 0);
787   resource_user->free_pool = 0;
788   grpc_closure_list_init(&resource_user->on_allocated);
789   resource_user->allocating = false;
790   resource_user->added_to_free_pool = false;
791   gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
792   resource_user->reclaimers[0] = nullptr;
793   resource_user->reclaimers[1] = nullptr;
794   resource_user->new_reclaimers[0] = nullptr;
795   resource_user->new_reclaimers[1] = nullptr;
796   resource_user->outstanding_allocations = 0;
797   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
798     resource_user->links[i].next = resource_user->links[i].prev = nullptr;
799   }
800   if (name != nullptr) {
801     resource_user->name = gpr_strdup(name);
802   } else {
803     gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
804                  (intptr_t)resource_user);
805   }
806   return resource_user;
807 }
808
809 grpc_resource_quota* grpc_resource_user_quota(
810     grpc_resource_user* resource_user) {
811   return resource_user->resource_quota;
812 }
813
814 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
815   GPR_ASSERT(amount > 0);
816   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
817 }
818
819 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
820   GPR_ASSERT(amount > 0);
821   gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
822   GPR_ASSERT(old >= amount);
823   if (old == amount) {
824     GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
825   }
826 }
827
828 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
829   ru_ref_by(resource_user, 1);
830 }
831
832 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
833   ru_unref_by(resource_user, 1);
834 }
835
836 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
837   if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
838     GRPC_CLOSURE_SCHED(
839         GRPC_CLOSURE_CREATE(
840             ru_shutdown, resource_user,
841             grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
842         GRPC_ERROR_NONE);
843   }
844 }
845
846 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
847                                          int thread_count) {
848   GPR_ASSERT(thread_count >= 0);
849   bool is_success = false;
850   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
851   grpc_resource_quota* rq = resource_user->resource_quota;
852   if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
853     rq->num_threads_allocated += thread_count;
854     gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
855                                  thread_count);
856     is_success = true;
857   }
858   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
859   return is_success;
860 }
861
862 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
863                                      int thread_count) {
864   GPR_ASSERT(thread_count >= 0);
865   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
866   grpc_resource_quota* rq = resource_user->resource_quota;
867   rq->num_threads_allocated -= thread_count;
868   int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
869       &resource_user->num_threads_allocated, -thread_count));
870   if (old_count < thread_count || rq->num_threads_allocated < 0) {
871     gpr_log(GPR_ERROR,
872             "Releasing more threads (%d) than currently allocated (rq threads: "
873             "%d, ru threads: %d)",
874             thread_count, rq->num_threads_allocated + thread_count, old_count);
875     abort();
876   }
877   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
878 }
879
880 static void resource_user_alloc_locked(grpc_resource_user* resource_user,
881                                        size_t size,
882                                        grpc_closure* optional_on_done) {
883   ru_ref_by(resource_user, static_cast<gpr_atm>(size));
884   resource_user->free_pool -= static_cast<int64_t>(size);
885   if (grpc_resource_quota_trace.enabled()) {
886     gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
887             resource_user->resource_quota->name, resource_user->name, size,
888             resource_user->free_pool);
889   }
890   if (resource_user->free_pool < 0) {
891     if (optional_on_done != nullptr) {
892       resource_user->outstanding_allocations += static_cast<int64_t>(size);
893       grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
894                                GRPC_ERROR_NONE);
895     }
896     if (!resource_user->allocating) {
897       resource_user->allocating = true;
898       GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
899     }
900   } else {
901     GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
902   }
903 }
904
905 bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
906                                    size_t size) {
907   if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
908   gpr_mu_lock(&resource_user->mu);
909   grpc_resource_quota* resource_quota = resource_user->resource_quota;
910   bool cas_success;
911   do {
912     gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
913     gpr_atm new_used = used + size;
914     if (static_cast<size_t>(new_used) >
915         grpc_resource_quota_peek_size(resource_quota)) {
916       gpr_mu_unlock(&resource_user->mu);
917       return false;
918     }
919     cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
920   } while (!cas_success);
921   resource_user_alloc_locked(resource_user, size, nullptr);
922   gpr_mu_unlock(&resource_user->mu);
923   return true;
924 }
925
926 void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
927                               grpc_closure* optional_on_done) {
928   // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
929   // because some tests become flaky after the change.
930   gpr_mu_lock(&resource_user->mu);
931   grpc_resource_quota* resource_quota = resource_user->resource_quota;
932   gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
933   resource_user_alloc_locked(resource_user, size, optional_on_done);
934   gpr_mu_unlock(&resource_user->mu);
935 }
936
937 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
938   gpr_mu_lock(&resource_user->mu);
939   grpc_resource_quota* resource_quota = resource_user->resource_quota;
940   gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
941   GPR_ASSERT(prior >= static_cast<long>(size));
942   bool was_zero_or_negative = resource_user->free_pool <= 0;
943   resource_user->free_pool += static_cast<int64_t>(size);
944   if (grpc_resource_quota_trace.enabled()) {
945     gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
946             resource_user->resource_quota->name, resource_user->name, size,
947             resource_user->free_pool);
948   }
949   bool is_bigger_than_zero = resource_user->free_pool > 0;
950   if (is_bigger_than_zero && was_zero_or_negative &&
951       !resource_user->added_to_free_pool) {
952     resource_user->added_to_free_pool = true;
953     GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
954                        GRPC_ERROR_NONE);
955   }
956   gpr_mu_unlock(&resource_user->mu);
957   ru_unref_by(resource_user, static_cast<gpr_atm>(size));
958 }
959
960 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
961                                        bool destructive,
962                                        grpc_closure* closure) {
963   GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
964   resource_user->new_reclaimers[destructive] = closure;
965   GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
966                      GRPC_ERROR_NONE);
967 }
968
969 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
970   if (grpc_resource_quota_trace.enabled()) {
971     gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
972             resource_user->resource_quota->name, resource_user->name);
973   }
974   GRPC_CLOSURE_SCHED(
975       &resource_user->resource_quota->rq_reclamation_done_closure,
976       GRPC_ERROR_NONE);
977 }
978
979 void grpc_resource_user_slice_allocator_init(
980     grpc_resource_user_slice_allocator* slice_allocator,
981     grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
982   GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
983                     slice_allocator, grpc_schedule_on_exec_ctx);
984   GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
985                     grpc_schedule_on_exec_ctx);
986   slice_allocator->resource_user = resource_user;
987 }
988
989 void grpc_resource_user_alloc_slices(
990     grpc_resource_user_slice_allocator* slice_allocator, size_t length,
991     size_t count, grpc_slice_buffer* dest) {
992   if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) {
993     GRPC_CLOSURE_SCHED(
994         &slice_allocator->on_allocated,
995         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
996     return;
997   }
998   slice_allocator->length = length;
999   slice_allocator->count = count;
1000   slice_allocator->dest = dest;
1001   grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
1002                            &slice_allocator->on_allocated);
1003 }