Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / test / test_iutils.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Test running processes with the APIs in L{twisted.internet.utils}.
6 """
7
8 import warnings, os, stat, sys, signal
9
10 from twisted.python.runtime import platform
11 from twisted.trial import unittest
12 from twisted.internet import error, reactor, utils, interfaces
13
14
15 class ProcessUtilsTests(unittest.TestCase):
16     """
17     Test running a process using L{getProcessOutput}, L{getProcessValue}, and
18     L{getProcessOutputAndValue}.
19     """
20
21     if interfaces.IReactorProcess(reactor, None) is None:
22         skip = "reactor doesn't implement IReactorProcess"
23
24     output = None
25     value = None
26     exe = sys.executable
27
28     def makeSourceFile(self, sourceLines):
29         """
30         Write the given list of lines to a text file and return the absolute
31         path to it.
32         """
33         script = self.mktemp()
34         scriptFile = file(script, 'wt')
35         scriptFile.write(os.linesep.join(sourceLines) + os.linesep)
36         scriptFile.close()
37         return os.path.abspath(script)
38
39
40     def test_output(self):
41         """
42         L{getProcessOutput} returns a L{Deferred} which fires with the complete
43         output of the process it runs after that process exits.
44         """
45         scriptFile = self.makeSourceFile([
46                 "import sys",
47                 "for s in 'hello world\\n':",
48                 "    sys.stdout.write(s)",
49                 "    sys.stdout.flush()"])
50         d = utils.getProcessOutput(self.exe, ['-u', scriptFile])
51         return d.addCallback(self.assertEqual, "hello world\n")
52
53
54     def test_outputWithErrorIgnored(self):
55         """
56         The L{Deferred} returned by L{getProcessOutput} is fired with an
57         L{IOError} L{Failure} if the child process writes to stderr.
58         """
59         # make sure stderr raises an error normally
60         scriptFile = self.makeSourceFile([
61             'import sys',
62             'sys.stderr.write("hello world\\n")'
63             ])
64
65         d = utils.getProcessOutput(self.exe, ['-u', scriptFile])
66         d = self.assertFailure(d, IOError)
67         def cbFailed(err):
68             return self.assertFailure(err.processEnded, error.ProcessDone)
69         d.addCallback(cbFailed)
70         return d
71
72
73     def test_outputWithErrorCollected(self):
74         """
75         If a C{True} value is supplied for the C{errortoo} parameter to
76         L{getProcessOutput}, the returned L{Deferred} fires with the child's
77         stderr output as well as its stdout output.
78         """
79         scriptFile = self.makeSourceFile([
80             'import sys',
81             # Write the same value to both because ordering isn't guaranteed so
82             # this simplifies the test.
83             'sys.stdout.write("foo")',
84             'sys.stdout.flush()',
85             'sys.stderr.write("foo")',
86             'sys.stderr.flush()'])
87
88         d = utils.getProcessOutput(self.exe, ['-u', scriptFile], errortoo=True)
89         return d.addCallback(self.assertEqual, "foofoo")
90
91
92     def test_value(self):
93         """
94         The L{Deferred} returned by L{getProcessValue} is fired with the exit
95         status of the child process.
96         """
97         scriptFile = self.makeSourceFile(["raise SystemExit(1)"])
98
99         d = utils.getProcessValue(self.exe, ['-u', scriptFile])
100         return d.addCallback(self.assertEqual, 1)
101
102
103     def test_outputAndValue(self):
104         """
105         The L{Deferred} returned by L{getProcessOutputAndValue} fires with a
106         three-tuple, the elements of which give the data written to the child's
107         stdout, the data written to the child's stderr, and the exit status of
108         the child.
109         """
110         exe = sys.executable
111         scriptFile = self.makeSourceFile([
112             "import sys",
113             "sys.stdout.write('hello world!\\n')",
114             "sys.stderr.write('goodbye world!\\n')",
115             "sys.exit(1)"
116             ])
117
118         def gotOutputAndValue((out, err, code)):
119             self.assertEqual(out, "hello world!\n")
120             self.assertEqual(err, "goodbye world!" + os.linesep)
121             self.assertEqual(code, 1)
122         d = utils.getProcessOutputAndValue(self.exe, ["-u", scriptFile])
123         return d.addCallback(gotOutputAndValue)
124
125
126     def test_outputSignal(self):
127         """
128         If the child process exits because of a signal, the L{Deferred}
129         returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple
130         containing the the child's stdout, stderr, and the signal which caused
131         it to exit.
132         """
133         # Use SIGKILL here because it's guaranteed to be delivered. Using
134         # SIGHUP might not work in, e.g., a buildbot slave run under the
135         # 'nohup' command.
136         scriptFile = self.makeSourceFile([
137             "import sys, os, signal",
138             "sys.stdout.write('stdout bytes\\n')",
139             "sys.stderr.write('stderr bytes\\n')",
140             "sys.stdout.flush()",
141             "sys.stderr.flush()",
142             "os.kill(os.getpid(), signal.SIGKILL)"])
143
144         def gotOutputAndValue((out, err, sig)):
145             self.assertEqual(out, "stdout bytes\n")
146             self.assertEqual(err, "stderr bytes\n")
147             self.assertEqual(sig, signal.SIGKILL)
148
149         d = utils.getProcessOutputAndValue(self.exe, ['-u', scriptFile])
150         d = self.assertFailure(d, tuple)
151         return d.addCallback(gotOutputAndValue)
152
153     if platform.isWindows():
154         test_outputSignal.skip = "Windows doesn't have real signals."
155
156
157     def _pathTest(self, utilFunc, check):
158         dir = os.path.abspath(self.mktemp())
159         os.makedirs(dir)
160         scriptFile = self.makeSourceFile([
161                 "import os, sys",
162                 "sys.stdout.write(os.getcwd())"])
163         d = utilFunc(self.exe, ['-u', scriptFile], path=dir)
164         d.addCallback(check, dir)
165         return d
166
167
168     def test_getProcessOutputPath(self):
169         """
170         L{getProcessOutput} runs the given command with the working directory
171         given by the C{path} parameter.
172         """
173         return self._pathTest(utils.getProcessOutput, self.assertEqual)
174
175
176     def test_getProcessValuePath(self):
177         """
178         L{getProcessValue} runs the given command with the working directory
179         given by the C{path} parameter.
180         """
181         def check(result, ignored):
182             self.assertEqual(result, 0)
183         return self._pathTest(utils.getProcessValue, check)
184
185
186     def test_getProcessOutputAndValuePath(self):
187         """
188         L{getProcessOutputAndValue} runs the given command with the working
189         directory given by the C{path} parameter.
190         """
191         def check((out, err, status), dir):
192             self.assertEqual(out, dir)
193             self.assertEqual(status, 0)
194         return self._pathTest(utils.getProcessOutputAndValue, check)
195
196
197     def _defaultPathTest(self, utilFunc, check):
198         # Make another directory to mess around with.
199         dir = os.path.abspath(self.mktemp())
200         os.makedirs(dir)
201
202         scriptFile = self.makeSourceFile([
203                 "import os, sys, stat",
204                 # Fix the permissions so we can report the working directory.
205                 # On OS X (and maybe elsewhere), os.getcwd() fails with EACCES
206                 # if +x is missing from the working directory.
207                 "os.chmod(%r, stat.S_IXUSR)" % (dir,),
208                 "sys.stdout.write(os.getcwd())"])
209
210         # Switch to it, but make sure we switch back
211         self.addCleanup(os.chdir, os.getcwd())
212         os.chdir(dir)
213
214         # Get rid of all its permissions, but make sure they get cleaned up
215         # later, because otherwise it might be hard to delete the trial
216         # temporary directory.
217         self.addCleanup(
218             os.chmod, dir, stat.S_IMODE(os.stat('.').st_mode))
219         os.chmod(dir, 0)
220
221         d = utilFunc(self.exe, ['-u', scriptFile])
222         d.addCallback(check, dir)
223         return d
224
225
226     def test_getProcessOutputDefaultPath(self):
227         """
228         If no value is supplied for the C{path} parameter, L{getProcessOutput}
229         runs the given command in the same working directory as the parent
230         process and succeeds even if the current working directory is not
231         accessible.
232         """
233         return self._defaultPathTest(utils.getProcessOutput, self.assertEqual)
234
235
236     def test_getProcessValueDefaultPath(self):
237         """
238         If no value is supplied for the C{path} parameter, L{getProcessValue}
239         runs the given command in the same working directory as the parent
240         process and succeeds even if the current working directory is not
241         accessible.
242         """
243         def check(result, ignored):
244             self.assertEqual(result, 0)
245         return self._defaultPathTest(utils.getProcessValue, check)
246
247
248     def test_getProcessOutputAndValueDefaultPath(self):
249         """
250         If no value is supplied for the C{path} parameter,
251         L{getProcessOutputAndValue} runs the given command in the same working
252         directory as the parent process and succeeds even if the current
253         working directory is not accessible.
254         """
255         def check((out, err, status), dir):
256             self.assertEqual(out, dir)
257             self.assertEqual(status, 0)
258         return self._defaultPathTest(
259             utils.getProcessOutputAndValue, check)
260
261
262
263 class WarningSuppression(unittest.TestCase):
264     def setUp(self):
265         self.warnings = []
266         self.originalshow = warnings.showwarning
267         warnings.showwarning = self.showwarning
268
269
270     def tearDown(self):
271         warnings.showwarning = self.originalshow
272
273
274     def showwarning(self, *a, **kw):
275         self.warnings.append((a, kw))
276
277
278     def testSuppressWarnings(self):
279         def f(msg):
280             warnings.warn(msg)
281         g = utils.suppressWarnings(f, (('ignore',), dict(message="This is message")))
282
283         # Start off with a sanity check - calling the original function
284         # should emit the warning.
285         f("Sanity check message")
286         self.assertEqual(len(self.warnings), 1)
287
288         # Now that that's out of the way, call the wrapped function, and
289         # make sure no new warnings show up.
290         g("This is message")
291         self.assertEqual(len(self.warnings), 1)
292
293         # Finally, emit another warning which should not be ignored, and
294         # make sure it is not.
295         g("Unignored message")
296         self.assertEqual(len(self.warnings), 2)