1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Test cases for Ltwisted.mail.pop3} module.
13 from zope.interface import implements
15 from twisted.internet import defer
17 from twisted.trial import unittest, util
18 from twisted import mail
19 import twisted.mail.protocols
20 import twisted.mail.pop3
21 import twisted.internet.protocol
22 from twisted import internet
23 from twisted.mail import pop3
24 from twisted.protocols import loopback
25 from twisted.python import failure
27 from twisted import cred
28 import twisted.cred.portal
29 import twisted.cred.checkers
30 import twisted.cred.credentials
32 from twisted.test.proto_helpers import LineSendingProtocol
35 class UtilityTestCase(unittest.TestCase):
37 Test the various helper functions and classes used by the POP3 server
38 protocol implementation.
41 def testLineBuffering(self):
43 Test creating a LineBuffer and feeding it some lines. The lines should
44 build up in its internal buffer for a while and then get spat out to
48 input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
49 c = pop3._IteratorBuffer(output.extend, input, 6)
51 self.assertEqual(output, []) # nothing is buffer
53 self.assertEqual(output, []) # '012' is buffered
55 self.assertEqual(output, []) # '012345' is buffered
57 self.assertEqual(output, ['012', '345', '6']) # nothing is buffered
60 self.assertEqual(output, ['012', '345', '6', '7', '8', '9', '012', '345'])
63 def testFinishLineBuffering(self):
65 Test that a LineBuffer flushes everything when its iterator is
66 exhausted, and itself raises StopIteration.
69 input = iter(['a', 'b', 'c'])
70 c = pop3._IteratorBuffer(output.extend, input, 5)
73 self.assertEqual(output, ['a', 'b', 'c'])
76 def testSuccessResponseFormatter(self):
78 Test that the thing that spits out POP3 'success responses' works
82 pop3.successResponse('Great.'),
86 def testStatLineFormatter(self):
88 Test that the function which formats stat lines does so appropriately.
90 statLine = list(pop3.formatStatResponse([]))[-1]
91 self.assertEqual(statLine, '+OK 0 0\r\n')
93 statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1]
94 self.assertEqual(statLine, '+OK 4 10142\r\n')
97 def testListLineFormatter(self):
99 Test that the function which formats the lines in response to a LIST
100 command does so appropriately.
102 listLines = list(pop3.formatListResponse([]))
105 ['+OK 0\r\n', '.\r\n'])
107 listLines = list(pop3.formatListResponse([1, 2, 3, 100]))
110 ['+OK 4\r\n', '1 1\r\n', '2 2\r\n', '3 3\r\n', '4 100\r\n', '.\r\n'])
114 def testUIDListLineFormatter(self):
116 Test that the function which formats lines in response to a UIDL
117 command does so appropriately.
119 UIDs = ['abc', 'def', 'ghi']
120 listLines = list(pop3.formatUIDListResponse([], UIDs.__getitem__))
123 ['+OK \r\n', '.\r\n'])
125 listLines = list(pop3.formatUIDListResponse([123, 431, 591], UIDs.__getitem__))
128 ['+OK \r\n', '1 abc\r\n', '2 def\r\n', '3 ghi\r\n', '.\r\n'])
130 listLines = list(pop3.formatUIDListResponse([0, None, 591], UIDs.__getitem__))
133 ['+OK \r\n', '1 abc\r\n', '3 ghi\r\n', '.\r\n'])
137 class MyVirtualPOP3(mail.protocols.VirtualPOP3):
141 def authenticateUserAPOP(self, user, digest):
142 user, domain = self.lookupDomain(user)
143 return self.service.domains['baz.com'].authenticateUserAPOP(user, digest, self.magic, domain)
150 def addUser(self, name):
151 self.users[name] = []
153 def addMessage(self, name, message):
154 self.users[name].append(message)
156 def authenticateUserAPOP(self, name, digest, magic, domain):
157 return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None
162 def __init__(self, list):
165 def listMessages(self, i=None):
167 return map(len, self.list)
168 return len(self.list[i])
170 def getMessage(self, i):
171 return StringIO.StringIO(self.list[i])
173 def getUidl(self, i):
176 def deleteMessage(self, i):
182 class MyPOP3Downloader(pop3.POP3Client):
184 def handle_WELCOME(self, line):
185 pop3.POP3Client.handle_WELCOME(self, line)
186 self.apop('hello@baz.com', 'world')
188 def handle_APOP(self, line):
191 data = (parts[1:] or ['NONE'])[0]
194 raise AssertionError, 'code is ' + code
198 def handle_RETR_continue(self, line):
199 self.lines.append(line)
201 def handle_RETR_end(self):
202 self.message = '\n'.join(self.lines) + '\n'
205 def handle_QUIT(self, line):
206 if line[:3] != '+OK':
207 raise AssertionError, 'code is ' + line
210 class POP3TestCase(unittest.TestCase):
215 Someone set up us the bomb!
218 expectedOutput = '''\
220 +OK Authentication succeeded\015
227 Someone set up us the bomb!\015
233 self.factory = internet.protocol.Factory()
234 self.factory.domains = {}
235 self.factory.domains['baz.com'] = DummyDomain()
236 self.factory.domains['baz.com'].addUser('hello')
237 self.factory.domains['baz.com'].addMessage('hello', self.message)
239 def testMessages(self):
240 client = LineSendingProtocol([
241 'APOP hello@baz.com world',
246 server = MyVirtualPOP3()
247 server.service = self.factory
249 output = '\r\n'.join(client.response) + '\r\n'
250 self.assertEqual(output, self.expectedOutput)
251 return loopback.loopbackTCP(server, client).addCallback(check)
253 def testLoopback(self):
254 protocol = MyVirtualPOP3()
255 protocol.service = self.factory
256 clientProtocol = MyPOP3Downloader()
258 self.assertEqual(clientProtocol.message, self.message)
259 protocol.connectionLost(
260 failure.Failure(Exception("Test harness disconnect")))
261 d = loopback.loopbackAsync(protocol, clientProtocol)
262 return d.addCallback(check)
263 testLoopback.suppress = [util.suppress(message="twisted.mail.pop3.POP3Client is deprecated")]
267 class DummyPOP3(pop3.POP3):
271 def authenticateUserAPOP(self, user, password):
272 return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
276 class DummyMailbox(pop3.Mailbox):
278 messages = ['From: moshe\nTo: moshe\n\nHow are you, friend?\n']
280 def __init__(self, exceptionType):
281 self.messages = DummyMailbox.messages[:]
282 self.exceptionType = exceptionType
284 def listMessages(self, i=None):
286 return map(len, self.messages)
287 if i >= len(self.messages):
288 raise self.exceptionType()
289 return len(self.messages[i])
291 def getMessage(self, i):
292 return StringIO.StringIO(self.messages[i])
294 def getUidl(self, i):
295 if i >= len(self.messages):
296 raise self.exceptionType()
299 def deleteMessage(self, i):
300 self.messages[i] = ''
303 class AnotherPOP3TestCase(unittest.TestCase):
305 def runTest(self, lines, expectedOutput):
307 client = LineSendingProtocol(lines)
308 d = loopback.loopbackAsync(dummy, client)
309 return d.addCallback(self._cbRunTest, client, dummy, expectedOutput)
312 def _cbRunTest(self, ignored, client, dummy, expectedOutput):
313 self.assertEqual('\r\n'.join(expectedOutput),
314 '\r\n'.join(client.response))
315 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
319 def test_buffer(self):
321 Test a lot of different POP3 commands in an extremely pipelined
324 This test may cover legitimate behavior, but the intent and
325 granularity are not very good. It would likely be an improvement to
326 split it into a number of smaller, more focused tests.
329 ["APOP moshez dummy",
338 '+OK Authentication succeeded',
349 'How are you, friend?',
351 '-ERR Bad message number argument',
353 '-ERR message deleted',
359 Test the no-op command.
366 '+OK Authentication succeeded',
371 def testAuthListing(self):
373 p.factory = internet.protocol.Factory()
374 p.factory.challengers = {'Auth1': None, 'secondAuth': None, 'authLast': None}
375 client = LineSendingProtocol([
380 d = loopback.loopbackAsync(p, client)
381 return d.addCallback(self._cbTestAuthListing, client)
383 def _cbTestAuthListing(self, ignored, client):
384 self.failUnless(client.response[1].startswith('+OK'))
385 self.assertEqual(client.response[2:6],
386 ["AUTH1", "SECONDAUTH", "AUTHLAST", "."])
388 def testIllegalPASS(self):
390 client = LineSendingProtocol([
394 d = loopback.loopbackAsync(dummy, client)
395 return d.addCallback(self._cbTestIllegalPASS, client, dummy)
397 def _cbTestIllegalPASS(self, ignored, client, dummy):
398 expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
399 self.assertEqual(expected_output, '\r\n'.join(client.response) + '\r\n')
400 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
402 def testEmptyPASS(self):
404 client = LineSendingProtocol([
408 d = loopback.loopbackAsync(dummy, client)
409 return d.addCallback(self._cbTestEmptyPASS, client, dummy)
411 def _cbTestEmptyPASS(self, ignored, client, dummy):
412 expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
413 self.assertEqual(expected_output, '\r\n'.join(client.response) + '\r\n')
414 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
417 class TestServerFactory:
418 implements(pop3.IServerFactory)
420 def cap_IMPLEMENTATION(self):
421 return "Test Implementation String"
423 def cap_EXPIRE(self):
426 challengers = {"SCHEME_1": None, "SCHEME_2": None}
428 def cap_LOGIN_DELAY(self):
432 def perUserExpiration(self):
436 def perUserLoginDelay(self):
442 messageExpiration = 25
445 class CapabilityTestCase(unittest.TestCase):
447 s = StringIO.StringIO()
449 p.factory = TestServerFactory()
450 p.transport = internet.protocol.FileWrapper(s)
454 self.caps = p.listCapabilities()
455 self.pcaps = s.getvalue().splitlines()
457 s = StringIO.StringIO()
458 p.mbox = TestMailbox()
459 p.transport = internet.protocol.FileWrapper(s)
462 self.lpcaps = s.getvalue().splitlines()
463 p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
465 def contained(self, s, *caps):
470 self.contained("UIDL", self.caps, self.pcaps, self.lpcaps)
473 self.contained("TOP", self.caps, self.pcaps, self.lpcaps)
476 self.contained("USER", self.caps, self.pcaps, self.lpcaps)
478 def testEXPIRE(self):
479 self.contained("EXPIRE 60 USER", self.caps, self.pcaps)
480 self.contained("EXPIRE 25", self.lpcaps)
482 def testIMPLEMENTATION(self):
484 "IMPLEMENTATION Test Implementation String",
485 self.caps, self.pcaps, self.lpcaps
490 "SASL SCHEME_1 SCHEME_2",
491 self.caps, self.pcaps, self.lpcaps
494 def testLOGIN_DELAY(self):
495 self.contained("LOGIN-DELAY 120 USER", self.caps, self.pcaps)
496 self.assertIn("LOGIN-DELAY 100", self.lpcaps)
500 class GlobalCapabilitiesTestCase(unittest.TestCase):
502 s = StringIO.StringIO()
504 p.factory = TestServerFactory()
505 p.factory.pue = p.factory.puld = False
506 p.transport = internet.protocol.FileWrapper(s)
510 self.caps = p.listCapabilities()
511 self.pcaps = s.getvalue().splitlines()
513 s = StringIO.StringIO()
514 p.mbox = TestMailbox()
515 p.transport = internet.protocol.FileWrapper(s)
518 self.lpcaps = s.getvalue().splitlines()
519 p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
521 def contained(self, s, *caps):
525 def testEXPIRE(self):
526 self.contained("EXPIRE 60", self.caps, self.pcaps, self.lpcaps)
528 def testLOGIN_DELAY(self):
529 self.contained("LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps)
534 def requestAvatar(self, avatarId, mind, *interfaces):
535 if avatarId == 'testuser':
536 return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
541 class SASLTestCase(unittest.TestCase):
542 def testValidLogin(self):
544 p.factory = TestServerFactory()
545 p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials}
546 p.portal = cred.portal.Portal(TestRealm())
547 ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
548 ch.addUser('testuser', 'testpassword')
549 p.portal.registerChecker(ch)
551 s = StringIO.StringIO()
552 p.transport = internet.protocol.FileWrapper(s)
555 p.lineReceived("CAPA")
556 self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0)
558 p.lineReceived("AUTH CRAM-MD5")
559 chal = s.getvalue().splitlines()[-1][2:]
560 chal = base64.decodestring(chal)
561 response = hmac.HMAC('testpassword', chal).hexdigest()
563 p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n'))
564 self.failUnless(p.mbox)
565 self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0)
566 p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
572 Tests for all the commands a POP3 server is allowed to receive.
579 More message text for you.
585 Make a POP3 server protocol instance hooked up to a simple mailbox and
586 a transport that buffers output to a StringIO.
589 p.mbox = self.mailboxType(self.exceptionType)
593 s = StringIO.StringIO()
594 p.transport = internet.protocol.FileWrapper(s)
597 self.pop3Transport = s
602 Disconnect the server protocol so it can clean up anything it might
605 self.pop3Server.connectionLost(failure.Failure(Exception("Test harness disconnect")))
610 Do some of the things that the reactor would take care of, if the
611 reactor were actually running.
613 # Oh man FileWrapper is pooh.
614 self.pop3Server.transport._checkProducer()
619 Test the two forms of list: with a message index number, which should
620 return a short-form response, and without a message index number, which
621 should return a long-form response, one line per message.
624 s = self.pop3Transport
626 p.lineReceived("LIST 1")
628 self.assertEqual(s.getvalue(), "+OK 1 44\r\n")
631 p.lineReceived("LIST")
633 self.assertEqual(s.getvalue(), "+OK 1\r\n1 44\r\n.\r\n")
636 def testLISTWithBadArgument(self):
638 Test that non-integers and out-of-bound integers produce appropriate
642 s = self.pop3Transport
644 p.lineReceived("LIST a")
647 "-ERR Invalid message-number: 'a'\r\n")
650 p.lineReceived("LIST 0")
653 "-ERR Invalid message-number: 0\r\n")
656 p.lineReceived("LIST 2")
659 "-ERR Invalid message-number: 2\r\n")
665 Test the two forms of the UIDL command. These are just like the two
666 forms of the LIST command.
669 s = self.pop3Transport
671 p.lineReceived("UIDL 1")
672 self.assertEqual(s.getvalue(), "+OK 0\r\n")
675 p.lineReceived("UIDL")
677 self.assertEqual(s.getvalue(), "+OK \r\n1 0\r\n.\r\n")
680 def testUIDLWithBadArgument(self):
682 Test that UIDL with a non-integer or an out-of-bounds integer produces
683 the appropriate error response.
686 s = self.pop3Transport
688 p.lineReceived("UIDL a")
691 "-ERR Bad message number argument\r\n")
694 p.lineReceived("UIDL 0")
697 "-ERR Bad message number argument\r\n")
700 p.lineReceived("UIDL 2")
703 "-ERR Bad message number argument\r\n")
709 Test the single form of the STAT command, which returns a short-form
710 response of the number of messages in the mailbox and their total size.
713 s = self.pop3Transport
715 p.lineReceived("STAT")
717 self.assertEqual(s.getvalue(), "+OK 1 44\r\n")
722 Test downloading a message.
725 s = self.pop3Transport
727 p.lineReceived("RETR 1")
735 "How are you, friend?\r\n"
740 def testRETRWithBadArgument(self):
742 Test that trying to download a message with a bad argument, either not
743 an integer or an out-of-bounds integer, fails with the appropriate
747 s = self.pop3Transport
749 p.lineReceived("RETR a")
752 "-ERR Bad message number argument\r\n")
755 p.lineReceived("RETR 0")
758 "-ERR Bad message number argument\r\n")
761 p.lineReceived("RETR 2")
764 "-ERR Bad message number argument\r\n")
770 Test downloading the headers and part of the body of a message.
773 s = self.pop3Transport
774 p.mbox.messages.append(self.extraMessage)
776 p.lineReceived("TOP 1 0")
780 "+OK Top of message follows\r\n"
787 def testTOPWithBadArgument(self):
789 Test that trying to download a message with a bad argument, either a
790 message number which isn't an integer or is an out-of-bounds integer or
791 a number of lines which isn't an integer or is a negative integer,
792 fails with the appropriate error response.
795 s = self.pop3Transport
796 p.mbox.messages.append(self.extraMessage)
798 p.lineReceived("TOP 1 a")
801 "-ERR Bad line count argument\r\n")
804 p.lineReceived("TOP 1 -1")
807 "-ERR Bad line count argument\r\n")
810 p.lineReceived("TOP a 1")
813 "-ERR Bad message number argument\r\n")
816 p.lineReceived("TOP 0 1")
819 "-ERR Bad message number argument\r\n")
822 p.lineReceived("TOP 3 1")
825 "-ERR Bad message number argument\r\n")
831 Test the exceedingly pointless LAST command, which tells you the
832 highest message index which you have already downloaded.
835 s = self.pop3Transport
836 p.mbox.messages.append(self.extraMessage)
838 p.lineReceived('LAST')
845 def testRetrieveUpdatesHighest(self):
847 Test that issuing a RETR command updates the LAST response.
850 s = self.pop3Transport
851 p.mbox.messages.append(self.extraMessage)
853 p.lineReceived('RETR 2')
856 p.lineReceived('LAST')
863 def testTopUpdatesHighest(self):
865 Test that issuing a TOP command updates the LAST response.
868 s = self.pop3Transport
869 p.mbox.messages.append(self.extraMessage)
871 p.lineReceived('TOP 2 10')
874 p.lineReceived('LAST')
880 def testHighestOnlyProgresses(self):
882 Test that downloading a message with a smaller index than the current
883 LAST response doesn't change the LAST response.
886 s = self.pop3Transport
887 p.mbox.messages.append(self.extraMessage)
889 p.lineReceived('RETR 2')
891 p.lineReceived('TOP 1 10')
894 p.lineReceived('LAST')
900 def testResetClearsHighest(self):
902 Test that issuing RSET changes the LAST response to 0.
905 s = self.pop3Transport
906 p.mbox.messages.append(self.extraMessage)
908 p.lineReceived('RETR 2')
910 p.lineReceived('RSET')
912 p.lineReceived('LAST')
919 _listMessageDeprecation = (
920 "twisted.mail.pop3.IMailbox.listMessages may not "
921 "raise IndexError for out-of-bounds message numbers: "
922 "raise ValueError instead.")
923 _listMessageSuppression = util.suppress(
924 message=_listMessageDeprecation,
925 category=PendingDeprecationWarning)
927 _getUidlDeprecation = (
928 "twisted.mail.pop3.IMailbox.getUidl may not "
929 "raise IndexError for out-of-bounds message numbers: "
930 "raise ValueError instead.")
931 _getUidlSuppression = util.suppress(
932 message=_getUidlDeprecation,
933 category=PendingDeprecationWarning)
935 class IndexErrorCommandTestCase(CommandMixin, unittest.TestCase):
937 Run all of the command tests against a mailbox which raises IndexError
938 when an out of bounds request is made. This behavior will be deprecated
939 shortly and then removed.
941 exceptionType = IndexError
942 mailboxType = DummyMailbox
944 def testLISTWithBadArgument(self):
945 return CommandMixin.testLISTWithBadArgument(self)
946 testLISTWithBadArgument.suppress = [_listMessageSuppression]
949 def testUIDLWithBadArgument(self):
950 return CommandMixin.testUIDLWithBadArgument(self)
951 testUIDLWithBadArgument.suppress = [_getUidlSuppression]
954 def testTOPWithBadArgument(self):
955 return CommandMixin.testTOPWithBadArgument(self)
956 testTOPWithBadArgument.suppress = [_listMessageSuppression]
959 def testRETRWithBadArgument(self):
960 return CommandMixin.testRETRWithBadArgument(self)
961 testRETRWithBadArgument.suppress = [_listMessageSuppression]
965 class ValueErrorCommandTestCase(CommandMixin, unittest.TestCase):
967 Run all of the command tests against a mailbox which raises ValueError
968 when an out of bounds request is made. This is the correct behavior and
969 after support for mailboxes which raise IndexError is removed, this will
970 become just C{CommandTestCase}.
972 exceptionType = ValueError
973 mailboxType = DummyMailbox
977 class SyncDeferredMailbox(DummyMailbox):
979 Mailbox which has a listMessages implementation which returns a Deferred
980 which has already fired.
982 def listMessages(self, n=None):
983 return defer.succeed(DummyMailbox.listMessages(self, n))
987 class IndexErrorSyncDeferredCommandTestCase(IndexErrorCommandTestCase):
989 Run all of the L{IndexErrorCommandTestCase} tests with a
990 synchronous-Deferred returning IMailbox implementation.
992 mailboxType = SyncDeferredMailbox
996 class ValueErrorSyncDeferredCommandTestCase(ValueErrorCommandTestCase):
998 Run all of the L{ValueErrorCommandTestCase} tests with a
999 synchronous-Deferred returning IMailbox implementation.
1001 mailboxType = SyncDeferredMailbox
1005 class AsyncDeferredMailbox(DummyMailbox):
1007 Mailbox which has a listMessages implementation which returns a Deferred
1008 which has not yet fired.
1010 def __init__(self, *a, **kw):
1012 DummyMailbox.__init__(self, *a, **kw)
1015 def listMessages(self, n=None):
1016 d = defer.Deferred()
1017 # See AsyncDeferredMailbox._flush
1018 self.waiting.append((d, DummyMailbox.listMessages(self, n)))
1023 class IndexErrorAsyncDeferredCommandTestCase(IndexErrorCommandTestCase):
1025 Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred
1026 returning IMailbox implementation.
1028 mailboxType = AsyncDeferredMailbox
1032 Fire whatever Deferreds we've built up in our mailbox.
1034 while self.pop3Server.mbox.waiting:
1035 d, a = self.pop3Server.mbox.waiting.pop()
1037 IndexErrorCommandTestCase._flush(self)
1041 class ValueErrorAsyncDeferredCommandTestCase(ValueErrorCommandTestCase):
1043 Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred
1044 returning IMailbox implementation.
1046 mailboxType = AsyncDeferredMailbox
1050 Fire whatever Deferreds we've built up in our mailbox.
1052 while self.pop3Server.mbox.waiting:
1053 d, a = self.pop3Server.mbox.waiting.pop()
1055 ValueErrorCommandTestCase._flush(self)
1057 class POP3MiscTestCase(unittest.TestCase):
1059 Miscellaneous tests more to do with module/package structure than
1060 anything to do with the Post Office Protocol.
1064 This test checks that all names listed in
1065 twisted.mail.pop3.__all__ are actually present in the module.
1067 mod = twisted.mail.pop3
1068 for attr in mod.__all__:
1069 self.failUnless(hasattr(mod, attr))