Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / conch / test / test_manhole.py
1 # -*- test-case-name: twisted.conch.test.test_manhole -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Tests for L{twisted.conch.manhole}.
7 """
8
9 import traceback
10
11 from twisted.trial import unittest
12 from twisted.internet import error, defer
13 from twisted.test.proto_helpers import StringTransport
14 from twisted.conch.test.test_recvline import _TelnetMixin, _SSHMixin, _StdioMixin, stdio, ssh
15 from twisted.conch import manhole
16 from twisted.conch.insults import insults
17
18
19 def determineDefaultFunctionName():
20     """
21     Return the string used by Python as the name for code objects which are
22     compiled from interactive input or at the top-level of modules.
23     """
24     try:
25         1 / 0
26     except:
27         # The last frame is this function.  The second to last frame is this
28         # function's caller, which is module-scope, which is what we want,
29         # so -2.
30         return traceback.extract_stack()[-2][2]
31 defaultFunctionName = determineDefaultFunctionName()
32
33
34
35 class ManholeInterpreterTests(unittest.TestCase):
36     """
37     Tests for L{manhole.ManholeInterpreter}.
38     """
39     def test_resetBuffer(self):
40         """
41         L{ManholeInterpreter.resetBuffer} should empty the input buffer.
42         """
43         interpreter = manhole.ManholeInterpreter(None)
44         interpreter.buffer.extend(["1", "2"])
45         interpreter.resetBuffer()
46         self.assertFalse(interpreter.buffer)
47
48
49
50 class ManholeProtocolTests(unittest.TestCase):
51     """
52     Tests for L{manhole.Manhole}.
53     """
54     def test_interruptResetsInterpreterBuffer(self):
55         """
56         L{manhole.Manhole.handle_INT} should cause the interpreter input buffer
57         to be reset.
58         """
59         transport = StringTransport()
60         terminal = insults.ServerProtocol(manhole.Manhole)
61         terminal.makeConnection(transport)
62         protocol = terminal.terminalProtocol
63         interpreter = protocol.interpreter
64         interpreter.buffer.extend(["1", "2"])
65         protocol.handle_INT()
66         self.assertFalse(interpreter.buffer)
67
68
69
70 class WriterTestCase(unittest.TestCase):
71     def testInteger(self):
72         manhole.lastColorizedLine("1")
73
74
75     def testDoubleQuoteString(self):
76         manhole.lastColorizedLine('"1"')
77
78
79     def testSingleQuoteString(self):
80         manhole.lastColorizedLine("'1'")
81
82
83     def testTripleSingleQuotedString(self):
84         manhole.lastColorizedLine("'''1'''")
85
86
87     def testTripleDoubleQuotedString(self):
88         manhole.lastColorizedLine('"""1"""')
89
90
91     def testFunctionDefinition(self):
92         manhole.lastColorizedLine("def foo():")
93
94
95     def testClassDefinition(self):
96         manhole.lastColorizedLine("class foo:")
97
98
99 class ManholeLoopbackMixin:
100     serverProtocol = manhole.ColoredManhole
101
102     def wfd(self, d):
103         return defer.waitForDeferred(d)
104
105     def testSimpleExpression(self):
106         done = self.recvlineClient.expect("done")
107
108         self._testwrite(
109             "1 + 1\n"
110             "done")
111
112         def finished(ign):
113             self._assertBuffer(
114                 [">>> 1 + 1",
115                  "2",
116                  ">>> done"])
117
118         return done.addCallback(finished)
119
120     def testTripleQuoteLineContinuation(self):
121         done = self.recvlineClient.expect("done")
122
123         self._testwrite(
124             "'''\n'''\n"
125             "done")
126
127         def finished(ign):
128             self._assertBuffer(
129                 [">>> '''",
130                  "... '''",
131                  "'\\n'",
132                  ">>> done"])
133
134         return done.addCallback(finished)
135
136     def testFunctionDefinition(self):
137         done = self.recvlineClient.expect("done")
138
139         self._testwrite(
140             "def foo(bar):\n"
141             "\tprint bar\n\n"
142             "foo(42)\n"
143             "done")
144
145         def finished(ign):
146             self._assertBuffer(
147                 [">>> def foo(bar):",
148                  "...     print bar",
149                  "... ",
150                  ">>> foo(42)",
151                  "42",
152                  ">>> done"])
153
154         return done.addCallback(finished)
155
156     def testClassDefinition(self):
157         done = self.recvlineClient.expect("done")
158
159         self._testwrite(
160             "class Foo:\n"
161             "\tdef bar(self):\n"
162             "\t\tprint 'Hello, world!'\n\n"
163             "Foo().bar()\n"
164             "done")
165
166         def finished(ign):
167             self._assertBuffer(
168                 [">>> class Foo:",
169                  "...     def bar(self):",
170                  "...         print 'Hello, world!'",
171                  "... ",
172                  ">>> Foo().bar()",
173                  "Hello, world!",
174                  ">>> done"])
175
176         return done.addCallback(finished)
177
178     def testException(self):
179         done = self.recvlineClient.expect("done")
180
181         self._testwrite(
182             "raise Exception('foo bar baz')\n"
183             "done")
184
185         def finished(ign):
186             self._assertBuffer(
187                 [">>> raise Exception('foo bar baz')",
188                  "Traceback (most recent call last):",
189                  '  File "<console>", line 1, in ' + defaultFunctionName,
190                  "Exception: foo bar baz",
191                  ">>> done"])
192
193         return done.addCallback(finished)
194
195     def testControlC(self):
196         done = self.recvlineClient.expect("done")
197
198         self._testwrite(
199             "cancelled line" + manhole.CTRL_C +
200             "done")
201
202         def finished(ign):
203             self._assertBuffer(
204                 [">>> cancelled line",
205                  "KeyboardInterrupt",
206                  ">>> done"])
207
208         return done.addCallback(finished)
209
210
211     def test_interruptDuringContinuation(self):
212         """
213         Sending ^C to Manhole while in a state where more input is required to
214         complete a statement should discard the entire ongoing statement and
215         reset the input prompt to the non-continuation prompt.
216         """
217         continuing = self.recvlineClient.expect("things")
218
219         self._testwrite("(\nthings")
220
221         def gotContinuation(ignored):
222             self._assertBuffer(
223                 [">>> (",
224                  "... things"])
225             interrupted = self.recvlineClient.expect(">>> ")
226             self._testwrite(manhole.CTRL_C)
227             return interrupted
228         continuing.addCallback(gotContinuation)
229
230         def gotInterruption(ignored):
231             self._assertBuffer(
232                 [">>> (",
233                  "... things",
234                  "KeyboardInterrupt",
235                  ">>> "])
236         continuing.addCallback(gotInterruption)
237         return continuing
238
239
240     def testControlBackslash(self):
241         self._testwrite("cancelled line")
242         partialLine = self.recvlineClient.expect("cancelled line")
243
244         def gotPartialLine(ign):
245             self._assertBuffer(
246                 [">>> cancelled line"])
247             self._testwrite(manhole.CTRL_BACKSLASH)
248
249             d = self.recvlineClient.onDisconnection
250             return self.assertFailure(d, error.ConnectionDone)
251
252         def gotClearedLine(ign):
253             self._assertBuffer(
254                 [""])
255
256         return partialLine.addCallback(gotPartialLine).addCallback(gotClearedLine)
257
258     def testControlD(self):
259         self._testwrite("1 + 1")
260         helloWorld = self.wfd(self.recvlineClient.expect(r"\+ 1"))
261         yield helloWorld
262         helloWorld.getResult()
263         self._assertBuffer([">>> 1 + 1"])
264
265         self._testwrite(manhole.CTRL_D + " + 1")
266         cleared = self.wfd(self.recvlineClient.expect(r"\+ 1"))
267         yield cleared
268         cleared.getResult()
269         self._assertBuffer([">>> 1 + 1 + 1"])
270
271         self._testwrite("\n")
272         printed = self.wfd(self.recvlineClient.expect("3\n>>> "))
273         yield printed
274         printed.getResult()
275
276         self._testwrite(manhole.CTRL_D)
277         d = self.recvlineClient.onDisconnection
278         disconnected = self.wfd(self.assertFailure(d, error.ConnectionDone))
279         yield disconnected
280         disconnected.getResult()
281     testControlD = defer.deferredGenerator(testControlD)
282
283
284     def testControlL(self):
285         """
286         CTRL+L is generally used as a redraw-screen command in terminal
287         applications.  Manhole doesn't currently respect this usage of it,
288         but it should at least do something reasonable in response to this
289         event (rather than, say, eating your face).
290         """
291         # Start off with a newline so that when we clear the display we can
292         # tell by looking for the missing first empty prompt line.
293         self._testwrite("\n1 + 1")
294         helloWorld = self.wfd(self.recvlineClient.expect(r"\+ 1"))
295         yield helloWorld
296         helloWorld.getResult()
297         self._assertBuffer([">>> ", ">>> 1 + 1"])
298
299         self._testwrite(manhole.CTRL_L + " + 1")
300         redrew = self.wfd(self.recvlineClient.expect(r"1 \+ 1 \+ 1"))
301         yield redrew
302         redrew.getResult()
303         self._assertBuffer([">>> 1 + 1 + 1"])
304     testControlL = defer.deferredGenerator(testControlL)
305
306
307     def test_controlA(self):
308         """
309         CTRL-A can be used as HOME - returning cursor to beginning of
310         current line buffer.
311         """
312         self._testwrite('rint "hello"' + '\x01' + 'p')
313         d = self.recvlineClient.expect('print "hello"')
314         def cb(ignore):
315             self._assertBuffer(['>>> print "hello"'])
316         return d.addCallback(cb)
317
318
319     def test_controlE(self):
320         """
321         CTRL-E can be used as END - setting cursor to end of current
322         line buffer.
323         """
324         self._testwrite('rint "hello' + '\x01' + 'p' + '\x05' + '"')
325         d = self.recvlineClient.expect('print "hello"')
326         def cb(ignore):
327             self._assertBuffer(['>>> print "hello"'])
328         return d.addCallback(cb)
329
330
331     def testDeferred(self):
332         self._testwrite(
333             "from twisted.internet import defer, reactor\n"
334             "d = defer.Deferred()\n"
335             "d\n")
336
337         deferred = self.wfd(self.recvlineClient.expect("<Deferred #0>"))
338         yield deferred
339         deferred.getResult()
340
341         self._testwrite(
342             "c = reactor.callLater(0.1, d.callback, 'Hi!')\n")
343         delayed = self.wfd(self.recvlineClient.expect(">>> "))
344         yield delayed
345         delayed.getResult()
346
347         called = self.wfd(self.recvlineClient.expect("Deferred #0 called back: 'Hi!'\n>>> "))
348         yield called
349         called.getResult()
350         self._assertBuffer(
351             [">>> from twisted.internet import defer, reactor",
352              ">>> d = defer.Deferred()",
353              ">>> d",
354              "<Deferred #0>",
355              ">>> c = reactor.callLater(0.1, d.callback, 'Hi!')",
356              "Deferred #0 called back: 'Hi!'",
357              ">>> "])
358
359     testDeferred = defer.deferredGenerator(testDeferred)
360
361 class ManholeLoopbackTelnet(_TelnetMixin, unittest.TestCase, ManholeLoopbackMixin):
362     pass
363
364 class ManholeLoopbackSSH(_SSHMixin, unittest.TestCase, ManholeLoopbackMixin):
365     if ssh is None:
366         skip = "Crypto requirements missing, can't run manhole tests over ssh"
367
368 class ManholeLoopbackStdio(_StdioMixin, unittest.TestCase, ManholeLoopbackMixin):
369     if stdio is None:
370         skip = "Terminal requirements missing, can't run manhole tests over stdio"
371     else:
372         serverProtocol = stdio.ConsoleManhole