1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.web.twcgi}.
10 from twisted.trial import unittest
11 from twisted.internet import reactor, interfaces, error
12 from twisted.python import util, failure
13 from twisted.web.http import NOT_FOUND, INTERNAL_SERVER_ERROR
14 from twisted.web import client, twcgi, server, resource
15 from twisted.web.test._util import _render
16 from twisted.web.test.test_web import DummyRequest
24 DUAL_HEADER_CGI = '''\
31 SPECIAL_HEADER_CGI = '''\
32 print "Server: monkeys"
33 print "Date: last year"
39 # this is an example of a correctly-written CGI script which reads a body
40 # from stdin, which only reads env['CONTENT_LENGTH'] bytes.
44 body_length = int(os.environ.get('CONTENT_LENGTH',0))
45 indata = sys.stdin.read(body_length)
51 READALLINPUT_CGI = '''\
52 # this is an example of the typical (incorrect) CGI script which expects
53 # the server to close stdin when the body of the request is complete.
54 # A correct CGI should only read env['CONTENT_LENGTH'] bytes.
58 indata = sys.stdin.read()
61 print "readallinput ok"
64 NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI = '''\
65 print "content-type: text/cgi-duplicate-test"
70 class PythonScript(twcgi.FilteredScript):
71 filter = sys.executable
73 class CGI(unittest.TestCase):
75 Tests for L{twcgi.FilteredScript}.
78 if not interfaces.IReactorProcess.providedBy(reactor):
79 skip = "CGI tests require a functional reactor.spawnProcess()"
81 def startServer(self, cgi):
82 root = resource.Resource()
83 cgipath = util.sibpath(__file__, cgi)
84 root.putChild("cgi", PythonScript(cgipath))
85 site = server.Site(root)
86 self.p = reactor.listenTCP(0, site)
87 return self.p.getHost().port
91 return self.p.stopListening()
94 def writeCGI(self, source):
95 cgiFilename = os.path.abspath(self.mktemp())
96 cgiFile = file(cgiFilename, 'wt')
103 cgiFilename = self.writeCGI(DUMMY_CGI)
105 portnum = self.startServer(cgiFilename)
106 d = client.getPage("http://localhost:%d/cgi" % portnum)
107 d.addCallback(self._testCGI_1)
111 def _testCGI_1(self, res):
112 self.assertEqual(res, "cgi output" + os.linesep)
115 def test_protectedServerAndDate(self):
117 If the CGI script emits a I{Server} or I{Date} header, these are
120 cgiFilename = self.writeCGI(SPECIAL_HEADER_CGI)
122 portnum = self.startServer(cgiFilename)
123 url = "http://localhost:%d/cgi" % (portnum,)
124 factory = client.HTTPClientFactory(url)
125 reactor.connectTCP('localhost', portnum, factory)
126 def checkResponse(ignored):
127 self.assertNotIn('monkeys', factory.response_headers['server'])
128 self.assertNotIn('last year', factory.response_headers['date'])
129 factory.deferred.addCallback(checkResponse)
130 return factory.deferred
133 def test_noDuplicateContentTypeHeaders(self):
135 If the CGI script emits a I{content-type} header, make sure that the
136 server doesn't add an additional (duplicate) one, as per ticket 4786.
138 cgiFilename = self.writeCGI(NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI)
140 portnum = self.startServer(cgiFilename)
141 url = "http://localhost:%d/cgi" % (portnum,)
142 factory = client.HTTPClientFactory(url)
143 reactor.connectTCP('localhost', portnum, factory)
144 def checkResponse(ignored):
146 factory.response_headers['content-type'], ['text/cgi-duplicate-test'])
147 factory.deferred.addCallback(checkResponse)
148 return factory.deferred
151 def test_duplicateHeaderCGI(self):
153 If a CGI script emits two instances of the same header, both are sent in
156 cgiFilename = self.writeCGI(DUAL_HEADER_CGI)
158 portnum = self.startServer(cgiFilename)
159 url = "http://localhost:%d/cgi" % (portnum,)
160 factory = client.HTTPClientFactory(url)
161 reactor.connectTCP('localhost', portnum, factory)
162 def checkResponse(ignored):
164 factory.response_headers['header'], ['spam', 'eggs'])
165 factory.deferred.addCallback(checkResponse)
166 return factory.deferred
169 def testReadEmptyInput(self):
170 cgiFilename = os.path.abspath(self.mktemp())
171 cgiFile = file(cgiFilename, 'wt')
172 cgiFile.write(READINPUT_CGI)
175 portnum = self.startServer(cgiFilename)
176 d = client.getPage("http://localhost:%d/cgi" % portnum)
177 d.addCallback(self._testReadEmptyInput_1)
179 testReadEmptyInput.timeout = 5
180 def _testReadEmptyInput_1(self, res):
181 self.assertEqual(res, "readinput ok%s" % os.linesep)
183 def testReadInput(self):
184 cgiFilename = os.path.abspath(self.mktemp())
185 cgiFile = file(cgiFilename, 'wt')
186 cgiFile.write(READINPUT_CGI)
189 portnum = self.startServer(cgiFilename)
190 d = client.getPage("http://localhost:%d/cgi" % portnum,
192 postdata="Here is your stdin")
193 d.addCallback(self._testReadInput_1)
195 testReadInput.timeout = 5
196 def _testReadInput_1(self, res):
197 self.assertEqual(res, "readinput ok%s" % os.linesep)
200 def testReadAllInput(self):
201 cgiFilename = os.path.abspath(self.mktemp())
202 cgiFile = file(cgiFilename, 'wt')
203 cgiFile.write(READALLINPUT_CGI)
206 portnum = self.startServer(cgiFilename)
207 d = client.getPage("http://localhost:%d/cgi" % portnum,
209 postdata="Here is your stdin")
210 d.addCallback(self._testReadAllInput_1)
212 testReadAllInput.timeout = 5
213 def _testReadAllInput_1(self, res):
214 self.assertEqual(res, "readallinput ok%s" % os.linesep)
218 class CGIDirectoryTests(unittest.TestCase):
220 Tests for L{twcgi.CGIDirectory}.
222 def test_render(self):
224 L{twcgi.CGIDirectory.render} sets the HTTP response code to I{NOT
227 resource = twcgi.CGIDirectory(self.mktemp())
228 request = DummyRequest([''])
229 d = _render(resource, request)
230 def cbRendered(ignored):
231 self.assertEqual(request.responseCode, NOT_FOUND)
232 d.addCallback(cbRendered)
236 def test_notFoundChild(self):
238 L{twcgi.CGIDirectory.getChild} returns a resource which renders an
239 response with the HTTP I{NOT FOUND} status code if the indicated child
240 does not exist as an entry in the directory used to initialized the
241 L{twcgi.CGIDirectory}.
245 resource = twcgi.CGIDirectory(path)
246 request = DummyRequest(['foo'])
247 child = resource.getChild("foo", request)
248 d = _render(child, request)
249 def cbRendered(ignored):
250 self.assertEqual(request.responseCode, NOT_FOUND)
251 d.addCallback(cbRendered)
256 class CGIProcessProtocolTests(unittest.TestCase):
258 Tests for L{twcgi.CGIProcessProtocol}.
260 def test_prematureEndOfHeaders(self):
262 If the process communicating with L{CGIProcessProtocol} ends before
263 finishing writing out headers, the response has I{INTERNAL SERVER
264 ERROR} as its status code.
266 request = DummyRequest([''])
267 protocol = twcgi.CGIProcessProtocol(request)
268 protocol.processEnded(failure.Failure(error.ProcessTerminated()))
269 self.assertEqual(request.responseCode, INTERNAL_SERVER_ERROR)