Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / protocols / htb.py
1 # -*- test-case-name: twisted.test.test_htb -*-
2 #
3 # Copyright (c) Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6
7 """Hierarchical Token Bucket traffic shaping.
8
9 Patterned after U{Martin Devera's Hierarchical Token Bucket traffic
10 shaper for the Linux kernel<http://luxik.cdi.cz/~devik/qos/htb/>}.
11
12 @seealso: U{HTB Linux queuing discipline manual - user guide
13   <http://luxik.cdi.cz/~devik/qos/htb/manual/userg.htm>}
14 @seealso: U{Token Bucket Filter in Linux Advanced Routing & Traffic Control
15     HOWTO<http://lartc.org/howto/lartc.qdisc.classless.html#AEN682>}
16 @author: Kevin Turner
17 """
18
19 from __future__ import nested_scopes
20
21 __version__ = '$Revision: 1.5 $'[11:-2]
22
23
24 # TODO: Investigate whether we should be using os.times()[-1] instead of
25 # time.time.  time.time, it has been pointed out, can go backwards.  Is
26 # the same true of os.times?
27 from time import time
28 from zope.interface import implements, Interface
29
30 from twisted.protocols import pcp
31
32
33 class Bucket:
34     """Token bucket, or something like it.
35
36     I can hold up to a certain number of tokens, and I drain over time.
37
38     @cvar maxburst: Size of the bucket, in bytes.  If None, the bucket is
39         never full.
40     @type maxburst: int
41     @cvar rate: Rate the bucket drains, in bytes per second.  If None,
42         the bucket drains instantaneously.
43     @type rate: int
44     """
45
46     maxburst = None
47     rate = None
48
49     _refcount = 0
50
51     def __init__(self, parentBucket=None):
52         self.content = 0
53         self.parentBucket=parentBucket
54         self.lastDrip = time()
55
56     def add(self, amount):
57         """Add tokens to me.
58
59         @param amount: A quanity of tokens to add.
60         @type amount: int
61
62         @returns: The number of tokens that fit.
63         @returntype: int
64         """
65         self.drip()
66         if self.maxburst is None:
67             allowable = amount
68         else:
69             allowable = min(amount, self.maxburst - self.content)
70
71         if self.parentBucket is not None:
72             allowable = self.parentBucket.add(allowable)
73         self.content += allowable
74         return allowable
75
76     def drip(self):
77         """
78         Let some of the bucket drain.
79
80         How much of the bucket drains depends on how long it has been
81         since I was last called.
82
83         @returns: C{True} if the bucket is empty after this drip.
84         @returntype: bool
85         """
86         if self.parentBucket is not None:
87             self.parentBucket.drip()
88
89         if self.rate is None:
90             self.content = 0
91         else:
92             now = time()
93             deltaT = now - self.lastDrip
94             self.content = long(max(0, self.content - deltaT * self.rate))
95             self.lastDrip = now
96         return self.content == 0
97
98
99 class IBucketFilter(Interface):
100     def getBucketFor(*somethings, **some_kw):
101         """I'll give you a bucket for something.
102
103         @returntype: L{Bucket}
104         """
105
106 class HierarchicalBucketFilter:
107     """I filter things into buckets, and I am nestable.
108
109     @cvar bucketFactory: Class of buckets to make.
110     @type bucketFactory: L{Bucket} class
111     @cvar sweepInterval: Seconds between sweeping out the bucket cache.
112     @type sweepInterval: int
113     """
114
115     implements(IBucketFilter)
116
117     bucketFactory = Bucket
118     sweepInterval = None
119
120     def __init__(self, parentFilter=None):
121         self.buckets = {}
122         self.parentFilter = parentFilter
123         self.lastSweep = time()
124
125     def getBucketFor(self, *a, **kw):
126         """You want a bucket for that?  I'll give you a bucket.
127
128         Any parameters are passed on to L{getBucketKey}, from them it
129         decides which bucket you get.
130
131         @returntype: L{Bucket}
132         """
133         if ((self.sweepInterval is not None)
134             and ((time() - self.lastSweep) > self.sweepInterval)):
135             self.sweep()
136
137         if self.parentFilter:
138             parentBucket = self.parentFilter.getBucketFor(self, *a, **kw)
139         else:
140             parentBucket = None
141
142         key = self.getBucketKey(*a, **kw)
143         bucket = self.buckets.get(key)
144         if bucket is None:
145             bucket = self.bucketFactory(parentBucket)
146             self.buckets[key] = bucket
147         return bucket
148
149     def getBucketKey(self, *a, **kw):
150         """I determine who gets which bucket.
151
152         Unless I'm overridden, everything gets the same bucket.
153
154         @returns: something to be used as a key in the bucket cache.
155         """
156         return None
157
158     def sweep(self):
159         """I throw away references to empty buckets."""
160         for key, bucket in self.buckets.items():
161             if (bucket._refcount == 0) and bucket.drip():
162                 del self.buckets[key]
163
164         self.lastSweep = time()
165
166
167 class FilterByHost(HierarchicalBucketFilter):
168     """A bucket filter with a bucket for each host.
169     """
170     sweepInterval = 60 * 20
171
172     def getBucketKey(self, transport):
173         return transport.getPeer()[1]
174
175
176 class FilterByServer(HierarchicalBucketFilter):
177     """A bucket filter with a bucket for each service.
178     """
179     sweepInterval = None
180
181     def getBucketKey(self, transport):
182         return transport.getHost()[2]
183
184
185 class ShapedConsumer(pcp.ProducerConsumerProxy):
186     """I wrap a Consumer and shape the rate at which it receives data.
187     """
188     # Providing a Pull interface means I don't have to try to schedule
189     # traffic with callLaters.
190     iAmStreaming = False
191
192     def __init__(self, consumer, bucket):
193         pcp.ProducerConsumerProxy.__init__(self, consumer)
194         self.bucket = bucket
195         self.bucket._refcount += 1
196
197     def _writeSomeData(self, data):
198         # In practice, this actually results in obscene amounts of
199         # overhead, as a result of generating lots and lots of packets
200         # with twelve-byte payloads.  We may need to do a version of
201         # this with scheduled writes after all.
202         amount = self.bucket.add(len(data))
203         return pcp.ProducerConsumerProxy._writeSomeData(self, data[:amount])
204
205     def stopProducing(self):
206         pcp.ProducerConsumerProxy.stopProducing(self)
207         self.bucket._refcount -= 1
208
209
210 class ShapedTransport(ShapedConsumer):
211     """I wrap a Transport and shape the rate at which it receives data.
212
213     I am a L{ShapedConsumer} with a little bit of magic to provide for
214     the case where the consumer I wrap is also a Transport and people
215     will be attempting to access attributes I do not proxy as a
216     Consumer (e.g. loseConnection).
217     """
218     # Ugh.  We only wanted to filter IConsumer, not ITransport.
219
220     iAmStreaming = False
221     def __getattr__(self, name):
222         # Because people will be doing things like .getPeer and
223         # .loseConnection on me.
224         return getattr(self.consumer, name)
225
226
227 class ShapedProtocolFactory:
228     """I dispense Protocols with traffic shaping on their transports.
229
230     Usage::
231
232         myserver = SomeFactory()
233         myserver.protocol = ShapedProtocolFactory(myserver.protocol,
234                                                   bucketFilter)
235
236     Where SomeServerFactory is a L{twisted.internet.protocol.Factory}, and
237     bucketFilter is an instance of L{HierarchicalBucketFilter}.
238     """
239     def __init__(self, protoClass, bucketFilter):
240         """Tell me what to wrap and where to get buckets.
241
242         @param protoClass: The class of Protocol I will generate
243           wrapped instances of.
244         @type protoClass: L{Protocol<twisted.internet.interfaces.IProtocol>}
245           class
246         @param bucketFilter: The filter which will determine how
247           traffic is shaped.
248         @type bucketFilter: L{HierarchicalBucketFilter}.
249         """
250         # More precisely, protoClass can be any callable that will return
251         # instances of something that implements IProtocol.
252         self.protocol = protoClass
253         self.bucketFilter = bucketFilter
254
255     def __call__(self, *a, **kw):
256         """Make a Protocol instance with a shaped transport.
257
258         Any parameters will be passed on to the protocol's initializer.
259
260         @returns: a Protocol instance with a L{ShapedTransport}.
261         """
262         proto = self.protocol(*a, **kw)
263         origMakeConnection = proto.makeConnection
264         def makeConnection(transport):
265             bucket = self.bucketFilter.getBucketFor(transport)
266             shapedTransport = ShapedTransport(transport, bucket)
267             return origMakeConnection(shapedTransport)
268         proto.makeConnection = makeConnection
269         return proto