Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / test / test_posixprocess.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for POSIX-based L{IReactorProcess} implementations.
6 """
7
8 import errno, os, sys
9
10 try:
11     import fcntl
12 except ImportError:
13     platformSkip = "non-POSIX platform"
14 else:
15     from twisted.internet import process
16     platformSkip = None
17
18 from twisted.trial.unittest import TestCase
19
20
21 class FakeFile(object):
22     """
23     A dummy file object which records when it is closed.
24     """
25     def __init__(self, testcase, fd):
26         self.testcase = testcase
27         self.fd = fd
28
29
30     def close(self):
31         self.testcase._files.remove(self.fd)
32
33
34
35 class FakeResourceModule(object):
36     """
37     Fake version of L{resource} which hard-codes a particular rlimit for maximum
38     open files.
39
40     @ivar _limit: The value to return for the hard limit of number of open files.
41     """
42     RLIMIT_NOFILE = 1
43
44     def __init__(self, limit):
45         self._limit = limit
46
47
48     def getrlimit(self, no):
49         """
50         A fake of L{resource.getrlimit} which returns a pre-determined result.
51         """
52         if no == self.RLIMIT_NOFILE:
53             return [0, self._limit]
54         return [123, 456]
55
56
57
58 class FDDetectorTests(TestCase):
59     """
60     Tests for _FDDetector class in twisted.internet.process, which detects
61     which function to drop in place for the _listOpenFDs method.
62
63     @ivar devfs: A flag indicating whether the filesystem fake will indicate
64         that /dev/fd exists.
65
66     @ivar accurateDevFDResults: A flag indicating whether the /dev/fd fake
67         returns accurate open file information.
68
69     @ivar procfs: A flag indicating whether the filesystem fake will indicate
70         that /proc/<pid>/fd exists.
71     """
72     skip = platformSkip
73
74     devfs = False
75     accurateDevFDResults = False
76
77     procfs = False
78
79     def getpid(self):
80         """
81         Fake os.getpid, always return the same thing
82         """
83         return 123
84
85
86     def listdir(self, arg):
87         """
88         Fake os.listdir, depending on what mode we're in to simulate behaviour.
89
90         @param arg: the directory to list
91         """
92         accurate = map(str, self._files)
93         if self.procfs and arg == ('/proc/%d/fd' % (self.getpid(),)):
94             return accurate
95         if self.devfs and arg == '/dev/fd':
96             if self.accurateDevFDResults:
97                 return accurate
98             return ["0", "1", "2"]
99         raise OSError()
100
101
102     def openfile(self, fname, mode):
103         """
104         This is a mock for L{open}.  It keeps track of opened files so extra
105         descriptors can be returned from the mock for L{os.listdir} when used on
106         one of the list-of-filedescriptors directories.
107
108         A L{FakeFile} is returned which can be closed to remove the new
109         descriptor from the open list.
110         """
111         # Find the smallest unused file descriptor and give it to the new file.
112         f = FakeFile(self, min(set(range(1024)) - set(self._files)))
113         self._files.append(f.fd)
114         return f
115
116
117     def hideResourceModule(self):
118         """
119         Make the L{resource} module unimportable for the remainder of the
120         current test method.
121         """
122         sys.modules['resource'] = None
123
124
125     def revealResourceModule(self, limit):
126         """
127         Make a L{FakeResourceModule} instance importable at the L{resource}
128         name.
129
130         @param limit: The value which will be returned for the hard limit of
131             number of open files by the fake resource module's C{getrlimit}
132             function.
133         """
134         sys.modules['resource'] = FakeResourceModule(limit)
135
136
137     def replaceResourceModule(self, value):
138         """
139         Restore the original resource module to L{sys.modules}.
140         """
141         if value is None:
142             try:
143                 del sys.modules['resource']
144             except KeyError:
145                 pass
146         else:
147             sys.modules['resource'] = value
148
149
150     def setUp(self):
151         """
152         Set up the tests, giving ourselves a detector object to play with and
153         setting up its testable knobs to refer to our mocked versions.
154         """
155         self.detector = process._FDDetector()
156         self.detector.listdir = self.listdir
157         self.detector.getpid = self.getpid
158         self.detector.openfile = self.openfile
159         self._files = [0, 1, 2]
160         self.addCleanup(
161             self.replaceResourceModule, sys.modules.get('resource'))
162
163
164     def test_selectFirstWorking(self):
165         """
166         L{FDDetector._getImplementation} returns the first method from its
167         C{_implementations} list which returns results which reflect a newly
168         opened file descriptor.
169         """
170         def failWithException():
171             raise ValueError("This does not work")
172
173         def failWithWrongResults():
174             return [0, 1, 2]
175
176         def correct():
177             return self._files[:]
178
179         self.detector._implementations = [
180             failWithException, failWithWrongResults, correct]
181
182         self.assertIdentical(correct, self.detector._getImplementation())
183
184
185     def test_selectLast(self):
186         """
187         L{FDDetector._getImplementation} returns the last method from its
188         C{_implementations} list if none of the implementations manage to return
189         results which reflect a newly opened file descriptor.
190         """
191         def failWithWrongResults():
192             return [3, 5, 9]
193
194         def failWithOtherWrongResults():
195             return [0, 1, 2]
196
197         self.detector._implementations = [
198             failWithWrongResults, failWithOtherWrongResults]
199
200         self.assertIdentical(
201             failWithOtherWrongResults, self.detector._getImplementation())
202
203
204     def test_identityOfListOpenFDsChanges(self):
205         """
206         Check that the identity of _listOpenFDs changes after running
207         _listOpenFDs the first time, but not after the second time it's run.
208
209         In other words, check that the monkey patching actually works.
210         """
211         # Create a new instance
212         detector = process._FDDetector()
213
214         first = detector._listOpenFDs.func_name
215         detector._listOpenFDs()
216         second = detector._listOpenFDs.func_name
217         detector._listOpenFDs()
218         third = detector._listOpenFDs.func_name
219
220         self.assertNotEqual(first, second)
221         self.assertEqual(second, third)
222
223
224     def test_devFDImplementation(self):
225         """
226         L{_FDDetector._devFDImplementation} raises L{OSError} if there is no
227         I{/dev/fd} directory, otherwise it returns the basenames of its children
228         interpreted as integers.
229         """
230         self.devfs = False
231         self.assertRaises(OSError, self.detector._devFDImplementation)
232         self.devfs = True
233         self.accurateDevFDResults = False
234         self.assertEqual([0, 1, 2], self.detector._devFDImplementation())
235
236
237     def test_procFDImplementation(self):
238         """
239         L{_FDDetector._procFDImplementation} raises L{OSError} if there is no
240         I{/proc/<pid>/fd} directory, otherwise it returns the basenames of its
241         children interpreted as integers.
242         """
243         self.procfs = False
244         self.assertRaises(OSError, self.detector._procFDImplementation)
245         self.procfs = True
246         self.assertEqual([0, 1, 2], self.detector._procFDImplementation())
247
248
249     def test_resourceFDImplementation(self):
250         """
251         L{_FDDetector._fallbackFDImplementation} uses the L{resource} module if
252         it is available, returning a range of integers from 0 to the the
253         minimum of C{1024} and the hard I{NOFILE} limit.
254         """
255         # When the resource module is here, use its value.
256         self.revealResourceModule(512)
257         self.assertEqual(
258             range(512), self.detector._fallbackFDImplementation())
259
260         # But limit its value to the arbitrarily selected value 1024.
261         self.revealResourceModule(2048)
262         self.assertEqual(
263             range(1024), self.detector._fallbackFDImplementation())
264
265
266     def test_fallbackFDImplementation(self):
267         """
268         L{_FDDetector._fallbackFDImplementation}, the implementation of last
269         resort, succeeds with a fixed range of integers from 0 to 1024 when the
270         L{resource} module is not importable.
271         """
272         self.hideResourceModule()
273         self.assertEqual(range(1024), self.detector._fallbackFDImplementation())
274
275
276
277 class FileDescriptorTests(TestCase):
278     """
279     Tests for L{twisted.internet.process._listOpenFDs}
280     """
281     skip = platformSkip
282
283     def test_openFDs(self):
284         """
285         File descriptors returned by L{_listOpenFDs} are mostly open.
286
287         This test assumes that zero-legth writes fail with EBADF on closed
288         file descriptors.
289         """
290         for fd in process._listOpenFDs():
291             try:
292                 fcntl.fcntl(fd, fcntl.F_GETFL)
293             except IOError, err:
294                 self.assertEqual(
295                     errno.EBADF, err.errno,
296                     "fcntl(%d, F_GETFL) failed with unexpected errno %d" % (
297                         fd, err.errno))
298
299
300     def test_expectedFDs(self):
301         """
302         L{_listOpenFDs} lists expected file descriptors.
303         """
304         # This is a tricky test.  A priori, there is no way to know what file
305         # descriptors are open now, so there is no way to know what _listOpenFDs
306         # should return.  Work around this by creating some new file descriptors
307         # which we can know the state of and then just making assertions about
308         # their presence or absence in the result.
309
310         # Expect a file we just opened to be listed.
311         f = file(os.devnull)
312         openfds = process._listOpenFDs()
313         self.assertIn(f.fileno(), openfds)
314
315         # Expect a file we just closed not to be listed - with a caveat.  The
316         # implementation may need to open a file to discover the result.  That
317         # open file descriptor will be allocated the same number as the one we
318         # just closed.  So, instead, create a hole in the file descriptor space
319         # to catch that internal descriptor and make the assertion about a
320         # different closed file descriptor.
321
322         # This gets allocated a file descriptor larger than f's, since nothing
323         # has been closed since we opened f.
324         fd = os.dup(f.fileno())
325
326         # But sanity check that; if it fails the test is invalid.
327         self.assertTrue(
328             fd > f.fileno(),
329             "Expected duplicate file descriptor to be greater than original")
330
331         try:
332             # Get rid of the original, creating the hole.  The copy should still
333             # be open, of course.
334             f.close()
335             self.assertIn(fd, process._listOpenFDs())
336         finally:
337             # Get rid of the copy now
338             os.close(fd)
339         # And it should not appear in the result.
340         self.assertNotIn(fd, process._listOpenFDs())