1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for implementations of L{IReactorUNIX} and L{IReactorUNIXDatagram}.
8 import stat, os, sys, types
11 from twisted.internet import interfaces, reactor, protocol, error, address, defer, utils
12 from twisted.python import lockfile
13 from twisted.trial import unittest
15 from twisted.test.test_tcp import MyServerFactory, MyClientFactory
18 class FailedConnectionClientFactory(protocol.ClientFactory):
19 def __init__(self, onFail):
22 def clientConnectionFailed(self, connector, reason):
23 self.onFail.errback(reason)
27 class UnixSocketTestCase(unittest.TestCase):
31 def test_peerBind(self):
33 The address passed to the server factory's C{buildProtocol} method and
34 the address returned by the connected protocol's transport's C{getPeer}
35 method match the address the client socket is bound to.
37 filename = self.mktemp()
38 peername = self.mktemp()
39 serverFactory = MyServerFactory()
40 connMade = serverFactory.protocolConnectionMade = defer.Deferred()
41 unixPort = reactor.listenUNIX(filename, serverFactory)
42 self.addCleanup(unixPort.stopListening)
43 unixSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
44 self.addCleanup(unixSocket.close)
45 unixSocket.bind(peername)
46 unixSocket.connect(filename)
47 def cbConnMade(proto):
48 expected = address.UNIXAddress(peername)
49 self.assertEqual(serverFactory.peerAddresses, [expected])
50 self.assertEqual(proto.transport.getPeer(), expected)
51 connMade.addCallback(cbConnMade)
55 def test_dumber(self):
57 L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
58 started with L{IReactorUNIX.listenUNIX}.
60 filename = self.mktemp()
61 serverFactory = MyServerFactory()
62 serverConnMade = defer.Deferred()
63 serverFactory.protocolConnectionMade = serverConnMade
64 unixPort = reactor.listenUNIX(filename, serverFactory)
65 self.addCleanup(unixPort.stopListening)
66 clientFactory = MyClientFactory()
67 clientConnMade = defer.Deferred()
68 clientFactory.protocolConnectionMade = clientConnMade
69 c = reactor.connectUNIX(filename, clientFactory)
70 d = defer.gatherResults([serverConnMade, clientConnMade])
71 def allConnected((serverProtocol, clientProtocol)):
73 # Incidental assertion which may or may not be redundant with some
74 # other test. This probably deserves its own test method.
75 self.assertEqual(clientFactory.peerAddresses,
76 [address.UNIXAddress(filename)])
78 clientProtocol.transport.loseConnection()
79 serverProtocol.transport.loseConnection()
80 d.addCallback(allConnected)
84 def test_pidFile(self):
86 A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
87 called and released when the Deferred returned by the L{IListeningPort}
88 provider's C{stopListening} method is called back.
90 filename = self.mktemp()
91 serverFactory = MyServerFactory()
92 serverConnMade = defer.Deferred()
93 serverFactory.protocolConnectionMade = serverConnMade
94 unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
95 self.assertTrue(lockfile.isLocked(filename + ".lock"))
97 # XXX This part would test something about the checkPID parameter, but
98 # it doesn't actually. It should be rewritten to test the several
99 # different possible behaviors. -exarkun
100 clientFactory = MyClientFactory()
101 clientConnMade = defer.Deferred()
102 clientFactory.protocolConnectionMade = clientConnMade
103 c = reactor.connectUNIX(filename, clientFactory, checkPID=1)
105 d = defer.gatherResults([serverConnMade, clientConnMade])
106 def _portStuff((serverProtocol, clientProto)):
108 # Incidental assertion which may or may not be redundant with some
109 # other test. This probably deserves its own test method.
110 self.assertEqual(clientFactory.peerAddresses,
111 [address.UNIXAddress(filename)])
113 clientProto.transport.loseConnection()
114 serverProtocol.transport.loseConnection()
115 return unixPort.stopListening()
116 d.addCallback(_portStuff)
119 self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
120 d.addCallback(_check)
124 def test_socketLocking(self):
126 L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
127 the name of a file on which a server is already listening.
129 filename = self.mktemp()
130 serverFactory = MyServerFactory()
131 unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
134 error.CannotListenError,
135 reactor.listenUNIX, filename, serverFactory, wantPID=True)
137 def stoppedListening(ign):
138 unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
139 return unixPort.stopListening()
141 return unixPort.stopListening().addCallback(stoppedListening)
144 def _uncleanSocketTest(self, callback):
145 self.filename = self.mktemp()
146 source = ("from twisted.internet import protocol, reactor\n"
147 "reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,)
148 env = {'PYTHONPATH': os.pathsep.join(sys.path)}
150 d = utils.getProcessValue(sys.executable, ("-u", "-c", source), env=env)
151 d.addCallback(callback)
155 def test_uncleanServerSocketLocking(self):
157 If passed C{True} for the C{wantPID} parameter, a server can be started
158 listening with L{IReactorUNIX.listenUNIX} when passed the name of a
159 file on which a previous server which has not exited cleanly has been
160 listening using the C{wantPID} option.
162 def ranStupidChild(ign):
163 # If this next call succeeds, our lock handling is correct.
164 p = reactor.listenUNIX(self.filename, MyServerFactory(), wantPID=True)
165 return p.stopListening()
166 return self._uncleanSocketTest(ranStupidChild)
169 def test_connectToUncleanServer(self):
171 If passed C{True} for the C{checkPID} parameter, a client connection
172 attempt made with L{IReactorUNIX.connectUNIX} fails with
173 L{error.BadFileError}.
175 def ranStupidChild(ign):
177 f = FailedConnectionClientFactory(d)
178 c = reactor.connectUNIX(self.filename, f, checkPID=True)
179 return self.assertFailure(d, error.BadFileError)
180 return self._uncleanSocketTest(ranStupidChild)
183 def _reprTest(self, serverFactory, factoryName):
185 Test the C{__str__} and C{__repr__} implementations of a UNIX port when
186 used with the given factory.
188 filename = self.mktemp()
189 unixPort = reactor.listenUNIX(filename, serverFactory)
191 connectedString = "<%s on %r>" % (factoryName, filename)
192 self.assertEqual(repr(unixPort), connectedString)
193 self.assertEqual(str(unixPort), connectedString)
195 d = defer.maybeDeferred(unixPort.stopListening)
196 def stoppedListening(ign):
197 unconnectedString = "<%s (not listening)>" % (factoryName,)
198 self.assertEqual(repr(unixPort), unconnectedString)
199 self.assertEqual(str(unixPort), unconnectedString)
200 d.addCallback(stoppedListening)
204 def test_reprWithClassicFactory(self):
206 The two string representations of the L{IListeningPort} returned by
207 L{IReactorUNIX.listenUNIX} contains the name of the classic factory
208 class being used and the filename on which the port is listening or
209 indicates that the port is not listening.
211 class ClassicFactory:
219 self.assertIsInstance(ClassicFactory, types.ClassType)
221 return self._reprTest(
222 ClassicFactory(), "twisted.test.test_unix.ClassicFactory")
225 def test_reprWithNewStyleFactory(self):
227 The two string representations of the L{IListeningPort} returned by
228 L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
229 class being used and the filename on which the port is listening or
230 indicates that the port is not listening.
232 class NewStyleFactory(object):
240 self.assertIsInstance(NewStyleFactory, type)
242 return self._reprTest(
243 NewStyleFactory(), "twisted.test.test_unix.NewStyleFactory")
247 class ClientProto(protocol.ConnectedDatagramProtocol):
248 started = stopped = False
252 self.deferredStarted = defer.Deferred()
253 self.deferredGotBack = defer.Deferred()
255 def stopProtocol(self):
258 def startProtocol(self):
260 self.deferredStarted.callback(None)
262 def datagramReceived(self, data):
264 self.deferredGotBack.callback(None)
266 class ServerProto(protocol.DatagramProtocol):
267 started = stopped = False
268 gotwhat = gotfrom = None
271 self.deferredStarted = defer.Deferred()
272 self.deferredGotWhat = defer.Deferred()
274 def stopProtocol(self):
277 def startProtocol(self):
279 self.deferredStarted.callback(None)
281 def datagramReceived(self, data, addr):
283 self.transport.write("hi back", addr)
285 self.deferredGotWhat.callback(None)
289 class DatagramUnixSocketTestCase(unittest.TestCase):
291 Test datagram UNIX sockets.
293 def test_exchange(self):
295 Test that a datagram can be sent to and received by a server and vice
298 clientaddr = self.mktemp()
299 serveraddr = self.mktemp()
302 s = reactor.listenUNIXDatagram(serveraddr, sp)
303 self.addCleanup(s.stopListening)
304 c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
305 self.addCleanup(c.stopListening)
307 d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
309 cp.transport.write("hi")
310 return defer.gatherResults([sp.deferredGotWhat,
313 def _cbTestExchange(ignored):
314 self.assertEqual("hi", sp.gotwhat)
315 self.assertEqual(clientaddr, sp.gotfrom)
316 self.assertEqual("hi back", cp.gotback)
319 d.addCallback(_cbTestExchange)
323 def test_cannotListen(self):
325 L{IReactorUNIXDatagram.listenUNIXDatagram} raises
326 L{error.CannotListenError} if the unix socket specified is already in
331 s = reactor.listenUNIXDatagram(addr, p)
332 self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
336 # test connecting to bound and connected (somewhere else) address
338 def _reprTest(self, serverProto, protocolName):
340 Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
341 port when used with the given protocol.
343 filename = self.mktemp()
344 unixPort = reactor.listenUNIXDatagram(filename, serverProto)
346 connectedString = "<%s on %r>" % (protocolName, filename)
347 self.assertEqual(repr(unixPort), connectedString)
348 self.assertEqual(str(unixPort), connectedString)
350 stopDeferred = defer.maybeDeferred(unixPort.stopListening)
351 def stoppedListening(ign):
352 unconnectedString = "<%s (not listening)>" % (protocolName,)
353 self.assertEqual(repr(unixPort), unconnectedString)
354 self.assertEqual(str(unixPort), unconnectedString)
355 stopDeferred.addCallback(stoppedListening)
359 def test_reprWithClassicProtocol(self):
361 The two string representations of the L{IListeningPort} returned by
362 L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
363 classic protocol class being used and the filename on which the port is
364 listening or indicates that the port is not listening.
366 class ClassicProtocol:
367 def makeConnection(self, transport):
374 self.assertIsInstance(ClassicProtocol, types.ClassType)
376 return self._reprTest(
377 ClassicProtocol(), "twisted.test.test_unix.ClassicProtocol")
380 def test_reprWithNewStyleProtocol(self):
382 The two string representations of the L{IListeningPort} returned by
383 L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
384 new-style protocol class being used and the filename on which the port
385 is listening or indicates that the port is not listening.
387 class NewStyleProtocol(object):
388 def makeConnection(self, transport):
395 self.assertIsInstance(NewStyleProtocol, type)
397 return self._reprTest(
398 NewStyleProtocol(), "twisted.test.test_unix.NewStyleProtocol")
402 if not interfaces.IReactorUNIX(reactor, None):
403 UnixSocketTestCase.skip = "This reactor does not support UNIX domain sockets"
404 if not interfaces.IReactorUNIXDatagram(reactor, None):
405 DatagramUnixSocketTestCase.skip = "This reactor does not support UNIX datagram sockets"