1 # -*- test-case-name: twisted.conch.test.test_helper -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Partial in-memory terminal emulator
13 from zope.interface import implements
15 from twisted.internet import defer, protocol, reactor
16 from twisted.python import log
18 from twisted.conch.insults import insults
22 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
24 class CharacterAttribute:
25 """Represents the attributes of a single character.
27 Character set, intensity, underlinedness, blinkitude, video
28 reversal, as well as foreground and background colors made up a
29 character's attributes.
31 def __init__(self, charset=insults.G0,
32 bold=False, underline=False,
33 blink=False, reverseVideo=False,
34 foreground=WHITE, background=BLACK,
37 self.charset = charset
39 self.underline = underline
41 self.reverseVideo = reverseVideo
42 self.foreground = foreground
43 self.background = background
45 self._subtracting = _subtracting
47 def __eq__(self, other):
48 return vars(self) == vars(other)
50 def __ne__(self, other):
51 return not self.__eq__(other)
55 c.__dict__.update(vars(self))
58 def wantOne(self, **kw):
60 if getattr(self, k) != v:
62 attr._subtracting = not v
69 # Spit out a vt102 control sequence that will set up
70 # all the attributes set here. Except charset.
75 attrs.append(insults.BOLD)
77 attrs.append(insults.UNDERLINE)
79 attrs.append(insults.BLINK)
81 attrs.append(insults.REVERSE_VIDEO)
82 if self.foreground != WHITE:
83 attrs.append(FOREGROUND + self.foreground)
84 if self.background != BLACK:
85 attrs.append(BACKGROUND + self.background)
87 return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
90 # XXX - need to support scroll regions and scroll history
91 class TerminalBuffer(protocol.Protocol):
93 An in-memory terminal emulator.
95 implements(insults.ITerminalTransport)
97 for keyID in ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
98 'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN',
99 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
100 'F10', 'F11', 'F12'):
101 exec '%s = object()' % (keyID,)
112 def getCharacter(self, x, y):
113 return self.lines[y][x]
115 def connectionMade(self):
118 def write(self, bytes):
120 Add the given printable bytes to the terminal.
122 Line feeds in C{bytes} will be replaced with carriage return / line
125 for b in bytes.replace('\n', '\r\n'):
126 self.insertAtCursor(b)
128 def _currentCharacterAttributes(self):
129 return CharacterAttribute(self.activeCharset, **self.graphicRendition)
131 def insertAtCursor(self, b):
133 Add one byte to the terminal at the cursor and make consequent state
136 If b is a carriage return, move the cursor to the beginning of the
139 If b is a line feed, move the cursor to the next row or scroll down if
140 the cursor is already in the last row.
142 Otherwise, if b is printable, put it at the cursor position (inserting
143 or overwriting as dictated by the current mode) and move the cursor.
149 elif b in string.printable:
150 if self.x >= self.width:
152 ch = (b, self._currentCharacterAttributes())
153 if self.modes.get(insults.modes.IRM):
154 self.lines[self.y][self.x:self.x] = [ch]
155 self.lines[self.y].pop()
157 self.lines[self.y][self.x] = ch
160 def _emptyLine(self, width):
161 return [(self.void, self._currentCharacterAttributes()) for i in xrange(width)]
163 def _scrollDown(self):
165 if self.y >= self.height:
168 self.lines.append(self._emptyLine(self.width))
175 self.lines.insert(0, self._emptyLine(self.width))
177 def cursorUp(self, n=1):
178 self.y = max(0, self.y - n)
180 def cursorDown(self, n=1):
181 self.y = min(self.height - 1, self.y + n)
183 def cursorBackward(self, n=1):
184 self.x = max(0, self.x - n)
186 def cursorForward(self, n=1):
187 self.x = min(self.width, self.x + n)
189 def cursorPosition(self, column, line):
193 def cursorHome(self):
200 def reverseIndex(self):
205 Update the cursor position attributes and scroll down if appropriate.
210 def saveCursor(self):
211 self._savedCursor = (self.x, self.y)
213 def restoreCursor(self):
214 self.x, self.y = self._savedCursor
215 del self._savedCursor
217 def setModes(self, modes):
221 def resetModes(self, modes):
229 def setPrivateModes(self, modes):
231 Enable the given modes.
233 Track which modes have been enabled so that the implementations of
234 other L{insults.ITerminalTransport} methods can be properly implemented
235 to respect these settings.
237 @see: L{resetPrivateModes}
238 @see: L{insults.ITerminalTransport.setPrivateModes}
241 self.privateModes[m] = True
244 def resetPrivateModes(self, modes):
246 Disable the given modes.
248 @see: L{setPrivateModes}
249 @see: L{insults.ITerminalTransport.resetPrivateModes}
253 del self.privateModes[m]
258 def applicationKeypadMode(self):
259 self.keypadMode = 'app'
261 def numericKeypadMode(self):
262 self.keypadMode = 'num'
264 def selectCharacterSet(self, charSet, which):
265 self.charsets[which] = charSet
268 self.activeCharset = insults.G0
271 self.activeCharset = insults.G1
273 def singleShift2(self):
274 oldActiveCharset = self.activeCharset
275 self.activeCharset = insults.G2
276 f = self.insertAtCursor
277 def insertAtCursor(b):
279 del self.insertAtCursor
280 self.activeCharset = oldActiveCharset
281 self.insertAtCursor = insertAtCursor
283 def singleShift3(self):
284 oldActiveCharset = self.activeCharset
285 self.activeCharset = insults.G3
286 f = self.insertAtCursor
287 def insertAtCursor(b):
289 del self.insertAtCursor
290 self.activeCharset = oldActiveCharset
291 self.insertAtCursor = insertAtCursor
293 def selectGraphicRendition(self, *attributes):
295 if a == insults.NORMAL:
296 self.graphicRendition = {
300 'reverseVideo': False,
303 elif a == insults.BOLD:
304 self.graphicRendition['bold'] = True
305 elif a == insults.UNDERLINE:
306 self.graphicRendition['underline'] = True
307 elif a == insults.BLINK:
308 self.graphicRendition['blink'] = True
309 elif a == insults.REVERSE_VIDEO:
310 self.graphicRendition['reverseVideo'] = True
315 log.msg("Unknown graphic rendition attribute: " + repr(a))
317 if FOREGROUND <= v <= FOREGROUND + N_COLORS:
318 self.graphicRendition['foreground'] = v - FOREGROUND
319 elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
320 self.graphicRendition['background'] = v - BACKGROUND
322 log.msg("Unknown graphic rendition attribute: " + repr(a))
325 self.lines[self.y] = self._emptyLine(self.width)
327 def eraseToLineEnd(self):
328 width = self.width - self.x
329 self.lines[self.y][self.x:] = self._emptyLine(width)
331 def eraseToLineBeginning(self):
332 self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
334 def eraseDisplay(self):
335 self.lines = [self._emptyLine(self.width) for i in xrange(self.height)]
337 def eraseToDisplayEnd(self):
338 self.eraseToLineEnd()
339 height = self.height - self.y - 1
340 self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
342 def eraseToDisplayBeginning(self):
343 self.eraseToLineBeginning()
344 self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
346 def deleteCharacter(self, n=1):
347 del self.lines[self.y][self.x:self.x+n]
348 self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
350 def insertLine(self, n=1):
351 self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
352 del self.lines[self.height:]
354 def deleteLine(self, n=1):
355 del self.lines[self.y:self.y+n]
356 self.lines.extend([self._emptyLine(self.width) for i in range(n)])
358 def reportCursorPosition(self):
359 return (self.x, self.y)
362 self.home = insults.Vector(0, 0)
365 self.privateModes = {}
366 self.setPrivateModes([insults.privateModes.AUTO_WRAP,
367 insults.privateModes.CURSOR_MODE])
368 self.numericKeypad = 'app'
369 self.activeCharset = insults.G0
370 self.graphicRendition = {
374 'reverseVideo': False,
378 insults.G0: insults.CS_US,
379 insults.G1: insults.CS_US,
380 insults.G2: insults.CS_ALTERNATE,
381 insults.G3: insults.CS_ALTERNATE_SPECIAL}
384 def unhandledControlSequence(self, buf):
385 print 'Could not handle', repr(buf)
393 if ch is not self.void:
397 buf.append(self.fill)
398 lines.append(''.join(buf[:length]))
399 return '\n'.join(lines)
401 class ExpectationTimeout(Exception):
404 class ExpectableBuffer(TerminalBuffer):
407 def connectionMade(self):
408 TerminalBuffer.connectionMade(self)
411 def write(self, bytes):
412 TerminalBuffer.write(self, bytes)
413 self._checkExpected()
415 def cursorHome(self):
416 TerminalBuffer.cursorHome(self)
419 def _timeoutExpected(self, d):
420 d.errback(ExpectationTimeout())
421 self._checkExpected()
423 def _checkExpected(self):
424 s = str(self)[self._mark:]
425 while self._expecting:
426 expr, timer, deferred = self._expecting[0]
427 if timer and not timer.active():
428 del self._expecting[0]
430 for match in expr.finditer(s):
433 del self._expecting[0]
434 self._mark += match.end()
436 deferred.callback(match)
441 def expect(self, expression, timeout=None, scheduler=reactor):
445 timer = scheduler.callLater(timeout, self._timeoutExpected, d)
446 self._expecting.append((re.compile(expression), timer, d))
447 self._checkExpected()
450 __all__ = ['CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']