Imported Upstream version 2.1.10
[platform/upstream/libevent.git] / bufferevent_ratelim.c
1 /*
2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 #include "evconfig-private.h"
29
30 #include <sys/types.h>
31 #include <limits.h>
32 #include <string.h>
33 #include <stdlib.h>
34
35 #include "event2/event.h"
36 #include "event2/event_struct.h"
37 #include "event2/util.h"
38 #include "event2/bufferevent.h"
39 #include "event2/bufferevent_struct.h"
40 #include "event2/buffer.h"
41
42 #include "ratelim-internal.h"
43
44 #include "bufferevent-internal.h"
45 #include "mm-internal.h"
46 #include "util-internal.h"
47 #include "event-internal.h"
48
49 int
50 ev_token_bucket_init_(struct ev_token_bucket *bucket,
51     const struct ev_token_bucket_cfg *cfg,
52     ev_uint32_t current_tick,
53     int reinitialize)
54 {
55         if (reinitialize) {
56                 /* on reinitialization, we only clip downwards, since we've
57                    already used who-knows-how-much bandwidth this tick.  We
58                    leave "last_updated" as it is; the next update will add the
59                    appropriate amount of bandwidth to the bucket.
60                 */
61                 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62                         bucket->read_limit = cfg->read_maximum;
63                 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64                         bucket->write_limit = cfg->write_maximum;
65         } else {
66                 bucket->read_limit = cfg->read_rate;
67                 bucket->write_limit = cfg->write_rate;
68                 bucket->last_updated = current_tick;
69         }
70         return 0;
71 }
72
73 int
74 ev_token_bucket_update_(struct ev_token_bucket *bucket,
75     const struct ev_token_bucket_cfg *cfg,
76     ev_uint32_t current_tick)
77 {
78         /* It's okay if the tick number overflows, since we'll just
79          * wrap around when we do the unsigned substraction. */
80         unsigned n_ticks = current_tick - bucket->last_updated;
81
82         /* Make sure some ticks actually happened, and that time didn't
83          * roll back. */
84         if (n_ticks == 0 || n_ticks > INT_MAX)
85                 return 0;
86
87         /* Naively, we would say
88                 bucket->limit += n_ticks * cfg->rate;
89
90                 if (bucket->limit > cfg->maximum)
91                         bucket->limit = cfg->maximum;
92
93            But we're worried about overflow, so we do it like this:
94         */
95
96         if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97                 bucket->read_limit = cfg->read_maximum;
98         else
99                 bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102         if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103                 bucket->write_limit = cfg->write_maximum;
104         else
105                 bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108         bucket->last_updated = current_tick;
109
110         return 1;
111 }
112
113 static inline void
114 bufferevent_update_buckets(struct bufferevent_private *bev)
115 {
116         /* Must hold lock on bev. */
117         struct timeval now;
118         unsigned tick;
119         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120         tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121         if (tick != bev->rate_limiting->limit.last_updated)
122                 ev_token_bucket_update_(&bev->rate_limiting->limit,
123                     bev->rate_limiting->cfg, tick);
124 }
125
126 ev_uint32_t
127 ev_token_bucket_get_tick_(const struct timeval *tv,
128     const struct ev_token_bucket_cfg *cfg)
129 {
130         /* This computation uses two multiplies and a divide.  We could do
131          * fewer if we knew that the tick length was an integer number of
132          * seconds, or if we knew it divided evenly into a second.  We should
133          * investigate that more.
134          */
135
136         /* We cast to an ev_uint64_t first, since we don't want to overflow
137          * before we do the final divide. */
138         ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139         return (unsigned)(msec / cfg->msec_per_tick);
140 }
141
142 struct ev_token_bucket_cfg *
143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144     size_t write_rate, size_t write_burst,
145     const struct timeval *tick_len)
146 {
147         struct ev_token_bucket_cfg *r;
148         struct timeval g;
149         if (! tick_len) {
150                 g.tv_sec = 1;
151                 g.tv_usec = 0;
152                 tick_len = &g;
153         }
154         if (read_rate > read_burst || write_rate > write_burst ||
155             read_rate < 1 || write_rate < 1)
156                 return NULL;
157         if (read_rate > EV_RATE_LIMIT_MAX ||
158             write_rate > EV_RATE_LIMIT_MAX ||
159             read_burst > EV_RATE_LIMIT_MAX ||
160             write_burst > EV_RATE_LIMIT_MAX)
161                 return NULL;
162         r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163         if (!r)
164                 return NULL;
165         r->read_rate = read_rate;
166         r->write_rate = write_rate;
167         r->read_maximum = read_burst;
168         r->write_maximum = write_burst;
169         memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170         r->msec_per_tick = (tick_len->tv_sec * 1000) +
171             (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172         return r;
173 }
174
175 void
176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177 {
178         mm_free(cfg);
179 }
180
181 /* Default values for max_single_read & max_single_write variables. */
182 #define MAX_SINGLE_READ_DEFAULT 16384
183 #define MAX_SINGLE_WRITE_DEFAULT 16384
184
185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187
188 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192
193 /** Helper: figure out the maximum amount we should write if is_write, or
194     the maximum amount we should read if is_read.  Return that maximum, or
195     0 if our bucket is wholly exhausted.
196  */
197 static inline ev_ssize_t
198 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199 {
200         /* needs lock on bev. */
201         ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202
203 #define LIM(x)                                          \
204         (is_write ? (x).write_limit : (x).read_limit)
205
206 #define GROUP_SUSPENDED(g)                      \
207         (is_write ? (g)->write_suspended : (g)->read_suspended)
208
209         /* Sets max_so_far to MIN(x, max_so_far) */
210 #define CLAMPTO(x)                              \
211         do {                                    \
212                 if (max_so_far > (x))           \
213                         max_so_far = (x);       \
214         } while (0);
215
216         if (!bev->rate_limiting)
217                 return max_so_far;
218
219         /* If rate-limiting is enabled at all, update the appropriate
220            bucket, and take the smaller of our rate limit and the group
221            rate limit.
222          */
223
224         if (bev->rate_limiting->cfg) {
225                 bufferevent_update_buckets(bev);
226                 max_so_far = LIM(bev->rate_limiting->limit);
227         }
228         if (bev->rate_limiting->group) {
229                 struct bufferevent_rate_limit_group *g =
230                     bev->rate_limiting->group;
231                 ev_ssize_t share;
232                 LOCK_GROUP(g);
233                 if (GROUP_SUSPENDED(g)) {
234                         /* We can get here if we failed to lock this
235                          * particular bufferevent while suspending the whole
236                          * group. */
237                         if (is_write)
238                                 bufferevent_suspend_write_(&bev->bev,
239                                     BEV_SUSPEND_BW_GROUP);
240                         else
241                                 bufferevent_suspend_read_(&bev->bev,
242                                     BEV_SUSPEND_BW_GROUP);
243                         share = 0;
244                 } else {
245                         /* XXXX probably we should divide among the active
246                          * members, not the total members. */
247                         share = LIM(g->rate_limit) / g->n_members;
248                         if (share < g->min_share)
249                                 share = g->min_share;
250                 }
251                 UNLOCK_GROUP(g);
252                 CLAMPTO(share);
253         }
254
255         if (max_so_far < 0)
256                 max_so_far = 0;
257         return max_so_far;
258 }
259
260 ev_ssize_t
261 bufferevent_get_read_max_(struct bufferevent_private *bev)
262 {
263         return bufferevent_get_rlim_max_(bev, 0);
264 }
265
266 ev_ssize_t
267 bufferevent_get_write_max_(struct bufferevent_private *bev)
268 {
269         return bufferevent_get_rlim_max_(bev, 1);
270 }
271
272 int
273 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274 {
275         /* XXXXX Make sure all users of this function check its return value */
276         int r = 0;
277         /* need to hold lock on bev */
278         if (!bev->rate_limiting)
279                 return 0;
280
281         if (bev->rate_limiting->cfg) {
282                 bev->rate_limiting->limit.read_limit -= bytes;
283                 if (bev->rate_limiting->limit.read_limit <= 0) {
284                         bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285                         if (event_add(&bev->rate_limiting->refill_bucket_event,
286                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
287                                 r = -1;
288                 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
289                         if (!(bev->write_suspended & BEV_SUSPEND_BW))
290                                 event_del(&bev->rate_limiting->refill_bucket_event);
291                         bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292                 }
293         }
294
295         if (bev->rate_limiting->group) {
296                 LOCK_GROUP(bev->rate_limiting->group);
297                 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298                 bev->rate_limiting->group->total_read += bytes;
299                 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300                         bev_group_suspend_reading_(bev->rate_limiting->group);
301                 } else if (bev->rate_limiting->group->read_suspended) {
302                         bev_group_unsuspend_reading_(bev->rate_limiting->group);
303                 }
304                 UNLOCK_GROUP(bev->rate_limiting->group);
305         }
306
307         return r;
308 }
309
310 int
311 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312 {
313         /* XXXXX Make sure all users of this function check its return value */
314         int r = 0;
315         /* need to hold lock */
316         if (!bev->rate_limiting)
317                 return 0;
318
319         if (bev->rate_limiting->cfg) {
320                 bev->rate_limiting->limit.write_limit -= bytes;
321                 if (bev->rate_limiting->limit.write_limit <= 0) {
322                         bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323                         if (event_add(&bev->rate_limiting->refill_bucket_event,
324                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
325                                 r = -1;
326                 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
327                         if (!(bev->read_suspended & BEV_SUSPEND_BW))
328                                 event_del(&bev->rate_limiting->refill_bucket_event);
329                         bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330                 }
331         }
332
333         if (bev->rate_limiting->group) {
334                 LOCK_GROUP(bev->rate_limiting->group);
335                 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336                 bev->rate_limiting->group->total_written += bytes;
337                 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338                         bev_group_suspend_writing_(bev->rate_limiting->group);
339                 } else if (bev->rate_limiting->group->write_suspended) {
340                         bev_group_unsuspend_writing_(bev->rate_limiting->group);
341                 }
342                 UNLOCK_GROUP(bev->rate_limiting->group);
343         }
344
345         return r;
346 }
347
348 /** Stop reading on every bufferevent in <b>g</b> */
349 static int
350 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351 {
352         /* Needs group lock */
353         struct bufferevent_private *bev;
354         g->read_suspended = 1;
355         g->pending_unsuspend_read = 0;
356
357         /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358            to prevent a deadlock.  (Ordinarily, the group lock nests inside
359            the bufferevent locks.  If we are unable to lock any individual
360            bufferevent, it will find out later when it looks at its limit
361            and sees that its group is suspended.)
362         */
363         LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
365                         bufferevent_suspend_read_(&bev->bev,
366                             BEV_SUSPEND_BW_GROUP);
367                         EVLOCK_UNLOCK(bev->lock, 0);
368                 }
369         }
370         return 0;
371 }
372
373 /** Stop writing on every bufferevent in <b>g</b> */
374 static int
375 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376 {
377         /* Needs group lock */
378         struct bufferevent_private *bev;
379         g->write_suspended = 1;
380         g->pending_unsuspend_write = 0;
381         LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
383                         bufferevent_suspend_write_(&bev->bev,
384                             BEV_SUSPEND_BW_GROUP);
385                         EVLOCK_UNLOCK(bev->lock, 0);
386                 }
387         }
388         return 0;
389 }
390
391 /** Timer callback invoked on a single bufferevent with one or more exhausted
392     buckets when they are ready to refill. */
393 static void
394 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395 {
396         unsigned tick;
397         struct timeval now;
398         struct bufferevent_private *bev = arg;
399         int again = 0;
400         BEV_LOCK(&bev->bev);
401         if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402                 BEV_UNLOCK(&bev->bev);
403                 return;
404         }
405
406         /* First, update the bucket */
407         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408         tick = ev_token_bucket_get_tick_(&now,
409             bev->rate_limiting->cfg);
410         ev_token_bucket_update_(&bev->rate_limiting->limit,
411             bev->rate_limiting->cfg,
412             tick);
413
414         /* Now unsuspend any read/write operations as appropriate. */
415         if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416                 if (bev->rate_limiting->limit.read_limit > 0)
417                         bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418                 else
419                         again = 1;
420         }
421         if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422                 if (bev->rate_limiting->limit.write_limit > 0)
423                         bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424                 else
425                         again = 1;
426         }
427         if (again) {
428                 /* One or more of the buckets may need another refill if they
429                    started negative.
430
431                    XXXX if we need to be quiet for more ticks, we should
432                    maybe figure out what timeout we really want.
433                 */
434                 /* XXXX Handle event_add failure somehow */
435                 event_add(&bev->rate_limiting->refill_bucket_event,
436                     &bev->rate_limiting->cfg->tick_timeout);
437         }
438         BEV_UNLOCK(&bev->bev);
439 }
440
441 /** Helper: grab a random element from a bufferevent group.
442  *
443  * Requires that we hold the lock on the group.
444  */
445 static struct bufferevent_private *
446 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447 {
448         int which;
449         struct bufferevent_private *bev;
450
451         /* requires group lock */
452
453         if (!group->n_members)
454                 return NULL;
455
456         EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457
458         which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459
460         bev = LIST_FIRST(&group->members);
461         while (which--)
462                 bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463
464         return bev;
465 }
466
467 /** Iterate over the elements of a rate-limiting group 'g' with a random
468     starting point, assigning each to the variable 'bev', and executing the
469     block 'block'.
470
471     We do this in a half-baked effort to get fairness among group members.
472     XXX Round-robin or some kind of priority queue would be even more fair.
473  */
474 #define FOREACH_RANDOM_ORDER(block)                     \
475         do {                                            \
476                 first = bev_group_random_element_(g);   \
477                 for (bev = first; bev != LIST_END(&g->members); \
478                     bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479                         block ;                                  \
480                 }                                                \
481                 for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482                     bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483                         block ;                                         \
484                 }                                                       \
485         } while (0)
486
487 static void
488 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489 {
490         int again = 0;
491         struct bufferevent_private *bev, *first;
492
493         g->read_suspended = 0;
494         FOREACH_RANDOM_ORDER({
495                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
496                         bufferevent_unsuspend_read_(&bev->bev,
497                             BEV_SUSPEND_BW_GROUP);
498                         EVLOCK_UNLOCK(bev->lock, 0);
499                 } else {
500                         again = 1;
501                 }
502         });
503         g->pending_unsuspend_read = again;
504 }
505
506 static void
507 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508 {
509         int again = 0;
510         struct bufferevent_private *bev, *first;
511         g->write_suspended = 0;
512
513         FOREACH_RANDOM_ORDER({
514                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
515                         bufferevent_unsuspend_write_(&bev->bev,
516                             BEV_SUSPEND_BW_GROUP);
517                         EVLOCK_UNLOCK(bev->lock, 0);
518                 } else {
519                         again = 1;
520                 }
521         });
522         g->pending_unsuspend_write = again;
523 }
524
525 /** Callback invoked every tick to add more elements to the group bucket
526     and unsuspend group members as needed.
527  */
528 static void
529 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530 {
531         struct bufferevent_rate_limit_group *g = arg;
532         unsigned tick;
533         struct timeval now;
534
535         event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537         LOCK_GROUP(g);
538
539         tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540         ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542         if (g->pending_unsuspend_read ||
543             (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544                 bev_group_unsuspend_reading_(g);
545         }
546         if (g->pending_unsuspend_write ||
547             (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548                 bev_group_unsuspend_writing_(g);
549         }
550
551         /* XXXX Rather than waiting to the next tick to unsuspend stuff
552          * with pending_unsuspend_write/read, we should do it on the
553          * next iteration of the mainloop.
554          */
555
556         UNLOCK_GROUP(g);
557 }
558
559 int
560 bufferevent_set_rate_limit(struct bufferevent *bev,
561     struct ev_token_bucket_cfg *cfg)
562 {
563         struct bufferevent_private *bevp = BEV_UPCAST(bev);
564         int r = -1;
565         struct bufferevent_rate_limit *rlim;
566         struct timeval now;
567         ev_uint32_t tick;
568         int reinit = 0, suspended = 0;
569         /* XXX reference-count cfg */
570
571         BEV_LOCK(bev);
572
573         if (cfg == NULL) {
574                 if (bevp->rate_limiting) {
575                         rlim = bevp->rate_limiting;
576                         rlim->cfg = NULL;
577                         bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
578                         bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
579                         if (event_initialized(&rlim->refill_bucket_event))
580                                 event_del(&rlim->refill_bucket_event);
581                 }
582                 r = 0;
583                 goto done;
584         }
585
586         event_base_gettimeofday_cached(bev->ev_base, &now);
587         tick = ev_token_bucket_get_tick_(&now, cfg);
588
589         if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590                 /* no-op */
591                 r = 0;
592                 goto done;
593         }
594         if (bevp->rate_limiting == NULL) {
595                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596                 if (!rlim)
597                         goto done;
598                 bevp->rate_limiting = rlim;
599         } else {
600                 rlim = bevp->rate_limiting;
601         }
602         reinit = rlim->cfg != NULL;
603
604         rlim->cfg = cfg;
605         ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
606
607         if (reinit) {
608                 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609                 event_del(&rlim->refill_bucket_event);
610         }
611         event_assign(&rlim->refill_bucket_event, bev->ev_base,
612             -1, EV_FINALIZE, bev_refill_callback_, bevp);
613
614         if (rlim->limit.read_limit > 0) {
615                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
616         } else {
617                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
618                 suspended=1;
619         }
620         if (rlim->limit.write_limit > 0) {
621                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
622         } else {
623                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
624                 suspended = 1;
625         }
626
627         if (suspended)
628                 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630         r = 0;
631
632 done:
633         BEV_UNLOCK(bev);
634         return r;
635 }
636
637 struct bufferevent_rate_limit_group *
638 bufferevent_rate_limit_group_new(struct event_base *base,
639     const struct ev_token_bucket_cfg *cfg)
640 {
641         struct bufferevent_rate_limit_group *g;
642         struct timeval now;
643         ev_uint32_t tick;
644
645         event_base_gettimeofday_cached(base, &now);
646         tick = ev_token_bucket_get_tick_(&now, cfg);
647
648         g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649         if (!g)
650                 return NULL;
651         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652         LIST_INIT(&g->members);
653
654         ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
655
656         event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
657             bev_group_refill_callback_, g);
658         /*XXXX handle event_add failure */
659         event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661         EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663         bufferevent_rate_limit_group_set_min_share(g, 64);
664
665         evutil_weakrand_seed_(&g->weakrand_seed,
666             (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
667
668         return g;
669 }
670
671 int
672 bufferevent_rate_limit_group_set_cfg(
673         struct bufferevent_rate_limit_group *g,
674         const struct ev_token_bucket_cfg *cfg)
675 {
676         int same_tick;
677         if (!g || !cfg)
678                 return -1;
679
680         LOCK_GROUP(g);
681         same_tick = evutil_timercmp(
682                 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
683         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
684
685         if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
686                 g->rate_limit.read_limit = cfg->read_maximum;
687         if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
688                 g->rate_limit.write_limit = cfg->write_maximum;
689
690         if (!same_tick) {
691                 /* This can cause a hiccup in the schedule */
692                 event_add(&g->master_refill_event, &cfg->tick_timeout);
693         }
694
695         /* The new limits might force us to adjust min_share differently. */
696         bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
697
698         UNLOCK_GROUP(g);
699         return 0;
700 }
701
702 int
703 bufferevent_rate_limit_group_set_min_share(
704         struct bufferevent_rate_limit_group *g,
705         size_t share)
706 {
707         if (share > EV_SSIZE_MAX)
708                 return -1;
709
710         g->configured_min_share = share;
711
712         /* Can't set share to less than the one-tick maximum.  IOW, at steady
713          * state, at least one connection can go per tick. */
714         if (share > g->rate_limit_cfg.read_rate)
715                 share = g->rate_limit_cfg.read_rate;
716         if (share > g->rate_limit_cfg.write_rate)
717                 share = g->rate_limit_cfg.write_rate;
718
719         g->min_share = share;
720         return 0;
721 }
722
723 void
724 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
725 {
726         LOCK_GROUP(g);
727         EVUTIL_ASSERT(0 == g->n_members);
728         event_del(&g->master_refill_event);
729         UNLOCK_GROUP(g);
730         EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
731         mm_free(g);
732 }
733
734 int
735 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
736     struct bufferevent_rate_limit_group *g)
737 {
738         int wsuspend, rsuspend;
739         struct bufferevent_private *bevp = BEV_UPCAST(bev);
740         BEV_LOCK(bev);
741
742         if (!bevp->rate_limiting) {
743                 struct bufferevent_rate_limit *rlim;
744                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
745                 if (!rlim) {
746                         BEV_UNLOCK(bev);
747                         return -1;
748                 }
749                 event_assign(&rlim->refill_bucket_event, bev->ev_base,
750                     -1, EV_FINALIZE, bev_refill_callback_, bevp);
751                 bevp->rate_limiting = rlim;
752         }
753
754         if (bevp->rate_limiting->group == g) {
755                 BEV_UNLOCK(bev);
756                 return 0;
757         }
758         if (bevp->rate_limiting->group)
759                 bufferevent_remove_from_rate_limit_group(bev);
760
761         LOCK_GROUP(g);
762         bevp->rate_limiting->group = g;
763         ++g->n_members;
764         LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
765
766         rsuspend = g->read_suspended;
767         wsuspend = g->write_suspended;
768
769         UNLOCK_GROUP(g);
770
771         if (rsuspend)
772                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
773         if (wsuspend)
774                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
775
776         BEV_UNLOCK(bev);
777         return 0;
778 }
779
780 int
781 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
782 {
783         return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
784 }
785
786 int
787 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
788     int unsuspend)
789 {
790         struct bufferevent_private *bevp = BEV_UPCAST(bev);
791         BEV_LOCK(bev);
792         if (bevp->rate_limiting && bevp->rate_limiting->group) {
793                 struct bufferevent_rate_limit_group *g =
794                     bevp->rate_limiting->group;
795                 LOCK_GROUP(g);
796                 bevp->rate_limiting->group = NULL;
797                 --g->n_members;
798                 LIST_REMOVE(bevp, rate_limiting->next_in_group);
799                 UNLOCK_GROUP(g);
800         }
801         if (unsuspend) {
802                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
803                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
804         }
805         BEV_UNLOCK(bev);
806         return 0;
807 }
808
809 /* ===
810  * API functions to expose rate limits.
811  *
812  * Don't use these from inside Libevent; they're meant to be for use by
813  * the program.
814  * === */
815
816 /* Mostly you don't want to use this function from inside libevent;
817  * bufferevent_get_read_max_() is more likely what you want*/
818 ev_ssize_t
819 bufferevent_get_read_limit(struct bufferevent *bev)
820 {
821         ev_ssize_t r;
822         struct bufferevent_private *bevp;
823         BEV_LOCK(bev);
824         bevp = BEV_UPCAST(bev);
825         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826                 bufferevent_update_buckets(bevp);
827                 r = bevp->rate_limiting->limit.read_limit;
828         } else {
829                 r = EV_SSIZE_MAX;
830         }
831         BEV_UNLOCK(bev);
832         return r;
833 }
834
835 /* Mostly you don't want to use this function from inside libevent;
836  * bufferevent_get_write_max_() is more likely what you want*/
837 ev_ssize_t
838 bufferevent_get_write_limit(struct bufferevent *bev)
839 {
840         ev_ssize_t r;
841         struct bufferevent_private *bevp;
842         BEV_LOCK(bev);
843         bevp = BEV_UPCAST(bev);
844         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845                 bufferevent_update_buckets(bevp);
846                 r = bevp->rate_limiting->limit.write_limit;
847         } else {
848                 r = EV_SSIZE_MAX;
849         }
850         BEV_UNLOCK(bev);
851         return r;
852 }
853
854 int
855 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
856 {
857         struct bufferevent_private *bevp;
858         BEV_LOCK(bev);
859         bevp = BEV_UPCAST(bev);
860         if (size == 0 || size > EV_SSIZE_MAX)
861                 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
862         else
863                 bevp->max_single_read = size;
864         BEV_UNLOCK(bev);
865         return 0;
866 }
867
868 int
869 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
870 {
871         struct bufferevent_private *bevp;
872         BEV_LOCK(bev);
873         bevp = BEV_UPCAST(bev);
874         if (size == 0 || size > EV_SSIZE_MAX)
875                 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
876         else
877                 bevp->max_single_write = size;
878         BEV_UNLOCK(bev);
879         return 0;
880 }
881
882 ev_ssize_t
883 bufferevent_get_max_single_read(struct bufferevent *bev)
884 {
885         ev_ssize_t r;
886
887         BEV_LOCK(bev);
888         r = BEV_UPCAST(bev)->max_single_read;
889         BEV_UNLOCK(bev);
890         return r;
891 }
892
893 ev_ssize_t
894 bufferevent_get_max_single_write(struct bufferevent *bev)
895 {
896         ev_ssize_t r;
897
898         BEV_LOCK(bev);
899         r = BEV_UPCAST(bev)->max_single_write;
900         BEV_UNLOCK(bev);
901         return r;
902 }
903
904 ev_ssize_t
905 bufferevent_get_max_to_read(struct bufferevent *bev)
906 {
907         ev_ssize_t r;
908         BEV_LOCK(bev);
909         r = bufferevent_get_read_max_(BEV_UPCAST(bev));
910         BEV_UNLOCK(bev);
911         return r;
912 }
913
914 ev_ssize_t
915 bufferevent_get_max_to_write(struct bufferevent *bev)
916 {
917         ev_ssize_t r;
918         BEV_LOCK(bev);
919         r = bufferevent_get_write_max_(BEV_UPCAST(bev));
920         BEV_UNLOCK(bev);
921         return r;
922 }
923
924 const struct ev_token_bucket_cfg *
925 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
926         struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
927         struct ev_token_bucket_cfg *cfg;
928
929         BEV_LOCK(bev);
930
931         if (bufev_private->rate_limiting) {
932                 cfg = bufev_private->rate_limiting->cfg;
933         } else {
934                 cfg = NULL;
935         }
936
937         BEV_UNLOCK(bev);
938
939         return cfg;
940 }
941
942 /* Mostly you don't want to use this function from inside libevent;
943  * bufferevent_get_read_max_() is more likely what you want*/
944 ev_ssize_t
945 bufferevent_rate_limit_group_get_read_limit(
946         struct bufferevent_rate_limit_group *grp)
947 {
948         ev_ssize_t r;
949         LOCK_GROUP(grp);
950         r = grp->rate_limit.read_limit;
951         UNLOCK_GROUP(grp);
952         return r;
953 }
954
955 /* Mostly you don't want to use this function from inside libevent;
956  * bufferevent_get_write_max_() is more likely what you want. */
957 ev_ssize_t
958 bufferevent_rate_limit_group_get_write_limit(
959         struct bufferevent_rate_limit_group *grp)
960 {
961         ev_ssize_t r;
962         LOCK_GROUP(grp);
963         r = grp->rate_limit.write_limit;
964         UNLOCK_GROUP(grp);
965         return r;
966 }
967
968 int
969 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
970 {
971         int r = 0;
972         ev_ssize_t old_limit, new_limit;
973         struct bufferevent_private *bevp;
974         BEV_LOCK(bev);
975         bevp = BEV_UPCAST(bev);
976         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
977         old_limit = bevp->rate_limiting->limit.read_limit;
978
979         new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
980         if (old_limit > 0 && new_limit <= 0) {
981                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
982                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
983                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
984                         r = -1;
985         } else if (old_limit <= 0 && new_limit > 0) {
986                 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
987                         event_del(&bevp->rate_limiting->refill_bucket_event);
988                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
989         }
990
991         BEV_UNLOCK(bev);
992         return r;
993 }
994
995 int
996 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
997 {
998         /* XXXX this is mostly copy-and-paste from
999          * bufferevent_decrement_read_limit */
1000         int r = 0;
1001         ev_ssize_t old_limit, new_limit;
1002         struct bufferevent_private *bevp;
1003         BEV_LOCK(bev);
1004         bevp = BEV_UPCAST(bev);
1005         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1006         old_limit = bevp->rate_limiting->limit.write_limit;
1007
1008         new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1009         if (old_limit > 0 && new_limit <= 0) {
1010                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1011                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
1012                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
1013                         r = -1;
1014         } else if (old_limit <= 0 && new_limit > 0) {
1015                 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1016                         event_del(&bevp->rate_limiting->refill_bucket_event);
1017                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1018         }
1019
1020         BEV_UNLOCK(bev);
1021         return r;
1022 }
1023
1024 int
1025 bufferevent_rate_limit_group_decrement_read(
1026         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1027 {
1028         int r = 0;
1029         ev_ssize_t old_limit, new_limit;
1030         LOCK_GROUP(grp);
1031         old_limit = grp->rate_limit.read_limit;
1032         new_limit = (grp->rate_limit.read_limit -= decr);
1033
1034         if (old_limit > 0 && new_limit <= 0) {
1035                 bev_group_suspend_reading_(grp);
1036         } else if (old_limit <= 0 && new_limit > 0) {
1037                 bev_group_unsuspend_reading_(grp);
1038         }
1039
1040         UNLOCK_GROUP(grp);
1041         return r;
1042 }
1043
1044 int
1045 bufferevent_rate_limit_group_decrement_write(
1046         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1047 {
1048         int r = 0;
1049         ev_ssize_t old_limit, new_limit;
1050         LOCK_GROUP(grp);
1051         old_limit = grp->rate_limit.write_limit;
1052         new_limit = (grp->rate_limit.write_limit -= decr);
1053
1054         if (old_limit > 0 && new_limit <= 0) {
1055                 bev_group_suspend_writing_(grp);
1056         } else if (old_limit <= 0 && new_limit > 0) {
1057                 bev_group_unsuspend_writing_(grp);
1058         }
1059
1060         UNLOCK_GROUP(grp);
1061         return r;
1062 }
1063
1064 void
1065 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1066     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1067 {
1068         EVUTIL_ASSERT(grp != NULL);
1069         if (total_read_out)
1070                 *total_read_out = grp->total_read;
1071         if (total_written_out)
1072                 *total_written_out = grp->total_written;
1073 }
1074
1075 void
1076 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1077 {
1078         grp->total_read = grp->total_written = 0;
1079 }
1080
1081 int
1082 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1083 {
1084         bev->rate_limiting = NULL;
1085         bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1086         bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1087
1088         return 0;
1089 }