Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / web / test / test_cgi.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.web.twcgi}.
6 """
7
8 import sys, os
9
10 from twisted.trial import unittest
11 from twisted.internet import reactor, interfaces, error
12 from twisted.python import util, failure
13 from twisted.web.http import NOT_FOUND, INTERNAL_SERVER_ERROR
14 from twisted.web import client, twcgi, server, resource
15 from twisted.web.test._util import _render
16 from twisted.web.test.test_web import DummyRequest
17
18 DUMMY_CGI = '''\
19 print "Header: OK"
20 print
21 print "cgi output"
22 '''
23
24 DUAL_HEADER_CGI = '''\
25 print "Header: spam"
26 print "Header: eggs"
27 print
28 print "cgi output"
29 '''
30
31 SPECIAL_HEADER_CGI = '''\
32 print "Server: monkeys"
33 print "Date: last year"
34 print
35 print "cgi output"
36 '''
37
38 READINPUT_CGI = '''\
39 # this is an example of a correctly-written CGI script which reads a body
40 # from stdin, which only reads env['CONTENT_LENGTH'] bytes.
41
42 import os, sys
43
44 body_length = int(os.environ.get('CONTENT_LENGTH',0))
45 indata = sys.stdin.read(body_length)
46 print "Header: OK"
47 print
48 print "readinput ok"
49 '''
50
51 READALLINPUT_CGI = '''\
52 # this is an example of the typical (incorrect) CGI script which expects
53 # the server to close stdin when the body of the request is complete.
54 # A correct CGI should only read env['CONTENT_LENGTH'] bytes.
55
56 import sys
57
58 indata = sys.stdin.read()
59 print "Header: OK"
60 print
61 print "readallinput ok"
62 '''
63
64 NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI = '''\
65 print "content-type: text/cgi-duplicate-test"
66 print
67 print "cgi output"
68 '''
69
70 class PythonScript(twcgi.FilteredScript):
71     filter = sys.executable
72
73 class CGI(unittest.TestCase):
74     """
75     Tests for L{twcgi.FilteredScript}.
76     """
77
78     if not interfaces.IReactorProcess.providedBy(reactor):
79         skip = "CGI tests require a functional reactor.spawnProcess()"
80
81     def startServer(self, cgi):
82         root = resource.Resource()
83         cgipath = util.sibpath(__file__, cgi)
84         root.putChild("cgi", PythonScript(cgipath))
85         site = server.Site(root)
86         self.p = reactor.listenTCP(0, site)
87         return self.p.getHost().port
88
89     def tearDown(self):
90         if self.p:
91             return self.p.stopListening()
92
93
94     def writeCGI(self, source):
95         cgiFilename = os.path.abspath(self.mktemp())
96         cgiFile = file(cgiFilename, 'wt')
97         cgiFile.write(source)
98         cgiFile.close()
99         return cgiFilename
100
101
102     def testCGI(self):
103         cgiFilename = self.writeCGI(DUMMY_CGI)
104
105         portnum = self.startServer(cgiFilename)
106         d = client.getPage("http://localhost:%d/cgi" % portnum)
107         d.addCallback(self._testCGI_1)
108         return d
109
110
111     def _testCGI_1(self, res):
112         self.assertEqual(res, "cgi output" + os.linesep)
113
114
115     def test_protectedServerAndDate(self):
116         """
117         If the CGI script emits a I{Server} or I{Date} header, these are
118         ignored.
119         """
120         cgiFilename = self.writeCGI(SPECIAL_HEADER_CGI)
121
122         portnum = self.startServer(cgiFilename)
123         url = "http://localhost:%d/cgi" % (portnum,)
124         factory = client.HTTPClientFactory(url)
125         reactor.connectTCP('localhost', portnum, factory)
126         def checkResponse(ignored):
127             self.assertNotIn('monkeys', factory.response_headers['server'])
128             self.assertNotIn('last year', factory.response_headers['date'])
129         factory.deferred.addCallback(checkResponse)
130         return factory.deferred
131
132
133     def test_noDuplicateContentTypeHeaders(self):
134         """
135         If the CGI script emits a I{content-type} header, make sure that the
136         server doesn't add an additional (duplicate) one, as per ticket 4786.
137         """
138         cgiFilename = self.writeCGI(NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI)
139
140         portnum = self.startServer(cgiFilename)
141         url = "http://localhost:%d/cgi" % (portnum,)
142         factory = client.HTTPClientFactory(url)
143         reactor.connectTCP('localhost', portnum, factory)
144         def checkResponse(ignored):
145             self.assertEqual(
146                 factory.response_headers['content-type'], ['text/cgi-duplicate-test'])
147         factory.deferred.addCallback(checkResponse)
148         return factory.deferred
149
150
151     def test_duplicateHeaderCGI(self):
152         """
153         If a CGI script emits two instances of the same header, both are sent in
154         the response.
155         """
156         cgiFilename = self.writeCGI(DUAL_HEADER_CGI)
157
158         portnum = self.startServer(cgiFilename)
159         url = "http://localhost:%d/cgi" % (portnum,)
160         factory = client.HTTPClientFactory(url)
161         reactor.connectTCP('localhost', portnum, factory)
162         def checkResponse(ignored):
163             self.assertEqual(
164                 factory.response_headers['header'], ['spam', 'eggs'])
165         factory.deferred.addCallback(checkResponse)
166         return factory.deferred
167
168
169     def testReadEmptyInput(self):
170         cgiFilename = os.path.abspath(self.mktemp())
171         cgiFile = file(cgiFilename, 'wt')
172         cgiFile.write(READINPUT_CGI)
173         cgiFile.close()
174
175         portnum = self.startServer(cgiFilename)
176         d = client.getPage("http://localhost:%d/cgi" % portnum)
177         d.addCallback(self._testReadEmptyInput_1)
178         return d
179     testReadEmptyInput.timeout = 5
180     def _testReadEmptyInput_1(self, res):
181         self.assertEqual(res, "readinput ok%s" % os.linesep)
182
183     def testReadInput(self):
184         cgiFilename = os.path.abspath(self.mktemp())
185         cgiFile = file(cgiFilename, 'wt')
186         cgiFile.write(READINPUT_CGI)
187         cgiFile.close()
188
189         portnum = self.startServer(cgiFilename)
190         d = client.getPage("http://localhost:%d/cgi" % portnum,
191                            method="POST",
192                            postdata="Here is your stdin")
193         d.addCallback(self._testReadInput_1)
194         return d
195     testReadInput.timeout = 5
196     def _testReadInput_1(self, res):
197         self.assertEqual(res, "readinput ok%s" % os.linesep)
198
199
200     def testReadAllInput(self):
201         cgiFilename = os.path.abspath(self.mktemp())
202         cgiFile = file(cgiFilename, 'wt')
203         cgiFile.write(READALLINPUT_CGI)
204         cgiFile.close()
205
206         portnum = self.startServer(cgiFilename)
207         d = client.getPage("http://localhost:%d/cgi" % portnum,
208                            method="POST",
209                            postdata="Here is your stdin")
210         d.addCallback(self._testReadAllInput_1)
211         return d
212     testReadAllInput.timeout = 5
213     def _testReadAllInput_1(self, res):
214         self.assertEqual(res, "readallinput ok%s" % os.linesep)
215
216
217
218 class CGIDirectoryTests(unittest.TestCase):
219     """
220     Tests for L{twcgi.CGIDirectory}.
221     """
222     def test_render(self):
223         """
224         L{twcgi.CGIDirectory.render} sets the HTTP response code to I{NOT
225         FOUND}.
226         """
227         resource = twcgi.CGIDirectory(self.mktemp())
228         request = DummyRequest([''])
229         d = _render(resource, request)
230         def cbRendered(ignored):
231             self.assertEqual(request.responseCode, NOT_FOUND)
232         d.addCallback(cbRendered)
233         return d
234
235
236     def test_notFoundChild(self):
237         """
238         L{twcgi.CGIDirectory.getChild} returns a resource which renders an
239         response with the HTTP I{NOT FOUND} status code if the indicated child
240         does not exist as an entry in the directory used to initialized the
241         L{twcgi.CGIDirectory}.
242         """
243         path = self.mktemp()
244         os.makedirs(path)
245         resource = twcgi.CGIDirectory(path)
246         request = DummyRequest(['foo'])
247         child = resource.getChild("foo", request)
248         d = _render(child, request)
249         def cbRendered(ignored):
250             self.assertEqual(request.responseCode, NOT_FOUND)
251         d.addCallback(cbRendered)
252         return d
253
254
255
256 class CGIProcessProtocolTests(unittest.TestCase):
257     """
258     Tests for L{twcgi.CGIProcessProtocol}.
259     """
260     def test_prematureEndOfHeaders(self):
261         """
262         If the process communicating with L{CGIProcessProtocol} ends before
263         finishing writing out headers, the response has I{INTERNAL SERVER
264         ERROR} as its status code.
265         """
266         request = DummyRequest([''])
267         protocol = twcgi.CGIProcessProtocol(request)
268         protocol.processEnded(failure.Failure(error.ProcessTerminated()))
269         self.assertEqual(request.responseCode, INTERNAL_SERVER_ERROR)
270