5f4c6176e39bb001467c380778b5a476c6c49037
[contrib/cloudeebus.git] / cloudeebus / cloudeebus.py
1 #!/usr/bin/env python
2
3 # Cloudeebus
4 #
5 # Copyright 2012 Intel Corporation.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 # Frederic Paut <frederic.paut@intel.com>
22 #
23
24
25 import argparse, dbus, json, sys
26
27 from twisted.internet import glib2reactor
28 # Configure the twisted mainloop to be run inside the glib mainloop.
29 # This must be done before importing the other twisted modules
30 glib2reactor.install()
31 from twisted.internet import reactor, defer
32
33 from autobahn.websocket import listenWS
34 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
35
36 from dbus.mainloop.glib import DBusGMainLoop
37
38 import gobject
39 gobject.threads_init()
40
41 from dbus import glib
42 glib.init_threads()
43
44 # enable debug log
45 from twisted.python import log
46
47
48
49 ###############################################################################
50
51 VERSION = "0.3.0"
52 OPENDOOR = False
53 CREDENTIALS = {}
54 WHITELIST = []
55 NETMASK =  []
56
57 ###############################################################################
58 def ipV4ToHex(mask):
59     ## Convert an ip or an IP mask (such as ip/24 or ip/255.255.255.0) in hex value (32bits)
60     maskHex = 0
61     byte = 0
62     if mask.rfind(".") == -1:
63         if (int(mask) < 32):
64             maskHex = (2**(int(mask))-1)
65             maskHex = maskHex << (32-int(mask))
66         else:
67             raise Exception("Illegal mask (larger than 32 bits) " + mask)
68     else:
69         maskField = mask.split(".")
70         # Check if mask has four fields (byte)
71         if len(maskField) != 4:
72             raise Exception("Illegal ip address / mask (should be 4 bytes) " + mask)
73         for maskQuartet in maskField:
74             byte = int(maskQuartet)
75             # Check if each field is really a byte
76             if byte > 255:
77                 raise Exception("Illegal ip address / mask (digit larger than a byte) " + mask)              
78             maskHex += byte
79             maskHex = maskHex << 8
80         maskHex = maskHex >> 8
81     return maskHex
82
83 ###############################################################################
84 class DbusCache:
85     '''
86     Global cache of DBus connexions and signal handlers
87     '''
88     def __init__(self):
89         self.dbusConnexions = {}
90         self.signalHandlers = {}
91
92
93     def reset(self):
94         '''
95         Disconnect signal handlers before resetting cache.
96         '''
97         self.dbusConnexions = {}
98         # disconnect signal handlers
99         for key in self.signalHandlers:
100             self.signalHandlers[key].disconnect()
101         self.signalHandlers = {}
102
103
104     def dbusConnexion(self, busName):
105         if not self.dbusConnexions.has_key(busName):
106             if busName == "session":
107                 self.dbusConnexions[busName] = dbus.SessionBus()
108             elif busName == "system":
109                 self.dbusConnexions[busName] = dbus.SystemBus()
110             else:
111                 raise Exception("Error: invalid bus: %s" % busName)
112         return self.dbusConnexions[busName]
113
114
115
116 ###############################################################################
117 class DbusSignalHandler:
118     '''
119     signal hash id as busName#senderName#objectName#interfaceName#signalName
120     '''
121     def __init__(self, busName, senderName, objectName, interfaceName, signalName):
122         self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
123         # connect handler to signal
124         self.bus = cache.dbusConnexion(busName)
125         self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
126         
127     
128     def disconnect(self):
129         names = self.id.split("#")
130         self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
131
132
133     def handleSignal(self, *args):
134         '''
135         publish dbus args under topic hash id
136         '''
137         factory.dispatch(self.id, json.dumps(args))
138
139
140
141 ###############################################################################
142 class DbusCallHandler:
143     '''
144     deferred reply to return dbus results
145     '''
146     def __init__(self, method, args):
147         self.pending = False
148         self.request = defer.Deferred()
149         self.method = method
150         self.args = args
151
152
153     def callMethod(self):
154         '''
155         dbus method async call
156         '''
157         self.pending = True
158         self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
159         return self.request
160
161
162     def dbusSuccess(self, *result):
163         '''
164         return JSON string result array
165         '''
166         self.request.callback(json.dumps(result))
167         self.pending = False
168
169
170     def dbusError(self, error):
171         '''
172         return dbus error message
173         '''
174         self.request.errback(Exception(error.get_dbus_message()))
175         self.pending = False
176
177
178
179 ###############################################################################
180 class CloudeebusService:
181     '''
182     support for sending DBus messages and registering for DBus signals
183     '''
184     def __init__(self, permissions):
185         self.permissions = permissions;
186         self.proxyObjects = {}
187         self.proxyMethods = {}
188         self.pendingCalls = []
189
190
191     def proxyObject(self, busName, serviceName, objectName):
192         '''
193         object hash id as busName#serviceName#objectName
194         '''
195         id = "#".join([busName, serviceName, objectName])
196         if not self.proxyObjects.has_key(id):
197             if not OPENDOOR:
198                 # check permissions, array.index throws exception
199                 self.permissions.index(serviceName)
200             bus = cache.dbusConnexion(busName)
201             self.proxyObjects[id] = bus.get_object(serviceName, objectName)
202         return self.proxyObjects[id]
203
204
205     def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
206         '''
207         method hash id as busName#serviceName#objectName#interfaceName#methodName
208         '''
209         id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
210         if not self.proxyMethods.has_key(id):
211             obj = self.proxyObject(busName, serviceName, objectName)
212             self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
213         return self.proxyMethods[id]
214
215
216     @exportRpc
217     def dbusRegister(self, list):
218         '''
219         arguments: bus, sender, object, interface, signal
220         '''
221         if len(list) < 5:
222             raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
223         
224         if not OPENDOOR:
225             # check permissions, array.index throws exception
226             self.permissions.index(list[1])
227         
228         # check if a handler exists
229         sigId = "#".join(list)
230         if cache.signalHandlers.has_key(sigId):
231             return sigId
232         
233         # create a handler that will publish the signal
234         dbusSignalHandler = DbusSignalHandler(*list)
235         cache.signalHandlers[sigId] = dbusSignalHandler
236         
237         return dbusSignalHandler.id
238
239
240     @exportRpc
241     def dbusSend(self, list):
242         '''
243         arguments: bus, destination, object, interface, message, [args]
244         '''
245         # clear pending calls
246         for call in self.pendingCalls:
247             if not call.pending:
248                 self.pendingCalls.remove(call)
249         
250         if len(list) < 5:
251             raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
252         
253         # parse JSON arg list
254         args = []
255         if len(list) == 6:
256             args = json.loads(list[5])
257         
258         # get dbus proxy method
259         method = self.proxyMethod(*list[0:5])
260         
261         # use a deferred call handler to manage dbus results
262         dbusCallHandler = DbusCallHandler(method, args)
263         self.pendingCalls.append(dbusCallHandler)
264         return dbusCallHandler.callMethod()
265
266
267     @exportRpc
268     def getVersion(self):
269         '''
270         return current version string
271         '''
272         return VERSION
273
274
275
276 ###############################################################################
277 class CloudeebusServerProtocol(WampCraServerProtocol):
278     '''
279     connexion and session authentication management
280     '''
281     
282     def onSessionOpen(self):
283         # CRA authentication options
284         self.clientAuthTimeout = 0
285         self.clientAuthAllowAnonymous = OPENDOOR
286         # CRA authentication init
287         WampCraServerProtocol.onSessionOpen(self)
288     
289     
290     def getAuthPermissions(self, key, extra):
291         return json.loads(extra.get("permissions", "[]"))
292     
293     
294     def getAuthSecret(self, key):
295         secret = CREDENTIALS.get(key, None)
296         if secret is None:
297             return None
298         # secret must be of str type to be hashed
299         return secret.encode('utf-8')
300     
301
302     def onAuthenticated(self, key, permissions):
303         if not OPENDOOR:
304             # check net filter
305             if NETMASK != []:
306                 ipAllowed = False
307                 for netfilter in NETMASK:
308                     ipHex=ipV4ToHex(self.peer.host)
309                     ipAllowed = (ipHex & netfilter['mask']) == netfilter['ipAllowed'] & netfilter['mask']
310                     if ipAllowed:
311                         break
312                 if not ipAllowed:
313                     raise Exception("host " + self.peer.host + " is not allowed!")
314             # check authentication key
315             if key is None:
316                 raise Exception("Authentication failed")
317             # check permissions, array.index throws exception
318             for req in permissions:
319                 WHITELIST.index(req)
320         # create cloudeebus service instance
321         self.cloudeebusService = CloudeebusService(permissions)
322         # register it for RPC
323         self.registerForRpc(self.cloudeebusService)
324         # register for Publish / Subscribe
325         self.registerForPubSub("", True)
326     
327     
328     def connectionLost(self, reason):
329         WampCraServerProtocol.connectionLost(self, reason)
330         if factory.getConnectionCount() == 0:
331             cache.reset()
332
333
334
335 ###############################################################################
336
337 if __name__ == '__main__':
338     
339     cache = DbusCache()
340
341     parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
342     parser.add_argument('-v', '--version', action='store_true', 
343         help='print version and exit')
344     parser.add_argument('-d', '--debug', action='store_true', 
345         help='log debug info on standard output')
346     parser.add_argument('-o', '--opendoor', action='store_true',
347         help='allow anonymous access to all services')
348     parser.add_argument('-p', '--port', default='9000',
349         help='port number')
350     parser.add_argument('-c', '--credentials',
351         help='path to credentials file')
352     parser.add_argument('-w', '--whitelist',
353         help='path to whitelist file')
354     parser.add_argument('-n', '--netmask',
355         help='netmask,IP filter (comma separated.) eg. : -n 127.0.0.1,192.168.2.0/24,10.12.16.0/255.255.255.0')
356     
357     args = parser.parse_args(sys.argv[1:])
358
359     if args.version:
360         print("Cloudeebus version " + VERSION)
361         exit(0)
362     
363     if args.debug:
364         log.startLogging(sys.stdout)
365     
366     OPENDOOR = args.opendoor
367     
368     if args.credentials:
369         jfile = open(args.credentials)
370         CREDENTIALS = json.load(jfile)
371         jfile.close()
372     
373     if args.whitelist:
374         jfile = open(args.whitelist)
375         WHITELIST = json.load(jfile)
376         jfile.close()
377         
378     if args.netmask:
379         iplist = args.netmask.split(",")
380         for ip in iplist:
381             if ip.rfind("/") != -1:
382                 ip=ip.split("/")
383                 ipAllowed = ip[0]
384                 mask = ip[1]
385             else:
386                 ipAllowed = ip
387                 mask = "255.255.255.255" 
388             NETMASK.append( {'ipAllowed': ipV4ToHex(ipAllowed), 'mask' : ipV4ToHex(mask)} )
389     
390     uri = "ws://localhost:" + args.port
391     
392     factory = WampServerFactory(uri, debugWamp = args.debug)
393     factory.protocol = CloudeebusServerProtocol
394     factory.setProtocolOptions(allowHixie76 = True)
395     
396     listenWS(factory)
397     
398     DBusGMainLoop(set_as_default=True)
399     
400     reactor.run()