3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/resource_quota.h"
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/iomgr/combiner.h"
36 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
38 #define MEMORY_USAGE_ESTIMATION_MAX 65536
40 /* Internal linked list pointers for a resource user */
42 grpc_resource_user* next;
43 grpc_resource_user* prev;
44 } grpc_resource_user_link;
46 /* Resource users are kept in (potentially) several intrusive linked lists
47 at once. These are the list names. */
49 /* Resource users that are waiting for an allocation */
50 GRPC_RULIST_AWAITING_ALLOCATION,
51 /* Resource users that have free memory available for internal reclamation */
52 GRPC_RULIST_NON_EMPTY_FREE_POOL,
53 /* Resource users that have published a benign reclamation is available */
54 GRPC_RULIST_RECLAIMER_BENIGN,
55 /* Resource users that have published a destructive reclamation is
57 GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
58 /* Number of lists: must be last */
62 struct grpc_resource_user {
63 /* The quota this resource user consumes from */
64 grpc_resource_quota* resource_quota;
66 /* Closure to schedule an allocation under the resource quota combiner lock */
67 grpc_closure allocate_closure;
68 /* Closure to publish a non empty free pool under the resource quota combiner
70 grpc_closure add_to_free_pool_closure;
72 /* one ref for each ref call (released by grpc_resource_user_unref), and one
73 ref for each byte allocated (released by grpc_resource_user_free) */
75 /* is this resource user unlocked? starts at 0, increases for each shutdown
80 /* The amount of memory (in bytes) this user has cached for its own use: to
81 avoid quota contention, each resource user can keep some memory in
82 addition to what it is immediately using (e.g., for caching), and the quota
83 can pull it back under memory pressure.
84 This value can become negative if more memory has been requested than
85 existed in the free pool, at which point the quota is consulted to bring
86 this value non-negative (asynchronously). */
88 /* A list of closures to call once free_pool becomes non-negative - ie when
89 all outstanding allocations have been granted. */
90 grpc_closure_list on_allocated;
91 /* True if we are currently trying to allocate from the quota, false if not */
93 /* The amount of memory (in bytes) that has been requested from this user
94 * asynchronously but hasn't been granted yet. */
95 int64_t outstanding_allocations;
96 /* True if we are currently trying to add ourselves to the non-free quota
97 list, false otherwise */
98 bool added_to_free_pool;
100 /* The number of threads currently allocated to this resource user */
101 gpr_atm num_threads_allocated;
103 /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
105 grpc_closure* reclaimers[2];
106 /* Reclaimers just posted: once we're in the combiner lock, we'll move them
107 to the array above */
108 grpc_closure* new_reclaimers[2];
109 /* Trampoline closures to finish reclamation and re-enter the quota combiner
111 grpc_closure post_reclaimer_closure[2];
113 /* Closure to execute under the quota combiner to de-register and shutdown the
115 grpc_closure destroy_closure;
117 /* Links in the various grpc_rulist lists */
118 grpc_resource_user_link links[GRPC_RULIST_COUNT];
120 /* The name of this resource user, for debugging/tracing */
124 struct grpc_resource_quota {
128 /* estimate of current memory usage
129 scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
130 gpr_atm memory_usage_estimation;
132 /* Master combiner lock: all activity on a quota executes under this combiner
133 * (so no mutex is needed for this data structure) */
134 grpc_combiner* combiner;
135 /* Size of the resource quota */
137 /* Amount of free memory in the resource quota */
139 /* Used size of memory in the resource quota. Updated as soon as the resource
140 * users start to allocate or free the memory. */
145 /* Mutex to protect max_threads and num_threads_allocated */
146 /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
147 * and avoid having this mutex; but in that case, each invocation of the
148 * function grpc_resource_user_allocate_threads() would have had to do at
149 * least two atomic loads (for max_threads and num_threads_allocated) followed
150 * by a CAS (on num_threads_allocated).
151 * Moreover, we expect grpc_resource_user_allocate_threads() to be often
152 * called concurrently thereby increasing the chances of failing the CAS
153 * operation. This additional complexity is not worth the tiny perf gain we
154 * may (or may not) have by using atomics */
155 gpr_mu thread_count_mu;
157 /* Max number of threads allowed */
160 /* Number of threads currently allocated via this resource_quota object */
161 int num_threads_allocated;
163 /* Has rq_step been scheduled to occur? */
166 /* Are we currently reclaiming memory */
169 /* Closure around rq_step */
170 grpc_closure rq_step_closure;
172 /* Closure around rq_reclamation_done */
173 grpc_closure rq_reclamation_done_closure;
175 /* This is only really usable for debugging: it's always a stale pointer, but
176 a stale pointer that might just be fresh enough to guide us to where the
177 reclamation system is stuck */
178 grpc_closure* debug_only_last_initiated_reclaimer;
179 grpc_resource_user* debug_only_last_reclaimer_resource_user;
181 /* Roots of all resource user lists */
182 grpc_resource_user* roots[GRPC_RULIST_COUNT];
187 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
189 /*******************************************************************************
193 static void rulist_add_head(grpc_resource_user* resource_user,
195 grpc_resource_quota* resource_quota = resource_user->resource_quota;
196 grpc_resource_user** root = &resource_quota->roots[list];
197 if (*root == nullptr) {
198 *root = resource_user;
199 resource_user->links[list].next = resource_user->links[list].prev =
202 resource_user->links[list].next = *root;
203 resource_user->links[list].prev = (*root)->links[list].prev;
204 resource_user->links[list].next->links[list].prev =
205 resource_user->links[list].prev->links[list].next = resource_user;
206 *root = resource_user;
210 static void rulist_add_tail(grpc_resource_user* resource_user,
212 grpc_resource_quota* resource_quota = resource_user->resource_quota;
213 grpc_resource_user** root = &resource_quota->roots[list];
214 if (*root == nullptr) {
215 *root = resource_user;
216 resource_user->links[list].next = resource_user->links[list].prev =
219 resource_user->links[list].next = (*root)->links[list].next;
220 resource_user->links[list].prev = *root;
221 resource_user->links[list].next->links[list].prev =
222 resource_user->links[list].prev->links[list].next = resource_user;
226 static bool rulist_empty(grpc_resource_quota* resource_quota,
228 return resource_quota->roots[list] == nullptr;
231 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
233 grpc_resource_user** root = &resource_quota->roots[list];
234 grpc_resource_user* resource_user = *root;
235 if (resource_user == nullptr) {
238 if (resource_user->links[list].next == resource_user) {
241 resource_user->links[list].next->links[list].prev =
242 resource_user->links[list].prev;
243 resource_user->links[list].prev->links[list].next =
244 resource_user->links[list].next;
245 *root = resource_user->links[list].next;
247 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
248 return resource_user;
251 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
252 if (resource_user->links[list].next == nullptr) return;
253 grpc_resource_quota* resource_quota = resource_user->resource_quota;
254 if (resource_quota->roots[list] == resource_user) {
255 resource_quota->roots[list] = resource_user->links[list].next;
256 if (resource_quota->roots[list] == resource_user) {
257 resource_quota->roots[list] = nullptr;
260 resource_user->links[list].next->links[list].prev =
261 resource_user->links[list].prev;
262 resource_user->links[list].prev->links[list].next =
263 resource_user->links[list].next;
264 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
267 /*******************************************************************************
268 * resource quota state machine
271 static bool rq_alloc(grpc_resource_quota* resource_quota);
272 static bool rq_reclaim_from_per_user_free_pool(
273 grpc_resource_quota* resource_quota);
274 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
276 static void rq_step(void* rq, grpc_error* error) {
277 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
278 resource_quota->step_scheduled = false;
280 if (rq_alloc(resource_quota)) goto done;
281 } while (rq_reclaim_from_per_user_free_pool(resource_quota));
283 if (!rq_reclaim(resource_quota, false)) {
284 rq_reclaim(resource_quota, true);
288 grpc_resource_quota_unref_internal(resource_quota);
291 static void rq_step_sched(grpc_resource_quota* resource_quota) {
292 if (resource_quota->step_scheduled) return;
293 resource_quota->step_scheduled = true;
294 grpc_resource_quota_ref_internal(resource_quota);
295 GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
298 /* update the atomically available resource estimate - use no barriers since
299 timeliness of delivery really doesn't matter much */
300 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
301 gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
302 if (resource_quota->size != 0) {
303 memory_usage_estimation =
304 GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
305 ((double)resource_quota->size)) *
306 MEMORY_USAGE_ESTIMATION_MAX),
307 0, MEMORY_USAGE_ESTIMATION_MAX);
309 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
310 memory_usage_estimation);
313 /* returns true if all allocations are completed */
314 static bool rq_alloc(grpc_resource_quota* resource_quota) {
315 grpc_resource_user* resource_user;
316 while ((resource_user = rulist_pop_head(resource_quota,
317 GRPC_RULIST_AWAITING_ALLOCATION))) {
318 gpr_mu_lock(&resource_user->mu);
319 if (grpc_resource_quota_trace.enabled()) {
321 "RQ: check allocation for user %p shutdown=%" PRIdPTR
322 " free_pool=%" PRId64,
323 resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
324 resource_user->free_pool);
326 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
327 resource_user->allocating = false;
328 grpc_closure_list_fail_all(
329 &resource_user->on_allocated,
330 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
331 int64_t aborted_allocations = resource_user->outstanding_allocations;
332 resource_user->outstanding_allocations = 0;
333 resource_user->free_pool += aborted_allocations;
334 GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
335 gpr_mu_unlock(&resource_user->mu);
336 ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
339 if (resource_user->free_pool < 0 &&
340 -resource_user->free_pool <= resource_quota->free_pool) {
341 int64_t amt = -resource_user->free_pool;
342 resource_user->free_pool = 0;
343 resource_quota->free_pool -= amt;
344 rq_update_estimate(resource_quota);
345 if (grpc_resource_quota_trace.enabled()) {
347 "RQ %s %s: grant alloc %" PRId64
348 " bytes; rq_free_pool -> %" PRId64,
349 resource_quota->name, resource_user->name, amt,
350 resource_quota->free_pool);
352 } else if (grpc_resource_quota_trace.enabled() &&
353 resource_user->free_pool >= 0) {
354 gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
355 resource_quota->name, resource_user->name);
357 if (resource_user->free_pool >= 0) {
358 resource_user->allocating = false;
359 resource_user->outstanding_allocations = 0;
360 GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
361 gpr_mu_unlock(&resource_user->mu);
363 rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
364 gpr_mu_unlock(&resource_user->mu);
371 /* returns true if any memory could be reclaimed from buffers */
372 static bool rq_reclaim_from_per_user_free_pool(
373 grpc_resource_quota* resource_quota) {
374 grpc_resource_user* resource_user;
375 while ((resource_user = rulist_pop_head(resource_quota,
376 GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
377 gpr_mu_lock(&resource_user->mu);
378 resource_user->added_to_free_pool = false;
379 if (resource_user->free_pool > 0) {
380 int64_t amt = resource_user->free_pool;
381 resource_user->free_pool = 0;
382 resource_quota->free_pool += amt;
383 rq_update_estimate(resource_quota);
384 if (grpc_resource_quota_trace.enabled()) {
386 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
387 " bytes; rq_free_pool -> %" PRId64,
388 resource_quota->name, resource_user->name, amt,
389 resource_quota->free_pool);
391 gpr_mu_unlock(&resource_user->mu);
394 if (grpc_resource_quota_trace.enabled()) {
396 "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
397 "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
398 resource_quota->name, resource_user->name,
399 resource_user->free_pool, resource_quota->free_pool);
401 gpr_mu_unlock(&resource_user->mu);
407 /* returns true if reclamation is proceeding */
408 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
409 if (resource_quota->reclaiming) return true;
410 grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
411 : GRPC_RULIST_RECLAIMER_BENIGN;
412 grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
413 if (resource_user == nullptr) return false;
414 if (grpc_resource_quota_trace.enabled()) {
415 gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation", resource_quota->name,
416 resource_user->name, destructive ? "destructive" : "benign");
418 resource_quota->reclaiming = true;
419 grpc_resource_quota_ref_internal(resource_quota);
420 grpc_closure* c = resource_user->reclaimers[destructive];
422 resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
423 resource_quota->debug_only_last_initiated_reclaimer = c;
424 resource_user->reclaimers[destructive] = nullptr;
425 GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
429 /*******************************************************************************
430 * ru_slice: a slice implementation that is backed by a grpc_resource_user
434 grpc_slice_refcount base;
436 grpc_resource_user* resource_user;
440 static void ru_slice_ref(void* p) {
441 ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
445 static void ru_slice_unref(void* p) {
446 ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
447 if (gpr_unref(&rc->refs)) {
448 grpc_resource_user_free(rc->resource_user, rc->size);
453 static const grpc_slice_refcount_vtable ru_slice_vtable = {
454 ru_slice_ref, ru_slice_unref, grpc_slice_default_eq_impl,
455 grpc_slice_default_hash_impl};
457 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
459 ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(
460 gpr_malloc(sizeof(ru_slice_refcount) + size));
461 rc->base.vtable = &ru_slice_vtable;
462 rc->base.sub_refcount = &rc->base;
463 gpr_ref_init(&rc->refs, 1);
464 rc->resource_user = resource_user;
467 slice.refcount = &rc->base;
468 slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
469 slice.data.refcounted.length = size;
473 /*******************************************************************************
474 * grpc_resource_quota internal implementation: resource user manipulation under
478 static void ru_allocate(void* ru, grpc_error* error) {
479 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
480 if (rulist_empty(resource_user->resource_quota,
481 GRPC_RULIST_AWAITING_ALLOCATION)) {
482 rq_step_sched(resource_user->resource_quota);
484 rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
487 static void ru_add_to_free_pool(void* ru, grpc_error* error) {
488 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
489 if (!rulist_empty(resource_user->resource_quota,
490 GRPC_RULIST_AWAITING_ALLOCATION) &&
491 rulist_empty(resource_user->resource_quota,
492 GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
493 rq_step_sched(resource_user->resource_quota);
495 rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
498 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
500 grpc_closure* closure = resource_user->new_reclaimers[destructive];
501 GPR_ASSERT(closure != nullptr);
502 resource_user->new_reclaimers[destructive] = nullptr;
503 GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
504 if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
505 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
508 resource_user->reclaimers[destructive] = closure;
512 static void ru_post_benign_reclaimer(void* ru, grpc_error* error) {
513 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
514 if (!ru_post_reclaimer(resource_user, false)) return;
515 if (!rulist_empty(resource_user->resource_quota,
516 GRPC_RULIST_AWAITING_ALLOCATION) &&
517 rulist_empty(resource_user->resource_quota,
518 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
519 rulist_empty(resource_user->resource_quota,
520 GRPC_RULIST_RECLAIMER_BENIGN)) {
521 rq_step_sched(resource_user->resource_quota);
523 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
526 static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) {
527 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
528 if (!ru_post_reclaimer(resource_user, true)) return;
529 if (!rulist_empty(resource_user->resource_quota,
530 GRPC_RULIST_AWAITING_ALLOCATION) &&
531 rulist_empty(resource_user->resource_quota,
532 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
533 rulist_empty(resource_user->resource_quota,
534 GRPC_RULIST_RECLAIMER_BENIGN) &&
535 rulist_empty(resource_user->resource_quota,
536 GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
537 rq_step_sched(resource_user->resource_quota);
539 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
542 static void ru_shutdown(void* ru, grpc_error* error) {
543 if (grpc_resource_quota_trace.enabled()) {
544 gpr_log(GPR_INFO, "RU shutdown %p", ru);
546 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
547 gpr_mu_lock(&resource_user->mu);
548 GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
549 GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
550 resource_user->reclaimers[0] = nullptr;
551 resource_user->reclaimers[1] = nullptr;
552 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
553 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
554 if (resource_user->allocating) {
555 rq_step_sched(resource_user->resource_quota);
557 gpr_mu_unlock(&resource_user->mu);
560 static void ru_destroy(void* ru, grpc_error* error) {
561 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
562 GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
563 // Free all the remaining thread quota
564 grpc_resource_user_free_threads(resource_user,
565 static_cast<int>(gpr_atm_no_barrier_load(
566 &resource_user->num_threads_allocated)));
568 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
569 rulist_remove(resource_user, static_cast<grpc_rulist>(i));
571 GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
572 GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
573 if (resource_user->free_pool != 0) {
574 resource_user->resource_quota->free_pool += resource_user->free_pool;
575 rq_step_sched(resource_user->resource_quota);
577 grpc_resource_quota_unref_internal(resource_user->resource_quota);
578 gpr_mu_destroy(&resource_user->mu);
579 gpr_free(resource_user->name);
580 gpr_free(resource_user);
583 static void ru_allocated_slices(void* arg, grpc_error* error) {
584 grpc_resource_user_slice_allocator* slice_allocator =
585 static_cast<grpc_resource_user_slice_allocator*>(arg);
586 if (error == GRPC_ERROR_NONE) {
587 for (size_t i = 0; i < slice_allocator->count; i++) {
588 grpc_slice_buffer_add_indexed(
589 slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
590 slice_allocator->length));
593 GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
596 /*******************************************************************************
597 * grpc_resource_quota internal implementation: quota manipulation under the
603 grpc_resource_quota* resource_quota;
604 grpc_closure closure;
607 static void rq_resize(void* args, grpc_error* error) {
608 rq_resize_args* a = static_cast<rq_resize_args*>(args);
609 int64_t delta = a->size - a->resource_quota->size;
610 a->resource_quota->size += delta;
611 a->resource_quota->free_pool += delta;
612 rq_update_estimate(a->resource_quota);
613 rq_step_sched(a->resource_quota);
614 grpc_resource_quota_unref_internal(a->resource_quota);
618 static void rq_reclamation_done(void* rq, grpc_error* error) {
619 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
620 resource_quota->reclaiming = false;
621 rq_step_sched(resource_quota);
622 grpc_resource_quota_unref_internal(resource_quota);
625 /*******************************************************************************
626 * grpc_resource_quota api
630 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
631 grpc_resource_quota* resource_quota =
632 static_cast<grpc_resource_quota*>(gpr_malloc(sizeof(*resource_quota)));
633 gpr_ref_init(&resource_quota->refs, 1);
634 resource_quota->combiner = grpc_combiner_create();
635 resource_quota->free_pool = INT64_MAX;
636 resource_quota->size = INT64_MAX;
637 resource_quota->used = 0;
638 gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
639 gpr_mu_init(&resource_quota->thread_count_mu);
640 resource_quota->max_threads = INT_MAX;
641 resource_quota->num_threads_allocated = 0;
642 resource_quota->step_scheduled = false;
643 resource_quota->reclaiming = false;
644 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
645 if (name != nullptr) {
646 resource_quota->name = gpr_strdup(name);
648 gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
649 (intptr_t)resource_quota);
651 GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
652 grpc_combiner_finally_scheduler(resource_quota->combiner));
653 GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
654 rq_reclamation_done, resource_quota,
655 grpc_combiner_scheduler(resource_quota->combiner));
656 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
657 resource_quota->roots[i] = nullptr;
659 return resource_quota;
662 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
663 if (gpr_unref(&resource_quota->refs)) {
664 // No outstanding thread quota
665 GPR_ASSERT(resource_quota->num_threads_allocated == 0);
666 GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
667 gpr_free(resource_quota->name);
668 gpr_mu_destroy(&resource_quota->thread_count_mu);
669 gpr_free(resource_quota);
674 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
675 grpc_core::ExecCtx exec_ctx;
676 grpc_resource_quota_unref_internal(resource_quota);
679 grpc_resource_quota* grpc_resource_quota_ref_internal(
680 grpc_resource_quota* resource_quota) {
681 gpr_ref(&resource_quota->refs);
682 return resource_quota;
686 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
687 grpc_resource_quota_ref_internal(resource_quota);
690 double grpc_resource_quota_get_memory_pressure(
691 grpc_resource_quota* resource_quota) {
692 return (static_cast<double>(gpr_atm_no_barrier_load(
693 &resource_quota->memory_usage_estimation))) /
694 (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
698 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
699 int new_max_threads) {
700 GPR_ASSERT(new_max_threads >= 0);
701 gpr_mu_lock(&resource_quota->thread_count_mu);
702 resource_quota->max_threads = new_max_threads;
703 gpr_mu_unlock(&resource_quota->thread_count_mu);
707 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
709 grpc_core::ExecCtx exec_ctx;
710 rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
711 a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
712 a->size = static_cast<int64_t>(size);
713 gpr_atm_no_barrier_store(&resource_quota->last_size,
714 (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
715 GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
716 GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
719 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
720 return static_cast<size_t>(
721 gpr_atm_no_barrier_load(&resource_quota->last_size));
724 /*******************************************************************************
725 * grpc_resource_user channel args api
728 grpc_resource_quota* grpc_resource_quota_from_channel_args(
729 const grpc_channel_args* channel_args, bool create) {
730 for (size_t i = 0; i < channel_args->num_args; i++) {
731 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
732 if (channel_args->args[i].type == GRPC_ARG_POINTER) {
733 return grpc_resource_quota_ref_internal(
734 static_cast<grpc_resource_quota*>(
735 channel_args->args[i].value.pointer.p));
737 gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
741 return create ? grpc_resource_quota_create(nullptr) : nullptr;
744 static void* rq_copy(void* rq) {
745 grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
749 static void rq_destroy(void* rq) {
750 grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
753 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
755 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
756 static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
760 /*******************************************************************************
761 * grpc_resource_user api
764 grpc_resource_user* grpc_resource_user_create(
765 grpc_resource_quota* resource_quota, const char* name) {
766 grpc_resource_user* resource_user =
767 static_cast<grpc_resource_user*>(gpr_malloc(sizeof(*resource_user)));
768 resource_user->resource_quota =
769 grpc_resource_quota_ref_internal(resource_quota);
770 GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
772 grpc_combiner_scheduler(resource_quota->combiner));
773 GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
774 &ru_add_to_free_pool, resource_user,
775 grpc_combiner_scheduler(resource_quota->combiner));
776 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
777 &ru_post_benign_reclaimer, resource_user,
778 grpc_combiner_scheduler(resource_quota->combiner));
779 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
780 &ru_post_destructive_reclaimer, resource_user,
781 grpc_combiner_scheduler(resource_quota->combiner));
782 GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
783 grpc_combiner_scheduler(resource_quota->combiner));
784 gpr_mu_init(&resource_user->mu);
785 gpr_atm_rel_store(&resource_user->refs, 1);
786 gpr_atm_rel_store(&resource_user->shutdown, 0);
787 resource_user->free_pool = 0;
788 grpc_closure_list_init(&resource_user->on_allocated);
789 resource_user->allocating = false;
790 resource_user->added_to_free_pool = false;
791 gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
792 resource_user->reclaimers[0] = nullptr;
793 resource_user->reclaimers[1] = nullptr;
794 resource_user->new_reclaimers[0] = nullptr;
795 resource_user->new_reclaimers[1] = nullptr;
796 resource_user->outstanding_allocations = 0;
797 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
798 resource_user->links[i].next = resource_user->links[i].prev = nullptr;
800 if (name != nullptr) {
801 resource_user->name = gpr_strdup(name);
803 gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
804 (intptr_t)resource_user);
806 return resource_user;
809 grpc_resource_quota* grpc_resource_user_quota(
810 grpc_resource_user* resource_user) {
811 return resource_user->resource_quota;
814 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
815 GPR_ASSERT(amount > 0);
816 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
819 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
820 GPR_ASSERT(amount > 0);
821 gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
822 GPR_ASSERT(old >= amount);
824 GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
828 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
829 ru_ref_by(resource_user, 1);
832 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
833 ru_unref_by(resource_user, 1);
836 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
837 if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
840 ru_shutdown, resource_user,
841 grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
846 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
848 GPR_ASSERT(thread_count >= 0);
849 bool is_success = false;
850 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
851 grpc_resource_quota* rq = resource_user->resource_quota;
852 if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
853 rq->num_threads_allocated += thread_count;
854 gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
858 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
862 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
864 GPR_ASSERT(thread_count >= 0);
865 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
866 grpc_resource_quota* rq = resource_user->resource_quota;
867 rq->num_threads_allocated -= thread_count;
868 int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
869 &resource_user->num_threads_allocated, -thread_count));
870 if (old_count < thread_count || rq->num_threads_allocated < 0) {
872 "Releasing more threads (%d) than currently allocated (rq threads: "
873 "%d, ru threads: %d)",
874 thread_count, rq->num_threads_allocated + thread_count, old_count);
877 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
880 static void resource_user_alloc_locked(grpc_resource_user* resource_user,
882 grpc_closure* optional_on_done) {
883 ru_ref_by(resource_user, static_cast<gpr_atm>(size));
884 resource_user->free_pool -= static_cast<int64_t>(size);
885 if (grpc_resource_quota_trace.enabled()) {
886 gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
887 resource_user->resource_quota->name, resource_user->name, size,
888 resource_user->free_pool);
890 if (resource_user->free_pool < 0) {
891 if (optional_on_done != nullptr) {
892 resource_user->outstanding_allocations += static_cast<int64_t>(size);
893 grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
896 if (!resource_user->allocating) {
897 resource_user->allocating = true;
898 GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
901 GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
905 bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
907 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
908 gpr_mu_lock(&resource_user->mu);
909 grpc_resource_quota* resource_quota = resource_user->resource_quota;
912 gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
913 gpr_atm new_used = used + size;
914 if (static_cast<size_t>(new_used) >
915 grpc_resource_quota_peek_size(resource_quota)) {
916 gpr_mu_unlock(&resource_user->mu);
919 cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
920 } while (!cas_success);
921 resource_user_alloc_locked(resource_user, size, nullptr);
922 gpr_mu_unlock(&resource_user->mu);
926 void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
927 grpc_closure* optional_on_done) {
928 // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
929 // because some tests become flaky after the change.
930 gpr_mu_lock(&resource_user->mu);
931 grpc_resource_quota* resource_quota = resource_user->resource_quota;
932 gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
933 resource_user_alloc_locked(resource_user, size, optional_on_done);
934 gpr_mu_unlock(&resource_user->mu);
937 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
938 gpr_mu_lock(&resource_user->mu);
939 grpc_resource_quota* resource_quota = resource_user->resource_quota;
940 gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
941 GPR_ASSERT(prior >= static_cast<long>(size));
942 bool was_zero_or_negative = resource_user->free_pool <= 0;
943 resource_user->free_pool += static_cast<int64_t>(size);
944 if (grpc_resource_quota_trace.enabled()) {
945 gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
946 resource_user->resource_quota->name, resource_user->name, size,
947 resource_user->free_pool);
949 bool is_bigger_than_zero = resource_user->free_pool > 0;
950 if (is_bigger_than_zero && was_zero_or_negative &&
951 !resource_user->added_to_free_pool) {
952 resource_user->added_to_free_pool = true;
953 GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
956 gpr_mu_unlock(&resource_user->mu);
957 ru_unref_by(resource_user, static_cast<gpr_atm>(size));
960 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
962 grpc_closure* closure) {
963 GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
964 resource_user->new_reclaimers[destructive] = closure;
965 GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
969 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
970 if (grpc_resource_quota_trace.enabled()) {
971 gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
972 resource_user->resource_quota->name, resource_user->name);
975 &resource_user->resource_quota->rq_reclamation_done_closure,
979 void grpc_resource_user_slice_allocator_init(
980 grpc_resource_user_slice_allocator* slice_allocator,
981 grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
982 GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
983 slice_allocator, grpc_schedule_on_exec_ctx);
984 GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
985 grpc_schedule_on_exec_ctx);
986 slice_allocator->resource_user = resource_user;
989 void grpc_resource_user_alloc_slices(
990 grpc_resource_user_slice_allocator* slice_allocator, size_t length,
991 size_t count, grpc_slice_buffer* dest) {
992 if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) {
994 &slice_allocator->on_allocated,
995 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
998 slice_allocator->length = length;
999 slice_allocator->count = count;
1000 slice_allocator->dest = dest;
1001 grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
1002 &slice_allocator->on_allocated);