Initial import to Tizen
[profile/ivi/python-twisted.git] / doc / core / examples / cred.py
1
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6
7 import sys
8 from zope.interface import implements, Interface
9
10 from twisted.protocols import basic
11 from twisted.internet import protocol
12 from twisted.python import log
13
14 from twisted.cred import error
15 from twisted.cred import portal
16 from twisted.cred import checkers
17 from twisted.cred import credentials
18
19 class IProtocolUser(Interface):
20     def getPrivileges():
21         """Return a list of privileges this user has."""
22
23     def logout():
24         """Cleanup per-login resources allocated to this avatar"""
25
26 class AnonymousUser:
27     implements(IProtocolUser)
28     
29     def getPrivileges(self):
30         return [1, 2, 3]
31
32     def logout(self):
33         print "Cleaning up anonymous user resources"
34
35 class RegularUser:
36     implements(IProtocolUser)
37     
38     def getPrivileges(self):
39         return [1, 2, 3, 5, 6]
40
41     def logout(self):
42         print "Cleaning up regular user resources"
43
44 class Administrator:
45     implements(IProtocolUser)
46     
47     def getPrivileges(self):
48         return range(50)
49
50     def logout(self):
51         print "Cleaning up administrator resources"
52
53 class Protocol(basic.LineReceiver):
54     user = None
55     portal = None
56     avatar = None
57     logout = None
58
59     def connectionMade(self):
60         self.sendLine("Login with USER <name> followed by PASS <password> or ANON")
61         self.sendLine("Check privileges with PRIVS")
62
63     def connectionLost(self, reason):
64         if self.logout:
65             self.logout()
66             self.avatar = None
67             self.logout = None
68     
69     def lineReceived(self, line):
70         f = getattr(self, 'cmd_' + line.upper().split()[0])
71         if f:
72             try:
73                 f(*line.split()[1:])
74             except TypeError:
75                 self.sendLine("Wrong number of arguments.")
76             except:
77                 self.sendLine("Server error (probably your fault)")
78
79     def cmd_ANON(self):
80         if self.portal:
81             self.portal.login(credentials.Anonymous(), None, IProtocolUser
82                 ).addCallbacks(self._cbLogin, self._ebLogin
83                 )
84         else:
85             self.sendLine("DENIED")
86     
87     def cmd_USER(self, name):
88         self.user = name
89         self.sendLine("Alright.  Now PASS?")
90     
91     def cmd_PASS(self, password):
92         if not self.user:
93             self.sendLine("USER required before PASS")
94         else:
95             if self.portal:
96                 self.portal.login(
97                     credentials.UsernamePassword(self.user, password),
98                     None,
99                     IProtocolUser
100                 ).addCallbacks(self._cbLogin, self._ebLogin
101                 )
102             else:
103                 self.sendLine("DENIED")
104
105     def cmd_PRIVS(self):
106         self.sendLine("You have the following privileges: ")
107         self.sendLine(" ".join(map(str, self.avatar.getPrivileges())))
108
109     def _cbLogin(self, (interface, avatar, logout)):
110         assert interface is IProtocolUser
111         self.avatar = avatar
112         self.logout = logout
113         self.sendLine("Login successful.  Available commands: PRIVS")
114     
115     def _ebLogin(self, failure):
116         failure.trap(error.UnauthorizedLogin)
117         self.sendLine("Login denied!  Go away.")
118
119 class ServerFactory(protocol.ServerFactory):
120     protocol = Protocol
121     
122     def __init__(self, portal):
123         self.portal = portal
124     
125     def buildProtocol(self, addr):
126         p = protocol.ServerFactory.buildProtocol(self, addr)
127         p.portal = self.portal
128         return p
129
130 class Realm:
131     implements(portal.IRealm)
132
133     def requestAvatar(self, avatarId, mind, *interfaces):
134         if IProtocolUser in interfaces:
135             if avatarId == checkers.ANONYMOUS:
136                 av = AnonymousUser()
137             elif avatarId.isupper():
138                 # Capitalized usernames are administrators.
139                 av = Administrator()
140             else:
141                 av = RegularUser()
142             return IProtocolUser, av, av.logout
143         raise NotImplementedError("Only IProtocolUser interface is supported by this realm")
144
145 def main():
146     r = Realm()
147     p = portal.Portal(r)
148     c = checkers.InMemoryUsernamePasswordDatabaseDontUse()
149     c.addUser("auser", "thepass")
150     c.addUser("SECONDUSER", "secret")
151     p.registerChecker(c)
152     p.registerChecker(checkers.AllowAnonymousAccess())
153     
154     f = ServerFactory(p)
155
156     log.startLogging(sys.stdout)
157
158     from twisted.internet import reactor
159     reactor.listenTCP(4738, f)
160     reactor.run()
161
162 if __name__ == '__main__':
163     main()