5 # Copyright 2012 Intel Corporation.
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 # Frederic Paut <frederic.paut@intel.com>
25 import argparse, dbus, json, sys
27 from twisted.internet import glib2reactor
28 # Configure the twisted mainloop to be run inside the glib mainloop.
29 # This must be done before importing the other twisted modules
30 glib2reactor.install()
31 from twisted.internet import reactor, defer
33 from autobahn.websocket import listenWS
34 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
36 from dbus.mainloop.glib import DBusGMainLoop
41 gobject.threads_init()
47 from twisted.python import log
50 from xml.etree.ElementTree import XMLParser
52 ###############################################################################
60 ###############################################################################
62 ## Convert an ip or an IP mask (such as ip/24 or ip/255.255.255.0) in hex value (32bits)
65 if mask.rfind(".") == -1:
67 maskHex = (2**(int(mask))-1)
68 maskHex = maskHex << (32-int(mask))
70 raise Exception("Illegal mask (larger than 32 bits) " + mask)
72 maskField = mask.split(".")
73 # Check if mask has four fields (byte)
74 if len(maskField) != 4:
75 raise Exception("Illegal ip address / mask (should be 4 bytes) " + mask)
76 for maskQuartet in maskField:
77 byte = int(maskQuartet)
78 # Check if each field is really a byte
80 raise Exception("Illegal ip address / mask (digit larger than a byte) " + mask)
82 maskHex = maskHex << 8
83 maskHex = maskHex >> 8
86 ###############################################################################
89 Global cache of DBus connexions and signal handlers
92 self.dbusConnexions = {}
93 self.signalHandlers = {}
98 Disconnect signal handlers before resetting cache.
100 self.dbusConnexions = {}
101 # disconnect signal handlers
102 for key in self.signalHandlers:
103 self.signalHandlers[key].disconnect()
104 self.signalHandlers = {}
107 def dbusConnexion(self, busName):
108 if not self.dbusConnexions.has_key(busName):
109 if busName == "session":
110 self.dbusConnexions[busName] = dbus.SessionBus()
111 elif busName == "system":
112 self.dbusConnexions[busName] = dbus.SystemBus()
114 raise Exception("Error: invalid bus: %s" % busName)
115 return self.dbusConnexions[busName]
119 ###############################################################################
120 class DbusSignalHandler:
122 signal hash id as busName#senderName#objectName#interfaceName#signalName
124 def __init__(self, busName, senderName, objectName, interfaceName, signalName):
125 self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
126 # connect handler to signal
127 self.bus = cache.dbusConnexion(busName)
128 self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
131 def disconnect(self):
132 names = self.id.split("#")
133 self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
136 def handleSignal(self, *args):
138 publish dbus args under topic hash id
140 factory.dispatch(self.id, json.dumps(args))
144 ###############################################################################
145 class DbusCallHandler:
147 deferred reply to return dbus results
149 def __init__(self, method, args):
151 self.request = defer.Deferred()
156 def callMethod(self):
158 dbus method async call
161 self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
165 def dbusSuccess(self, *result):
167 return JSON string result array
169 self.request.callback(json.dumps(result))
173 def dbusError(self, error):
175 return dbus error message
177 self.request.errback(Exception(error.get_dbus_message()))
182 ################################################################################
185 Execute DynDBusClass generated code
187 def __init__(self, globalCtx, localCtx) :
188 self.exec_string = ""
189 self.exec_code = None
190 self.exec_code_valid = 1
191 self.indent_level = 0
192 self.indent_increment = 1
194 self.localCtx = localCtx
195 self.globalCtx = globalCtx
198 def append_stmt(self, stmt) :
199 self.exec_code_valid = 0
201 for x in range(0,self.indent_level):
202 self.exec_string = self.exec_string + ' '
203 self.exec_string = self.exec_string + stmt + '\n'
206 self.indent_level = self.indent_level + self.indent_increment
209 self.indent_level = self.indent_level - self.indent_increment
211 # compile : Compile exec_string into exec_code using the builtin
212 # compile function. Skip if already in sync.
214 if not self.exec_code_valid :
215 self.exec_code = compile(self.exec_string, "<string>", "exec")
216 self.exec_code_valid = True
219 if not self.exec_code_valid :
221 exec(self.exec_code, self.globalCtx, self.localCtx)
225 ################################################################################
226 class XmlCbParser: # The target object of the parser
229 def __init__(self, dynDBusClass):
230 self.dynDBusClass = dynDBusClass
232 def start(self, tag, attrib): # Called for each opening tag.
236 if (tag == 'interface'):
237 self.dynDBusClass.set_interface(attrib['name'])
240 if (tag == 'method'):
242 self.dynDBusClass.def_method(attrib['name'])
244 if (tag == 'signal'):
246 self.dynDBusClass.def_signal(attrib['name'])
249 # Set signature (in/out & name) for method
251 if (self.current == 'method'):
252 if (attrib.has_key('direction') == False):
253 attrib['direction'] = "in"
254 self.dynDBusClass.add_signature(attrib['name'],
258 if (self.current == 'signal'):
259 if (attrib.has_key('name') == False):
260 attrib['name'] = 'value'
261 self.dynDBusClass.add_signature(attrib['name'], 'in',
264 def end(self, tag): # Called for each closing tag.
265 if (tag == 'method'):
266 self.dynDBusClass.add_dbus_method()
267 self.dynDBusClass.add_body_method()
268 self.dynDBusClass.end_method()
269 if (tag == 'signal'):
270 self.dynDBusClass.add_dbus_signal()
271 self.dynDBusClass.add_body_signal()
272 self.dynDBusClass.end_method()
274 def data(self, data):
275 pass # We do not need to do anything with data.
276 def close(self): # Called when all data has been parsed.
281 ################################################################################
282 class DynDBusClass():
283 def __init__(self, className, globalCtx, localCtx):
284 self.className = className
285 self.xmlCB = XmlCbParser(self)
287 self.class_code = ExecCode(globalCtx, localCtx)
288 self.class_code.indent_increment = 4
289 self.class_code.append_stmt("import dbus")
290 self.class_code.append_stmt("\n")
291 self.class_code.append_stmt("class " + self.className + "(dbus.service.Object):")
292 self.class_code.indent()
294 ## Overload of __init__ method
295 self.def_method("__init__")
296 self.add_method("bus, callback=None, objPath='/sample', busName='org.cloudeebus'")
297 self.add_stmt("self.bus = bus")
298 self.add_stmt("self.objPath = objPath")
299 self.add_stmt("self.callback = callback")
300 self.add_stmt("dbus.service.Object.__init__(self, conn=bus, bus_name=busName)")
303 ## Create 'add_to_connection' method
304 self.def_method("add_to_connection")
305 self.add_method("connection=None, path=None")
306 self.add_stmt("dbus.service.Object.add_to_connection(self, connection=self.bus, path=self.objPath)")
309 ## Create 'remove_from_connection' method
310 self.def_method("remove_from_connection")
311 self.add_method("connection=None, path=None")
312 self.add_stmt("dbus.service.Object.remove_from_connection(self, connection=None, path=self.objPath)")
315 def createDBusServiceFromXML(self, xml):
316 self.parser = XMLParser(target=self.xmlCB)
317 self.parser.feed(xml)
320 def set_interface(self, ifName):
323 def def_method(self, methodName):
324 self.methodToAdd = methodName
325 self.signalToAdd = None
326 self.args_str = str()
328 self.signature['name'] = str()
329 self.signature['in'] = str()
330 self.signature['out'] = str()
332 def def_signal(self, signalName):
333 self.methodToAdd = None
334 self.signalToAdd = signalName
335 self.args_str = str()
337 self.signature['name'] = str()
338 self.signature['in'] = str()
339 self.signature['out'] = str()
341 def add_signature(self, name, direction, signature):
342 if (direction == 'in'):
343 self.signature['in'] += signature
344 if (self.signature['name'] != str()):
345 self.signature['name'] += ", "
346 self.signature['name'] += name
347 if (direction == 'out'):
348 self.signature['out'] = signature
350 def add_method(self, args = None, async_success_cb = None, async_err_cb = None):
352 if (self.methodToAdd != None):
353 name = self.methodToAdd
355 name = self.signalToAdd
358 if (async_success_cb != None):
359 async_cb_str = async_success_cb
360 if (async_err_cb != None):
361 if (async_cb_str != str()):
363 async_cb_str += async_err_cb
365 parameters = self.args_str
366 if (async_cb_str != str()):
367 if (parameters != str()):
369 parameters +=async_cb_str
371 if (parameters != str()):
372 self.class_code.append_stmt("def " + name + "(self, %s):" % parameters)
374 self.class_code.append_stmt("def " + name + "(self):")
375 self.class_code.indent()
377 def end_method(self):
378 self.class_code.append_stmt("\n")
379 self.class_code.dedent()
381 def add_dbus_method(self):
382 decorator = '@dbus.service.method("' + self.ifName + '"'
383 if (self.signature.has_key('in') and self.signature['in'] != str()):
384 decorator += ", in_signature='" + self.signature['in'] + "'"
385 if (self.signature.has_key('out') and self.signature['out'] != str()):
386 decorator += ", out_signature='" + self.signature['out'] + "'"
387 decorator += ", async_callbacks=('dbus_async_cb', 'dbus_async_err_cb')"
389 self.class_code.append_stmt(decorator)
390 if (self.signature.has_key('name') and self.signature['name'] != str()):
391 self.add_method(self.signature['name'], async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
393 self.add_method(async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
395 def add_dbus_signal(self):
396 decorator = '@dbus.service.signal("' + self.ifName + '"'
397 if (self.signature.has_key('in') and self.signature['in'] != str()):
398 decorator += ", signature='" + self.signature['in'] + "'"
400 self.class_code.append_stmt(decorator)
401 if (self.signature.has_key('name') and self.signature['name'] != str()):
402 self.add_method(self.signature['name'])
406 def add_body_method(self):
407 if (self.methodToAdd != None):
408 if (self.args_str != str()):
409 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', self.objPath, '" + self.ifName + "', " + "dbus_async_cb, dbus_async_err_cb, %s)" % self.args_str)
411 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', self.objPath, '" + self.ifName + "', " + "dbus_async_cb, dbus_async_err_cb)")
413 def add_body_signal(self):
414 self.class_code.append_stmt("return") ## TODO: Remove and fix with code ad hoc
415 self.class_code.append_stmt("\n")
417 def add_stmt(self, stmt) :
418 self.class_code.append_stmt(stmt)
421 self.class_code.execute()
425 ###############################################################################
426 class CloudeebusService:
428 support for sending DBus messages and registering for DBus signals
430 def __init__(self, permissions):
431 self.permissions = permissions;
432 self.proxyObjects = {}
433 self.proxyMethods = {}
434 self.pendingCalls = []
435 self.dynDBusClasses = {} # DBus class source code generated dynamically (a list because one by classname)
436 self.services = {} # DBus service created
437 self.serviceAgents = {} # Instantiated DBus class previously generated dynamically, for now, one by classname
438 self.servicePendingCalls = {} # JS methods called (and waiting for a Success/error response), containing 'methodId', (successCB, errorCB)
439 self.localCtx = locals()
440 self.globalCtx = globals()
443 def proxyObject(self, busName, serviceName, objectName):
445 object hash id as busName#serviceName#objectName
447 id = "#".join([busName, serviceName, objectName])
448 if not self.proxyObjects.has_key(id):
450 # check permissions, array.index throws exception
451 self.permissions.index(serviceName)
452 bus = cache.dbusConnexion(busName)
453 self.proxyObjects[id] = bus.get_object(serviceName, objectName)
454 return self.proxyObjects[id]
457 def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
459 method hash id as busName#serviceName#objectName#interfaceName#methodName
461 id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
462 if not self.proxyMethods.has_key(id):
463 obj = self.proxyObject(busName, serviceName, objectName)
464 self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
465 return self.proxyMethods[id]
469 def dbusRegister(self, list):
471 arguments: bus, sender, object, interface, signal
474 raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
477 # check permissions, array.index throws exception
478 self.permissions.index(list[1])
480 # check if a handler exists
481 sigId = "#".join(list)
482 if cache.signalHandlers.has_key(sigId):
485 # create a handler that will publish the signal
486 dbusSignalHandler = DbusSignalHandler(*list)
487 cache.signalHandlers[sigId] = dbusSignalHandler
489 return dbusSignalHandler.id
493 def dbusSend(self, list):
495 arguments: bus, destination, object, interface, message, [args]
497 # clear pending calls
498 for call in self.pendingCalls:
500 self.pendingCalls.remove(call)
503 raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
505 # parse JSON arg list
508 args = json.loads(list[5])
510 # get dbus proxy method
511 method = self.proxyMethod(*list[0:5])
513 # use a deferred call handler to manage dbus results
514 dbusCallHandler = DbusCallHandler(method, args)
515 self.pendingCalls.append(dbusCallHandler)
516 return dbusCallHandler.callMethod()
520 def emitSignal(self, list):
522 arguments: agentObjectPath, signalName, result (to emit)
525 className = re.sub('/', '_', objectPath[1:])
528 if (self.serviceAgents.has_key(className) == True):
529 exe_str = "self.serviceAgents['"+ className +"']."+ signalName + "(" + str(result) + ")"
530 eval(exe_str, self.globalCtx, self.localCtx)
532 raise Exception("No object path " + objectPath)
535 def returnMethod(self, list):
537 arguments: methodId, callIndex, success (=true, error otherwise), result (to return)
543 if (self.servicePendingCalls.has_key(methodId)):
544 cb = self.servicePendingCalls[methodId]['calls'][callIndex]
546 raise Exception("No pending call " + str(callIndex) + " for methodID " + methodId)
548 successCB = cb["successCB"]
554 errorCB = cb["errorCB"]
559 self.servicePendingCalls[methodId]['calls'][callIndex] = None
560 self.servicePendingCalls[methodId]['count'] = self.servicePendingCalls[methodId]['count'] - 1
561 if self.servicePendingCalls[methodId]['count'] == 0:
562 del self.servicePendingCalls[methodId]
564 raise Exception("No methodID " + methodId)
566 def srvCB(self, name, objPath, ifName, async_succes_cb, async_error_cb, *args):
567 methodId = self.srvName + "#" + objPath + "#" + ifName + "#" + name
568 cb = { 'successCB': async_succes_cb,
569 'errorCB': async_error_cb}
570 if methodId not in self.servicePendingCalls:
571 self.servicePendingCalls[methodId] = {'count': 0, 'calls': []}
572 pendingCallStr = json.dumps({'callIndex': len(self.servicePendingCalls[methodId]['calls']), 'args': args})
573 self.servicePendingCalls[methodId]['calls'].append(cb)
574 self.servicePendingCalls[methodId]['count'] = self.servicePendingCalls[methodId]['count'] + 1
575 factory.dispatch(methodId, pendingCallStr)
578 def serviceAdd(self, list):
580 arguments: busName, srvName
583 self.bus = cache.dbusConnexion( busName['name'] )
584 self.srvName = list[1]
585 if (self.services.has_key(self.srvName) == False):
586 self.services[self.srvName] = dbus.service.BusName(name = self.srvName, bus = self.bus)
590 def serviceRelease(self, list):
592 arguments: busName, srvName
594 self.srvName = list[0]
595 if (self.services.has_key(self.srvName) == True):
596 self.services.pop(self.srvName)
599 raise Exception(self.srvName + " do not exist")
602 def serviceAddAgent(self, list):
604 arguments: objectPath, xmlTemplate
606 self.agentObjectPath = list[0]
607 xmlTemplate = list[1]
608 self.className = re.sub('/', '_', self.agentObjectPath[1:])
609 if (self.dynDBusClasses.has_key(self.className) == False):
610 self.dynDBusClasses[self.className] = DynDBusClass(self.className, self.globalCtx, self.localCtx)
611 self.dynDBusClasses[self.className].createDBusServiceFromXML(xmlTemplate)
612 self.dynDBusClasses[self.className].declare()
614 ## Class already exist, instanciate it if not already instanciated
615 if (self.serviceAgents.has_key(self.className) == False):
616 self.serviceAgents[self.className] = eval(self.className + "(self.bus, callback=self.srvCB, objPath=self.agentObjectPath, busName=self.srvName)", self.globalCtx, self.localCtx)
618 self.serviceAgents[self.className].add_to_connection()
619 return (self.agentObjectPath)
622 def serviceDelAgent(self, list):
624 arguments: objectPath, xmlTemplate
626 agentObjectPath = list[0]
627 className = re.sub('/', '_', agentObjectPath[1:])
629 if (self.serviceAgents.has_key(className)):
630 self.serviceAgents[self.className].remove_from_connection()
631 self.serviceAgents.pop(self.className)
633 raise Exception(agentObjectPath + " doesn't exist!")
635 return (agentObjectPath)
638 def getVersion(self):
640 return current version string
646 ###############################################################################
647 class CloudeebusServerProtocol(WampCraServerProtocol):
649 connexion and session authentication management
652 def onSessionOpen(self):
653 # CRA authentication options
654 self.clientAuthTimeout = 0
655 self.clientAuthAllowAnonymous = OPENDOOR
656 # CRA authentication init
657 WampCraServerProtocol.onSessionOpen(self)
660 def getAuthPermissions(self, key, extra):
661 return json.loads(extra.get("permissions", "[]"))
664 def getAuthSecret(self, key):
665 secret = CREDENTIALS.get(key, None)
668 # secret must be of str type to be hashed
669 return secret.encode('utf-8')
672 def onAuthenticated(self, key, permissions):
677 for netfilter in NETMASK:
678 ipHex=ipV4ToHex(self.peer.host)
679 ipAllowed = (ipHex & netfilter['mask']) == netfilter['ipAllowed'] & netfilter['mask']
683 raise Exception("host " + self.peer.host + " is not allowed!")
684 # check authentication key
686 raise Exception("Authentication failed")
687 # check permissions, array.index throws exception
688 for req in permissions:
690 # create cloudeebus service instance
691 self.cloudeebusService = CloudeebusService(permissions)
692 # register it for RPC
693 self.registerForRpc(self.cloudeebusService)
694 # register for Publish / Subscribe
695 self.registerForPubSub("", True)
698 def connectionLost(self, reason):
699 WampCraServerProtocol.connectionLost(self, reason)
700 if factory.getConnectionCount() == 0:
705 ###############################################################################
707 if __name__ == '__main__':
711 parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
712 parser.add_argument('-v', '--version', action='store_true',
713 help='print version and exit')
714 parser.add_argument('-d', '--debug', action='store_true',
715 help='log debug info on standard output')
716 parser.add_argument('-o', '--opendoor', action='store_true',
717 help='allow anonymous access to all services')
718 parser.add_argument('-p', '--port', default='9000',
720 parser.add_argument('-c', '--credentials',
721 help='path to credentials file')
722 parser.add_argument('-w', '--whitelist',
723 help='path to whitelist file')
724 parser.add_argument('-n', '--netmask',
725 help='netmask,IP filter (comma separated.) eg. : -n 127.0.0.1,192.168.2.0/24,10.12.16.0/255.255.255.0')
727 args = parser.parse_args(sys.argv[1:])
730 print("Cloudeebus version " + VERSION)
734 log.startLogging(sys.stdout)
736 OPENDOOR = args.opendoor
739 jfile = open(args.credentials)
740 CREDENTIALS = json.load(jfile)
744 jfile = open(args.whitelist)
745 WHITELIST = json.load(jfile)
749 iplist = args.netmask.split(",")
751 if ip.rfind("/") != -1:
757 mask = "255.255.255.255"
758 NETMASK.append( {'ipAllowed': ipV4ToHex(ipAllowed), 'mask' : ipV4ToHex(mask)} )
760 uri = "ws://localhost:" + args.port
762 factory = WampServerFactory(uri, debugWamp = args.debug)
763 factory.protocol = CloudeebusServerProtocol
764 factory.setProtocolOptions(allowHixie76 = True)
768 DBusGMainLoop(set_as_default=True)