Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / mail / test / test_pop3.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Test cases for Ltwisted.mail.pop3} module.
6 """
7
8 import StringIO
9 import hmac
10 import base64
11 import itertools
12
13 from zope.interface import implements
14
15 from twisted.internet import defer
16
17 from twisted.trial import unittest, util
18 from twisted import mail
19 import twisted.mail.protocols
20 import twisted.mail.pop3
21 import twisted.internet.protocol
22 from twisted import internet
23 from twisted.mail import pop3
24 from twisted.protocols import loopback
25 from twisted.python import failure
26
27 from twisted import cred
28 import twisted.cred.portal
29 import twisted.cred.checkers
30 import twisted.cred.credentials
31
32 from twisted.test.proto_helpers import LineSendingProtocol
33
34
35 class UtilityTestCase(unittest.TestCase):
36     """
37     Test the various helper functions and classes used by the POP3 server
38     protocol implementation.
39     """
40
41     def testLineBuffering(self):
42         """
43         Test creating a LineBuffer and feeding it some lines.  The lines should
44         build up in its internal buffer for a while and then get spat out to
45         the writer.
46         """
47         output = []
48         input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
49         c = pop3._IteratorBuffer(output.extend, input, 6)
50         i = iter(c)
51         self.assertEqual(output, []) # nothing is buffer
52         i.next()
53         self.assertEqual(output, []) # '012' is buffered
54         i.next()
55         self.assertEqual(output, []) # '012345' is buffered
56         i.next()
57         self.assertEqual(output, ['012', '345', '6']) # nothing is buffered
58         for n in range(5):
59             i.next()
60         self.assertEqual(output, ['012', '345', '6', '7', '8', '9', '012', '345'])
61
62
63     def testFinishLineBuffering(self):
64         """
65         Test that a LineBuffer flushes everything when its iterator is
66         exhausted, and itself raises StopIteration.
67         """
68         output = []
69         input = iter(['a', 'b', 'c'])
70         c = pop3._IteratorBuffer(output.extend, input, 5)
71         for i in c:
72             pass
73         self.assertEqual(output, ['a', 'b', 'c'])
74
75
76     def testSuccessResponseFormatter(self):
77         """
78         Test that the thing that spits out POP3 'success responses' works
79         right.
80         """
81         self.assertEqual(
82             pop3.successResponse('Great.'),
83             '+OK Great.\r\n')
84
85
86     def testStatLineFormatter(self):
87         """
88         Test that the function which formats stat lines does so appropriately.
89         """
90         statLine = list(pop3.formatStatResponse([]))[-1]
91         self.assertEqual(statLine, '+OK 0 0\r\n')
92
93         statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1]
94         self.assertEqual(statLine, '+OK 4 10142\r\n')
95
96
97     def testListLineFormatter(self):
98         """
99         Test that the function which formats the lines in response to a LIST
100         command does so appropriately.
101         """
102         listLines = list(pop3.formatListResponse([]))
103         self.assertEqual(
104             listLines,
105             ['+OK 0\r\n', '.\r\n'])
106
107         listLines = list(pop3.formatListResponse([1, 2, 3, 100]))
108         self.assertEqual(
109             listLines,
110             ['+OK 4\r\n', '1 1\r\n', '2 2\r\n', '3 3\r\n', '4 100\r\n', '.\r\n'])
111
112
113
114     def testUIDListLineFormatter(self):
115         """
116         Test that the function which formats lines in response to a UIDL
117         command does so appropriately.
118         """
119         UIDs = ['abc', 'def', 'ghi']
120         listLines = list(pop3.formatUIDListResponse([], UIDs.__getitem__))
121         self.assertEqual(
122             listLines,
123             ['+OK \r\n', '.\r\n'])
124
125         listLines = list(pop3.formatUIDListResponse([123, 431, 591], UIDs.__getitem__))
126         self.assertEqual(
127             listLines,
128             ['+OK \r\n', '1 abc\r\n', '2 def\r\n', '3 ghi\r\n', '.\r\n'])
129
130         listLines = list(pop3.formatUIDListResponse([0, None, 591], UIDs.__getitem__))
131         self.assertEqual(
132             listLines,
133             ['+OK \r\n', '1 abc\r\n', '3 ghi\r\n', '.\r\n'])
134
135
136
137 class MyVirtualPOP3(mail.protocols.VirtualPOP3):
138
139     magic = '<moshez>'
140
141     def authenticateUserAPOP(self, user, digest):
142         user, domain = self.lookupDomain(user)
143         return self.service.domains['baz.com'].authenticateUserAPOP(user, digest, self.magic, domain)
144
145 class DummyDomain:
146
147    def __init__(self):
148        self.users = {}
149
150    def addUser(self, name):
151        self.users[name] = []
152
153    def addMessage(self, name, message):
154        self.users[name].append(message)
155
156    def authenticateUserAPOP(self, name, digest, magic, domain):
157        return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None
158
159
160 class ListMailbox:
161
162     def __init__(self, list):
163         self.list = list
164
165     def listMessages(self, i=None):
166         if i is None:
167             return map(len, self.list)
168         return len(self.list[i])
169
170     def getMessage(self, i):
171         return StringIO.StringIO(self.list[i])
172
173     def getUidl(self, i):
174         return i
175
176     def deleteMessage(self, i):
177         self.list[i] = ''
178
179     def sync(self):
180         pass
181
182 class MyPOP3Downloader(pop3.POP3Client):
183
184     def handle_WELCOME(self, line):
185         pop3.POP3Client.handle_WELCOME(self, line)
186         self.apop('hello@baz.com', 'world')
187
188     def handle_APOP(self, line):
189         parts = line.split()
190         code = parts[0]
191         data = (parts[1:] or ['NONE'])[0]
192         if code != '+OK':
193             print parts
194             raise AssertionError, 'code is ' + code
195         self.lines = []
196         self.retr(1)
197
198     def handle_RETR_continue(self, line):
199         self.lines.append(line)
200
201     def handle_RETR_end(self):
202         self.message = '\n'.join(self.lines) + '\n'
203         self.quit()
204
205     def handle_QUIT(self, line):
206         if line[:3] != '+OK':
207             raise AssertionError, 'code is ' + line
208
209
210 class POP3TestCase(unittest.TestCase):
211
212     message = '''\
213 Subject: urgent
214
215 Someone set up us the bomb!
216 '''
217
218     expectedOutput = '''\
219 +OK <moshez>\015
220 +OK Authentication succeeded\015
221 +OK \015
222 1 0\015
223 .\015
224 +OK %d\015
225 Subject: urgent\015
226 \015
227 Someone set up us the bomb!\015
228 .\015
229 +OK \015
230 ''' % len(message)
231
232     def setUp(self):
233         self.factory = internet.protocol.Factory()
234         self.factory.domains = {}
235         self.factory.domains['baz.com'] = DummyDomain()
236         self.factory.domains['baz.com'].addUser('hello')
237         self.factory.domains['baz.com'].addMessage('hello', self.message)
238
239     def testMessages(self):
240         client = LineSendingProtocol([
241             'APOP hello@baz.com world',
242             'UIDL',
243             'RETR 1',
244             'QUIT',
245         ])
246         server =  MyVirtualPOP3()
247         server.service = self.factory
248         def check(ignored):
249             output = '\r\n'.join(client.response) + '\r\n'
250             self.assertEqual(output, self.expectedOutput)
251         return loopback.loopbackTCP(server, client).addCallback(check)
252
253     def testLoopback(self):
254         protocol =  MyVirtualPOP3()
255         protocol.service = self.factory
256         clientProtocol = MyPOP3Downloader()
257         def check(ignored):
258             self.assertEqual(clientProtocol.message, self.message)
259             protocol.connectionLost(
260                 failure.Failure(Exception("Test harness disconnect")))
261         d = loopback.loopbackAsync(protocol, clientProtocol)
262         return d.addCallback(check)
263     testLoopback.suppress = [util.suppress(message="twisted.mail.pop3.POP3Client is deprecated")]
264
265
266
267 class DummyPOP3(pop3.POP3):
268
269     magic = '<moshez>'
270
271     def authenticateUserAPOP(self, user, password):
272         return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
273
274
275
276 class DummyMailbox(pop3.Mailbox):
277
278     messages = ['From: moshe\nTo: moshe\n\nHow are you, friend?\n']
279
280     def __init__(self, exceptionType):
281         self.messages = DummyMailbox.messages[:]
282         self.exceptionType = exceptionType
283
284     def listMessages(self, i=None):
285         if i is None:
286             return map(len, self.messages)
287         if i >= len(self.messages):
288             raise self.exceptionType()
289         return len(self.messages[i])
290
291     def getMessage(self, i):
292         return StringIO.StringIO(self.messages[i])
293
294     def getUidl(self, i):
295         if i >= len(self.messages):
296             raise self.exceptionType()
297         return str(i)
298
299     def deleteMessage(self, i):
300         self.messages[i] = ''
301
302
303 class AnotherPOP3TestCase(unittest.TestCase):
304
305     def runTest(self, lines, expectedOutput):
306         dummy = DummyPOP3()
307         client = LineSendingProtocol(lines)
308         d = loopback.loopbackAsync(dummy, client)
309         return d.addCallback(self._cbRunTest, client, dummy, expectedOutput)
310
311
312     def _cbRunTest(self, ignored, client, dummy, expectedOutput):
313         self.assertEqual('\r\n'.join(expectedOutput),
314                              '\r\n'.join(client.response))
315         dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
316         return ignored
317
318
319     def test_buffer(self):
320         """
321         Test a lot of different POP3 commands in an extremely pipelined
322         scenario.
323
324         This test may cover legitimate behavior, but the intent and
325         granularity are not very good.  It would likely be an improvement to
326         split it into a number of smaller, more focused tests.
327         """
328         return self.runTest(
329             ["APOP moshez dummy",
330              "LIST",
331              "UIDL",
332              "RETR 1",
333              "RETR 2",
334              "DELE 1",
335              "RETR 1",
336              "QUIT"],
337             ['+OK <moshez>',
338              '+OK Authentication succeeded',
339              '+OK 1',
340              '1 44',
341              '.',
342              '+OK ',
343              '1 0',
344              '.',
345              '+OK 44',
346              'From: moshe',
347              'To: moshe',
348              '',
349              'How are you, friend?',
350              '.',
351              '-ERR Bad message number argument',
352              '+OK ',
353              '-ERR message deleted',
354              '+OK '])
355
356
357     def test_noop(self):
358         """
359         Test the no-op command.
360         """
361         return self.runTest(
362             ['APOP spiv dummy',
363              'NOOP',
364              'QUIT'],
365             ['+OK <moshez>',
366              '+OK Authentication succeeded',
367              '+OK ',
368              '+OK '])
369
370
371     def testAuthListing(self):
372         p = DummyPOP3()
373         p.factory = internet.protocol.Factory()
374         p.factory.challengers = {'Auth1': None, 'secondAuth': None, 'authLast': None}
375         client = LineSendingProtocol([
376             "AUTH",
377             "QUIT",
378         ])
379
380         d = loopback.loopbackAsync(p, client)
381         return d.addCallback(self._cbTestAuthListing, client)
382
383     def _cbTestAuthListing(self, ignored, client):
384         self.failUnless(client.response[1].startswith('+OK'))
385         self.assertEqual(client.response[2:6],
386                           ["AUTH1", "SECONDAUTH", "AUTHLAST", "."])
387
388     def testIllegalPASS(self):
389         dummy = DummyPOP3()
390         client = LineSendingProtocol([
391             "PASS fooz",
392             "QUIT"
393         ])
394         d = loopback.loopbackAsync(dummy, client)
395         return d.addCallback(self._cbTestIllegalPASS, client, dummy)
396
397     def _cbTestIllegalPASS(self, ignored, client, dummy):
398         expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
399         self.assertEqual(expected_output, '\r\n'.join(client.response) + '\r\n')
400         dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
401
402     def testEmptyPASS(self):
403         dummy = DummyPOP3()
404         client = LineSendingProtocol([
405             "PASS ",
406             "QUIT"
407         ])
408         d = loopback.loopbackAsync(dummy, client)
409         return d.addCallback(self._cbTestEmptyPASS, client, dummy)
410
411     def _cbTestEmptyPASS(self, ignored, client, dummy):
412         expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
413         self.assertEqual(expected_output, '\r\n'.join(client.response) + '\r\n')
414         dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
415
416
417 class TestServerFactory:
418     implements(pop3.IServerFactory)
419
420     def cap_IMPLEMENTATION(self):
421         return "Test Implementation String"
422
423     def cap_EXPIRE(self):
424         return 60
425
426     challengers = {"SCHEME_1": None, "SCHEME_2": None}
427
428     def cap_LOGIN_DELAY(self):
429         return 120
430
431     pue = True
432     def perUserExpiration(self):
433         return self.pue
434
435     puld = True
436     def perUserLoginDelay(self):
437         return self.puld
438
439
440 class TestMailbox:
441     loginDelay = 100
442     messageExpiration = 25
443
444
445 class CapabilityTestCase(unittest.TestCase):
446     def setUp(self):
447         s = StringIO.StringIO()
448         p = pop3.POP3()
449         p.factory = TestServerFactory()
450         p.transport = internet.protocol.FileWrapper(s)
451         p.connectionMade()
452         p.do_CAPA()
453
454         self.caps = p.listCapabilities()
455         self.pcaps = s.getvalue().splitlines()
456
457         s = StringIO.StringIO()
458         p.mbox = TestMailbox()
459         p.transport = internet.protocol.FileWrapper(s)
460         p.do_CAPA()
461
462         self.lpcaps = s.getvalue().splitlines()
463         p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
464
465     def contained(self, s, *caps):
466         for c in caps:
467             self.assertIn(s, c)
468
469     def testUIDL(self):
470         self.contained("UIDL", self.caps, self.pcaps, self.lpcaps)
471
472     def testTOP(self):
473         self.contained("TOP", self.caps, self.pcaps, self.lpcaps)
474
475     def testUSER(self):
476         self.contained("USER", self.caps, self.pcaps, self.lpcaps)
477
478     def testEXPIRE(self):
479         self.contained("EXPIRE 60 USER", self.caps, self.pcaps)
480         self.contained("EXPIRE 25", self.lpcaps)
481
482     def testIMPLEMENTATION(self):
483         self.contained(
484             "IMPLEMENTATION Test Implementation String",
485             self.caps, self.pcaps, self.lpcaps
486         )
487
488     def testSASL(self):
489         self.contained(
490             "SASL SCHEME_1 SCHEME_2",
491             self.caps, self.pcaps, self.lpcaps
492         )
493
494     def testLOGIN_DELAY(self):
495         self.contained("LOGIN-DELAY 120 USER", self.caps, self.pcaps)
496         self.assertIn("LOGIN-DELAY 100", self.lpcaps)
497
498
499
500 class GlobalCapabilitiesTestCase(unittest.TestCase):
501     def setUp(self):
502         s = StringIO.StringIO()
503         p = pop3.POP3()
504         p.factory = TestServerFactory()
505         p.factory.pue = p.factory.puld = False
506         p.transport = internet.protocol.FileWrapper(s)
507         p.connectionMade()
508         p.do_CAPA()
509
510         self.caps = p.listCapabilities()
511         self.pcaps = s.getvalue().splitlines()
512
513         s = StringIO.StringIO()
514         p.mbox = TestMailbox()
515         p.transport = internet.protocol.FileWrapper(s)
516         p.do_CAPA()
517
518         self.lpcaps = s.getvalue().splitlines()
519         p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
520
521     def contained(self, s, *caps):
522         for c in caps:
523             self.assertIn(s, c)
524
525     def testEXPIRE(self):
526         self.contained("EXPIRE 60", self.caps, self.pcaps, self.lpcaps)
527
528     def testLOGIN_DELAY(self):
529         self.contained("LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps)
530
531
532
533 class TestRealm:
534     def requestAvatar(self, avatarId, mind, *interfaces):
535         if avatarId == 'testuser':
536             return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
537         assert False
538
539
540
541 class SASLTestCase(unittest.TestCase):
542     def testValidLogin(self):
543         p = pop3.POP3()
544         p.factory = TestServerFactory()
545         p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials}
546         p.portal = cred.portal.Portal(TestRealm())
547         ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
548         ch.addUser('testuser', 'testpassword')
549         p.portal.registerChecker(ch)
550
551         s = StringIO.StringIO()
552         p.transport = internet.protocol.FileWrapper(s)
553         p.connectionMade()
554
555         p.lineReceived("CAPA")
556         self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0)
557
558         p.lineReceived("AUTH CRAM-MD5")
559         chal = s.getvalue().splitlines()[-1][2:]
560         chal = base64.decodestring(chal)
561         response = hmac.HMAC('testpassword', chal).hexdigest()
562
563         p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n'))
564         self.failUnless(p.mbox)
565         self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0)
566         p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
567
568
569
570 class CommandMixin:
571     """
572     Tests for all the commands a POP3 server is allowed to receive.
573     """
574
575     extraMessage = '''\
576 From: guy
577 To: fellow
578
579 More message text for you.
580 '''
581
582
583     def setUp(self):
584         """
585         Make a POP3 server protocol instance hooked up to a simple mailbox and
586         a transport that buffers output to a StringIO.
587         """
588         p = pop3.POP3()
589         p.mbox = self.mailboxType(self.exceptionType)
590         p.schedule = list
591         self.pop3Server = p
592
593         s = StringIO.StringIO()
594         p.transport = internet.protocol.FileWrapper(s)
595         p.connectionMade()
596         s.truncate(0)
597         self.pop3Transport = s
598
599
600     def tearDown(self):
601         """
602         Disconnect the server protocol so it can clean up anything it might
603         need to clean up.
604         """
605         self.pop3Server.connectionLost(failure.Failure(Exception("Test harness disconnect")))
606
607
608     def _flush(self):
609         """
610         Do some of the things that the reactor would take care of, if the
611         reactor were actually running.
612         """
613         # Oh man FileWrapper is pooh.
614         self.pop3Server.transport._checkProducer()
615
616
617     def testLIST(self):
618         """
619         Test the two forms of list: with a message index number, which should
620         return a short-form response, and without a message index number, which
621         should return a long-form response, one line per message.
622         """
623         p = self.pop3Server
624         s = self.pop3Transport
625
626         p.lineReceived("LIST 1")
627         self._flush()
628         self.assertEqual(s.getvalue(), "+OK 1 44\r\n")
629         s.truncate(0)
630
631         p.lineReceived("LIST")
632         self._flush()
633         self.assertEqual(s.getvalue(), "+OK 1\r\n1 44\r\n.\r\n")
634
635
636     def testLISTWithBadArgument(self):
637         """
638         Test that non-integers and out-of-bound integers produce appropriate
639         error responses.
640         """
641         p = self.pop3Server
642         s = self.pop3Transport
643
644         p.lineReceived("LIST a")
645         self.assertEqual(
646             s.getvalue(),
647             "-ERR Invalid message-number: 'a'\r\n")
648         s.truncate(0)
649
650         p.lineReceived("LIST 0")
651         self.assertEqual(
652             s.getvalue(),
653             "-ERR Invalid message-number: 0\r\n")
654         s.truncate(0)
655
656         p.lineReceived("LIST 2")
657         self.assertEqual(
658             s.getvalue(),
659             "-ERR Invalid message-number: 2\r\n")
660         s.truncate(0)
661
662
663     def testUIDL(self):
664         """
665         Test the two forms of the UIDL command.  These are just like the two
666         forms of the LIST command.
667         """
668         p = self.pop3Server
669         s = self.pop3Transport
670
671         p.lineReceived("UIDL 1")
672         self.assertEqual(s.getvalue(), "+OK 0\r\n")
673         s.truncate(0)
674
675         p.lineReceived("UIDL")
676         self._flush()
677         self.assertEqual(s.getvalue(), "+OK \r\n1 0\r\n.\r\n")
678
679
680     def testUIDLWithBadArgument(self):
681         """
682         Test that UIDL with a non-integer or an out-of-bounds integer produces
683         the appropriate error response.
684         """
685         p = self.pop3Server
686         s = self.pop3Transport
687
688         p.lineReceived("UIDL a")
689         self.assertEqual(
690             s.getvalue(),
691             "-ERR Bad message number argument\r\n")
692         s.truncate(0)
693
694         p.lineReceived("UIDL 0")
695         self.assertEqual(
696             s.getvalue(),
697             "-ERR Bad message number argument\r\n")
698         s.truncate(0)
699
700         p.lineReceived("UIDL 2")
701         self.assertEqual(
702             s.getvalue(),
703             "-ERR Bad message number argument\r\n")
704         s.truncate(0)
705
706
707     def testSTAT(self):
708         """
709         Test the single form of the STAT command, which returns a short-form
710         response of the number of messages in the mailbox and their total size.
711         """
712         p = self.pop3Server
713         s = self.pop3Transport
714
715         p.lineReceived("STAT")
716         self._flush()
717         self.assertEqual(s.getvalue(), "+OK 1 44\r\n")
718
719
720     def testRETR(self):
721         """
722         Test downloading a message.
723         """
724         p = self.pop3Server
725         s = self.pop3Transport
726
727         p.lineReceived("RETR 1")
728         self._flush()
729         self.assertEqual(
730             s.getvalue(),
731             "+OK 44\r\n"
732             "From: moshe\r\n"
733             "To: moshe\r\n"
734             "\r\n"
735             "How are you, friend?\r\n"
736             ".\r\n")
737         s.truncate(0)
738
739
740     def testRETRWithBadArgument(self):
741         """
742         Test that trying to download a message with a bad argument, either not
743         an integer or an out-of-bounds integer, fails with the appropriate
744         error response.
745         """
746         p = self.pop3Server
747         s = self.pop3Transport
748
749         p.lineReceived("RETR a")
750         self.assertEqual(
751             s.getvalue(),
752             "-ERR Bad message number argument\r\n")
753         s.truncate(0)
754
755         p.lineReceived("RETR 0")
756         self.assertEqual(
757             s.getvalue(),
758             "-ERR Bad message number argument\r\n")
759         s.truncate(0)
760
761         p.lineReceived("RETR 2")
762         self.assertEqual(
763             s.getvalue(),
764             "-ERR Bad message number argument\r\n")
765         s.truncate(0)
766
767
768     def testTOP(self):
769         """
770         Test downloading the headers and part of the body of a message.
771         """
772         p = self.pop3Server
773         s = self.pop3Transport
774         p.mbox.messages.append(self.extraMessage)
775
776         p.lineReceived("TOP 1 0")
777         self._flush()
778         self.assertEqual(
779             s.getvalue(),
780             "+OK Top of message follows\r\n"
781             "From: moshe\r\n"
782             "To: moshe\r\n"
783             "\r\n"
784             ".\r\n")
785
786
787     def testTOPWithBadArgument(self):
788         """
789         Test that trying to download a message with a bad argument, either a
790         message number which isn't an integer or is an out-of-bounds integer or
791         a number of lines which isn't an integer or is a negative integer,
792         fails with the appropriate error response.
793         """
794         p = self.pop3Server
795         s = self.pop3Transport
796         p.mbox.messages.append(self.extraMessage)
797
798         p.lineReceived("TOP 1 a")
799         self.assertEqual(
800             s.getvalue(),
801             "-ERR Bad line count argument\r\n")
802         s.truncate(0)
803
804         p.lineReceived("TOP 1 -1")
805         self.assertEqual(
806             s.getvalue(),
807             "-ERR Bad line count argument\r\n")
808         s.truncate(0)
809
810         p.lineReceived("TOP a 1")
811         self.assertEqual(
812             s.getvalue(),
813             "-ERR Bad message number argument\r\n")
814         s.truncate(0)
815
816         p.lineReceived("TOP 0 1")
817         self.assertEqual(
818             s.getvalue(),
819             "-ERR Bad message number argument\r\n")
820         s.truncate(0)
821
822         p.lineReceived("TOP 3 1")
823         self.assertEqual(
824             s.getvalue(),
825             "-ERR Bad message number argument\r\n")
826         s.truncate(0)
827
828
829     def testLAST(self):
830         """
831         Test the exceedingly pointless LAST command, which tells you the
832         highest message index which you have already downloaded.
833         """
834         p = self.pop3Server
835         s = self.pop3Transport
836         p.mbox.messages.append(self.extraMessage)
837
838         p.lineReceived('LAST')
839         self.assertEqual(
840             s.getvalue(),
841             "+OK 0\r\n")
842         s.truncate(0)
843
844
845     def testRetrieveUpdatesHighest(self):
846         """
847         Test that issuing a RETR command updates the LAST response.
848         """
849         p = self.pop3Server
850         s = self.pop3Transport
851         p.mbox.messages.append(self.extraMessage)
852
853         p.lineReceived('RETR 2')
854         self._flush()
855         s.truncate(0)
856         p.lineReceived('LAST')
857         self.assertEqual(
858             s.getvalue(),
859             '+OK 2\r\n')
860         s.truncate(0)
861
862
863     def testTopUpdatesHighest(self):
864         """
865         Test that issuing a TOP command updates the LAST response.
866         """
867         p = self.pop3Server
868         s = self.pop3Transport
869         p.mbox.messages.append(self.extraMessage)
870
871         p.lineReceived('TOP 2 10')
872         self._flush()
873         s.truncate(0)
874         p.lineReceived('LAST')
875         self.assertEqual(
876             s.getvalue(),
877             '+OK 2\r\n')
878
879
880     def testHighestOnlyProgresses(self):
881         """
882         Test that downloading a message with a smaller index than the current
883         LAST response doesn't change the LAST response.
884         """
885         p = self.pop3Server
886         s = self.pop3Transport
887         p.mbox.messages.append(self.extraMessage)
888
889         p.lineReceived('RETR 2')
890         self._flush()
891         p.lineReceived('TOP 1 10')
892         self._flush()
893         s.truncate(0)
894         p.lineReceived('LAST')
895         self.assertEqual(
896             s.getvalue(),
897             '+OK 2\r\n')
898
899
900     def testResetClearsHighest(self):
901         """
902         Test that issuing RSET changes the LAST response to 0.
903         """
904         p = self.pop3Server
905         s = self.pop3Transport
906         p.mbox.messages.append(self.extraMessage)
907
908         p.lineReceived('RETR 2')
909         self._flush()
910         p.lineReceived('RSET')
911         s.truncate(0)
912         p.lineReceived('LAST')
913         self.assertEqual(
914             s.getvalue(),
915             '+OK 0\r\n')
916
917
918
919 _listMessageDeprecation = (
920     "twisted.mail.pop3.IMailbox.listMessages may not "
921     "raise IndexError for out-of-bounds message numbers: "
922     "raise ValueError instead.")
923 _listMessageSuppression = util.suppress(
924     message=_listMessageDeprecation,
925     category=PendingDeprecationWarning)
926
927 _getUidlDeprecation = (
928     "twisted.mail.pop3.IMailbox.getUidl may not "
929     "raise IndexError for out-of-bounds message numbers: "
930     "raise ValueError instead.")
931 _getUidlSuppression = util.suppress(
932     message=_getUidlDeprecation,
933     category=PendingDeprecationWarning)
934
935 class IndexErrorCommandTestCase(CommandMixin, unittest.TestCase):
936     """
937     Run all of the command tests against a mailbox which raises IndexError
938     when an out of bounds request is made.  This behavior will be deprecated
939     shortly and then removed.
940     """
941     exceptionType = IndexError
942     mailboxType = DummyMailbox
943
944     def testLISTWithBadArgument(self):
945         return CommandMixin.testLISTWithBadArgument(self)
946     testLISTWithBadArgument.suppress = [_listMessageSuppression]
947
948
949     def testUIDLWithBadArgument(self):
950         return CommandMixin.testUIDLWithBadArgument(self)
951     testUIDLWithBadArgument.suppress = [_getUidlSuppression]
952
953
954     def testTOPWithBadArgument(self):
955         return CommandMixin.testTOPWithBadArgument(self)
956     testTOPWithBadArgument.suppress = [_listMessageSuppression]
957
958
959     def testRETRWithBadArgument(self):
960         return CommandMixin.testRETRWithBadArgument(self)
961     testRETRWithBadArgument.suppress = [_listMessageSuppression]
962
963
964
965 class ValueErrorCommandTestCase(CommandMixin, unittest.TestCase):
966     """
967     Run all of the command tests against a mailbox which raises ValueError
968     when an out of bounds request is made.  This is the correct behavior and
969     after support for mailboxes which raise IndexError is removed, this will
970     become just C{CommandTestCase}.
971     """
972     exceptionType = ValueError
973     mailboxType = DummyMailbox
974
975
976
977 class SyncDeferredMailbox(DummyMailbox):
978     """
979     Mailbox which has a listMessages implementation which returns a Deferred
980     which has already fired.
981     """
982     def listMessages(self, n=None):
983         return defer.succeed(DummyMailbox.listMessages(self, n))
984
985
986
987 class IndexErrorSyncDeferredCommandTestCase(IndexErrorCommandTestCase):
988     """
989     Run all of the L{IndexErrorCommandTestCase} tests with a
990     synchronous-Deferred returning IMailbox implementation.
991     """
992     mailboxType = SyncDeferredMailbox
993
994
995
996 class ValueErrorSyncDeferredCommandTestCase(ValueErrorCommandTestCase):
997     """
998     Run all of the L{ValueErrorCommandTestCase} tests with a
999     synchronous-Deferred returning IMailbox implementation.
1000     """
1001     mailboxType = SyncDeferredMailbox
1002
1003
1004
1005 class AsyncDeferredMailbox(DummyMailbox):
1006     """
1007     Mailbox which has a listMessages implementation which returns a Deferred
1008     which has not yet fired.
1009     """
1010     def __init__(self, *a, **kw):
1011         self.waiting = []
1012         DummyMailbox.__init__(self, *a, **kw)
1013
1014
1015     def listMessages(self, n=None):
1016         d = defer.Deferred()
1017         # See AsyncDeferredMailbox._flush
1018         self.waiting.append((d, DummyMailbox.listMessages(self, n)))
1019         return d
1020
1021
1022
1023 class IndexErrorAsyncDeferredCommandTestCase(IndexErrorCommandTestCase):
1024     """
1025     Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred
1026     returning IMailbox implementation.
1027     """
1028     mailboxType = AsyncDeferredMailbox
1029
1030     def _flush(self):
1031         """
1032         Fire whatever Deferreds we've built up in our mailbox.
1033         """
1034         while self.pop3Server.mbox.waiting:
1035             d, a = self.pop3Server.mbox.waiting.pop()
1036             d.callback(a)
1037         IndexErrorCommandTestCase._flush(self)
1038
1039
1040
1041 class ValueErrorAsyncDeferredCommandTestCase(ValueErrorCommandTestCase):
1042     """
1043     Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred
1044     returning IMailbox implementation.
1045     """
1046     mailboxType = AsyncDeferredMailbox
1047
1048     def _flush(self):
1049         """
1050         Fire whatever Deferreds we've built up in our mailbox.
1051         """
1052         while self.pop3Server.mbox.waiting:
1053             d, a = self.pop3Server.mbox.waiting.pop()
1054             d.callback(a)
1055         ValueErrorCommandTestCase._flush(self)
1056
1057 class POP3MiscTestCase(unittest.TestCase):
1058     """
1059     Miscellaneous tests more to do with module/package structure than
1060     anything to do with the Post Office Protocol.
1061     """
1062     def test_all(self):
1063         """
1064         This test checks that all names listed in
1065         twisted.mail.pop3.__all__ are actually present in the module.
1066         """
1067         mod = twisted.mail.pop3
1068         for attr in mod.__all__:
1069             self.failUnless(hasattr(mod, attr))