5 # Copyright 2012 Intel Corporation.
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 # Frederic Paut <frederic.paut@intel.com>
25 import argparse, dbus, json, sys
27 from twisted.internet import glib2reactor
28 # Configure the twisted mainloop to be run inside the glib mainloop.
29 # This must be done before importing the other twisted modules
30 glib2reactor.install()
31 from twisted.internet import reactor, defer
33 from autobahn.websocket import listenWS
34 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
36 from dbus.mainloop.glib import DBusGMainLoop
41 gobject.threads_init()
47 from twisted.python import log
50 from xml.etree.ElementTree import XMLParser
52 ###############################################################################
60 ###############################################################################
62 ## Convert an ip or an IP mask (such as ip/24 or ip/255.255.255.0) in hex value (32bits)
65 if mask.rfind(".") == -1:
67 maskHex = (2**(int(mask))-1)
68 maskHex = maskHex << (32-int(mask))
70 raise Exception("Illegal mask (larger than 32 bits) " + mask)
72 maskField = mask.split(".")
73 # Check if mask has four fields (byte)
74 if len(maskField) != 4:
75 raise Exception("Illegal ip address / mask (should be 4 bytes) " + mask)
76 for maskQuartet in maskField:
77 byte = int(maskQuartet)
78 # Check if each field is really a byte
80 raise Exception("Illegal ip address / mask (digit larger than a byte) " + mask)
82 maskHex = maskHex << 8
83 maskHex = maskHex >> 8
86 ###############################################################################
89 Global cache of DBus connexions and signal handlers
92 self.dbusConnexions = {}
93 self.signalHandlers = {}
98 Disconnect signal handlers before resetting cache.
100 self.dbusConnexions = {}
101 # disconnect signal handlers
102 for key in self.signalHandlers:
103 self.signalHandlers[key].disconnect()
104 self.signalHandlers = {}
107 def dbusConnexion(self, busName):
108 if not self.dbusConnexions.has_key(busName):
109 if busName == "session":
110 self.dbusConnexions[busName] = dbus.SessionBus()
111 elif busName == "system":
112 self.dbusConnexions[busName] = dbus.SystemBus()
114 raise Exception("Error: invalid bus: %s" % busName)
115 return self.dbusConnexions[busName]
119 ###############################################################################
120 class DbusSignalHandler:
122 signal hash id as busName#senderName#objectName#interfaceName#signalName
124 def __init__(self, busName, senderName, objectName, interfaceName, signalName):
125 self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
126 # connect handler to signal
127 self.bus = cache.dbusConnexion(busName)
128 self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
131 def disconnect(self):
132 names = self.id.split("#")
133 self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
136 def handleSignal(self, *args):
138 publish dbus args under topic hash id
140 factory.dispatch(self.id, json.dumps(args))
144 ###############################################################################
145 class DbusCallHandler:
147 deferred reply to return dbus results
149 def __init__(self, method, args):
151 self.request = defer.Deferred()
156 def callMethod(self):
158 dbus method async call
161 self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
165 def dbusSuccess(self, *result):
167 return JSON string result array
169 self.request.callback(json.dumps(result))
173 def dbusError(self, error):
175 return dbus error message
177 self.request.errback(Exception(error.get_dbus_message()))
182 ################################################################################
185 Execute DynDBusClass generated code
187 def __init__(self, globalCtx, localCtx) :
188 self.exec_string = ""
189 self.exec_code = None
190 self.exec_code_valid = 1
191 self.indent_level = 0
192 self.indent_increment = 1
194 self.localCtx = localCtx
195 self.globalCtx = globalCtx
198 def append_stmt(self, stmt) :
199 self.exec_code_valid = 0
201 for x in range(0,self.indent_level):
202 self.exec_string = self.exec_string + ' '
203 self.exec_string = self.exec_string + stmt + '\n'
206 self.indent_level = self.indent_level + self.indent_increment
209 self.indent_level = self.indent_level - self.indent_increment
211 # compile : Compile exec_string into exec_code using the builtin
212 # compile function. Skip if already in sync.
214 if not self.exec_code_valid :
215 self.exec_code = compile(self.exec_string, "<string>", "exec")
216 self.exec_code_valid = True
219 if not self.exec_code_valid :
221 exec(self.exec_code, self.globalCtx, self.localCtx)
225 ################################################################################
226 class XmlCbParser: # The target object of the parser
229 def __init__(self, dynDBusClass):
230 self.dynDBusClass = dynDBusClass
232 def start(self, tag, attrib): # Called for each opening tag.
236 if (tag == 'interface'):
237 self.dynDBusClass.set_interface(attrib['name'])
240 if (tag == 'method'):
242 self.dynDBusClass.def_method(attrib['name'])
244 if (tag == 'signal'):
246 self.dynDBusClass.def_signal(attrib['name'])
249 # Set signature (in/out & name) for method
251 if (self.current == 'method'):
252 if (attrib.has_key('direction') == False):
253 attrib['direction'] = "in"
254 self.dynDBusClass.add_signature(attrib['name'],
258 if (self.current == 'signal'):
259 if (attrib.has_key('name') == False):
260 attrib['name'] = 'value'
261 self.dynDBusClass.add_signature(attrib['name'], 'in',
264 def end(self, tag): # Called for each closing tag.
265 if (tag == 'method'):
266 self.dynDBusClass.add_dbus_method()
267 self.dynDBusClass.add_body_method()
268 self.dynDBusClass.end_method()
269 if (tag == 'signal'):
270 self.dynDBusClass.add_dbus_signal()
271 self.dynDBusClass.add_body_signal()
272 self.dynDBusClass.end_method()
274 def data(self, data):
275 pass # We do not need to do anything with data.
276 def close(self): # Called when all data has been parsed.
281 ################################################################################
282 class DynDBusClass():
283 def __init__(self, className, globalCtx, localCtx):
284 self.className = className
285 self.xmlCB = XmlCbParser(self)
287 self.class_code = ExecCode(globalCtx, localCtx)
288 self.class_code.indent_increment = 4
289 self.class_code.append_stmt("import dbus")
290 self.class_code.append_stmt("\n")
291 self.class_code.append_stmt("class " + self.className + "(dbus.service.Object):")
292 self.class_code.indent()
294 ## Overload of __init__ method
295 self.def_method("__init__")
296 self.add_method("bus, callback=None, objPath='/sample', busName='org.cloudeebus'")
297 self.add_stmt("self.bus = bus")
298 self.add_stmt("self.objPath = objPath")
299 self.add_stmt("self.callback = callback")
300 self.add_stmt("dbus.service.Object.__init__(self, conn=bus, bus_name=busName)")
303 ## Create 'add_to_connection' method
304 self.def_method("add_to_connection")
305 self.add_method("connection=None, path=None")
306 self.add_stmt("dbus.service.Object.add_to_connection(self, connection=self.bus, path=self.objPath)")
309 ## Create 'remove_from_connection' method
310 self.def_method("remove_from_connection")
311 self.add_method("connection=None, path=None")
312 self.add_stmt("dbus.service.Object.remove_from_connection(self, connection=None, path=self.objPath)")
315 def createDBusServiceFromXML(self, xml):
316 self.parser = XMLParser(target=self.xmlCB)
317 self.parser.feed(xml)
320 def set_interface(self, ifName):
323 def def_method(self, methodName):
324 self.methodToAdd = methodName
325 self.signalToAdd = None
326 self.args_str = str()
328 self.signature['name'] = str()
329 self.signature['in'] = str()
330 self.signature['out'] = str()
332 def def_signal(self, signalName):
333 self.methodToAdd = None
334 self.signalToAdd = signalName
335 self.args_str = str()
337 self.signature['name'] = str()
338 self.signature['in'] = str()
339 self.signature['out'] = str()
341 def add_signature(self, name, direction, signature):
342 if (direction == 'in'):
343 self.signature['in'] += signature
344 if (self.signature['name'] != str()):
345 self.signature['name'] += ", "
346 self.signature['name'] += name
347 if (direction == 'out'):
348 self.signature['out'] = signature
350 def add_method(self, args = None, async_success_cb = None, async_err_cb = None):
352 if (self.methodToAdd != None):
353 name = self.methodToAdd
355 name = self.signalToAdd
358 if (async_success_cb != None):
359 async_cb_str = async_success_cb
360 if (async_err_cb != None):
361 if (async_cb_str != str()):
363 async_cb_str += async_err_cb
365 parameters = self.args_str
366 if (async_cb_str != str()):
367 if (parameters != str()):
369 parameters +=async_cb_str
371 if (parameters != str()):
372 self.class_code.append_stmt("def " + name + "(self, %s):" % parameters)
374 self.class_code.append_stmt("def " + name + "(self):")
375 self.class_code.indent()
377 def end_method(self):
378 self.class_code.append_stmt("\n")
379 self.class_code.dedent()
381 def add_dbus_method(self):
382 decorator = '@dbus.service.method("' + self.ifName + '"'
383 if (self.signature.has_key('in') and self.signature['in'] != str()):
384 decorator += ", in_signature='" + self.signature['in'] + "'"
385 if (self.signature.has_key('out') and self.signature['out'] != str()):
386 decorator += ", out_signature='" + self.signature['out'] + "'"
387 decorator += ", async_callbacks=('dbus_async_cb', 'dbus_async_err_cb')"
389 self.class_code.append_stmt(decorator)
390 if (self.signature.has_key('name') and self.signature['name'] != str()):
391 self.add_method(self.signature['name'], async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
393 self.add_method(async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
395 def add_dbus_signal(self):
396 decorator = '@dbus.service.signal("' + self.ifName + '"'
397 if (self.signature.has_key('in') and self.signature['in'] != str()):
398 decorator += ", signature='" + self.signature['in'] + "'"
400 self.class_code.append_stmt(decorator)
401 if (self.signature.has_key('name') and self.signature['name'] != str()):
402 self.add_method(self.signature['name'])
406 def add_body_method(self):
407 if (self.methodToAdd != None):
408 if (self.args_str != str()):
409 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', self.objPath, '" + self.ifName + "', " + "dbus_async_cb, dbus_async_err_cb, %s)" % self.args_str)
411 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', self.objPath, '" + self.ifName + "', " + "dbus_async_cb, dbus_async_err_cb)")
413 def add_body_signal(self):
414 self.class_code.append_stmt("return") ## TODO: Remove and fix with code ad hoc
415 self.class_code.append_stmt("\n")
417 def add_stmt(self, stmt) :
418 self.class_code.append_stmt(stmt)
421 self.class_code.execute()
425 ###############################################################################
426 class CloudeebusService:
428 support for sending DBus messages and registering for DBus signals
430 def __init__(self, permissions):
431 self.permissions = {};
432 self.permissions['permissions'] = permissions['permissions']
433 self.permissions['authextra'] = permissions['authextra']
434 self.proxyObjects = {}
435 self.proxyMethods = {}
436 self.pendingCalls = []
437 self.dynDBusClasses = {} # DBus class source code generated dynamically (a list because one by classname)
438 self.services = {} # DBus service created
439 self.serviceAgents = {} # Instantiated DBus class previously generated dynamically, for now, one by classname
440 self.servicePendingCalls = {} # JS methods called (and waiting for a Success/error response), containing 'methodId', (successCB, errorCB)
441 self.localCtx = locals()
442 self.globalCtx = globals()
445 def proxyObject(self, busName, serviceName, objectName):
447 object hash id as busName#serviceName#objectName
449 id = "#".join([busName, serviceName, objectName])
450 if not self.proxyObjects.has_key(id):
452 # check permissions, array.index throws exception
453 self.permissions['permissions'].index(serviceName)
454 bus = cache.dbusConnexion(busName)
455 self.proxyObjects[id] = bus.get_object(serviceName, objectName)
456 return self.proxyObjects[id]
459 def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
461 method hash id as busName#serviceName#objectName#interfaceName#methodName
463 id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
464 if not self.proxyMethods.has_key(id):
465 obj = self.proxyObject(busName, serviceName, objectName)
466 self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
467 return self.proxyMethods[id]
471 def dbusRegister(self, list):
473 arguments: bus, sender, object, interface, signal
476 raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
479 # check permissions, array.index throws exception
480 self.permissions['permissions'].index(list[1])
482 # check if a handler exists
483 sigId = "#".join(list)
484 if cache.signalHandlers.has_key(sigId):
487 # create a handler that will publish the signal
488 dbusSignalHandler = DbusSignalHandler(*list)
489 cache.signalHandlers[sigId] = dbusSignalHandler
491 return dbusSignalHandler.id
495 def dbusSend(self, list):
497 arguments: bus, destination, object, interface, message, [args]
499 # clear pending calls
500 for call in self.pendingCalls:
502 self.pendingCalls.remove(call)
505 raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
507 # parse JSON arg list
510 args = json.loads(list[5])
512 # get dbus proxy method
513 method = self.proxyMethod(*list[0:5])
515 # use a deferred call handler to manage dbus results
516 dbusCallHandler = DbusCallHandler(method, args)
517 self.pendingCalls.append(dbusCallHandler)
518 return dbusCallHandler.callMethod()
522 def emitSignal(self, list):
524 arguments: agentObjectPath, signalName, result (to emit)
527 className = re.sub('/', '_', objectPath[1:])
530 if (self.serviceAgents.has_key(className) == True):
531 exe_str = "self.serviceAgents['"+ className +"']."+ signalName + "(" + str(result) + ")"
532 eval(exe_str, self.globalCtx, self.localCtx)
534 raise Exception("No object path " + objectPath)
537 def returnMethod(self, list):
539 arguments: methodId, callIndex, success (=true, error otherwise), result (to return)
545 if (self.servicePendingCalls.has_key(methodId)):
546 cb = self.servicePendingCalls[methodId]['calls'][callIndex]
548 raise Exception("No pending call " + str(callIndex) + " for methodID " + methodId)
550 successCB = cb["successCB"]
556 errorCB = cb["errorCB"]
561 self.servicePendingCalls[methodId]['calls'][callIndex] = None
562 self.servicePendingCalls[methodId]['count'] = self.servicePendingCalls[methodId]['count'] - 1
563 if self.servicePendingCalls[methodId]['count'] == 0:
564 del self.servicePendingCalls[methodId]
566 raise Exception("No methodID " + methodId)
568 def srvCB(self, name, objPath, ifName, async_succes_cb, async_error_cb, *args):
569 methodId = self.srvName + "#" + objPath + "#" + ifName + "#" + name
570 cb = { 'successCB': async_succes_cb,
571 'errorCB': async_error_cb}
572 if methodId not in self.servicePendingCalls:
573 self.servicePendingCalls[methodId] = {'count': 0, 'calls': []}
574 pendingCallStr = json.dumps({'callIndex': len(self.servicePendingCalls[methodId]['calls']), 'args': args})
575 self.servicePendingCalls[methodId]['calls'].append(cb)
576 self.servicePendingCalls[methodId]['count'] = self.servicePendingCalls[methodId]['count'] + 1
577 factory.dispatch(methodId, pendingCallStr)
580 def serviceAdd(self, list):
582 arguments: busName, srvName
585 self.bus = cache.dbusConnexion( busName['name'] )
586 self.srvName = list[1]
587 if (self.services.has_key(self.srvName) == False):
588 self.services[self.srvName] = dbus.service.BusName(name = self.srvName, bus = self.bus)
592 def serviceRelease(self, list):
594 arguments: busName, srvName
596 self.srvName = list[0]
597 if (self.services.has_key(self.srvName) == True):
598 self.services.pop(self.srvName)
601 raise Exception(self.srvName + " do not exist")
604 def serviceAddAgent(self, list):
606 arguments: objectPath, xmlTemplate
608 self.agentObjectPath = list[0]
609 xmlTemplate = list[1]
610 self.className = re.sub('/', '_', self.agentObjectPath[1:])
611 if (self.dynDBusClasses.has_key(self.className) == False):
612 self.dynDBusClasses[self.className] = DynDBusClass(self.className, self.globalCtx, self.localCtx)
613 self.dynDBusClasses[self.className].createDBusServiceFromXML(xmlTemplate)
614 self.dynDBusClasses[self.className].declare()
616 ## Class already exist, instanciate it if not already instanciated
617 if (self.serviceAgents.has_key(self.className) == False):
618 self.serviceAgents[self.className] = eval(self.className + "(self.bus, callback=self.srvCB, objPath=self.agentObjectPath, busName=self.srvName)", self.globalCtx, self.localCtx)
620 self.serviceAgents[self.className].add_to_connection()
621 return (self.agentObjectPath)
624 def serviceDelAgent(self, list):
626 arguments: objectPath, xmlTemplate
628 agentObjectPath = list[0]
629 className = re.sub('/', '_', agentObjectPath[1:])
631 if (self.serviceAgents.has_key(className)):
632 self.serviceAgents[self.className].remove_from_connection()
633 self.serviceAgents.pop(self.className)
635 raise Exception(agentObjectPath + " doesn't exist!")
637 return (agentObjectPath)
640 def getVersion(self):
642 return current version string
648 ###############################################################################
649 class CloudeebusServerProtocol(WampCraServerProtocol):
651 connexion and session authentication management
654 def onSessionOpen(self):
655 # CRA authentication options
656 self.clientAuthTimeout = 0
657 self.clientAuthAllowAnonymous = OPENDOOR
658 # CRA authentication init
659 WampCraServerProtocol.onSessionOpen(self)
662 def getAuthPermissions(self, key, extra):
663 return {'permissions': extra.get("permissions", None),
664 'authextra': extra.get("authextra", None)}
666 def getAuthSecret(self, key):
667 secret = CREDENTIALS.get(key, None)
670 # secret must be of str type to be hashed
674 def onAuthenticated(self, key, permissions):
679 for netfilter in NETMASK:
680 ipHex=ipV4ToHex(self.peer.host)
681 ipAllowed = (ipHex & netfilter['mask']) == netfilter['ipAllowed'] & netfilter['mask']
685 raise Exception("host " + self.peer.host + " is not allowed!")
686 # check authentication key
688 raise Exception("Authentication failed")
689 # check permissions, array.index throws exception
690 for req in permissions['permissions']:
691 WHITELIST.index(req);
692 # create cloudeebus service instance
693 self.cloudeebusService = CloudeebusService(permissions)
694 # register it for RPC
695 self.registerForRpc(self.cloudeebusService)
696 # register for Publish / Subscribe
697 self.registerForPubSub("", True)
700 def connectionLost(self, reason):
701 WampCraServerProtocol.connectionLost(self, reason)
702 if factory.getConnectionCount() == 0:
707 ###############################################################################
709 if __name__ == '__main__':
713 parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
714 parser.add_argument('-v', '--version', action='store_true',
715 help='print version and exit')
716 parser.add_argument('-d', '--debug', action='store_true',
717 help='log debug info on standard output')
718 parser.add_argument('-o', '--opendoor', action='store_true',
719 help='allow anonymous access to all services')
720 parser.add_argument('-p', '--port', default='9000',
722 parser.add_argument('-c', '--credentials',
723 help='path to credentials file')
724 parser.add_argument('-w', '--whitelist',
725 help='path to whitelist file')
726 parser.add_argument('-n', '--netmask',
727 help='netmask,IP filter (comma separated.) eg. : -n 127.0.0.1,192.168.2.0/24,10.12.16.0/255.255.255.0')
729 args = parser.parse_args(sys.argv[1:])
732 print("Cloudeebus version " + VERSION)
736 log.startLogging(sys.stdout)
738 OPENDOOR = args.opendoor
741 jfile = open(args.credentials)
742 CREDENTIALS = json.load(jfile)
746 jfile = open(args.whitelist)
747 WHITELIST = json.load(jfile)
751 iplist = args.netmask.split(",")
753 if ip.rfind("/") != -1:
759 mask = "255.255.255.255"
760 NETMASK.append( {'ipAllowed': ipV4ToHex(ipAllowed), 'mask' : ipV4ToHex(mask)} )
762 uri = "ws://localhost:" + args.port
764 factory = WampServerFactory(uri, debugWamp = args.debug)
765 factory.protocol = CloudeebusServerProtocol
766 factory.setProtocolOptions(allowHixie76 = True)
770 DBusGMainLoop(set_as_default=True)