Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / words / test / test_jabberclient.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.words.protocols.jabber.client}
6 """
7
8 from twisted.internet import defer
9 from twisted.python.hashlib import sha1
10 from twisted.trial import unittest
11 from twisted.words.protocols.jabber import client, error, jid, xmlstream
12 from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer
13 from twisted.words.xish import utility
14
15 IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]'
16 IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]'
17 NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
18 IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND
19 NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session'
20 IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION
21
22 class CheckVersionInitializerTest(unittest.TestCase):
23     def setUp(self):
24         a = xmlstream.Authenticator()
25         xs = xmlstream.XmlStream(a)
26         self.init = client.CheckVersionInitializer(xs)
27
28
29     def testSupported(self):
30         """
31         Test supported version number 1.0
32         """
33         self.init.xmlstream.version = (1, 0)
34         self.init.initialize()
35
36
37     def testNotSupported(self):
38         """
39         Test unsupported version number 0.0, and check exception.
40         """
41         self.init.xmlstream.version = (0, 0)
42         exc = self.assertRaises(error.StreamError, self.init.initialize)
43         self.assertEqual('unsupported-version', exc.condition)
44
45
46
47 class InitiatingInitializerHarness(object):
48     """
49     Testing harness for interacting with XML stream initializers.
50
51     This sets up an L{utility.XmlPipe} to create a communication channel between
52     the initializer and the stubbed receiving entity. It features a sink and
53     source side that both act similarly to a real L{xmlstream.XmlStream}. The
54     sink is augmented with an authenticator to which initializers can be added.
55
56     The harness also provides some utility methods to work with event observers
57     and deferreds.
58     """
59
60     def setUp(self):
61         self.output = []
62         self.pipe = utility.XmlPipe()
63         self.xmlstream = self.pipe.sink
64         self.authenticator = xmlstream.ConnectAuthenticator('example.org')
65         self.xmlstream.authenticator = self.authenticator
66
67
68     def waitFor(self, event, handler):
69         """
70         Observe an output event, returning a deferred.
71
72         The returned deferred will be fired when the given event has been
73         observed on the source end of the L{XmlPipe} tied to the protocol
74         under test. The handler is added as the first callback.
75
76         @param event: The event to be observed. See
77             L{utility.EventDispatcher.addOnetimeObserver}.
78         @param handler: The handler to be called with the observed event object.
79         @rtype: L{defer.Deferred}.
80         """
81         d = defer.Deferred()
82         d.addCallback(handler)
83         self.pipe.source.addOnetimeObserver(event, d.callback)
84         return d
85
86
87
88 class IQAuthInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
89     """
90     Tests for L{client.IQAuthInitializer}.
91     """
92
93     def setUp(self):
94         super(IQAuthInitializerTest, self).setUp()
95         self.init = client.IQAuthInitializer(self.xmlstream)
96         self.authenticator.jid = jid.JID('user@example.com/resource')
97         self.authenticator.password = 'secret'
98
99
100     def testPlainText(self):
101         """
102         Test plain-text authentication.
103
104         Act as a server supporting plain-text authentication and expect the
105         C{password} field to be filled with the password. Then act as if
106         authentication succeeds.
107         """
108
109         def onAuthGet(iq):
110             """
111             Called when the initializer sent a query for authentication methods.
112
113             The response informs the client that plain-text authentication
114             is supported.
115             """
116
117             # Create server response
118             response = xmlstream.toResponse(iq, 'result')
119             response.addElement(('jabber:iq:auth', 'query'))
120             response.query.addElement('username')
121             response.query.addElement('password')
122             response.query.addElement('resource')
123
124             # Set up an observer for the next request we expect.
125             d = self.waitFor(IQ_AUTH_SET, onAuthSet)
126
127             # Send server response
128             self.pipe.source.send(response)
129
130             return d
131
132         def onAuthSet(iq):
133             """
134             Called when the initializer sent the authentication request.
135
136             The server checks the credentials and responds with an empty result
137             signalling success.
138             """
139             self.assertEqual('user', unicode(iq.query.username))
140             self.assertEqual('secret', unicode(iq.query.password))
141             self.assertEqual('resource', unicode(iq.query.resource))
142
143             # Send server response
144             response = xmlstream.toResponse(iq, 'result')
145             self.pipe.source.send(response)
146
147         # Set up an observer for the request for authentication fields
148         d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
149
150         # Start the initializer
151         d2 = self.init.initialize()
152         return defer.gatherResults([d1, d2])
153
154
155     def testDigest(self):
156         """
157         Test digest authentication.
158
159         Act as a server supporting digest authentication and expect the
160         C{digest} field to be filled with a sha1 digest of the concatenated
161         stream session identifier and password. Then act as if authentication
162         succeeds.
163         """
164
165         def onAuthGet(iq):
166             """
167             Called when the initializer sent a query for authentication methods.
168
169             The response informs the client that digest authentication is
170             supported.
171             """
172
173             # Create server response
174             response = xmlstream.toResponse(iq, 'result')
175             response.addElement(('jabber:iq:auth', 'query'))
176             response.query.addElement('username')
177             response.query.addElement('digest')
178             response.query.addElement('resource')
179
180             # Set up an observer for the next request we expect.
181             d = self.waitFor(IQ_AUTH_SET, onAuthSet)
182
183             # Send server response
184             self.pipe.source.send(response)
185
186             return d
187
188         def onAuthSet(iq):
189             """
190             Called when the initializer sent the authentication request.
191
192             The server checks the credentials and responds with an empty result
193             signalling success.
194             """
195             self.assertEqual('user', unicode(iq.query.username))
196             self.assertEqual(sha1('12345secret').hexdigest(),
197                               unicode(iq.query.digest).encode('utf-8'))
198             self.assertEqual('resource', unicode(iq.query.resource))
199
200             # Send server response
201             response = xmlstream.toResponse(iq, 'result')
202             self.pipe.source.send(response)
203
204         # Digest authentication relies on the stream session identifier. Set it.
205         self.xmlstream.sid = u'12345'
206
207         # Set up an observer for the request for authentication fields
208         d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
209
210         # Start the initializer
211         d2 = self.init.initialize()
212
213         return defer.gatherResults([d1, d2])
214
215
216     def testFailRequestFields(self):
217         """
218         Test initializer failure of request for fields for authentication.
219         """
220         def onAuthGet(iq):
221             """
222             Called when the initializer sent a query for authentication methods.
223
224             The server responds that the client is not authorized to authenticate.
225             """
226             response = error.StanzaError('not-authorized').toResponse(iq)
227             self.pipe.source.send(response)
228
229         # Set up an observer for the request for authentication fields
230         d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
231
232         # Start the initializer
233         d2 = self.init.initialize()
234
235         # The initialized should fail with a stanza error.
236         self.assertFailure(d2, error.StanzaError)
237
238         return defer.gatherResults([d1, d2])
239
240
241     def testFailAuth(self):
242         """
243         Test initializer failure to authenticate.
244         """
245
246         def onAuthGet(iq):
247             """
248             Called when the initializer sent a query for authentication methods.
249
250             The response informs the client that plain-text authentication
251             is supported.
252             """
253
254             # Send server response
255             response = xmlstream.toResponse(iq, 'result')
256             response.addElement(('jabber:iq:auth', 'query'))
257             response.query.addElement('username')
258             response.query.addElement('password')
259             response.query.addElement('resource')
260
261             # Set up an observer for the next request we expect.
262             d = self.waitFor(IQ_AUTH_SET, onAuthSet)
263
264             # Send server response
265             self.pipe.source.send(response)
266
267             return d
268
269         def onAuthSet(iq):
270             """
271             Called when the initializer sent the authentication request.
272
273             The server checks the credentials and responds with a not-authorized
274             stanza error.
275             """
276             response = error.StanzaError('not-authorized').toResponse(iq)
277             self.pipe.source.send(response)
278
279         # Set up an observer for the request for authentication fields
280         d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
281
282         # Start the initializer
283         d2 = self.init.initialize()
284
285         # The initializer should fail with a stanza error.
286         self.assertFailure(d2, error.StanzaError)
287
288         return defer.gatherResults([d1, d2])
289
290
291
292 class BindInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
293     """
294     Tests for L{client.BindInitializer}.
295     """
296
297     def setUp(self):
298         super(BindInitializerTest, self).setUp()
299         self.init = client.BindInitializer(self.xmlstream)
300         self.authenticator.jid = jid.JID('user@example.com/resource')
301
302
303     def testBasic(self):
304         """
305         Set up a stream, and act as if resource binding succeeds.
306         """
307         def onBind(iq):
308             response = xmlstream.toResponse(iq, 'result')
309             response.addElement((NS_BIND, 'bind'))
310             response.bind.addElement('jid',
311                                      content='user@example.com/other resource')
312             self.pipe.source.send(response)
313
314         def cb(result):
315             self.assertEqual(jid.JID('user@example.com/other resource'),
316                               self.authenticator.jid)
317
318         d1 = self.waitFor(IQ_BIND_SET, onBind)
319         d2 = self.init.start()
320         d2.addCallback(cb)
321         return defer.gatherResults([d1, d2])
322
323
324     def testFailure(self):
325         """
326         Set up a stream, and act as if resource binding fails.
327         """
328         def onBind(iq):
329             response = error.StanzaError('conflict').toResponse(iq)
330             self.pipe.source.send(response)
331
332         d1 = self.waitFor(IQ_BIND_SET, onBind)
333         d2 = self.init.start()
334         self.assertFailure(d2, error.StanzaError)
335         return defer.gatherResults([d1, d2])
336
337
338
339 class SessionInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
340     """
341     Tests for L{client.SessionInitializer}.
342     """
343
344     def setUp(self):
345         super(SessionInitializerTest, self).setUp()
346         self.init = client.SessionInitializer(self.xmlstream)
347
348
349     def testSuccess(self):
350         """
351         Set up a stream, and act as if session establishment succeeds.
352         """
353
354         def onSession(iq):
355             response = xmlstream.toResponse(iq, 'result')
356             self.pipe.source.send(response)
357
358         d1 = self.waitFor(IQ_SESSION_SET, onSession)
359         d2 = self.init.start()
360         return defer.gatherResults([d1, d2])
361
362
363     def testFailure(self):
364         """
365         Set up a stream, and act as if session establishment fails.
366         """
367         def onSession(iq):
368             response = error.StanzaError('forbidden').toResponse(iq)
369             self.pipe.source.send(response)
370
371         d1 = self.waitFor(IQ_SESSION_SET, onSession)
372         d2 = self.init.start()
373         self.assertFailure(d2, error.StanzaError)
374         return defer.gatherResults([d1, d2])
375
376
377
378 class XMPPAuthenticatorTest(unittest.TestCase):
379     """
380     Test for both XMPPAuthenticator and XMPPClientFactory.
381     """
382     def testBasic(self):
383         """
384         Test basic operations.
385
386         Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let
387         it produce a protocol instance. Then inspect the instance variables of
388         the authenticator and XML stream objects.
389         """
390         self.client_jid = jid.JID('user@example.com/resource')
391
392         # Get an XmlStream instance. Note that it gets initialized with the
393         # XMPPAuthenticator (that has its associateWithXmlStream called) that
394         # is in turn initialized with the arguments to the factory.
395         xs = client.XMPPClientFactory(self.client_jid,
396                                       'secret').buildProtocol(None)
397
398         # test authenticator's instance variables
399         self.assertEqual('example.com', xs.authenticator.otherHost)
400         self.assertEqual(self.client_jid, xs.authenticator.jid)
401         self.assertEqual('secret', xs.authenticator.password)
402
403         # test list of initializers
404         version, tls, sasl, bind, session = xs.initializers
405
406         self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer))
407         self.assert_(isinstance(sasl, SASLInitiatingInitializer))
408         self.assert_(isinstance(bind, client.BindInitializer))
409         self.assert_(isinstance(session, client.SessionInitializer))
410
411         self.assertFalse(tls.required)
412         self.assertTrue(sasl.required)
413         self.assertFalse(bind.required)
414         self.assertFalse(session.required)