7209aea8e6058b0e0c400a0454f4252845611587
[contrib/cloudeebus.git] / cloudeebus / cloudeebus.py
1 #!/usr/bin/env python
2
3 # Cloudeebus
4 #
5 # Copyright 2012 Intel Corporation.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 #
22
23
24 import argparse, dbus, json, sys
25
26 from twisted.internet import glib2reactor
27 # Configure the twisted mainloop to be run inside the glib mainloop.
28 # This must be done before importing the other twisted modules
29 glib2reactor.install()
30 from twisted.internet import reactor, defer
31
32 from autobahn.websocket import listenWS
33 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
34
35 from dbus.mainloop.glib import DBusGMainLoop
36
37 import gobject
38 gobject.threads_init()
39
40 from dbus import glib
41 glib.init_threads()
42
43 # enable debug log
44 from twisted.python import log
45
46
47
48 ###############################################################################
49
50 VERSION = "0.3.0"
51 OPENDOOR = False
52 CREDENTIALS = {}
53 WHITELIST = []
54 NETMASK =  []
55
56 ###############################################################################
57 def ipV4ToHex(mask):
58     ## Convert an ip or an IP mask (such as ip/24 or ip/255.255.255.0) in hex value (32bits)
59     invalidMask = False
60     maskHex = 0
61     byte = 0
62     if mask.rfind(".") == -1:
63         if (int(mask) < 32):
64             maskHex = (2**(int(mask))-1)
65             maskHex = maskHex << (32-int(mask))
66         else:
67             invalidMask = invalidMask or True
68     else:
69         maskField = mask.split(".")
70         # Check if mask has four fields (byte)
71         invalidMask = invalidMask or len(maskField) != 4                    
72         for maskQuartet in maskField:
73             byte = int(maskQuartet)
74             # Check if each field is really a byte
75             if byte > 255:
76                 invalidMask = invalidMask or True                
77             maskHex += byte
78             maskHex = maskHex << 8
79         maskHex = maskHex >> 8
80     if invalidMask:
81         msg = "Illegal mask (or IP address) " + mask
82         log.msg(msg)
83         raise Exception(msg)
84     return maskHex
85
86 ###############################################################################
87 class DbusCache:
88     '''
89     Global cache of DBus connexions and signal handlers
90     '''
91     def __init__(self):
92         self.dbusConnexions = {}
93         self.signalHandlers = {}
94
95
96     def reset(self):
97         '''
98         Disconnect signal handlers before resetting cache.
99         '''
100         self.dbusConnexions = {}
101         # disconnect signal handlers
102         for key in self.signalHandlers:
103             self.signalHandlers[key].disconnect()
104         self.signalHandlers = {}
105
106
107     def dbusConnexion(self, busName):
108         if not self.dbusConnexions.has_key(busName):
109             if busName == "session":
110                 self.dbusConnexions[busName] = dbus.SessionBus()
111             elif busName == "system":
112                 self.dbusConnexions[busName] = dbus.SystemBus()
113             else:
114                 raise Exception("Error: invalid bus: %s" % busName)
115         return self.dbusConnexions[busName]
116
117
118
119 ###############################################################################
120 class DbusSignalHandler:
121     '''
122     signal hash id as busName#senderName#objectName#interfaceName#signalName
123     '''
124     def __init__(self, busName, senderName, objectName, interfaceName, signalName):
125         self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
126         # connect handler to signal
127         self.bus = cache.dbusConnexion(busName)
128         self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
129         
130     
131     def disconnect(self):
132         names = self.id.split("#")
133         self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
134
135
136     def handleSignal(self, *args):
137         '''
138         publish dbus args under topic hash id
139         '''
140         factory.dispatch(self.id, json.dumps(args))
141
142
143
144 ###############################################################################
145 class DbusCallHandler:
146     '''
147     deferred reply to return dbus results
148     '''
149     def __init__(self, method, args):
150         self.pending = False
151         self.request = defer.Deferred()
152         self.method = method
153         self.args = args
154
155
156     def callMethod(self):
157         '''
158         dbus method async call
159         '''
160         self.pending = True
161         self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
162         return self.request
163
164
165     def dbusSuccess(self, *result):
166         '''
167         return JSON string result array
168         '''
169         self.request.callback(json.dumps(result))
170         self.pending = False
171
172
173     def dbusError(self, error):
174         '''
175         return dbus error message
176         '''
177         self.request.errback(Exception(error.get_dbus_message()))
178         self.pending = False
179
180
181
182 ###############################################################################
183 class CloudeebusService:
184     '''
185     support for sending DBus messages and registering for DBus signals
186     '''
187     def __init__(self, permissions):
188         self.permissions = permissions;
189         self.proxyObjects = {}
190         self.proxyMethods = {}
191         self.pendingCalls = []
192
193
194     def proxyObject(self, busName, serviceName, objectName):
195         '''
196         object hash id as busName#serviceName#objectName
197         '''
198         id = "#".join([busName, serviceName, objectName])
199         if not self.proxyObjects.has_key(id):
200             if not OPENDOOR:
201                 # check permissions, array.index throws exception
202                 self.permissions.index(serviceName)
203             bus = cache.dbusConnexion(busName)
204             self.proxyObjects[id] = bus.get_object(serviceName, objectName)
205         return self.proxyObjects[id]
206
207
208     def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
209         '''
210         method hash id as busName#serviceName#objectName#interfaceName#methodName
211         '''
212         id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
213         if not self.proxyMethods.has_key(id):
214             obj = self.proxyObject(busName, serviceName, objectName)
215             self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
216         return self.proxyMethods[id]
217
218
219     @exportRpc
220     def dbusRegister(self, list):
221         '''
222         arguments: bus, sender, object, interface, signal
223         '''
224         if len(list) < 5:
225             raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
226         
227         if not OPENDOOR:
228             # check permissions, array.index throws exception
229             self.permissions.index(list[1])
230         
231         # check if a handler exists
232         sigId = "#".join(list)
233         if cache.signalHandlers.has_key(sigId):
234             return sigId
235         
236         # create a handler that will publish the signal
237         dbusSignalHandler = DbusSignalHandler(*list)
238         cache.signalHandlers[sigId] = dbusSignalHandler
239         
240         return dbusSignalHandler.id
241
242
243     @exportRpc
244     def dbusSend(self, list):
245         '''
246         arguments: bus, destination, object, interface, message, [args]
247         '''
248         # clear pending calls
249         for call in self.pendingCalls:
250             if not call.pending:
251                 self.pendingCalls.remove(call)
252         
253         if len(list) < 5:
254             raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
255         
256         # parse JSON arg list
257         args = []
258         if len(list) == 6:
259             args = json.loads(list[5])
260         
261         # get dbus proxy method
262         method = self.proxyMethod(*list[0:5])
263         
264         # use a deferred call handler to manage dbus results
265         dbusCallHandler = DbusCallHandler(method, args)
266         self.pendingCalls.append(dbusCallHandler)
267         return dbusCallHandler.callMethod()
268
269
270     @exportRpc
271     def getVersion(self):
272         '''
273         return current version string
274         '''
275         return VERSION
276
277
278
279 ###############################################################################
280 class CloudeebusServerProtocol(WampCraServerProtocol):
281     '''
282     connexion and session authentication management
283     '''
284     
285     def onSessionOpen(self):
286         # CRA authentication options
287         self.clientAuthTimeout = 0
288         self.clientAuthAllowAnonymous = OPENDOOR
289         # CRA authentication init
290         WampCraServerProtocol.onSessionOpen(self)
291     
292     
293     def getAuthPermissions(self, key, extra):
294         return json.loads(extra.get("permissions", "[]"))
295     
296     
297     def getAuthSecret(self, key):
298         secret = CREDENTIALS.get(key, None)
299         if secret is None:
300             return None
301         # secret must be of str type to be hashed
302         return secret.encode('utf-8')
303     
304
305     def onAuthenticated(self, key, permissions):
306         if not OPENDOOR:
307             # check net filter
308             if NETMASK != []:
309                 ipAllowed = False
310                 for netfilter in NETMASK:
311                     ipHex=ipV4ToHex(self.peer.host)
312                     ipAllowed = (ipHex & netfilter['mask']) == netfilter['ipAllowed'] & netfilter['mask']
313                     if ipAllowed:
314                         break
315                         
316                 if not ipAllowed:
317                     raise Exception("host " + self.peer.host + " is not allowed!")
318             # check authentication key
319             if key is None:
320                 raise Exception("Authentication failed")
321             # check permissions, array.index throws exception
322             for req in permissions:
323                 WHITELIST.index(req)
324         # create cloudeebus service instance
325         self.cloudeebusService = CloudeebusService(permissions)
326         # register it for RPC
327         self.registerForRpc(self.cloudeebusService)
328         # register for Publish / Subscribe
329         self.registerForPubSub("", True)
330     
331     
332     def connectionLost(self, reason):
333         WampCraServerProtocol.connectionLost(self, reason)
334         if factory.getConnectionCount() == 0:
335             cache.reset()
336
337
338
339 ###############################################################################
340
341 if __name__ == '__main__':
342     
343     cache = DbusCache()
344
345     parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
346     parser.add_argument('-v', '--version', action='store_true', 
347         help='print version and exit')
348     parser.add_argument('-d', '--debug', action='store_true', 
349         help='log debug info on standard output')
350     parser.add_argument('-o', '--opendoor', action='store_true',
351         help='allow anonymous access to all services')
352     parser.add_argument('-p', '--port', default='9000',
353         help='port number')
354     parser.add_argument('-c', '--credentials',
355         help='path to credentials file')
356     parser.add_argument('-w', '--whitelist',
357         help='path to whitelist file')
358     parser.add_argument('-n', '--netmask',
359         help='netmask,IP filter (comma separated.) eg. : -n 127.0.0.1,192.168.2.0/24,10.12.16.0/255.255.255.0')
360     
361     args = parser.parse_args(sys.argv[1:])
362
363     if args.version:
364         print("Cloudeebus version " + VERSION)
365         exit(0)
366     
367     if args.debug:
368         log.startLogging(sys.stdout)
369     
370     OPENDOOR = args.opendoor
371     
372     if args.credentials:
373         jfile = open(args.credentials)
374         CREDENTIALS = json.load(jfile)
375         jfile.close()
376     
377     if args.whitelist:
378         jfile = open(args.whitelist)
379         WHITELIST = json.load(jfile)
380         jfile.close()
381         
382     if args.netmask:
383         iplist = args.netmask.split(",")
384         for ip in iplist:
385             if ip.rfind("/") != -1:
386                 ip=ip.split("/")
387                 ipAllowed = ip[0]
388                 mask = ip[1]
389             else:
390                 ipAllowed = ip
391                 mask = "255.255.255.255" 
392             NETMASK.append( {'ipAllowed': ipV4ToHex(ipAllowed), 'mask' : ipV4ToHex(mask)} )
393     
394     uri = "ws://localhost:" + args.port
395     
396     factory = WampServerFactory(uri, debugWamp = args.debug)
397     factory.protocol = CloudeebusServerProtocol
398     factory.setProtocolOptions(allowHixie76 = True)
399     
400     listenWS(factory)
401     
402     DBusGMainLoop(set_as_default=True)
403     
404     reactor.run()