Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / test_htb.py
1 # -*- Python -*-
2
3 __version__ = '$Revision: 1.3 $'[11:-2]
4
5 from twisted.trial import unittest
6 from twisted.protocols import htb
7
8 class DummyClock:
9     time = 0
10     def set(self, when):
11         self.time = when
12
13     def __call__(self):
14         return self.time
15
16 class SomeBucket(htb.Bucket):
17     maxburst = 100
18     rate = 2
19
20 class TestBucketBase(unittest.TestCase):
21     def setUp(self):
22         self._realTimeFunc = htb.time
23         self.clock = DummyClock()
24         htb.time = self.clock
25
26     def tearDown(self):
27         htb.time = self._realTimeFunc
28
29 class TestBucket(TestBucketBase):
30     def testBucketSize(self):
31         """Testing the size of the bucket."""
32         b = SomeBucket()
33         fit = b.add(1000)
34         self.assertEqual(100, fit)
35
36     def testBucketDrain(self):
37         """Testing the bucket's drain rate."""
38         b = SomeBucket()
39         fit = b.add(1000)
40         self.clock.set(10)
41         fit = b.add(1000)
42         self.assertEqual(20, fit)
43
44     def test_bucketEmpty(self):
45         """
46         L{htb.Bucket.drip} returns C{True} if the bucket is empty after that drip.
47         """
48         b = SomeBucket()
49         b.add(20)
50         self.clock.set(9)
51         empty = b.drip()
52         self.assertFalse(empty)
53         self.clock.set(10)
54         empty = b.drip()
55         self.assertTrue(empty)
56
57 class TestBucketNesting(TestBucketBase):
58     def setUp(self):
59         TestBucketBase.setUp(self)
60         self.parent = SomeBucket()
61         self.child1 = SomeBucket(self.parent)
62         self.child2 = SomeBucket(self.parent)
63
64     def testBucketParentSize(self):
65         # Use up most of the parent bucket.
66         self.child1.add(90)
67         fit = self.child2.add(90)
68         self.assertEqual(10, fit)
69
70     def testBucketParentRate(self):
71         # Make the parent bucket drain slower.
72         self.parent.rate = 1
73         # Fill both child1 and parent.
74         self.child1.add(100)
75         self.clock.set(10)
76         fit = self.child1.add(100)
77         # How much room was there?  The child bucket would have had 20,
78         # but the parent bucket only ten (so no, it wouldn't make too much
79         # sense to have a child bucket draining faster than its parent in a real
80         # application.)
81         self.assertEqual(10, fit)
82
83
84 # TODO: Test the Transport stuff?
85
86 from test_pcp import DummyConsumer
87
88 class ConsumerShaperTest(TestBucketBase):
89     def setUp(self):
90         TestBucketBase.setUp(self)
91         self.underlying = DummyConsumer()
92         self.bucket = SomeBucket()
93         self.shaped = htb.ShapedConsumer(self.underlying, self.bucket)
94
95     def testRate(self):
96         # Start off with a full bucket, so the burst-size dosen't factor in
97         # to the calculations.
98         delta_t = 10
99         self.bucket.add(100)
100         self.shaped.write("x" * 100)
101         self.clock.set(delta_t)
102         self.shaped.resumeProducing()
103         self.assertEqual(len(self.underlying.getvalue()),
104                              delta_t * self.bucket.rate)
105
106     def testBucketRefs(self):
107         self.assertEqual(self.bucket._refcount, 1)
108         self.shaped.stopProducing()
109         self.assertEqual(self.bucket._refcount, 0)