1 # -*- test-case-name: twisted.conch.test.test_recvline -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Tests for L{twisted.conch.recvline} and fixtures for testing related
12 from twisted.conch.insults import insults
13 from twisted.conch import recvline
15 from twisted.python import reflect, components
16 from twisted.internet import defer, error
17 from twisted.trial import unittest
18 from twisted.cred import portal
19 from twisted.test.proto_helpers import StringTransport
21 class Arrows(unittest.TestCase):
23 self.underlyingTransport = StringTransport()
24 self.pt = insults.ServerProtocol()
25 self.p = recvline.HistoricRecvLine()
26 self.pt.protocolFactory = lambda: self.p
27 self.pt.factory = self
28 self.pt.makeConnection(self.underlyingTransport)
29 # self.p.makeConnection(self.pt)
31 def test_printableCharacters(self):
33 When L{HistoricRecvLine} receives a printable character,
34 it adds it to the current line buffer.
36 self.p.keystrokeReceived('x', None)
37 self.p.keystrokeReceived('y', None)
38 self.p.keystrokeReceived('z', None)
40 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
42 def test_horizontalArrows(self):
44 When L{HistoricRecvLine} receives an LEFT_ARROW or
45 RIGHT_ARROW keystroke it moves the cursor left or right
46 in the current line buffer, respectively.
48 kR = lambda ch: self.p.keystrokeReceived(ch, None)
52 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
54 kR(self.pt.RIGHT_ARROW)
55 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
57 kR(self.pt.LEFT_ARROW)
58 self.assertEqual(self.p.currentLineBuffer(), ('xy', 'z'))
60 kR(self.pt.LEFT_ARROW)
61 self.assertEqual(self.p.currentLineBuffer(), ('x', 'yz'))
63 kR(self.pt.LEFT_ARROW)
64 self.assertEqual(self.p.currentLineBuffer(), ('', 'xyz'))
66 kR(self.pt.LEFT_ARROW)
67 self.assertEqual(self.p.currentLineBuffer(), ('', 'xyz'))
69 kR(self.pt.RIGHT_ARROW)
70 self.assertEqual(self.p.currentLineBuffer(), ('x', 'yz'))
72 kR(self.pt.RIGHT_ARROW)
73 self.assertEqual(self.p.currentLineBuffer(), ('xy', 'z'))
75 kR(self.pt.RIGHT_ARROW)
76 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
78 kR(self.pt.RIGHT_ARROW)
79 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
81 def test_newline(self):
83 When {HistoricRecvLine} receives a newline, it adds the current
84 line buffer to the end of its history buffer.
86 kR = lambda ch: self.p.keystrokeReceived(ch, None)
88 for ch in 'xyz\nabc\n123\n':
91 self.assertEqual(self.p.currentHistoryBuffer(),
92 (('xyz', 'abc', '123'), ()))
97 self.assertEqual(self.p.currentHistoryBuffer(),
98 (('xyz', 'abc', '123'), ()))
101 self.assertEqual(self.p.currentHistoryBuffer(),
102 (('xyz', 'abc', '123', 'cba'), ()))
104 def test_verticalArrows(self):
106 When L{HistoricRecvLine} receives UP_ARROW or DOWN_ARROW
107 keystrokes it move the current index in the current history
108 buffer up or down, and resets the current line buffer to the
109 previous or next line in history, respectively for each.
111 kR = lambda ch: self.p.keystrokeReceived(ch, None)
113 for ch in 'xyz\nabc\n123\n':
116 self.assertEqual(self.p.currentHistoryBuffer(),
117 (('xyz', 'abc', '123'), ()))
118 self.assertEqual(self.p.currentLineBuffer(), ('', ''))
121 self.assertEqual(self.p.currentHistoryBuffer(),
122 (('xyz', 'abc'), ('123',)))
123 self.assertEqual(self.p.currentLineBuffer(), ('123', ''))
126 self.assertEqual(self.p.currentHistoryBuffer(),
127 (('xyz',), ('abc', '123')))
128 self.assertEqual(self.p.currentLineBuffer(), ('abc', ''))
131 self.assertEqual(self.p.currentHistoryBuffer(),
132 ((), ('xyz', 'abc', '123')))
133 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
136 self.assertEqual(self.p.currentHistoryBuffer(),
137 ((), ('xyz', 'abc', '123')))
138 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
141 kR(self.pt.DOWN_ARROW)
142 self.assertEqual(self.p.currentHistoryBuffer(),
143 (('xyz', 'abc', '123'), ()))
147 When L{HistoricRecvLine} receives a HOME keystroke it moves the
148 cursor to the beginning of the current line buffer.
150 kR = lambda ch: self.p.keystrokeReceived(ch, None)
152 for ch in 'hello, world':
154 self.assertEqual(self.p.currentLineBuffer(), ('hello, world', ''))
157 self.assertEqual(self.p.currentLineBuffer(), ('', 'hello, world'))
161 When L{HistoricRecvLine} receives a END keystroke it moves the cursor
162 to the end of the current line buffer.
164 kR = lambda ch: self.p.keystrokeReceived(ch, None)
166 for ch in 'hello, world':
168 self.assertEqual(self.p.currentLineBuffer(), ('hello, world', ''))
172 self.assertEqual(self.p.currentLineBuffer(), ('hello, world', ''))
174 def test_backspace(self):
176 When L{HistoricRecvLine} receives a BACKSPACE keystroke it deletes
177 the character immediately before the cursor.
179 kR = lambda ch: self.p.keystrokeReceived(ch, None)
183 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
185 kR(self.pt.BACKSPACE)
186 self.assertEqual(self.p.currentLineBuffer(), ('xy', ''))
188 kR(self.pt.LEFT_ARROW)
189 kR(self.pt.BACKSPACE)
190 self.assertEqual(self.p.currentLineBuffer(), ('', 'y'))
192 kR(self.pt.BACKSPACE)
193 self.assertEqual(self.p.currentLineBuffer(), ('', 'y'))
195 def test_delete(self):
197 When L{HistoricRecvLine} receives a DELETE keystroke, it
198 delets the character immediately after the cursor.
200 kR = lambda ch: self.p.keystrokeReceived(ch, None)
204 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
207 self.assertEqual(self.p.currentLineBuffer(), ('xyz', ''))
209 kR(self.pt.LEFT_ARROW)
211 self.assertEqual(self.p.currentLineBuffer(), ('xy', ''))
213 kR(self.pt.LEFT_ARROW)
215 self.assertEqual(self.p.currentLineBuffer(), ('x', ''))
217 kR(self.pt.LEFT_ARROW)
219 self.assertEqual(self.p.currentLineBuffer(), ('', ''))
222 self.assertEqual(self.p.currentLineBuffer(), ('', ''))
224 def test_insert(self):
226 When not in INSERT mode, L{HistoricRecvLine} inserts the typed
227 character at the cursor before the next character.
229 kR = lambda ch: self.p.keystrokeReceived(ch, None)
234 kR(self.pt.LEFT_ARROW)
236 self.assertEqual(self.p.currentLineBuffer(), ('xyA', 'z'))
238 kR(self.pt.LEFT_ARROW)
240 self.assertEqual(self.p.currentLineBuffer(), ('xyB', 'Az'))
242 def test_typeover(self):
244 When in INSERT mode and upon receiving a keystroke with a printable
245 character, L{HistoricRecvLine} replaces the character at
246 the cursor with the typed character rather than inserting before.
247 Ah, the ironies of INSERT mode.
249 kR = lambda ch: self.p.keystrokeReceived(ch, None)
256 kR(self.pt.LEFT_ARROW)
258 self.assertEqual(self.p.currentLineBuffer(), ('xyA', ''))
260 kR(self.pt.LEFT_ARROW)
262 self.assertEqual(self.p.currentLineBuffer(), ('xyB', ''))
265 def test_unprintableCharacters(self):
267 When L{HistoricRecvLine} receives a keystroke for an unprintable
268 function key with no assigned behavior, the line buffer is unmodified.
270 kR = lambda ch: self.p.keystrokeReceived(ch, None)
273 for ch in (pt.F1, pt.F2, pt.F3, pt.F4, pt.F5, pt.F6, pt.F7, pt.F8,
274 pt.F9, pt.F10, pt.F11, pt.F12, pt.PGUP, pt.PGDN):
276 self.assertEqual(self.p.currentLineBuffer(), ('', ''))
279 from twisted.conch import telnet
280 from twisted.conch.insults import helper
281 from twisted.protocols import loopback
283 class EchoServer(recvline.HistoricRecvLine):
284 def lineReceived(self, line):
285 self.terminal.write(line + '\n' + self.ps[self.pn])
287 # An insults API for this would be nice.
298 from twisted.cred import checkers
301 from twisted.conch.ssh import userauth, transport, channel, connection, session
302 from twisted.conch.manhole_ssh import TerminalUser, TerminalSession, TerminalRealm, TerminalSessionTransport, ConchFactory
307 class SessionChannel(channel.SSHChannel):
310 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, width, height, *a, **kw):
311 channel.SSHChannel.__init__(self, *a, **kw)
313 self.protocolFactory = protocolFactory
314 self.protocolArgs = protocolArgs
315 self.protocolKwArgs = protocolKwArgs
320 def channelOpen(self, data):
321 term = session.packRequest_pty_req("vt102", (self.height, self.width, 0, 0), '')
322 self.conn.sendRequest(self, 'pty-req', term)
323 self.conn.sendRequest(self, 'shell', '')
325 self._protocolInstance = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
326 self._protocolInstance.factory = self
327 self._protocolInstance.makeConnection(self)
330 self._protocolInstance.connectionLost(error.ConnectionDone())
332 def dataReceived(self, data):
333 self._protocolInstance.dataReceived(data)
335 class TestConnection(connection.SSHConnection):
336 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, width, height, *a, **kw):
337 connection.SSHConnection.__init__(self, *a, **kw)
339 self.protocolFactory = protocolFactory
340 self.protocolArgs = protocolArgs
341 self.protocolKwArgs = protocolKwArgs
346 def serviceStarted(self):
347 self.__channel = SessionChannel(self.protocolFactory, self.protocolArgs, self.protocolKwArgs, self.width, self.height)
348 self.openChannel(self.__channel)
350 def write(self, bytes):
351 return self.__channel.write(bytes)
353 class TestAuth(userauth.SSHUserAuthClient):
354 def __init__(self, username, password, *a, **kw):
355 userauth.SSHUserAuthClient.__init__(self, username, *a, **kw)
356 self.password = password
358 def getPassword(self):
359 return defer.succeed(self.password)
361 class TestTransport(transport.SSHClientTransport):
362 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, username, password, width, height, *a, **kw):
363 # transport.SSHClientTransport.__init__(self, *a, **kw)
364 self.protocolFactory = protocolFactory
365 self.protocolArgs = protocolArgs
366 self.protocolKwArgs = protocolKwArgs
367 self.username = username
368 self.password = password
372 def verifyHostKey(self, hostKey, fingerprint):
373 return defer.succeed(True)
375 def connectionSecure(self):
376 self.__connection = TestConnection(self.protocolFactory, self.protocolArgs, self.protocolKwArgs, self.width, self.height)
378 TestAuth(self.username, self.password, self.__connection))
380 def write(self, bytes):
381 return self.__connection.write(bytes)
383 class TestSessionTransport(TerminalSessionTransport):
384 def protocolFactory(self):
385 return self.avatar.conn.transport.factory.serverProtocol()
387 class TestSession(TerminalSession):
388 transportFactory = TestSessionTransport
390 class TestUser(TerminalUser):
393 components.registerAdapter(TestSession, TestUser, session.ISession)
396 class LoopbackRelay(loopback.LoopbackRelay):
400 return "LoopbackRelay(%r)" % (self.target.__class__.__name__,)
402 def write(self, bytes):
403 loopback.LoopbackRelay.write(self, bytes)
404 if self.clearCall is not None:
405 self.clearCall.cancel()
407 from twisted.internet import reactor
408 self.clearCall = reactor.callLater(0, self._clearBuffer)
410 def _clearBuffer(self):
411 self.clearCall = None
412 loopback.LoopbackRelay.clearBuffer(self)
415 class NotifyingExpectableBuffer(helper.ExpectableBuffer):
417 self.onConnection = defer.Deferred()
418 self.onDisconnection = defer.Deferred()
420 def connectionMade(self):
421 helper.ExpectableBuffer.connectionMade(self)
422 self.onConnection.callback(self)
424 def connectionLost(self, reason):
425 self.onDisconnection.errback(reason)
432 def _assertBuffer(self, lines):
433 receivedLines = str(self.recvlineClient).splitlines()
434 expectedLines = lines + ([''] * (self.HEIGHT - len(lines) - 1))
435 self.assertEqual(len(receivedLines), len(expectedLines))
436 for i in range(len(receivedLines)):
438 receivedLines[i], expectedLines[i],
439 str(receivedLines[max(0, i-1):i+1]) +
441 str(expectedLines[max(0, i-1):i+1]))
443 def _trivialTest(self, input, output):
444 done = self.recvlineClient.expect("done")
446 self._testwrite(input)
449 self._assertBuffer(output)
451 return done.addCallback(finished)
454 class _SSHMixin(_BaseMixin):
457 raise unittest.SkipTest("Crypto requirements missing, can't run historic recvline tests over ssh")
459 u, p = 'testuser', 'testpass'
460 rlm = TerminalRealm()
461 rlm.userFactory = TestUser
462 rlm.chainedProtocolFactory = lambda: insultsServer
466 [checkers.InMemoryUsernamePasswordDatabaseDontUse(**{u: p})])
467 sshFactory = ConchFactory(ptl)
468 sshFactory.serverProtocol = self.serverProtocol
469 sshFactory.startFactory()
471 recvlineServer = self.serverProtocol()
472 insultsServer = insults.ServerProtocol(lambda: recvlineServer)
473 sshServer = sshFactory.buildProtocol(None)
474 clientTransport = LoopbackRelay(sshServer)
476 recvlineClient = NotifyingExpectableBuffer()
477 insultsClient = insults.ClientProtocol(lambda: recvlineClient)
478 sshClient = TestTransport(lambda: insultsClient, (), {}, u, p, self.WIDTH, self.HEIGHT)
479 serverTransport = LoopbackRelay(sshClient)
481 sshClient.makeConnection(clientTransport)
482 sshServer.makeConnection(serverTransport)
484 self.recvlineClient = recvlineClient
485 self.sshClient = sshClient
486 self.sshServer = sshServer
487 self.clientTransport = clientTransport
488 self.serverTransport = serverTransport
490 return recvlineClient.onConnection
492 def _testwrite(self, bytes):
493 self.sshClient.write(bytes)
495 from twisted.conch.test import test_telnet
497 class TestInsultsClientProtocol(insults.ClientProtocol,
498 test_telnet.TestProtocol):
502 class TestInsultsServerProtocol(insults.ServerProtocol,
503 test_telnet.TestProtocol):
506 class _TelnetMixin(_BaseMixin):
508 recvlineServer = self.serverProtocol()
509 insultsServer = TestInsultsServerProtocol(lambda: recvlineServer)
510 telnetServer = telnet.TelnetTransport(lambda: insultsServer)
511 clientTransport = LoopbackRelay(telnetServer)
513 recvlineClient = NotifyingExpectableBuffer()
514 insultsClient = TestInsultsClientProtocol(lambda: recvlineClient)
515 telnetClient = telnet.TelnetTransport(lambda: insultsClient)
516 serverTransport = LoopbackRelay(telnetClient)
518 telnetClient.makeConnection(clientTransport)
519 telnetServer.makeConnection(serverTransport)
521 serverTransport.clearBuffer()
522 clientTransport.clearBuffer()
524 self.recvlineClient = recvlineClient
525 self.telnetClient = telnetClient
526 self.clientTransport = clientTransport
527 self.serverTransport = serverTransport
529 return recvlineClient.onConnection
531 def _testwrite(self, bytes):
532 self.telnetClient.write(bytes)
535 from twisted.conch import stdio
539 class _StdioMixin(_BaseMixin):
541 # A memory-only terminal emulator, into which the server will
542 # write things and make other state changes. What ends up
543 # here is basically what a user would have seen on their
545 testTerminal = NotifyingExpectableBuffer()
547 # An insults client protocol which will translate bytes
548 # received from the child process into keystroke commands for
549 # an ITerminalProtocol.
550 insultsClient = insults.ClientProtocol(lambda: testTerminal)
552 # A process protocol which will translate stdout and stderr
553 # received from the child process to dataReceived calls and
554 # error reporting on an insults client protocol.
555 processClient = stdio.TerminalProcessProtocol(insultsClient)
557 # Run twisted/conch/stdio.py with the name of a class
558 # implementing ITerminalProtocol. This class will be used to
559 # handle bytes we send to the child process.
561 module = stdio.__file__
562 if module.endswith('.pyc') or module.endswith('.pyo'):
564 args = [exe, module, reflect.qual(self.serverProtocol)]
565 env = os.environ.copy()
566 env["PYTHONPATH"] = os.pathsep.join(sys.path)
568 from twisted.internet import reactor
569 clientTransport = reactor.spawnProcess(processClient, exe, args,
570 env=env, usePTY=True)
572 self.recvlineClient = self.testTerminal = testTerminal
573 self.processClient = processClient
574 self.clientTransport = clientTransport
576 # Wait for the process protocol and test terminal to become
577 # connected before proceeding. The former should always
578 # happen first, but it doesn't hurt to be safe.
579 return defer.gatherResults(filter(None, [
580 processClient.onConnection,
581 testTerminal.expect(">>> ")]))
584 # Kill the child process. We're done with it.
586 self.clientTransport.signalProcess("KILL")
587 except (error.ProcessExitedAlready, OSError):
590 failure.trap(error.ProcessTerminated)
591 self.assertEqual(failure.value.exitCode, None)
592 self.assertEqual(failure.value.status, 9)
593 return self.testTerminal.onDisconnection.addErrback(trap)
595 def _testwrite(self, bytes):
596 self.clientTransport.write(bytes)
598 class RecvlineLoopbackMixin:
599 serverProtocol = EchoServer
601 def testSimple(self):
602 return self._trivialTest(
608 def testLeftArrow(self):
609 return self._trivialTest(
610 insert + 'first line' + left * 4 + "xxxx\ndone",
615 def testRightArrow(self):
616 return self._trivialTest(
617 insert + 'right line' + left * 4 + right * 2 + "xx\ndone",
622 def testBackspace(self):
623 return self._trivialTest(
624 "second line" + backspace * 4 + "xxxx\ndone",
629 def testDelete(self):
630 return self._trivialTest(
631 "delete xxxx" + left * 4 + delete * 4 + "line\ndone",
636 def testInsert(self):
637 return self._trivialTest(
638 "third ine" + left * 3 + "l\ndone",
643 def testTypeover(self):
644 return self._trivialTest(
645 "fourth xine" + left * 4 + insert + "l\ndone",
651 return self._trivialTest(
652 insert + "blah line" + home + "home\ndone",
658 return self._trivialTest(
659 "end " + left * 4 + end + "line\ndone",
664 class RecvlineLoopbackTelnet(_TelnetMixin, unittest.TestCase, RecvlineLoopbackMixin):
667 class RecvlineLoopbackSSH(_SSHMixin, unittest.TestCase, RecvlineLoopbackMixin):
670 class RecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, RecvlineLoopbackMixin):
672 skip = "Terminal requirements missing, can't run recvline tests over stdio"
675 class HistoricRecvlineLoopbackMixin:
676 serverProtocol = EchoServer
678 def testUpArrow(self):
679 return self._trivialTest(
680 "first line\n" + up + "\ndone",
687 def testDownArrow(self):
688 return self._trivialTest(
689 "first line\nsecond line\n" + up * 2 + down + "\ndone",
698 class HistoricRecvlineLoopbackTelnet(_TelnetMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
701 class HistoricRecvlineLoopbackSSH(_SSHMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
704 class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
706 skip = "Terminal requirements missing, can't run historic recvline tests over stdio"