Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / mail / test / pop3testserver.py
1 #!/usr/bin/env python
2 # -*- test-case-name: twisted.mail.test.test_pop3client -*-
3
4 # Copyright (c) Twisted Matrix Laboratories.
5 # See LICENSE for details.
6
7 from twisted.internet.protocol import Factory
8 from twisted.protocols import basic
9 from twisted.internet import reactor
10 import sys, time
11
12 USER = "test"
13 PASS = "twisted"
14
15 PORT = 1100
16
17 SSL_SUPPORT = True
18 UIDL_SUPPORT = True
19 INVALID_SERVER_RESPONSE = False
20 INVALID_CAPABILITY_RESPONSE = False
21 INVALID_LOGIN_RESPONSE = False
22 DENY_CONNECTION = False
23 DROP_CONNECTION = False
24 BAD_TLS_RESPONSE = False
25 TIMEOUT_RESPONSE = False
26 TIMEOUT_DEFERRED = False
27 SLOW_GREETING = False
28
29 """Commands"""
30 CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready"
31
32 CAPABILITIES = [
33 "TOP",
34 "LOGIN-DELAY 180",
35 "USER",
36 "SASL LOGIN"
37 ]
38
39 CAPABILITIES_SSL = "STLS"
40 CAPABILITIES_UIDL = "UIDL"
41
42  
43 INVALID_RESPONSE = "-ERR Unknown request"
44 VALID_RESPONSE = "+OK Command Completed"
45 AUTH_DECLINED = "-ERR LOGIN failed"
46 AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
47 TLS_ERROR = "-ERR server side error start TLS handshake"
48 LOGOUT_COMPLETE = "+OK quit completed"
49 NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
50 STAT = "+OK 0 0"
51 UIDL = "+OK Unique-ID listing follows\r\n."
52 LIST = "+OK Mailbox scan listing follows\r\n."
53 CAP_START = "+OK Capability list follows:"
54
55
56 class POP3TestServer(basic.LineReceiver):
57     def __init__(self, contextFactory = None):
58         self.loggedIn = False
59         self.caps = None
60         self.tmpUser = None
61         self.ctx = contextFactory
62
63     def sendSTATResp(self, req):
64         self.sendLine(STAT)
65
66     def sendUIDLResp(self, req):
67         self.sendLine(UIDL)
68
69     def sendLISTResp(self, req):
70         self.sendLine(LIST)
71
72     def sendCapabilities(self):
73         if self.caps is None:
74             self.caps = [CAP_START]
75
76         if UIDL_SUPPORT:
77             self.caps.append(CAPABILITIES_UIDL)
78
79         if SSL_SUPPORT:
80             self.caps.append(CAPABILITIES_SSL)
81
82         for cap in CAPABILITIES:
83             self.caps.append(cap)
84         resp = '\r\n'.join(self.caps)
85         resp += "\r\n."
86
87         self.sendLine(resp)
88
89
90     def connectionMade(self):
91         if DENY_CONNECTION:
92             self.disconnect()
93             return
94
95         if SLOW_GREETING:
96             reactor.callLater(20, self.sendGreeting)
97
98         else:
99             self.sendGreeting()
100
101     def sendGreeting(self):
102         self.sendLine(CONNECTION_MADE)
103
104     def lineReceived(self, line):
105         """Error Conditions"""
106
107         uline = line.upper()
108         find = lambda s: uline.find(s) != -1
109
110         if TIMEOUT_RESPONSE:
111             # Do not respond to clients request
112             return
113
114         if DROP_CONNECTION:
115             self.disconnect()
116             return
117
118         elif find("CAPA"):
119             if INVALID_CAPABILITY_RESPONSE:
120                 self.sendLine(INVALID_RESPONSE)
121             else:
122                 self.sendCapabilities()
123
124         elif find("STLS") and SSL_SUPPORT:
125             self.startTLS()
126
127         elif find("USER"):
128             if INVALID_LOGIN_RESPONSE:
129                 self.sendLine(INVALID_RESPONSE)
130                 return
131
132             resp = None
133             try:
134                 self.tmpUser = line.split(" ")[1]
135                 resp = VALID_RESPONSE
136             except:
137                 resp = AUTH_DECLINED
138
139             self.sendLine(resp)
140
141         elif find("PASS"):
142             resp = None
143             try:
144                 pwd = line.split(" ")[1]
145
146                 if self.tmpUser is None or pwd is None:
147                     resp = AUTH_DECLINED
148                 elif self.tmpUser == USER and pwd == PASS:
149                     resp = AUTH_ACCEPTED
150                     self.loggedIn = True
151                 else:
152                     resp = AUTH_DECLINED
153             except:
154                 resp = AUTH_DECLINED
155
156             self.sendLine(resp)
157
158         elif find("QUIT"):
159             self.loggedIn = False
160             self.sendLine(LOGOUT_COMPLETE)
161             self.disconnect()
162
163         elif INVALID_SERVER_RESPONSE:
164             self.sendLine(INVALID_RESPONSE)
165
166         elif not self.loggedIn:
167             self.sendLine(NOT_LOGGED_IN)
168
169         elif find("NOOP"):
170             self.sendLine(VALID_RESPONSE)
171
172         elif find("STAT"):
173             if TIMEOUT_DEFERRED:
174                 return
175             self.sendLine(STAT)
176
177         elif find("LIST"):
178             if TIMEOUT_DEFERRED:
179                 return
180             self.sendLine(LIST)
181
182         elif find("UIDL"):
183             if TIMEOUT_DEFERRED:
184                 return
185             elif not UIDL_SUPPORT:
186                 self.sendLine(INVALID_RESPONSE)
187                 return
188
189             self.sendLine(UIDL)
190
191     def startTLS(self):
192         if self.ctx is None:
193             self.getContext()
194
195         if SSL_SUPPORT and self.ctx is not None:
196             self.sendLine('+OK Begin TLS negotiation now')
197             self.transport.startTLS(self.ctx)
198         else:
199             self.sendLine('-ERR TLS not available')
200
201     def disconnect(self):
202         self.transport.loseConnection()
203
204     def getContext(self):
205         try:
206             from twisted.internet import ssl
207         except ImportError:
208            self.ctx = None
209         else:
210             self.ctx = ssl.ClientContextFactory()
211             self.ctx.method = ssl.SSL.TLSv1_METHOD
212
213
214 usage = """popServer.py [arg] (default is Standard POP Server with no messages)
215 no_ssl  - Start with no SSL support
216 no_uidl - Start with no UIDL support
217 bad_resp - Send a non-RFC compliant response to the Client
218 bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABILITY' request
219 bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGIN' request
220 deny - Deny the connection
221 drop - Drop the connection after sending the greeting
222 bad_tls - Send a bad response to a STARTTLS
223 timeout - Do not return a response to a Client request
224 to_deferred - Do not return a response on a 'Select' request. This
225               will test Deferred callback handling
226 slow - Wait 20 seconds after the connection is made to return a Server Greeting
227 """
228
229 def printMessage(msg):
230     print "Server Starting in %s mode" % msg
231
232 def processArg(arg):
233
234     if arg.lower() == 'no_ssl':
235         global SSL_SUPPORT
236         SSL_SUPPORT = False
237         printMessage("NON-SSL")
238
239     elif arg.lower() == 'no_uidl':
240         global UIDL_SUPPORT
241         UIDL_SUPPORT = False
242         printMessage("NON-UIDL")
243
244     elif arg.lower() == 'bad_resp':
245         global INVALID_SERVER_RESPONSE
246         INVALID_SERVER_RESPONSE = True
247         printMessage("Invalid Server Response")
248
249     elif arg.lower() == 'bad_cap_resp':
250         global INVALID_CAPABILITY_RESPONSE
251         INVALID_CAPABILITY_RESPONSE = True
252         printMessage("Invalid Capability Response")
253
254     elif arg.lower() == 'bad_login_resp':
255         global INVALID_LOGIN_RESPONSE
256         INVALID_LOGIN_RESPONSE = True
257         printMessage("Invalid Capability Response")
258
259     elif arg.lower() == 'deny':
260         global DENY_CONNECTION
261         DENY_CONNECTION = True
262         printMessage("Deny Connection")
263
264     elif arg.lower() == 'drop':
265         global DROP_CONNECTION
266         DROP_CONNECTION = True
267         printMessage("Drop Connection")
268
269
270     elif arg.lower() == 'bad_tls':
271         global BAD_TLS_RESPONSE
272         BAD_TLS_RESPONSE = True
273         printMessage("Bad TLS Response")
274
275     elif arg.lower() == 'timeout':
276         global TIMEOUT_RESPONSE
277         TIMEOUT_RESPONSE = True
278         printMessage("Timeout Response")
279
280     elif arg.lower() == 'to_deferred':
281         global TIMEOUT_DEFERRED
282         TIMEOUT_DEFERRED = True
283         printMessage("Timeout Deferred Response")
284
285     elif arg.lower() == 'slow':
286         global SLOW_GREETING
287         SLOW_GREETING = True
288         printMessage("Slow Greeting")
289
290     elif arg.lower() == '--help':
291         print usage
292         sys.exit()
293
294     else:
295         print usage
296         sys.exit()
297
298 def main():
299
300     if len(sys.argv) < 2:
301         printMessage("POP3 with no messages")
302     else:
303         args = sys.argv[1:]
304
305         for arg in args:
306             processArg(arg)
307
308     f = Factory()
309     f.protocol = POP3TestServer
310     reactor.listenTCP(PORT, f)
311     reactor.run()
312
313 if __name__ == '__main__':
314     main()