1 # -*- test-case-name: twisted.conch.test.test_insults -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 VT102 and VT220 terminal manipulation.
11 from zope.interface import implements, Interface
13 from twisted.internet import protocol, defer, interfaces as iinternet
15 class ITerminalProtocol(Interface):
16 def makeConnection(transport):
17 """Called with an L{ITerminalTransport} when a connection is established.
20 def keystrokeReceived(keyID, modifier):
21 """A keystroke was received.
23 Each keystroke corresponds to one invocation of this method.
24 keyID is a string identifier for that key. Printable characters
25 are represented by themselves. Control keys, such as arrows and
26 function keys, are represented with symbolic constants on
30 def terminalSize(width, height):
31 """Called to indicate the size of the terminal.
33 A terminal of 80x24 should be assumed if this method is not
34 called. This method might not be called for real terminals.
37 def unhandledControlSequence(seq):
38 """Called when an unsupported control sequence is received.
41 @param seq: The whole control sequence which could not be interpreted.
44 def connectionLost(reason):
45 """Called when the connection has been lost.
47 reason is a Failure describing why.
50 class TerminalProtocol(object):
51 implements(ITerminalProtocol)
53 def makeConnection(self, terminal):
54 # assert ITerminalTransport.providedBy(transport), "TerminalProtocol.makeConnection must be passed an ITerminalTransport implementor"
55 self.terminal = terminal
58 def connectionMade(self):
59 """Called after a connection has been established.
62 def keystrokeReceived(self, keyID, modifier):
65 def terminalSize(self, width, height):
68 def unhandledControlSequence(self, seq):
71 def connectionLost(self, reason):
74 class ITerminalTransport(iinternet.ITransport):
76 """Move the cursor up n lines.
80 """Move the cursor down n lines.
83 def cursorForward(n=1):
84 """Move the cursor right n columns.
87 def cursorBackward(n=1):
88 """Move the cursor left n columns.
91 def cursorPosition(column, line):
92 """Move the cursor to the given line and column.
96 """Move the cursor home.
100 """Move the cursor down one line, performing scrolling if necessary.
104 """Move the cursor up one line, performing scrolling if necessary.
108 """Move the cursor to the first position on the next line, performing scrolling if necessary.
112 """Save the cursor position, character attribute, character set, and origin mode selection.
116 """Restore the previously saved cursor position, character attribute, character set, and origin mode selection.
118 If no cursor state was previously saved, move the cursor to the home position.
122 """Set the given modes on the terminal.
125 def resetModes(mode):
126 """Reset the given modes on the terminal.
130 def setPrivateModes(modes):
132 Set the given DEC private modes on the terminal.
136 def resetPrivateModes(modes):
138 Reset the given DEC private modes on the terminal.
142 def applicationKeypadMode():
143 """Cause keypad to generate control functions.
145 Cursor key mode selects the type of characters generated by cursor keys.
148 def numericKeypadMode():
149 """Cause keypad to generate normal characters.
152 def selectCharacterSet(charSet, which):
153 """Select a character set.
155 charSet should be one of CS_US, CS_UK, CS_DRAWING, CS_ALTERNATE, or
156 CS_ALTERNATE_SPECIAL.
158 which should be one of G0 or G1.
162 """Activate the G0 character set.
166 """Activate the G1 character set.
170 """Shift to the G2 character set for a single character.
174 """Shift to the G3 character set for a single character.
177 def selectGraphicRendition(*attributes):
178 """Enabled one or more character attributes.
180 Arguments should be one or more of UNDERLINE, REVERSE_VIDEO, BLINK, or BOLD.
181 NORMAL may also be specified to disable all character attributes.
184 def horizontalTabulationSet():
185 """Set a tab stop at the current cursor position.
188 def tabulationClear():
189 """Clear the tab stop at the current cursor position.
192 def tabulationClearAll():
193 """Clear all tab stops.
196 def doubleHeightLine(top=True):
197 """Make the current line the top or bottom half of a double-height, double-width line.
199 If top is True, the current line is the top half. Otherwise, it is the bottom half.
202 def singleWidthLine():
203 """Make the current line a single-width, single-height line.
206 def doubleWidthLine():
207 """Make the current line a double-width line.
210 def eraseToLineEnd():
211 """Erase from the cursor to the end of line, including cursor position.
214 def eraseToLineBeginning():
215 """Erase from the cursor to the beginning of the line, including the cursor position.
219 """Erase the entire cursor line.
222 def eraseToDisplayEnd():
223 """Erase from the cursor to the end of the display, including the cursor position.
226 def eraseToDisplayBeginning():
227 """Erase from the cursor to the beginning of the display, including the cursor position.
231 """Erase the entire display.
234 def deleteCharacter(n=1):
235 """Delete n characters starting at the cursor position.
237 Characters to the right of deleted characters are shifted to the left.
241 """Insert n lines at the cursor position.
243 Lines below the cursor are shifted down. Lines moved past the bottom margin are lost.
244 This command is ignored when the cursor is outside the scroll region.
248 """Delete n lines starting at the cursor position.
250 Lines below the cursor are shifted up. This command is ignored when the cursor is outside
254 def reportCursorPosition():
255 """Return a Deferred that fires with a two-tuple of (x, y) indicating the cursor position.
259 """Reset the terminal to its initial state.
262 def unhandledControlSequence(seq):
263 """Called when an unsupported control sequence is received.
266 @param seq: The whole control sequence which could not be interpreted.
274 """ECMA 48 standardized modes
277 # BREAKS YOPUR KEYBOARD MOFO
278 KEYBOARD_ACTION = KAM = 2
280 # When set, enables character insertion. New display characters
281 # move old display characters to the right. Characters moved past
282 # the right margin are lost.
284 # When reset, enables replacement mode (disables character
285 # insertion). New display characters replace old display
286 # characters at cursor position. The old character is erased.
287 INSERTION_REPLACEMENT = IRM = 4
289 # Set causes a received linefeed, form feed, or vertical tab to
290 # move cursor to first column of next line. RETURN transmits both
291 # a carriage return and linefeed. This selection is also called
294 # Reset causes a received linefeed, form feed, or vertical tab to
295 # move cursor to next line in current column. RETURN transmits a
297 LINEFEED_NEWLINE = LNM = 20
301 """ANSI-Compatible Private Modes
312 PRINTER_FORM_FEED = 18
315 # Toggle cursor visibility (reset hides it)
322 CS_DRAWING = 'CS_DRAWING'
323 CS_ALTERNATE = 'CS_ALTERNATE'
324 CS_ALTERNATE_SPECIAL = 'CS_ALTERNATE_SPECIAL'
326 # Groupings (or something?? These are like variables that can be bound to character sets)
330 # G2 and G3 cannot be changed, but they can be shifted to.
334 # Character attributes
343 def __init__(self, x, y):
348 file('log', 'a').write(str(s) + '\n')
350 # XXX TODO - These attributes are really part of the
351 # ITerminalTransport interface, I think.
352 _KEY_NAMES = ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
353 'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN', 'NUMPAD_MIDDLE',
354 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
357 'ALT', 'SHIFT', 'CONTROL')
359 class _const(object):
361 @ivar name: A string naming this constant
363 def __init__(self, name):
367 return '[' + self.name + ']'
371 _const(_name) for _name in _KEY_NAMES]
373 class ServerProtocol(protocol.Protocol):
374 implements(ITerminalTransport)
376 protocolFactory = None
377 terminalProtocol = None
387 termSize = Vector(80, 24)
388 cursorPos = Vector(0, 0)
391 # Factory who instantiated me
394 def __init__(self, protocolFactory=None, *a, **kw):
396 @param protocolFactory: A callable which will be invoked with
397 *a, **kw and should return an ITerminalProtocol implementor.
398 This will be invoked when a connection to this ServerProtocol
401 @param a: Any positional arguments to pass to protocolFactory.
402 @param kw: Any keyword arguments to pass to protocolFactory.
404 # assert protocolFactory is None or ITerminalProtocol.implementedBy(protocolFactory), "ServerProtocol.__init__ must be passed an ITerminalProtocol implementor"
405 if protocolFactory is not None:
406 self.protocolFactory = protocolFactory
407 self.protocolArgs = a
408 self.protocolKwArgs = kw
410 self._cursorReports = []
412 def connectionMade(self):
413 if self.protocolFactory is not None:
414 self.terminalProtocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
417 factory = self.factory
418 except AttributeError:
421 self.terminalProtocol.factory = factory
423 self.terminalProtocol.makeConnection(self)
425 def dataReceived(self, data):
427 if self.state == 'data':
429 self.state = 'escaped'
431 self.terminalProtocol.keystrokeReceived(ch, None)
432 elif self.state == 'escaped':
434 self.state = 'bracket-escaped'
437 self.state = 'low-function-escaped'
440 self._handleShortControlSequence(ch)
441 elif self.state == 'bracket-escaped':
443 self.state = 'low-function-escaped'
444 elif ch.isalpha() or ch == '~':
445 self._handleControlSequence(''.join(self.escBuf) + ch)
449 self.escBuf.append(ch)
450 elif self.state == 'low-function-escaped':
451 self._handleLowFunctionControlSequence(ch)
454 raise ValueError("Illegal state")
456 def _handleShortControlSequence(self, ch):
457 self.terminalProtocol.keystrokeReceived(ch, self.ALT)
459 def _handleControlSequence(self, buf):
461 f = getattr(self.controlSequenceParser, CST.get(buf[-1], buf[-1]), None)
463 self.unhandledControlSequence(buf)
465 f(self, self.terminalProtocol, buf[:-1])
467 def unhandledControlSequence(self, buf):
468 self.terminalProtocol.unhandledControlSequence(buf)
470 def _handleLowFunctionControlSequence(self, ch):
471 map = {'P': self.F1, 'Q': self.F2, 'R': self.F3, 'S': self.F4}
473 if keyID is not None:
474 self.terminalProtocol.keystrokeReceived(keyID, None)
476 self.terminalProtocol.unhandledControlSequence('\x1b[O' + ch)
478 class ControlSequenceParser:
479 def A(self, proto, handler, buf):
481 handler.keystrokeReceived(proto.UP_ARROW, None)
483 handler.unhandledControlSequence(buf + 'A')
485 def B(self, proto, handler, buf):
487 handler.keystrokeReceived(proto.DOWN_ARROW, None)
489 handler.unhandledControlSequence(buf + 'B')
491 def C(self, proto, handler, buf):
493 handler.keystrokeReceived(proto.RIGHT_ARROW, None)
495 handler.unhandledControlSequence(buf + 'C')
497 def D(self, proto, handler, buf):
499 handler.keystrokeReceived(proto.LEFT_ARROW, None)
501 handler.unhandledControlSequence(buf + 'D')
503 def E(self, proto, handler, buf):
505 handler.keystrokeReceived(proto.NUMPAD_MIDDLE, None)
507 handler.unhandledControlSequence(buf + 'E')
509 def F(self, proto, handler, buf):
511 handler.keystrokeReceived(proto.END, None)
513 handler.unhandledControlSequence(buf + 'F')
515 def H(self, proto, handler, buf):
517 handler.keystrokeReceived(proto.HOME, None)
519 handler.unhandledControlSequence(buf + 'H')
521 def R(self, proto, handler, buf):
522 if not proto._cursorReports:
523 handler.unhandledControlSequence(buf + 'R')
524 elif buf.startswith('\x1b['):
526 parts = report.split(';')
528 handler.unhandledControlSequence(buf + 'R')
532 Pl, Pc = int(Pl), int(Pc)
534 handler.unhandledControlSequence(buf + 'R')
536 d = proto._cursorReports.pop(0)
537 d.callback((Pc - 1, Pl - 1))
539 handler.unhandledControlSequence(buf + 'R')
541 def Z(self, proto, handler, buf):
543 handler.keystrokeReceived(proto.TAB, proto.SHIFT)
545 handler.unhandledControlSequence(buf + 'Z')
547 def tilde(self, proto, handler, buf):
548 map = {1: proto.HOME, 2: proto.INSERT, 3: proto.DELETE,
549 4: proto.END, 5: proto.PGUP, 6: proto.PGDN,
551 15: proto.F5, 17: proto.F6, 18: proto.F7,
552 19: proto.F8, 20: proto.F9, 21: proto.F10,
553 23: proto.F11, 24: proto.F12}
555 if buf.startswith('\x1b['):
560 handler.unhandledControlSequence(buf + '~')
562 symbolic = map.get(v)
563 if symbolic is not None:
564 handler.keystrokeReceived(map[v], None)
566 handler.unhandledControlSequence(buf + '~')
568 handler.unhandledControlSequence(buf + '~')
570 controlSequenceParser = ControlSequenceParser()
573 def cursorUp(self, n=1):
575 self.cursorPos.y = max(self.cursorPos.y - n, 0)
576 self.write('\x1b[%dA' % (n,))
578 def cursorDown(self, n=1):
580 self.cursorPos.y = min(self.cursorPos.y + n, self.termSize.y - 1)
581 self.write('\x1b[%dB' % (n,))
583 def cursorForward(self, n=1):
585 self.cursorPos.x = min(self.cursorPos.x + n, self.termSize.x - 1)
586 self.write('\x1b[%dC' % (n,))
588 def cursorBackward(self, n=1):
590 self.cursorPos.x = max(self.cursorPos.x - n, 0)
591 self.write('\x1b[%dD' % (n,))
593 def cursorPosition(self, column, line):
594 self.write('\x1b[%d;%dH' % (line + 1, column + 1))
596 def cursorHome(self):
597 self.cursorPos.x = self.cursorPos.y = 0
601 self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
604 def reverseIndex(self):
605 self.cursorPos.y = max(self.cursorPos.y - 1, 0)
610 self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
613 def saveCursor(self):
614 self._savedCursorPos = Vector(self.cursorPos.x, self.cursorPos.y)
617 def restoreCursor(self):
618 self.cursorPos = self._savedCursorPos
619 del self._savedCursorPos
622 def setModes(self, modes):
623 # XXX Support ANSI-Compatible private modes
624 self.write('\x1b[%sh' % (';'.join(map(str, modes)),))
626 def setPrivateModes(self, modes):
627 self.write('\x1b[?%sh' % (';'.join(map(str, modes)),))
629 def resetModes(self, modes):
630 # XXX Support ANSI-Compatible private modes
631 self.write('\x1b[%sl' % (';'.join(map(str, modes)),))
633 def resetPrivateModes(self, modes):
634 self.write('\x1b[?%sl' % (';'.join(map(str, modes)),))
636 def applicationKeypadMode(self):
639 def numericKeypadMode(self):
642 def selectCharacterSet(self, charSet, which):
643 # XXX Rewrite these as dict lookups
649 raise ValueError("`which' argument to selectCharacterSet must be G0 or G1")
652 elif charSet == CS_US:
654 elif charSet == CS_DRAWING:
656 elif charSet == CS_ALTERNATE:
658 elif charSet == CS_ALTERNATE_SPECIAL:
661 raise ValueError("Invalid `charSet' argument to selectCharacterSet")
662 self.write('\x1b' + which + charSet)
670 def singleShift2(self):
673 def singleShift3(self):
676 def selectGraphicRendition(self, *attributes):
680 self.write('\x1b[%sm' % (';'.join(attrs),))
682 def horizontalTabulationSet(self):
685 def tabulationClear(self):
688 def tabulationClearAll(self):
689 self.write('\x1b[3q')
691 def doubleHeightLine(self, top=True):
697 def singleWidthLine(self):
700 def doubleWidthLine(self):
703 def eraseToLineEnd(self):
706 def eraseToLineBeginning(self):
707 self.write('\x1b[1K')
710 self.write('\x1b[2K')
712 def eraseToDisplayEnd(self):
715 def eraseToDisplayBeginning(self):
716 self.write('\x1b[1J')
718 def eraseDisplay(self):
719 self.write('\x1b[2J')
721 def deleteCharacter(self, n=1):
722 self.write('\x1b[%dP' % (n,))
724 def insertLine(self, n=1):
725 self.write('\x1b[%dL' % (n,))
727 def deleteLine(self, n=1):
728 self.write('\x1b[%dM' % (n,))
730 def setScrollRegion(self, first=None, last=None):
731 if first is not None:
732 first = '%d' % (first,)
736 last = '%d' % (last,)
739 self.write('\x1b[%s;%sr' % (first, last))
741 def resetScrollRegion(self):
742 self.setScrollRegion()
744 def reportCursorPosition(self):
746 self._cursorReports.append(d)
747 self.write('\x1b[6n')
751 self.cursorPos.x = self.cursorPos.y = 0
753 del self._savedCursorPos
754 except AttributeError:
759 def write(self, bytes):
761 self.lastWrite = bytes
762 self.transport.write('\r\n'.join(bytes.split('\n')))
764 def writeSequence(self, bytes):
765 self.write(''.join(bytes))
767 def loseConnection(self):
769 self.transport.loseConnection()
771 def connectionLost(self, reason):
772 if self.terminalProtocol is not None:
774 self.terminalProtocol.connectionLost(reason)
776 self.terminalProtocol = None
777 # Add symbolic names for function keys
778 for name, const in zip(_KEY_NAMES, FUNCTION_KEYS):
779 setattr(ServerProtocol, name, const)
783 class ClientProtocol(protocol.Protocol):
785 terminalFactory = None
797 '8': 'restoreCursor',
798 '=': 'applicationKeypadMode',
799 '>': 'numericKeypadMode',
802 'H': 'horizontalTabulationSet',
806 '[': 'bracket-escape',
809 '#': 'select-height-width'}
816 '2': CS_ALTERNATE_SPECIAL}
818 # Factory who instantiated me
821 def __init__(self, terminalFactory=None, *a, **kw):
823 @param terminalFactory: A callable which will be invoked with
824 *a, **kw and should return an ITerminalTransport provider.
825 This will be invoked when this ClientProtocol establishes a
828 @param a: Any positional arguments to pass to terminalFactory.
829 @param kw: Any keyword arguments to pass to terminalFactory.
831 # assert terminalFactory is None or ITerminalTransport.implementedBy(terminalFactory), "ClientProtocol.__init__ must be passed an ITerminalTransport implementor"
832 if terminalFactory is not None:
833 self.terminalFactory = terminalFactory
834 self.terminalArgs = a
835 self.terminalKwArgs = kw
837 def connectionMade(self):
838 if self.terminalFactory is not None:
839 self.terminal = self.terminalFactory(*self.terminalArgs, **self.terminalKwArgs)
840 self.terminal.factory = self.factory
841 self.terminal.makeConnection(self)
843 def connectionLost(self, reason):
844 if self.terminal is not None:
846 self.terminal.connectionLost(reason)
850 def dataReceived(self, bytes):
852 Parse the given data from a terminal server, dispatching to event
853 handlers defined by C{self.terminal}.
857 if self.state == 'data':
860 self.terminal.write(''.join(toWrite))
862 self.state = 'escaped'
865 self.terminal.write(''.join(toWrite))
867 self.terminal.shiftOut()
870 self.terminal.write(''.join(toWrite))
872 self.terminal.shiftIn()
875 self.terminal.write(''.join(toWrite))
877 self.terminal.cursorBackward()
880 elif self.state == 'escaped':
881 fName = self._shorts.get(b)
882 if fName is not None:
884 getattr(self.terminal, fName)()
886 state = self._longs.get(b)
887 if state is not None:
890 self.terminal.unhandledControlSequence('\x1b' + b)
892 elif self.state == 'bracket-escape':
893 if self._escBuf is None:
895 if b.isalpha() or b == '~':
896 self._handleControlSequence(''.join(self._escBuf), b)
900 self._escBuf.append(b)
901 elif self.state == 'select-g0':
902 self.terminal.selectCharacterSet(self._charsets.get(b, b), G0)
904 elif self.state == 'select-g1':
905 self.terminal.selectCharacterSet(self._charsets.get(b, b), G1)
907 elif self.state == 'select-height-width':
908 self._handleHeightWidth(b)
911 raise ValueError("Illegal state")
913 self.terminal.write(''.join(toWrite))
916 def _handleControlSequence(self, buf, terminal):
917 f = getattr(self.controlSequenceParser, CST.get(terminal, terminal), None)
919 self.terminal.unhandledControlSequence('\x1b[' + buf + terminal)
921 f(self, self.terminal, buf)
923 class ControlSequenceParser:
924 def _makeSimple(ch, fName):
926 def simple(self, proto, handler, buf):
928 getattr(handler, n)(1)
933 handler.unhandledControlSequence('\x1b[' + buf + ch)
935 getattr(handler, n)(m)
937 for (ch, fName) in (('A', 'Up'),
941 exec ch + " = _makeSimple(ch, fName)"
944 def h(self, proto, handler, buf):
945 # XXX - Handle '?' to introduce ANSI-Compatible private modes.
947 modes = map(int, buf.split(';'))
949 handler.unhandledControlSequence('\x1b[' + buf + 'h')
951 handler.setModes(modes)
953 def l(self, proto, handler, buf):
954 # XXX - Handle '?' to introduce ANSI-Compatible private modes.
956 modes = map(int, buf.split(';'))
958 handler.unhandledControlSequence('\x1b[' + buf + 'l')
960 handler.resetModes(modes)
962 def r(self, proto, handler, buf):
963 parts = buf.split(';')
965 handler.setScrollRegion(None, None)
966 elif len(parts) == 2:
977 handler.unhandledControlSequence('\x1b[' + buf + 'r')
979 handler.setScrollRegion(pt, pb)
981 handler.unhandledControlSequence('\x1b[' + buf + 'r')
983 def K(self, proto, handler, buf):
985 handler.eraseToLineEnd()
987 handler.eraseToLineBeginning()
991 handler.unhandledControlSequence('\x1b[' + buf + 'K')
993 def H(self, proto, handler, buf):
996 def J(self, proto, handler, buf):
998 handler.eraseToDisplayEnd()
1000 handler.eraseToDisplayBeginning()
1002 handler.eraseDisplay()
1004 handler.unhandledControlSequence('\x1b[' + buf + 'J')
1006 def P(self, proto, handler, buf):
1008 handler.deleteCharacter(1)
1013 handler.unhandledControlSequence('\x1b[' + buf + 'P')
1015 handler.deleteCharacter(n)
1017 def L(self, proto, handler, buf):
1019 handler.insertLine(1)
1024 handler.unhandledControlSequence('\x1b[' + buf + 'L')
1026 handler.insertLine(n)
1028 def M(self, proto, handler, buf):
1030 handler.deleteLine(1)
1035 handler.unhandledControlSequence('\x1b[' + buf + 'M')
1037 handler.deleteLine(n)
1039 def n(self, proto, handler, buf):
1041 x, y = handler.reportCursorPosition()
1042 proto.transport.write('\x1b[%d;%dR' % (x + 1, y + 1))
1044 handler.unhandledControlSequence('\x1b[' + buf + 'n')
1046 def m(self, proto, handler, buf):
1048 handler.selectGraphicRendition(NORMAL)
1051 for a in buf.split(';'):
1057 handler.selectGraphicRendition(*attrs)
1059 controlSequenceParser = ControlSequenceParser()
1061 def _handleHeightWidth(self, b):
1063 self.terminal.doubleHeightLine(True)
1065 self.terminal.doubleHeightLine(False)
1067 self.terminal.singleWidthLine()
1069 self.terminal.doubleWidthLine()
1071 self.terminal.unhandledControlSequence('\x1b#' + b)
1076 'ITerminalProtocol', 'ITerminalTransport',
1078 # Symbolic constants
1079 'modes', 'privateModes', 'FUNCTION_KEYS',
1081 'CS_US', 'CS_UK', 'CS_DRAWING', 'CS_ALTERNATE', 'CS_ALTERNATE_SPECIAL',
1082 'G0', 'G1', 'G2', 'G3',
1084 'UNDERLINE', 'REVERSE_VIDEO', 'BLINK', 'BOLD', 'NORMAL',
1087 'ServerProtocol', 'ClientProtocol']