Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / conch / test / test_telnet.py
1 # -*- test-case-name: twisted.conch.test.test_telnet -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Tests for L{twisted.conch.telnet}.
7 """
8
9 from zope.interface import implements
10 from zope.interface.verify import verifyObject
11
12 from twisted.internet import defer
13
14 from twisted.conch import telnet
15
16 from twisted.trial import unittest
17 from twisted.test import proto_helpers
18
19
20 class TestProtocol:
21     implements(telnet.ITelnetProtocol)
22
23     localEnableable = ()
24     remoteEnableable = ()
25
26     def __init__(self):
27         self.bytes = ''
28         self.subcmd = ''
29         self.calls = []
30
31         self.enabledLocal = []
32         self.enabledRemote = []
33         self.disabledLocal = []
34         self.disabledRemote = []
35
36     def makeConnection(self, transport):
37         d = transport.negotiationMap = {}
38         d['\x12'] = self.neg_TEST_COMMAND
39
40         d = transport.commandMap = transport.commandMap.copy()
41         for cmd in ('NOP', 'DM', 'BRK', 'IP', 'AO', 'AYT', 'EC', 'EL', 'GA'):
42             d[getattr(telnet, cmd)] = lambda arg, cmd=cmd: self.calls.append(cmd)
43
44     def dataReceived(self, bytes):
45         self.bytes += bytes
46
47     def connectionLost(self, reason):
48         pass
49
50     def neg_TEST_COMMAND(self, payload):
51         self.subcmd = payload
52
53     def enableLocal(self, option):
54         if option in self.localEnableable:
55             self.enabledLocal.append(option)
56             return True
57         return False
58
59     def disableLocal(self, option):
60         self.disabledLocal.append(option)
61
62     def enableRemote(self, option):
63         if option in self.remoteEnableable:
64             self.enabledRemote.append(option)
65             return True
66         return False
67
68     def disableRemote(self, option):
69         self.disabledRemote.append(option)
70
71
72
73 class TestInterfaces(unittest.TestCase):
74     def test_interface(self):
75         """
76         L{telnet.TelnetProtocol} implements L{telnet.ITelnetProtocol}
77         """
78         p = telnet.TelnetProtocol()
79         verifyObject(telnet.ITelnetProtocol, p)
80
81
82
83 class TelnetTransportTestCase(unittest.TestCase):
84     """
85     Tests for L{telnet.TelnetTransport}.
86     """
87     def setUp(self):
88         self.p = telnet.TelnetTransport(TestProtocol)
89         self.t = proto_helpers.StringTransport()
90         self.p.makeConnection(self.t)
91
92     def testRegularBytes(self):
93         # Just send a bunch of bytes.  None of these do anything
94         # with telnet.  They should pass right through to the
95         # application layer.
96         h = self.p.protocol
97
98         L = ["here are some bytes la la la",
99              "some more arrive here",
100              "lots of bytes to play with",
101              "la la la",
102              "ta de da",
103              "dum"]
104         for b in L:
105             self.p.dataReceived(b)
106
107         self.assertEqual(h.bytes, ''.join(L))
108
109     def testNewlineHandling(self):
110         # Send various kinds of newlines and make sure they get translated
111         # into \n.
112         h = self.p.protocol
113
114         L = ["here is the first line\r\n",
115              "here is the second line\r\0",
116              "here is the third line\r\n",
117              "here is the last line\r\0"]
118
119         for b in L:
120             self.p.dataReceived(b)
121
122         self.assertEqual(h.bytes, L[0][:-2] + '\n' +
123                           L[1][:-2] + '\r' +
124                           L[2][:-2] + '\n' +
125                           L[3][:-2] + '\r')
126
127     def testIACEscape(self):
128         # Send a bunch of bytes and a couple quoted \xFFs.  Unquoted,
129         # \xFF is a telnet command.  Quoted, one of them from each pair
130         # should be passed through to the application layer.
131         h = self.p.protocol
132
133         L = ["here are some bytes\xff\xff with an embedded IAC",
134              "and here is a test of a border escape\xff",
135              "\xff did you get that IAC?"]
136
137         for b in L:
138             self.p.dataReceived(b)
139
140         self.assertEqual(h.bytes, ''.join(L).replace('\xff\xff', '\xff'))
141
142     def _simpleCommandTest(self, cmdName):
143         # Send a single simple telnet command and make sure
144         # it gets noticed and the appropriate method gets
145         # called.
146         h = self.p.protocol
147
148         cmd = telnet.IAC + getattr(telnet, cmdName)
149         L = ["Here's some bytes, tra la la",
150              "But ono!" + cmd + " an interrupt"]
151
152         for b in L:
153             self.p.dataReceived(b)
154
155         self.assertEqual(h.calls, [cmdName])
156         self.assertEqual(h.bytes, ''.join(L).replace(cmd, ''))
157
158     def testInterrupt(self):
159         self._simpleCommandTest("IP")
160
161     def testNoOperation(self):
162         self._simpleCommandTest("NOP")
163
164     def testDataMark(self):
165         self._simpleCommandTest("DM")
166
167     def testBreak(self):
168         self._simpleCommandTest("BRK")
169
170     def testAbortOutput(self):
171         self._simpleCommandTest("AO")
172
173     def testAreYouThere(self):
174         self._simpleCommandTest("AYT")
175
176     def testEraseCharacter(self):
177         self._simpleCommandTest("EC")
178
179     def testEraseLine(self):
180         self._simpleCommandTest("EL")
181
182     def testGoAhead(self):
183         self._simpleCommandTest("GA")
184
185     def testSubnegotiation(self):
186         # Send a subnegotiation command and make sure it gets
187         # parsed and that the correct method is called.
188         h = self.p.protocol
189
190         cmd = telnet.IAC + telnet.SB + '\x12hello world' + telnet.IAC + telnet.SE
191         L = ["These are some bytes but soon" + cmd,
192              "there will be some more"]
193
194         for b in L:
195             self.p.dataReceived(b)
196
197         self.assertEqual(h.bytes, ''.join(L).replace(cmd, ''))
198         self.assertEqual(h.subcmd, list("hello world"))
199
200     def testSubnegotiationWithEmbeddedSE(self):
201         # Send a subnegotiation command with an embedded SE.  Make sure
202         # that SE gets passed to the correct method.
203         h = self.p.protocol
204
205         cmd = (telnet.IAC + telnet.SB +
206                '\x12' + telnet.SE +
207                telnet.IAC + telnet.SE)
208
209         L = ["Some bytes are here" + cmd + "and here",
210              "and here"]
211
212         for b in L:
213             self.p.dataReceived(b)
214
215         self.assertEqual(h.bytes, ''.join(L).replace(cmd, ''))
216         self.assertEqual(h.subcmd, [telnet.SE])
217
218     def testBoundarySubnegotiation(self):
219         # Send a subnegotiation command.  Split it at every possible byte boundary
220         # and make sure it always gets parsed and that it is passed to the correct
221         # method.
222         cmd = (telnet.IAC + telnet.SB +
223                '\x12' + telnet.SE + 'hello' +
224                telnet.IAC + telnet.SE)
225
226         for i in range(len(cmd)):
227             h = self.p.protocol = TestProtocol()
228             h.makeConnection(self.p)
229
230             a, b = cmd[:i], cmd[i:]
231             L = ["first part" + a,
232                  b + "last part"]
233
234             for bytes in L:
235                 self.p.dataReceived(bytes)
236
237             self.assertEqual(h.bytes, ''.join(L).replace(cmd, ''))
238             self.assertEqual(h.subcmd, [telnet.SE] + list('hello'))
239
240     def _enabledHelper(self, o, eL=[], eR=[], dL=[], dR=[]):
241         self.assertEqual(o.enabledLocal, eL)
242         self.assertEqual(o.enabledRemote, eR)
243         self.assertEqual(o.disabledLocal, dL)
244         self.assertEqual(o.disabledRemote, dR)
245
246     def testRefuseWill(self):
247         # Try to enable an option.  The server should refuse to enable it.
248         cmd = telnet.IAC + telnet.WILL + '\x12'
249
250         bytes = "surrounding bytes" + cmd + "to spice things up"
251         self.p.dataReceived(bytes)
252
253         self.assertEqual(self.p.protocol.bytes, bytes.replace(cmd, ''))
254         self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + '\x12')
255         self._enabledHelper(self.p.protocol)
256
257     def testRefuseDo(self):
258         # Try to enable an option.  The server should refuse to enable it.
259         cmd = telnet.IAC + telnet.DO + '\x12'
260
261         bytes = "surrounding bytes" + cmd + "to spice things up"
262         self.p.dataReceived(bytes)
263
264         self.assertEqual(self.p.protocol.bytes, bytes.replace(cmd, ''))
265         self.assertEqual(self.t.value(), telnet.IAC + telnet.WONT + '\x12')
266         self._enabledHelper(self.p.protocol)
267
268     def testAcceptDo(self):
269         # Try to enable an option.  The option is in our allowEnable
270         # list, so we will allow it to be enabled.
271         cmd = telnet.IAC + telnet.DO + '\x19'
272         bytes = 'padding' + cmd + 'trailer'
273
274         h = self.p.protocol
275         h.localEnableable = ('\x19',)
276         self.p.dataReceived(bytes)
277
278         self.assertEqual(self.t.value(), telnet.IAC + telnet.WILL + '\x19')
279         self._enabledHelper(h, eL=['\x19'])
280
281     def testAcceptWill(self):
282         # Same as testAcceptDo, but reversed.
283         cmd = telnet.IAC + telnet.WILL + '\x91'
284         bytes = 'header' + cmd + 'padding'
285
286         h = self.p.protocol
287         h.remoteEnableable = ('\x91',)
288         self.p.dataReceived(bytes)
289
290         self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + '\x91')
291         self._enabledHelper(h, eR=['\x91'])
292
293     def testAcceptWont(self):
294         # Try to disable an option.  The server must allow any option to
295         # be disabled at any time.  Make sure it disables it and sends
296         # back an acknowledgement of this.
297         cmd = telnet.IAC + telnet.WONT + '\x29'
298
299         # Jimmy it - after these two lines, the server will be in a state
300         # such that it believes the option to have been previously enabled
301         # via normal negotiation.
302         s = self.p.getOptionState('\x29')
303         s.him.state = 'yes'
304
305         bytes = "fiddle dee" + cmd
306         self.p.dataReceived(bytes)
307
308         self.assertEqual(self.p.protocol.bytes, bytes.replace(cmd, ''))
309         self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + '\x29')
310         self.assertEqual(s.him.state, 'no')
311         self._enabledHelper(self.p.protocol, dR=['\x29'])
312
313     def testAcceptDont(self):
314         # Try to disable an option.  The server must allow any option to
315         # be disabled at any time.  Make sure it disables it and sends
316         # back an acknowledgement of this.
317         cmd = telnet.IAC + telnet.DONT + '\x29'
318
319         # Jimmy it - after these two lines, the server will be in a state
320         # such that it believes the option to have beenp previously enabled
321         # via normal negotiation.
322         s = self.p.getOptionState('\x29')
323         s.us.state = 'yes'
324
325         bytes = "fiddle dum " + cmd
326         self.p.dataReceived(bytes)
327
328         self.assertEqual(self.p.protocol.bytes, bytes.replace(cmd, ''))
329         self.assertEqual(self.t.value(), telnet.IAC + telnet.WONT + '\x29')
330         self.assertEqual(s.us.state, 'no')
331         self._enabledHelper(self.p.protocol, dL=['\x29'])
332
333     def testIgnoreWont(self):
334         # Try to disable an option.  The option is already disabled.  The
335         # server should send nothing in response to this.
336         cmd = telnet.IAC + telnet.WONT + '\x47'
337
338         bytes = "dum de dum" + cmd + "tra la la"
339         self.p.dataReceived(bytes)
340
341         self.assertEqual(self.p.protocol.bytes, bytes.replace(cmd, ''))
342         self.assertEqual(self.t.value(), '')
343         self._enabledHelper(self.p.protocol)
344
345     def testIgnoreDont(self):
346         # Try to disable an option.  The option is already disabled.  The
347         # server should send nothing in response to this.  Doing so could
348         # lead to a negotiation loop.
349         cmd = telnet.IAC + telnet.DONT + '\x47'
350
351         bytes = "dum de dum" + cmd + "tra la la"
352         self.p.dataReceived(bytes)
353
354         self.assertEqual(self.p.protocol.bytes, bytes.replace(cmd, ''))
355         self.assertEqual(self.t.value(), '')
356         self._enabledHelper(self.p.protocol)
357
358     def testIgnoreWill(self):
359         # Try to enable an option.  The option is already enabled.  The
360         # server should send nothing in response to this.  Doing so could
361         # lead to a negotiation loop.
362         cmd = telnet.IAC + telnet.WILL + '\x56'
363
364         # Jimmy it - after these two lines, the server will be in a state
365         # such that it believes the option to have been previously enabled
366         # via normal negotiation.
367         s = self.p.getOptionState('\x56')
368         s.him.state = 'yes'
369
370         bytes = "tra la la" + cmd + "dum de dum"
371         self.p.dataReceived(bytes)
372
373         self.assertEqual(self.p.protocol.bytes, bytes.replace(cmd, ''))
374         self.assertEqual(self.t.value(), '')
375         self._enabledHelper(self.p.protocol)
376
377     def testIgnoreDo(self):
378         # Try to enable an option.  The option is already enabled.  The
379         # server should send nothing in response to this.  Doing so could
380         # lead to a negotiation loop.
381         cmd = telnet.IAC + telnet.DO + '\x56'
382
383         # Jimmy it - after these two lines, the server will be in a state
384         # such that it believes the option to have been previously enabled
385         # via normal negotiation.
386         s = self.p.getOptionState('\x56')
387         s.us.state = 'yes'
388
389         bytes = "tra la la" + cmd + "dum de dum"
390         self.p.dataReceived(bytes)
391
392         self.assertEqual(self.p.protocol.bytes, bytes.replace(cmd, ''))
393         self.assertEqual(self.t.value(), '')
394         self._enabledHelper(self.p.protocol)
395
396     def testAcceptedEnableRequest(self):
397         # Try to enable an option through the user-level API.  This
398         # returns a Deferred that fires when negotiation about the option
399         # finishes.  Make sure it fires, make sure state gets updated
400         # properly, make sure the result indicates the option was enabled.
401         d = self.p.do('\x42')
402
403         h = self.p.protocol
404         h.remoteEnableable = ('\x42',)
405
406         self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + '\x42')
407
408         self.p.dataReceived(telnet.IAC + telnet.WILL + '\x42')
409
410         d.addCallback(self.assertEqual, True)
411         d.addCallback(lambda _:  self._enabledHelper(h, eR=['\x42']))
412         return d
413
414
415     def test_refusedEnableRequest(self):
416         """
417         If the peer refuses to enable an option we request it to enable, the
418         L{Deferred} returned by L{TelnetProtocol.do} fires with an
419         L{OptionRefused} L{Failure}.
420         """
421         # Try to enable an option through the user-level API.  This returns a
422         # Deferred that fires when negotiation about the option finishes.  Make
423         # sure it fires, make sure state gets updated properly, make sure the
424         # result indicates the option was enabled.
425         self.p.protocol.remoteEnableable = ('\x42',)
426         d = self.p.do('\x42')
427
428         self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + '\x42')
429
430         s = self.p.getOptionState('\x42')
431         self.assertEqual(s.him.state, 'no')
432         self.assertEqual(s.us.state, 'no')
433         self.assertEqual(s.him.negotiating, True)
434         self.assertEqual(s.us.negotiating, False)
435
436         self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42')
437
438         d = self.assertFailure(d, telnet.OptionRefused)
439         d.addCallback(lambda ignored: self._enabledHelper(self.p.protocol))
440         d.addCallback(
441             lambda ignored: self.assertEqual(s.him.negotiating, False))
442         return d
443
444
445     def test_refusedEnableOffer(self):
446         """
447         If the peer refuses to allow us to enable an option, the L{Deferred}
448         returned by L{TelnetProtocol.will} fires with an L{OptionRefused}
449         L{Failure}.
450         """
451         # Try to offer an option through the user-level API.  This returns a
452         # Deferred that fires when negotiation about the option finishes.  Make
453         # sure it fires, make sure state gets updated properly, make sure the
454         # result indicates the option was enabled.
455         self.p.protocol.localEnableable = ('\x42',)
456         d = self.p.will('\x42')
457
458         self.assertEqual(self.t.value(), telnet.IAC + telnet.WILL + '\x42')
459
460         s = self.p.getOptionState('\x42')
461         self.assertEqual(s.him.state, 'no')
462         self.assertEqual(s.us.state, 'no')
463         self.assertEqual(s.him.negotiating, False)
464         self.assertEqual(s.us.negotiating, True)
465
466         self.p.dataReceived(telnet.IAC + telnet.DONT + '\x42')
467
468         d = self.assertFailure(d, telnet.OptionRefused)
469         d.addCallback(lambda ignored: self._enabledHelper(self.p.protocol))
470         d.addCallback(
471             lambda ignored: self.assertEqual(s.us.negotiating, False))
472         return d
473
474
475     def testAcceptedDisableRequest(self):
476         # Try to disable an option through the user-level API.  This
477         # returns a Deferred that fires when negotiation about the option
478         # finishes.  Make sure it fires, make sure state gets updated
479         # properly, make sure the result indicates the option was enabled.
480         s = self.p.getOptionState('\x42')
481         s.him.state = 'yes'
482
483         d = self.p.dont('\x42')
484
485         self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + '\x42')
486
487         self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42')
488
489         d.addCallback(self.assertEqual, True)
490         d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
491                                                     dR=['\x42']))
492         return d
493
494     def testNegotiationBlocksFurtherNegotiation(self):
495         # Try to disable an option, then immediately try to enable it, then
496         # immediately try to disable it.  Ensure that the 2nd and 3rd calls
497         # fail quickly with the right exception.
498         s = self.p.getOptionState('\x24')
499         s.him.state = 'yes'
500         d2 = self.p.dont('\x24') # fires after the first line of _final
501
502         def _do(x):
503             d = self.p.do('\x24')
504             return self.assertFailure(d, telnet.AlreadyNegotiating)
505
506         def _dont(x):
507             d = self.p.dont('\x24')
508             return self.assertFailure(d, telnet.AlreadyNegotiating)
509
510         def _final(x):
511             self.p.dataReceived(telnet.IAC + telnet.WONT + '\x24')
512             # an assertion that only passes if d2 has fired
513             self._enabledHelper(self.p.protocol, dR=['\x24'])
514             # Make sure we allow this
515             self.p.protocol.remoteEnableable = ('\x24',)
516             d = self.p.do('\x24')
517             self.p.dataReceived(telnet.IAC + telnet.WILL + '\x24')
518             d.addCallback(self.assertEqual, True)
519             d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
520                                                         eR=['\x24'],
521                                                         dR=['\x24']))
522             return d
523
524         d = _do(None)
525         d.addCallback(_dont)
526         d.addCallback(_final)
527         return d
528
529     def testSuperfluousDisableRequestRaises(self):
530         # Try to disable a disabled option.  Make sure it fails properly.
531         d = self.p.dont('\xab')
532         return self.assertFailure(d, telnet.AlreadyDisabled)
533
534     def testSuperfluousEnableRequestRaises(self):
535         # Try to disable a disabled option.  Make sure it fails properly.
536         s = self.p.getOptionState('\xab')
537         s.him.state = 'yes'
538         d = self.p.do('\xab')
539         return self.assertFailure(d, telnet.AlreadyEnabled)
540
541     def testLostConnectionFailsDeferreds(self):
542         d1 = self.p.do('\x12')
543         d2 = self.p.do('\x23')
544         d3 = self.p.do('\x34')
545
546         class TestException(Exception):
547             pass
548
549         self.p.connectionLost(TestException("Total failure!"))
550
551         d1 = self.assertFailure(d1, TestException)
552         d2 = self.assertFailure(d2, TestException)
553         d3 = self.assertFailure(d3, TestException)
554         return defer.gatherResults([d1, d2, d3])
555
556
557 class TestTelnet(telnet.Telnet):
558     """
559     A trivial extension of the telnet protocol class useful to unit tests.
560     """
561     def __init__(self):
562         telnet.Telnet.__init__(self)
563         self.events = []
564
565
566     def applicationDataReceived(self, bytes):
567         """
568         Record the given data in C{self.events}.
569         """
570         self.events.append(('bytes', bytes))
571
572
573     def unhandledCommand(self, command, bytes):
574         """
575         Record the given command in C{self.events}.
576         """
577         self.events.append(('command', command, bytes))
578
579
580     def unhandledSubnegotiation(self, command, bytes):
581         """
582         Record the given subnegotiation command in C{self.events}.
583         """
584         self.events.append(('negotiate', command, bytes))
585
586
587
588 class TelnetTests(unittest.TestCase):
589     """
590     Tests for L{telnet.Telnet}.
591
592     L{telnet.Telnet} implements the TELNET protocol (RFC 854), including option
593     and suboption negotiation, and option state tracking.
594     """
595     def setUp(self):
596         """
597         Create an unconnected L{telnet.Telnet} to be used by tests.
598         """
599         self.protocol = TestTelnet()
600
601
602     def test_enableLocal(self):
603         """
604         L{telnet.Telnet.enableLocal} should reject all options, since
605         L{telnet.Telnet} does not know how to implement any options.
606         """
607         self.assertFalse(self.protocol.enableLocal('\0'))
608
609
610     def test_enableRemote(self):
611         """
612         L{telnet.Telnet.enableRemote} should reject all options, since
613         L{telnet.Telnet} does not know how to implement any options.
614         """
615         self.assertFalse(self.protocol.enableRemote('\0'))
616
617
618     def test_disableLocal(self):
619         """
620         It is an error for L{telnet.Telnet.disableLocal} to be called, since
621         L{telnet.Telnet.enableLocal} will never allow any options to be enabled
622         locally.  If a subclass overrides enableLocal, it must also override
623         disableLocal.
624         """
625         self.assertRaises(NotImplementedError, self.protocol.disableLocal, '\0')
626
627
628     def test_disableRemote(self):
629         """
630         It is an error for L{telnet.Telnet.disableRemote} to be called, since
631         L{telnet.Telnet.enableRemote} will never allow any options to be
632         enabled remotely.  If a subclass overrides enableRemote, it must also
633         override disableRemote.
634         """
635         self.assertRaises(NotImplementedError, self.protocol.disableRemote, '\0')
636
637
638     def test_requestNegotiation(self):
639         """
640         L{telnet.Telnet.requestNegotiation} formats the feature byte and the
641         payload bytes into the subnegotiation format and sends them.
642
643         See RFC 855.
644         """
645         transport = proto_helpers.StringTransport()
646         self.protocol.makeConnection(transport)
647         self.protocol.requestNegotiation('\x01', '\x02\x03')
648         self.assertEqual(
649             transport.value(),
650             # IAC SB feature bytes IAC SE
651             '\xff\xfa\x01\x02\x03\xff\xf0')
652
653
654     def test_requestNegotiationEscapesIAC(self):
655         """
656         If the payload for a subnegotiation includes I{IAC}, it is escaped by
657         L{telnet.Telnet.requestNegotiation} with another I{IAC}.
658
659         See RFC 855.
660         """
661         transport = proto_helpers.StringTransport()
662         self.protocol.makeConnection(transport)
663         self.protocol.requestNegotiation('\x01', '\xff')
664         self.assertEqual(
665             transport.value(),
666             '\xff\xfa\x01\xff\xff\xff\xf0')
667
668
669     def _deliver(self, bytes, *expected):
670         """
671         Pass the given bytes to the protocol's C{dataReceived} method and
672         assert that the given events occur.
673         """
674         received = self.protocol.events = []
675         self.protocol.dataReceived(bytes)
676         self.assertEqual(received, list(expected))
677
678
679     def test_oneApplicationDataByte(self):
680         """
681         One application-data byte in the default state gets delivered right
682         away.
683         """
684         self._deliver('a', ('bytes', 'a'))
685
686
687     def test_twoApplicationDataBytes(self):
688         """
689         Two application-data bytes in the default state get delivered
690         together.
691         """
692         self._deliver('bc', ('bytes', 'bc'))
693
694
695     def test_threeApplicationDataBytes(self):
696         """
697         Three application-data bytes followed by a control byte get
698         delivered, but the control byte doesn't.
699         """
700         self._deliver('def' + telnet.IAC, ('bytes', 'def'))
701
702
703     def test_escapedControl(self):
704         """
705         IAC in the escaped state gets delivered and so does another
706         application-data byte following it.
707         """
708         self._deliver(telnet.IAC)
709         self._deliver(telnet.IAC + 'g', ('bytes', telnet.IAC + 'g'))
710
711
712     def test_carriageReturn(self):
713         """
714         A carriage return only puts the protocol into the newline state.  A
715         linefeed in the newline state causes just the newline to be
716         delivered.  A nul in the newline state causes a carriage return to
717         be delivered.  An IAC in the newline state causes a carriage return
718         to be delivered and puts the protocol into the escaped state. 
719         Anything else causes a carriage return and that thing to be
720         delivered.
721         """
722         self._deliver('\r')
723         self._deliver('\n', ('bytes', '\n'))
724         self._deliver('\r\n', ('bytes', '\n'))
725
726         self._deliver('\r')
727         self._deliver('\0', ('bytes', '\r'))
728         self._deliver('\r\0', ('bytes', '\r'))
729
730         self._deliver('\r')
731         self._deliver('a', ('bytes', '\ra'))
732         self._deliver('\ra', ('bytes', '\ra'))
733
734         self._deliver('\r')
735         self._deliver(
736             telnet.IAC + telnet.IAC + 'x', ('bytes', '\r' + telnet.IAC + 'x'))
737
738
739     def test_applicationDataBeforeSimpleCommand(self):
740         """
741         Application bytes received before a command are delivered before the
742         command is processed.
743         """
744         self._deliver(
745             'x' + telnet.IAC + telnet.NOP,
746             ('bytes', 'x'), ('command', telnet.NOP, None))
747
748
749     def test_applicationDataBeforeCommand(self):
750         """
751         Application bytes received before a WILL/WONT/DO/DONT are delivered
752         before the command is processed.
753         """
754         self.protocol.commandMap = {}
755         self._deliver(
756             'y' + telnet.IAC + telnet.WILL + '\x00',
757             ('bytes', 'y'), ('command', telnet.WILL, '\x00'))
758
759
760     def test_applicationDataBeforeSubnegotiation(self):
761         """
762         Application bytes received before a subnegotiation command are
763         delivered before the negotiation is processed.
764         """
765         self._deliver(
766             'z' + telnet.IAC + telnet.SB + 'Qx' + telnet.IAC + telnet.SE,
767             ('bytes', 'z'), ('negotiate', 'Q', ['x']))