Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / mail / relaymanager.py
1 # -*- test-case-name: twisted.mail.test.test_mail -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Infrastructure for relaying mail through smart host
7
8 Today, internet e-mail has stopped being Peer-to-peer for many problems,
9 spam (unsolicited bulk mail) among them. Instead, most nodes on the
10 internet send all e-mail to a single computer, usually the ISP's though
11 sometimes other schemes, such as SMTP-after-POP, are used. This computer
12 is supposedly permanently up and traceable, and will do the work of
13 figuring out MXs and connecting to them. This kind of configuration
14 is usually termed "smart host", since the host we are connecting to
15 is "smart" (and will find MXs and connect to them) rather then just
16 accepting mail for a small set of domains.
17
18 The classes here are meant to facilitate support for such a configuration
19 for the twisted.mail SMTP server
20 """
21
22 import rfc822
23 import os
24 import time
25
26 try:
27     import cPickle as pickle
28 except ImportError:
29     import pickle
30
31 from twisted.python import log
32 from twisted.python.failure import Failure
33 from twisted.python.compat import set
34 from twisted.mail import relay
35 from twisted.mail import bounce
36 from twisted.internet import protocol
37 from twisted.internet.defer import Deferred, DeferredList
38 from twisted.internet.error import DNSLookupError
39 from twisted.mail import smtp
40 from twisted.application import internet
41
42 class ManagedRelayerMixin:
43     """SMTP Relayer which notifies a manager
44
45     Notify the manager about successful mail, failed mail
46     and broken connections
47     """
48
49     def __init__(self, manager):
50         self.manager = manager
51
52     def sentMail(self, code, resp, numOk, addresses, log):
53         """called when e-mail has been sent
54
55         we will always get 0 or 1 addresses.
56         """
57         message = self.names[0]
58         if code in smtp.SUCCESS:
59             self.manager.notifySuccess(self.factory, message)
60         else:
61             self.manager.notifyFailure(self.factory, message)
62         del self.messages[0]
63         del self.names[0]
64
65     def connectionLost(self, reason):
66         """called when connection is broken
67
68         notify manager we will try to send no more e-mail
69         """
70         self.manager.notifyDone(self.factory)
71
72 class SMTPManagedRelayer(ManagedRelayerMixin, relay.SMTPRelayer):
73     def __init__(self, messages, manager, *args, **kw):
74         """
75         @type messages: C{list} of C{str}
76         @param messages: Filenames of messages to relay
77
78         manager should support .notifySuccess, .notifyFailure
79         and .notifyDone
80         """
81         ManagedRelayerMixin.__init__(self, manager)
82         relay.SMTPRelayer.__init__(self, messages, *args, **kw)
83
84 class ESMTPManagedRelayer(ManagedRelayerMixin, relay.ESMTPRelayer):
85     def __init__(self, messages, manager, *args, **kw):
86         """
87         @type messages: C{list} of C{str}
88         @param messages: Filenames of messages to relay
89
90         manager should support .notifySuccess, .notifyFailure
91         and .notifyDone
92         """
93         ManagedRelayerMixin.__init__(self, manager)
94         relay.ESMTPRelayer.__init__(self, messages, *args, **kw)
95
96 class SMTPManagedRelayerFactory(protocol.ClientFactory):
97     protocol = SMTPManagedRelayer
98
99     def __init__(self, messages, manager, *args, **kw):
100         self.messages = messages
101         self.manager = manager
102         self.pArgs = args
103         self.pKwArgs = kw
104
105     def buildProtocol(self, addr):
106         protocol = self.protocol(self.messages, self.manager, *self.pArgs,
107             **self.pKwArgs)
108         protocol.factory = self
109         return protocol
110
111     def clientConnectionFailed(self, connector, reason):
112         """called when connection could not be made
113
114         our manager should be notified that this happened,
115         it might prefer some other host in that case"""
116         self.manager.notifyNoConnection(self)
117         self.manager.notifyDone(self)
118
119 class ESMTPManagedRelayerFactory(SMTPManagedRelayerFactory):
120     protocol = ESMTPManagedRelayer
121
122     def __init__(self, messages, manager, secret, contextFactory, *args, **kw):
123         self.secret = secret
124         self.contextFactory = contextFactory
125         SMTPManagedRelayerFactory.__init__(self, messages, manager, *args, **kw)
126
127     def buildProtocol(self, addr):
128         s = self.secret and self.secret(addr)
129         protocol = self.protocol(self.messages, self.manager, s,
130             self.contextFactory, *self.pArgs, **self.pKwArgs)
131         protocol.factory = self
132         return protocol
133
134 class Queue:
135     """A queue of ougoing emails."""
136
137     noisy = True
138
139     def __init__(self, directory):
140         self.directory = directory
141         self._init()
142
143     def _init(self):
144         self.n = 0
145         self.waiting = {}
146         self.relayed = {}
147         self.readDirectory()
148
149     def __getstate__(self):
150         """(internal) delete volatile state"""
151         return {'directory' : self.directory}
152
153     def __setstate__(self, state):
154         """(internal) restore volatile state"""
155         self.__dict__.update(state)
156         self._init()
157
158     def readDirectory(self):
159         """Read the messages directory.
160
161         look for new messages.
162         """
163         for message in os.listdir(self.directory):
164             # Skip non data files
165             if message[-2:]!='-D':
166                 continue
167             self.addMessage(message[:-2])
168
169     def getWaiting(self):
170         return self.waiting.keys()
171
172     def hasWaiting(self):
173         return len(self.waiting) > 0
174
175     def getRelayed(self):
176         return self.relayed.keys()
177
178     def setRelaying(self, message):
179         del self.waiting[message]
180         self.relayed[message] = 1
181
182     def setWaiting(self, message):
183         del self.relayed[message]
184         self.waiting[message] = 1
185
186     def addMessage(self, message):
187         if message not in self.relayed:
188             self.waiting[message] = 1
189             if self.noisy:
190                 log.msg('Set ' + message + ' waiting')
191
192     def done(self, message):
193         """Remove message to from queue."""
194         message = os.path.basename(message)
195         os.remove(self.getPath(message) + '-D')
196         os.remove(self.getPath(message) + '-H')
197         del self.relayed[message]
198
199     def getPath(self, message):
200         """Get the path in the filesystem of a message."""
201         return os.path.join(self.directory, message)
202
203     def getEnvelope(self, message):
204         return pickle.load(self.getEnvelopeFile(message))
205
206     def getEnvelopeFile(self, message):
207         return open(os.path.join(self.directory, message+'-H'), 'rb')
208
209     def createNewMessage(self):
210         """Create a new message in the queue.
211
212         Return a tuple - file-like object for headers, and ISMTPMessage.
213         """
214         fname = "%s_%s_%s_%s" % (os.getpid(), time.time(), self.n, id(self))
215         self.n = self.n + 1
216         headerFile = open(os.path.join(self.directory, fname+'-H'), 'wb')
217         tempFilename = os.path.join(self.directory, fname+'-C')
218         finalFilename = os.path.join(self.directory, fname+'-D')
219         messageFile = open(tempFilename, 'wb')
220
221         from twisted.mail.mail import FileMessage
222         return headerFile,FileMessage(messageFile, tempFilename, finalFilename)
223
224
225 class _AttemptManager(object):
226     """
227     Manage the state of a single attempt to flush the relay queue.
228     """
229     def __init__(self, manager):
230         self.manager = manager
231         self._completionDeferreds = []
232
233
234     def getCompletionDeferred(self):
235         self._completionDeferreds.append(Deferred())
236         return self._completionDeferreds[-1]
237
238
239     def _finish(self, relay, message):
240         self.manager.managed[relay].remove(os.path.basename(message))
241         self.manager.queue.done(message)
242
243
244     def notifySuccess(self, relay, message):
245         """a relay sent a message successfully
246
247         Mark it as sent in our lists
248         """
249         if self.manager.queue.noisy:
250             log.msg("success sending %s, removing from queue" % message)
251         self._finish(relay, message)
252
253
254     def notifyFailure(self, relay, message):
255         """Relaying the message has failed."""
256         if self.manager.queue.noisy:
257             log.msg("could not relay "+message)
258         # Moshe - Bounce E-mail here
259         # Be careful: if it's a bounced bounce, silently
260         # discard it
261         message = os.path.basename(message)
262         fp = self.manager.queue.getEnvelopeFile(message)
263         from_, to = pickle.load(fp)
264         fp.close()
265         from_, to, bounceMessage = bounce.generateBounce(open(self.manager.queue.getPath(message)+'-D'), from_, to)
266         fp, outgoingMessage = self.manager.queue.createNewMessage()
267         pickle.dump([from_, to], fp)
268         fp.close()
269         for line in bounceMessage.splitlines():
270              outgoingMessage.lineReceived(line)
271         outgoingMessage.eomReceived()
272         self._finish(relay, self.manager.queue.getPath(message))
273
274
275     def notifyDone(self, relay):
276         """A relaying SMTP client is disconnected.
277
278         unmark all pending messages under this relay's responsibility
279         as being relayed, and remove the relay.
280         """
281         for message in self.manager.managed.get(relay, ()):
282             if self.manager.queue.noisy:
283                 log.msg("Setting " + message + " waiting")
284             self.manager.queue.setWaiting(message)
285         try:
286             del self.manager.managed[relay]
287         except KeyError:
288             pass
289         notifications = self._completionDeferreds
290         self._completionDeferreds = None
291         for d in notifications:
292             d.callback(None)
293
294
295     def notifyNoConnection(self, relay):
296         """Relaying SMTP client couldn't connect.
297
298         Useful because it tells us our upstream server is unavailable.
299         """
300         # Back off a bit
301         try:
302             msgs = self.manager.managed[relay]
303         except KeyError:
304             log.msg("notifyNoConnection passed unknown relay!")
305             return
306
307         if self.manager.queue.noisy:
308             log.msg("Backing off on delivery of " + str(msgs))
309         def setWaiting(queue, messages):
310             map(queue.setWaiting, messages)
311         from twisted.internet import reactor
312         reactor.callLater(30, setWaiting, self.manager.queue, msgs)
313         del self.manager.managed[relay]
314
315
316
317 class SmartHostSMTPRelayingManager:
318     """Manage SMTP Relayers
319
320     Manage SMTP relayers, keeping track of the existing connections,
321     each connection's responsibility in term of messages. Create
322     more relayers if the need arises.
323
324     Someone should press .checkState periodically
325
326     @ivar fArgs: Additional positional arguments used to instantiate
327     C{factory}.
328
329     @ivar fKwArgs: Additional keyword arguments used to instantiate
330     C{factory}.
331
332     @ivar factory: A callable which returns a ClientFactory suitable for
333     making SMTP connections.
334     """
335
336     factory = SMTPManagedRelayerFactory
337
338     PORT = 25
339
340     mxcalc = None
341
342     def __init__(self, queue, maxConnections=2, maxMessagesPerConnection=10):
343         """
344         @type queue: Any implementor of C{IQueue}
345         @param queue: The object used to queue messages on their way to
346         delivery.
347
348         @type maxConnections: C{int}
349         @param maxConnections: The maximum number of SMTP connections to
350         allow to be opened at any given time.
351
352         @type maxMessagesPerConnection: C{int}
353         @param maxMessagesPerConnection: The maximum number of messages a
354         relayer will be given responsibility for.
355
356         Default values are meant for a small box with 1-5 users.
357         """
358         self.maxConnections = maxConnections
359         self.maxMessagesPerConnection = maxMessagesPerConnection
360         self.managed = {} # SMTP clients we're managing
361         self.queue = queue
362         self.fArgs = ()
363         self.fKwArgs = {}
364
365     def __getstate__(self):
366         """(internal) delete volatile state"""
367         dct = self.__dict__.copy()
368         del dct['managed']
369         return dct
370
371     def __setstate__(self, state):
372         """(internal) restore volatile state"""
373         self.__dict__.update(state)
374         self.managed = {}
375
376     def checkState(self):
377         """
378         Synchronize with the state of the world, and maybe launch a new
379         relay.
380
381         Call me periodically to check I am still up to date.
382
383         @return: None or a Deferred which fires when all of the SMTP clients
384         started by this call have disconnected.
385         """
386         self.queue.readDirectory()
387         if (len(self.managed) >= self.maxConnections):
388             return
389         if  not self.queue.hasWaiting():
390             return
391
392         return self._checkStateMX()
393
394     def _checkStateMX(self):
395         nextMessages = self.queue.getWaiting()
396         nextMessages.reverse()
397
398         exchanges = {}
399         for msg in nextMessages:
400             from_, to = self.queue.getEnvelope(msg)
401             name, addr = rfc822.parseaddr(to)
402             parts = addr.split('@', 1)
403             if len(parts) != 2:
404                 log.err("Illegal message destination: " + to)
405                 continue
406             domain = parts[1]
407
408             self.queue.setRelaying(msg)
409             exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
410             if len(exchanges) >= (self.maxConnections - len(self.managed)):
411                 break
412
413         if self.mxcalc is None:
414             self.mxcalc = MXCalculator()
415
416         relays = []
417         for (domain, msgs) in exchanges.iteritems():
418             manager = _AttemptManager(self)
419             factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs)
420             self.managed[factory] = map(os.path.basename, msgs)
421             relayAttemptDeferred = manager.getCompletionDeferred()
422             connectSetupDeferred = self.mxcalc.getMX(domain)
423             connectSetupDeferred.addCallback(lambda mx: str(mx.name))
424             connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factory)
425             connectSetupDeferred.addErrback(lambda err: (relayAttemptDeferred.errback(err), err)[1])
426             connectSetupDeferred.addErrback(self._ebExchange, factory, domain)
427             relays.append(relayAttemptDeferred)
428         return DeferredList(relays)
429
430
431     def _cbExchange(self, address, port, factory):
432         from twisted.internet import reactor
433         reactor.connectTCP(address, port, factory)
434
435     def _ebExchange(self, failure, factory, domain):
436         log.err('Error setting up managed relay factory for ' + domain)
437         log.err(failure)
438         def setWaiting(queue, messages):
439             map(queue.setWaiting, messages)
440         from twisted.internet import reactor
441         reactor.callLater(30, setWaiting, self.queue, self.managed[factory])
442         del self.managed[factory]
443
444 class SmartHostESMTPRelayingManager(SmartHostSMTPRelayingManager):
445     factory = ESMTPManagedRelayerFactory
446
447 def _checkState(manager):
448     manager.checkState()
449
450 def RelayStateHelper(manager, delay):
451     return internet.TimerService(delay, _checkState, manager)
452
453
454
455 class CanonicalNameLoop(Exception):
456     """
457     When trying to look up the MX record for a host, a set of CNAME records was
458     found which form a cycle and resolution was abandoned.
459     """
460
461
462 class CanonicalNameChainTooLong(Exception):
463     """
464     When trying to look up the MX record for a host, too many CNAME records
465     which point to other CNAME records were encountered and resolution was
466     abandoned.
467     """
468
469
470 class MXCalculator:
471     """
472     A utility for looking up mail exchange hosts and tracking whether they are
473     working or not.
474
475     @ivar clock: L{IReactorTime} provider which will be used to decide when to
476         retry mail exchanges which have not been working.
477     """
478     timeOutBadMX = 60 * 60 # One hour
479     fallbackToDomain = True
480
481     def __init__(self, resolver=None, clock=None):
482         self.badMXs = {}
483         if resolver is None:
484             from twisted.names.client import createResolver
485             resolver = createResolver()
486         self.resolver = resolver
487         if clock is None:
488             from twisted.internet import reactor as clock
489         self.clock = clock
490
491
492     def markBad(self, mx):
493         """Indicate a given mx host is not currently functioning.
494
495         @type mx: C{str}
496         @param mx: The hostname of the host which is down.
497         """
498         self.badMXs[str(mx)] = self.clock.seconds() + self.timeOutBadMX
499
500     def markGood(self, mx):
501         """Indicate a given mx host is back online.
502
503         @type mx: C{str}
504         @param mx: The hostname of the host which is up.
505         """
506         try:
507             del self.badMXs[mx]
508         except KeyError:
509             pass
510
511     def getMX(self, domain, maximumCanonicalChainLength=3):
512         """
513         Find an MX record for the given domain.
514
515         @type domain: C{str}
516         @param domain: The domain name for which to look up an MX record.
517
518         @type maximumCanonicalChainLength: C{int}
519         @param maximumCanonicalChainLength: The maximum number of unique CNAME
520             records to follow while looking up the MX record.
521
522         @return: A L{Deferred} which is called back with a string giving the
523             name in the found MX record or which is errbacked if no MX record
524             can be found.
525         """
526         mailExchangeDeferred = self.resolver.lookupMailExchange(domain)
527         mailExchangeDeferred.addCallback(self._filterRecords)
528         mailExchangeDeferred.addCallback(
529             self._cbMX, domain, maximumCanonicalChainLength)
530         mailExchangeDeferred.addErrback(self._ebMX, domain)
531         return mailExchangeDeferred
532
533
534     def _filterRecords(self, records):
535         """
536         Convert a DNS response (a three-tuple of lists of RRHeaders) into a
537         mapping from record names to lists of corresponding record payloads.
538         """
539         recordBag = {}
540         for answer in records[0]:
541             recordBag.setdefault(str(answer.name), []).append(answer.payload)
542         return recordBag
543
544
545     def _cbMX(self, answers, domain, cnamesLeft):
546         """
547         Try to find the MX host from the given DNS information.
548
549         This will attempt to resolve CNAME results.  It can recognize loops
550         and will give up on non-cyclic chains after a specified number of
551         lookups.
552         """
553         # Do this import here so that relaymanager.py doesn't depend on
554         # twisted.names, only MXCalculator will.
555         from twisted.names import dns, error
556
557         seenAliases = set()
558         exchanges = []
559         # Examine the answers for the domain we asked about
560         pertinentRecords = answers.get(domain, [])
561         while pertinentRecords:
562             record = pertinentRecords.pop()
563
564             # If it's a CNAME, we'll need to do some more processing
565             if record.TYPE == dns.CNAME:
566
567                 # Remember that this name was an alias.
568                 seenAliases.add(domain)
569
570                 canonicalName = str(record.name)
571                 # See if we have some local records which might be relevant.
572                 if canonicalName in answers:
573
574                     # Make sure it isn't a loop contained entirely within the
575                     # results we have here.
576                     if canonicalName in seenAliases:
577                         return Failure(CanonicalNameLoop(record))
578
579                     pertinentRecords = answers[canonicalName]
580                     exchanges = []
581                 else:
582                     if cnamesLeft:
583                         # Request more information from the server.
584                         return self.getMX(canonicalName, cnamesLeft - 1)
585                     else:
586                         # Give up.
587                         return Failure(CanonicalNameChainTooLong(record))
588
589             # If it's an MX, collect it.
590             if record.TYPE == dns.MX:
591                 exchanges.append((record.preference, record))
592
593         if exchanges:
594             exchanges.sort()
595             for (preference, record) in exchanges:
596                 host = str(record.name)
597                 if host not in self.badMXs:
598                     return record
599                 t = self.clock.seconds() - self.badMXs[host]
600                 if t >= 0:
601                     del self.badMXs[host]
602                     return record
603             return exchanges[0][1]
604         else:
605             # Treat no answers the same as an error - jump to the errback to try
606             # to look up an A record.  This provides behavior described as a
607             # special case in RFC 974 in the section headed I{Interpreting the
608             # List of MX RRs}.
609             return Failure(
610                 error.DNSNameError("No MX records for %r" % (domain,)))
611
612
613     def _ebMX(self, failure, domain):
614         from twisted.names import error, dns
615
616         if self.fallbackToDomain:
617             failure.trap(error.DNSNameError)
618             log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,))
619
620             # Alright, I admit, this is a bit icky.
621             d = self.resolver.getHostByName(domain)
622             def cbResolved(addr):
623                 return dns.Record_MX(name=addr)
624             def ebResolved(err):
625                 err.trap(error.DNSNameError)
626                 raise DNSLookupError()
627             d.addCallbacks(cbResolved, ebResolved)
628             return d
629         elif failure.check(error.DNSNameError):
630             raise IOError("No MX found for %r" % (domain,))
631         return failure