5 # Copyright 2012 Intel Corporation.
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 # Frederic Paut <frederic.paut@intel.com>
25 import argparse, dbus, json, sys
27 from twisted.internet import glib2reactor
28 # Configure the twisted mainloop to be run inside the glib mainloop.
29 # This must be done before importing the other twisted modules
30 glib2reactor.install()
31 from twisted.internet import reactor, defer
33 from autobahn.websocket import listenWS
34 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
36 from dbus.mainloop.glib import DBusGMainLoop
41 gobject.threads_init()
47 from twisted.python import log
50 from xml.etree.ElementTree import XMLParser
52 ###############################################################################
60 ###############################################################################
62 ## Convert an ip or an IP mask (such as ip/24 or ip/255.255.255.0) in hex value (32bits)
65 if mask.rfind(".") == -1:
67 maskHex = (2**(int(mask))-1)
68 maskHex = maskHex << (32-int(mask))
70 raise Exception("Illegal mask (larger than 32 bits) " + mask)
72 maskField = mask.split(".")
73 # Check if mask has four fields (byte)
74 if len(maskField) != 4:
75 raise Exception("Illegal ip address / mask (should be 4 bytes) " + mask)
76 for maskQuartet in maskField:
77 byte = int(maskQuartet)
78 # Check if each field is really a byte
80 raise Exception("Illegal ip address / mask (digit larger than a byte) " + mask)
82 maskHex = maskHex << 8
83 maskHex = maskHex >> 8
86 ###############################################################################
89 Global cache of DBus connexions and signal handlers
92 self.dbusConnexions = {}
93 self.signalHandlers = {}
98 Disconnect signal handlers before resetting cache.
100 self.dbusConnexions = {}
101 # disconnect signal handlers
102 for key in self.signalHandlers:
103 self.signalHandlers[key].disconnect()
104 self.signalHandlers = {}
107 def dbusConnexion(self, busName):
108 if not self.dbusConnexions.has_key(busName):
109 if busName == "session":
110 self.dbusConnexions[busName] = dbus.SessionBus()
111 elif busName == "system":
112 self.dbusConnexions[busName] = dbus.SystemBus()
114 raise Exception("Error: invalid bus: %s" % busName)
115 return self.dbusConnexions[busName]
119 ###############################################################################
120 class DbusSignalHandler:
122 signal hash id as busName#senderName#objectName#interfaceName#signalName
124 def __init__(self, busName, senderName, objectName, interfaceName, signalName):
125 self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
126 # connect handler to signal
127 self.bus = cache.dbusConnexion(busName)
128 self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
131 def disconnect(self):
132 names = self.id.split("#")
133 self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
136 def handleSignal(self, *args):
138 publish dbus args under topic hash id
140 factory.dispatch(self.id, json.dumps(args))
144 ###############################################################################
145 class DbusCallHandler:
147 deferred reply to return dbus results
149 def __init__(self, method, args):
151 self.request = defer.Deferred()
156 def callMethod(self):
158 dbus method async call
161 self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
165 def dbusSuccess(self, *result):
167 return JSON string result array
169 self.request.callback(json.dumps(result))
173 def dbusError(self, error):
175 return dbus error message
177 self.request.errback(Exception(error.get_dbus_message()))
182 ################################################################################
185 Execute DynDBusClass generated code
187 def __init__(self, globalCtx, localCtx) :
188 self.exec_string = ""
189 self.exec_code = None
190 self.exec_code_valid = 1
191 self.indent_level = 0
192 self.indent_increment = 1
194 self.localCtx = localCtx
195 self.globalCtx = globalCtx
198 def append_stmt(self, stmt) :
199 self.exec_code_valid = 0
201 for x in range(0,self.indent_level):
202 self.exec_string = self.exec_string + ' '
203 self.exec_string = self.exec_string + stmt + '\n'
206 self.indent_level = self.indent_level + self.indent_increment
209 self.indent_level = self.indent_level - self.indent_increment
211 # compile : Compile exec_string into exec_code using the builtin
212 # compile function. Skip if already in sync.
214 if not self.exec_code_valid :
215 self.exec_code = compile(self.exec_string, "<string>", "exec")
216 self.exec_code_valid = True
219 if not self.exec_code_valid :
221 exec(self.exec_code, self.globalCtx, self.localCtx)
225 ################################################################################
226 class XmlCbParser: # The target object of the parser
229 def __init__(self, dynDBusClass):
230 self.dynDBusClass = dynDBusClass
232 def start(self, tag, attrib): # Called for each opening tag.
236 if (tag == 'interface'):
237 self.dynDBusClass.set_interface(attrib['name'])
240 if (tag == 'method'):
242 self.dynDBusClass.def_method(attrib['name'])
244 if (tag == 'signal'):
246 self.dynDBusClass.def_signal(attrib['name'])
249 # Set signature (in/out & name) for method
251 if (self.current == 'method'):
252 if (attrib.has_key('direction') == False):
253 attrib['direction'] = "in"
254 self.dynDBusClass.add_signature(attrib['name'],
258 if (self.current == 'signal'):
259 if (attrib.has_key('name') == False):
260 attrib['name'] = 'value'
261 self.dynDBusClass.add_signature(attrib['name'], 'in',
264 def end(self, tag): # Called for each closing tag.
265 if (tag == 'method'):
266 self.dynDBusClass.add_dbus_method()
267 self.dynDBusClass.add_body_method()
268 self.dynDBusClass.end_method()
269 if (tag == 'signal'):
270 self.dynDBusClass.add_dbus_signal()
271 self.dynDBusClass.add_body_signal()
272 self.dynDBusClass.end_method()
274 def data(self, data):
275 pass # We do not need to do anything with data.
276 def close(self): # Called when all data has been parsed.
281 ###############################################################################
282 def createClassName(objectPath):
283 return re.sub('/', '_', objectPath[1:])
285 ################################################################################
286 class DynDBusClass():
287 def __init__(self, className, globalCtx, localCtx):
288 self.className = className
289 self.xmlCB = XmlCbParser(self)
291 self.class_code = ExecCode(globalCtx, localCtx)
292 self.class_code.indent_increment = 4
293 self.class_code.append_stmt("import dbus")
294 self.class_code.append_stmt("\n")
295 self.class_code.append_stmt("class " + self.className + "(dbus.service.Object):")
296 self.class_code.indent()
298 ## Overload of __init__ method
299 self.def_method("__init__")
300 self.add_method("bus, callback=None, objPath='/sample', busName='org.cloudeebus'")
301 self.add_stmt("self.bus = bus")
302 self.add_stmt("self.objPath = objPath")
303 self.add_stmt("self.callback = callback")
304 self.add_stmt("dbus.service.Object.__init__(self, conn=bus, bus_name=busName)")
307 ## Create 'add_to_connection' method
308 self.def_method("add_to_connection")
309 self.add_method("connection=None, path=None")
310 self.add_stmt("dbus.service.Object.add_to_connection(self, connection=self.bus, path=self.objPath)")
313 ## Create 'remove_from_connection' method
314 self.def_method("remove_from_connection")
315 self.add_method("connection=None, path=None")
316 self.add_stmt("dbus.service.Object.remove_from_connection(self, connection=None, path=self.objPath)")
319 def createDBusServiceFromXML(self, xml):
320 self.parser = XMLParser(target=self.xmlCB)
321 self.parser.feed(xml)
324 def set_interface(self, ifName):
327 def def_method(self, methodName):
328 self.methodToAdd = methodName
329 self.signalToAdd = None
330 self.args_str = str()
332 self.signature['name'] = str()
333 self.signature['in'] = str()
334 self.signature['out'] = str()
336 def def_signal(self, signalName):
337 self.methodToAdd = None
338 self.signalToAdd = signalName
339 self.args_str = str()
341 self.signature['name'] = str()
342 self.signature['in'] = str()
343 self.signature['out'] = str()
345 def add_signature(self, name, direction, signature):
346 if (direction == 'in'):
347 self.signature['in'] += signature
348 if (self.signature['name'] != str()):
349 self.signature['name'] += ", "
350 self.signature['name'] += name
351 if (direction == 'out'):
352 self.signature['out'] = signature
354 def add_method(self, args = None, async_success_cb = None, async_err_cb = None):
356 if (self.methodToAdd != None):
357 name = self.methodToAdd
359 name = self.signalToAdd
362 if (async_success_cb != None):
363 async_cb_str = async_success_cb
364 if (async_err_cb != None):
365 if (async_cb_str != str()):
367 async_cb_str += async_err_cb
369 parameters = self.args_str
370 if (async_cb_str != str()):
371 if (parameters != str()):
373 parameters +=async_cb_str
375 if (parameters != str()):
376 self.class_code.append_stmt("def " + name + "(self, %s):" % parameters)
378 self.class_code.append_stmt("def " + name + "(self):")
379 self.class_code.indent()
381 def end_method(self):
382 self.class_code.append_stmt("\n")
383 self.class_code.dedent()
385 def add_dbus_method(self):
386 decorator = '@dbus.service.method("' + self.ifName + '"'
387 if (self.signature.has_key('in') and self.signature['in'] != str()):
388 decorator += ", in_signature='" + self.signature['in'] + "'"
389 if (self.signature.has_key('out') and self.signature['out'] != str()):
390 decorator += ", out_signature='" + self.signature['out'] + "'"
391 decorator += ", async_callbacks=('dbus_async_cb', 'dbus_async_err_cb')"
393 self.class_code.append_stmt(decorator)
394 if (self.signature.has_key('name') and self.signature['name'] != str()):
395 self.add_method(self.signature['name'], async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
397 self.add_method(async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
399 def add_dbus_signal(self):
400 decorator = '@dbus.service.signal("' + self.ifName + '"'
401 if (self.signature.has_key('in') and self.signature['in'] != str()):
402 decorator += ", signature='" + self.signature['in'] + "'"
404 self.class_code.append_stmt(decorator)
405 if (self.signature.has_key('name') and self.signature['name'] != str()):
406 self.add_method(self.signature['name'])
410 def add_body_method(self):
411 if (self.methodToAdd != None):
412 if (self.args_str != str()):
413 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', self.objPath, '" + self.ifName + "', " + "dbus_async_cb, dbus_async_err_cb, %s)" % self.args_str)
415 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', self.objPath, '" + self.ifName + "', " + "dbus_async_cb, dbus_async_err_cb)")
417 def add_body_signal(self):
418 self.class_code.append_stmt("return") ## TODO: Remove and fix with code ad hoc
419 self.class_code.append_stmt("\n")
421 def add_stmt(self, stmt) :
422 self.class_code.append_stmt(stmt)
425 self.class_code.execute()
429 ###############################################################################
430 class CloudeebusService:
432 support for sending DBus messages and registering for DBus signals
434 def __init__(self, permissions):
435 self.permissions = {};
436 self.permissions['permissions'] = permissions['permissions']
437 self.permissions['authextra'] = permissions['authextra']
438 self.proxyObjects = {}
439 self.proxyMethods = {}
440 self.pendingCalls = []
441 self.dynDBusClasses = {} # DBus class source code generated dynamically (a list because one by classname)
442 self.services = {} # DBus service created
443 self.serviceAgents = {} # Instantiated DBus class previously generated dynamically, for now, one by classname
444 self.servicePendingCalls = {} # JS methods called (and waiting for a Success/error response), containing 'methodId', (successCB, errorCB)
445 self.localCtx = locals()
446 self.globalCtx = globals()
449 def proxyObject(self, busName, serviceName, objectName):
451 object hash id as busName#serviceName#objectName
453 id = "#".join([busName, serviceName, objectName])
454 if not self.proxyObjects.has_key(id):
456 # check permissions, array.index throws exception
457 self.permissions['permissions'].index(serviceName)
458 bus = cache.dbusConnexion(busName)
459 self.proxyObjects[id] = bus.get_object(serviceName, objectName)
460 return self.proxyObjects[id]
463 def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
465 method hash id as busName#serviceName#objectName#interfaceName#methodName
467 id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
468 if not self.proxyMethods.has_key(id):
469 obj = self.proxyObject(busName, serviceName, objectName)
470 self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
471 return self.proxyMethods[id]
475 def dbusRegister(self, list):
477 arguments: bus, sender, object, interface, signal
480 raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
483 # check permissions, array.index throws exception
484 self.permissions['permissions'].index(list[1])
486 # check if a handler exists
487 sigId = "#".join(list)
488 if cache.signalHandlers.has_key(sigId):
491 # create a handler that will publish the signal
492 dbusSignalHandler = DbusSignalHandler(*list)
493 cache.signalHandlers[sigId] = dbusSignalHandler
495 return dbusSignalHandler.id
499 def dbusSend(self, list):
501 arguments: bus, destination, object, interface, message, [args]
503 # clear pending calls
504 for call in self.pendingCalls:
506 self.pendingCalls.remove(call)
509 raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
511 # parse JSON arg list
514 args = json.loads(list[5])
516 # get dbus proxy method
517 method = self.proxyMethod(*list[0:5])
519 # use a deferred call handler to manage dbus results
520 dbusCallHandler = DbusCallHandler(method, args)
521 self.pendingCalls.append(dbusCallHandler)
522 return dbusCallHandler.callMethod()
526 def emitSignal(self, list):
528 arguments: agentObjectPath, signalName, result (to emit)
531 className = re.sub('/', '_', objectPath[1:])
534 if (self.serviceAgents.has_key(className) == True):
535 exe_str = "self.serviceAgents['"+ className +"']."+ signalName + "(" + str(result) + ")"
536 eval(exe_str, self.globalCtx, self.localCtx)
538 raise Exception("No object path " + objectPath)
541 def returnMethod(self, list):
543 arguments: methodId, callIndex, success (=true, error otherwise), result (to return)
549 if (self.servicePendingCalls.has_key(methodId)):
550 cb = self.servicePendingCalls[methodId]['calls'][callIndex]
552 raise Exception("No pending call " + str(callIndex) + " for methodID " + methodId)
554 successCB = cb["successCB"]
560 errorCB = cb["errorCB"]
565 self.servicePendingCalls[methodId]['calls'][callIndex] = None
566 self.servicePendingCalls[methodId]['count'] = self.servicePendingCalls[methodId]['count'] - 1
567 if self.servicePendingCalls[methodId]['count'] == 0:
568 del self.servicePendingCalls[methodId]
570 raise Exception("No methodID " + methodId)
572 def srvCB(self, name, objPath, ifName, async_succes_cb, async_error_cb, *args):
573 methodId = self.srvName + "#" + objPath + "#" + ifName + "#" + name
574 cb = { 'successCB': async_succes_cb,
575 'errorCB': async_error_cb}
576 if methodId not in self.servicePendingCalls:
577 self.servicePendingCalls[methodId] = {'count': 0, 'calls': []}
580 pendingCallStr = json.dumps({'callIndex': len(self.servicePendingCalls[methodId]['calls']), 'args': args})
582 args = eval( str(args).replace("dbus.Byte", "dbus.Int16") )
583 pendingCallStr = json.dumps({'callIndex': len(self.servicePendingCalls[methodId]['calls']), 'args': args})
585 self.servicePendingCalls[methodId]['calls'].append(cb)
586 self.servicePendingCalls[methodId]['count'] = self.servicePendingCalls[methodId]['count'] + 1
587 factory.dispatch(methodId, pendingCallStr)
590 def serviceAdd(self, list):
592 arguments: busName, srvName
595 self.bus = cache.dbusConnexion( busName['name'] )
596 self.srvName = list[1]
597 if (self.services.has_key(self.srvName) == False):
598 self.services[self.srvName] = dbus.service.BusName(name = self.srvName, bus = self.bus)
602 def serviceRelease(self, list):
604 arguments: busName, srvName
606 self.srvName = list[0]
607 if (self.services.has_key(self.srvName) == True):
608 self.services.pop(self.srvName)
611 raise Exception(self.srvName + " do not exist")
614 def serviceAddAgent(self, list):
616 arguments: objectPath, xmlTemplate
618 self.agentObjectPath = list[0]
619 xmlTemplate = list[1]
620 self.className = createClassName(self.agentObjectPath)
621 if (self.dynDBusClasses.has_key(self.className) == False):
622 self.dynDBusClasses[self.className] = DynDBusClass(self.className, self.globalCtx, self.localCtx)
623 self.dynDBusClasses[self.className].createDBusServiceFromXML(xmlTemplate)
624 self.dynDBusClasses[self.className].declare()
626 ## Class already exist, instanciate it if not already instanciated
627 if (self.serviceAgents.has_key(self.className) == False):
628 self.serviceAgents[self.className] = eval(self.className + "(self.bus, callback=self.srvCB, objPath=self.agentObjectPath, busName=self.srvName)", self.globalCtx, self.localCtx)
630 self.serviceAgents[self.className].add_to_connection()
631 return (self.agentObjectPath)
634 def serviceDelAgent(self, list):
636 arguments: objectPath, xmlTemplate
638 agentObjectPath = list[0]
639 className = createClassName(agentObjectPath)
641 if (self.serviceAgents.has_key(className)):
642 self.serviceAgents[self.className].remove_from_connection()
643 self.serviceAgents.pop(self.className)
645 raise Exception(agentObjectPath + " doesn't exist!")
647 return (agentObjectPath)
650 def getVersion(self):
652 return current version string
658 ###############################################################################
659 class CloudeebusServerProtocol(WampCraServerProtocol):
661 connexion and session authentication management
664 def onSessionOpen(self):
665 # CRA authentication options
666 self.clientAuthTimeout = 0
667 self.clientAuthAllowAnonymous = OPENDOOR
668 # CRA authentication init
669 WampCraServerProtocol.onSessionOpen(self)
672 def getAuthPermissions(self, key, extra):
673 return {'permissions': extra.get("permissions", None),
674 'authextra': extra.get("authextra", None)}
676 def getAuthSecret(self, key):
677 secret = CREDENTIALS.get(key, None)
680 # secret must be of str type to be hashed
684 def onAuthenticated(self, key, permissions):
689 for netfilter in NETMASK:
690 ipHex=ipV4ToHex(self.peer.host)
691 ipAllowed = (ipHex & netfilter['mask']) == netfilter['ipAllowed'] & netfilter['mask']
695 raise Exception("host " + self.peer.host + " is not allowed!")
696 # check authentication key
698 raise Exception("Authentication failed")
699 # check permissions, array.index throws exception
700 for req in permissions['permissions']:
701 WHITELIST.index(req);
702 # create cloudeebus service instance
703 self.cloudeebusService = CloudeebusService(permissions)
704 # register it for RPC
705 self.registerForRpc(self.cloudeebusService)
706 # register for Publish / Subscribe
707 self.registerForPubSub("", True)
710 def connectionLost(self, reason):
711 WampCraServerProtocol.connectionLost(self, reason)
712 if factory.getConnectionCount() == 0:
717 ###############################################################################
719 if __name__ == '__main__':
723 parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
724 parser.add_argument('-v', '--version', action='store_true',
725 help='print version and exit')
726 parser.add_argument('-d', '--debug', action='store_true',
727 help='log debug info on standard output')
728 parser.add_argument('-o', '--opendoor', action='store_true',
729 help='allow anonymous access to all services')
730 parser.add_argument('-p', '--port', default='9000',
732 parser.add_argument('-c', '--credentials',
733 help='path to credentials file')
734 parser.add_argument('-w', '--whitelist',
735 help='path to whitelist file')
736 parser.add_argument('-n', '--netmask',
737 help='netmask,IP filter (comma separated.) eg. : -n 127.0.0.1,192.168.2.0/24,10.12.16.0/255.255.255.0')
739 args = parser.parse_args(sys.argv[1:])
742 print("Cloudeebus version " + VERSION)
746 log.startLogging(sys.stdout)
748 OPENDOOR = args.opendoor
751 jfile = open(args.credentials)
752 CREDENTIALS = json.load(jfile)
756 jfile = open(args.whitelist)
757 WHITELIST = json.load(jfile)
761 iplist = args.netmask.split(",")
763 if ip.rfind("/") != -1:
769 mask = "255.255.255.255"
770 NETMASK.append( {'ipAllowed': ipV4ToHex(ipAllowed), 'mask' : ipV4ToHex(mask)} )
772 uri = "ws://localhost:" + args.port
774 factory = WampServerFactory(uri, debugWamp = args.debug)
775 factory.protocol = CloudeebusServerProtocol
776 factory.setProtocolOptions(allowHixie76 = True)
780 DBusGMainLoop(set_as_default=True)