0614f4fc8ff96e703cf8e71e3cc11a49f5bd3860
[contrib/cloudeebus.git] / cloudeebus / cloudeebus.py
1 #!/usr/bin/env python
2
3 # Cloudeebus
4 #
5 # Copyright 2012 Intel Corporation.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 #
22
23
24 import argparse, dbus, json, sys
25
26 from twisted.internet import glib2reactor
27 # Configure the twisted mainloop to be run inside the glib mainloop.
28 # This must be done before importing the other twisted modules
29 glib2reactor.install()
30 from twisted.internet import reactor, defer
31
32 from autobahn.websocket import listenWS
33 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
34
35 from dbus.mainloop.glib import DBusGMainLoop
36
37 import gobject
38 import re
39 import dbus.service
40 gobject.threads_init()
41
42 from dbus import glib
43 glib.init_threads()
44
45 # enable debug log
46 from twisted.python import log
47
48 # XML parser module
49 from xml.etree.ElementTree import XMLParser
50
51
52 ###############################################################################
53
54 VERSION = "0.3.0"
55 OPENDOOR = False
56 CREDENTIALS = {}
57 WHITELIST = []
58
59 ###############################################################################
60 class DbusCache:
61     '''
62     Global cache of DBus connexions and signal handlers
63     '''
64     def __init__(self):
65         self.dbusConnexions = {}
66         self.signalHandlers = {}
67
68
69     def reset(self):
70         '''
71         Disconnect signal handlers before resetting cache.
72         '''
73         self.dbusConnexions = {}
74         # disconnect signal handlers
75         for key in self.signalHandlers:
76             self.signalHandlers[key].disconnect()
77         self.signalHandlers = {}
78
79
80     def dbusConnexion(self, busName):
81         if not self.dbusConnexions.has_key(busName):
82             if busName == "session":
83                 self.dbusConnexions[busName] = dbus.SessionBus()
84             elif busName == "system":
85                 self.dbusConnexions[busName] = dbus.SystemBus()
86             else:
87                 raise Exception("Error: invalid bus: %s" % busName)
88         return self.dbusConnexions[busName]
89
90
91
92 ###############################################################################
93 class DbusSignalHandler:
94     '''
95     signal hash id as busName#senderName#objectName#interfaceName#signalName
96     '''
97     def __init__(self, busName, senderName, objectName, interfaceName, signalName):
98         self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
99         # connect handler to signal
100         self.bus = cache.dbusConnexion(busName)
101         self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
102         
103     
104     def disconnect(self):
105         names = self.id.split("#")
106         self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
107
108
109     def handleSignal(self, *args):
110         '''
111         publish dbus args under topic hash id
112         '''
113         factory.dispatch(self.id, json.dumps(args))
114
115
116
117 ###############################################################################
118 class DbusCallHandler:
119     '''
120     deferred reply to return dbus results
121     '''
122     def __init__(self, method, args):
123         self.pending = False
124         self.request = defer.Deferred()
125         self.method = method
126         self.args = args
127
128
129     def callMethod(self):
130         '''
131         dbus method async call
132         '''
133         self.pending = True
134         self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
135         return self.request
136
137
138     def dbusSuccess(self, *result):
139         '''
140         return JSON string result array
141         '''
142         self.request.callback(json.dumps(result))
143         self.pending = False
144
145
146     def dbusError(self, error):
147         '''
148         return dbus error message
149         '''
150         self.request.errback(Exception(error.get_dbus_message()))
151         self.pending = False
152
153
154
155 ################################################################################       
156 class exec_code:
157     def __init__(self, globalCtx, localCtx) :
158         self.exec_string = ""
159         self.exec_code = None
160         self.exec_code_valid = 1
161         self.indent_level = 0
162         self.indent_increment = 1
163         self.line = 0
164         self.localCtx = localCtx
165         self.globalCtx = globalCtx
166         
167
168     def append_stmt(self, stmt) :
169         self.exec_code_valid = 0
170         self.line += 1
171         for x in range(0,self.indent_level):
172             self.exec_string = self.exec_string + ' '            
173         self.exec_string = self.exec_string + stmt + '\n'
174
175     def indent(self) :
176         self.indent_level = self.indent_level + self.indent_increment
177
178     def dedent(self) :
179         self.indent_level = self.indent_level - self.indent_increment
180     
181     # compile : Compile exec_string into exec_code using the builtin
182     # compile function. Skip if already in sync.
183     def compile(self) :
184         if not self.exec_code_valid :
185             self.exec_code = compile(self.exec_string, "<string>", "exec")
186         self.exec_code_valid = True
187
188     def execute(self) :
189         if not self.exec_code_valid :
190             self.compile()
191         exec(self.exec_code, self.globalCtx, self.localCtx)
192
193
194
195 ################################################################################       
196 class XmlCb_Parser: # The target object of the parser
197     maxDepth = 0
198     depth = 0
199     def __init__(self, dynDBusClass):
200         self.dynDBusClass = dynDBusClass
201         
202     def start(self, tag, attrib):   # Called for each opening tag.
203         if (tag == 'node'):
204             return
205         # Set interface name
206         if (tag == 'interface'):
207             self.dynDBusClass.set_interface(attrib['name'])
208             return
209         # Set method name
210         if (tag == 'method'):
211             self.current = tag
212             self.dynDBusClass.def_method(attrib['name'])
213             return
214         if (tag == 'signal'):
215             self.current = tag
216             self.dynDBusClass.def_signal(attrib['name'])
217             return
218
219         # Set signature (in/out & name) for method
220         if (tag == 'arg'):
221             if (self.current == 'method'):
222                 if (attrib.has_key('direction') == False):
223                     attrib['direction'] = "in"
224                 self.dynDBusClass.add_signature(attrib['name'],
225                                                 attrib['direction'],
226                                                 attrib['type'])
227                 return
228             if (self.current == 'signal'):
229                 self.dynDBusClass.add_signature(attrib['name'], 'in',
230                                                 attrib['type'])
231                 return
232     def end(self, tag):             # Called for each closing tag.
233         if (tag == 'method'):
234             self.dynDBusClass.add_dbus_method()
235             self.dynDBusClass.add_body_method()
236             self.dynDBusClass.end_method()
237         if (tag == 'signal'):
238             self.dynDBusClass.add_dbus_signal()
239             self.dynDBusClass.add_body_signal()
240             self.dynDBusClass.end_method()
241            
242     def data(self, data):
243         pass            # We do not need to do anything with data.
244     def close(self):    # Called when all data has been parsed.
245         return self.maxDepth
246
247
248        
249 ################################################################################       
250 class dynDBusClass():
251     def __init__(self, className, globalCtx, localCtx):
252         self.className = className
253         self.xmlCB = XmlCb_Parser(self)
254         self.signature = {}
255         self.class_code = exec_code(globalCtx, localCtx)  
256         self.class_code.indent_increment = 4
257         self.class_code.append_stmt("import dbus")
258         self.class_code.append_stmt("\n")
259         self.class_code.append_stmt("class " + self.className + "(dbus.service.Object):")
260         self.class_code.indent()
261         
262         ## Overload of __init__ method 
263         self.def_method("__init__")
264         self.add_method("bus, callback=None, objName='/sample', busName='org.cloudeebus'")
265         self.add_stmt("self.bus = bus")
266         self.add_stmt("self.objName = objName")
267         self.add_stmt("self.callback = callback")        
268         self.add_stmt("dbus.service.Object.__init__(self, conn=bus, bus_name=busName)")
269         self.end_method()
270                
271         ## Create 'add_to_connection' method 
272         self.def_method("add_to_connection")
273         self.add_method("connection=None, path=None")
274         self.add_stmt("dbus.service.Object.add_to_connection(self, connection=self.bus, path=self.objName)")
275         self.end_method()
276                
277         ## Create 'remove_from_connection' method 
278         self.def_method("remove_from_connection")
279         self.add_method("connection=None, path=None")
280         self.add_stmt("dbus.service.Object.remove_from_connection(self, connection=None, path=self.objName)")
281         self.end_method()
282                
283     def createDBusServiceFromXML(self, xml):
284         self.parser = XMLParser(target=self.xmlCB)
285         self.parser.feed(xml)
286         self.parser.close()
287     
288     def set_interface(self, ifName):
289         self.ifName = ifName
290         
291     def def_method(self, methodName):
292         self.methodToAdd = methodName
293         self.signalToAdd = None
294         self.args_str = str()
295         self.signature = {}
296         self.signature['name'] = str()
297         self.signature['in'] = str()                
298         self.signature['out'] = str()                        
299
300     def def_signal(self, signalName):
301         self.methodToAdd = None
302         self.signalToAdd = signalName
303         self.args_str = str()
304         self.signature = {}
305         self.signature['name'] = str()
306         self.signature['in'] = str()                
307         self.signature['out'] = str()                        
308
309     def add_signature(self, name, direction, signature):
310         if (direction == 'in'):
311             self.signature['in'] += signature
312             if (self.signature['name'] != str()):
313                 self.signature['name'] += ", "
314             self.signature['name'] += name
315         if (direction == 'out'):
316             self.signature['out'] = signature                        
317         
318     def add_method(self, args = None, async_success_cb = None, async_err_cb = None):
319         async_cb_str = str()
320         if (self.methodToAdd != None):
321             name = self.methodToAdd
322         else:
323             name = self.signalToAdd
324         if (args != None):
325             self.args_str = args
326         if (async_success_cb != None):
327             async_cb_str = async_success_cb
328         if (async_err_cb != None):
329             if (async_cb_str != str()):
330                 async_cb_str += ", "
331             async_cb_str += async_err_cb
332                         
333         parameters = self.args_str
334         if (async_cb_str != str()):
335             if (parameters != str()):
336                 parameters += ", "
337             parameters +=async_cb_str       
338         
339         if (parameters != str()):
340             self.class_code.append_stmt("def " + name + "(self, %s):" % parameters)               
341         else:
342             self.class_code.append_stmt("def " + name + "(self):")
343         self.class_code.indent()
344         
345     def end_method(self):
346         self.class_code.append_stmt("\n")
347         self.class_code.dedent()
348         
349     def add_dbus_method(self):
350         decorator = '@dbus.service.method("' + self.ifName + '"'
351         if (self.signature.has_key('in') and self.signature['in'] != str()):
352                 decorator += ", in_signature='" + self.signature['in'] + "'"
353         if (self.signature.has_key('out') and self.signature['out'] != str()):
354                 decorator += ", out_signature='" + self.signature['out'] + "'"
355         decorator += ", async_callbacks=('dbus_async_cb', 'dbus_async_err_cb')"            
356         decorator += ")"
357         self.class_code.append_stmt(decorator)
358         if (self.signature.has_key('name') and self.signature['name'] != str()):
359             self.add_method(self.signature['name'], async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
360         else:
361             self.add_method(async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
362
363     def add_dbus_signal(self):
364         decorator = '@dbus.service.signal("' + self.ifName + '"'
365         if (self.signature.has_key('in') and self.signature['in'] != str()):
366                 decorator += ", signature='" + self.signature['in'] + "'"
367         decorator += ")"            
368         self.class_code.append_stmt(decorator)
369         if (self.signature.has_key('name') and self.signature['name'] != str()):
370             self.add_method(self.signature['name'])
371         else:
372             self.add_method()
373
374     def add_body_method(self):
375         if (self.methodToAdd != None):
376             if (self.args_str != str()):
377                 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', dbus_async_cb, dbus_async_err_cb, %s)" % self.args_str)
378             else:        
379                 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', dbus_async_cb, dbus_async_err_cb)")
380
381     def add_body_signal(self):
382         self.class_code.append_stmt("return") ## TODO: Remove and fix with code ad hoc
383         self.class_code.append_stmt("\n")
384
385     def add_stmt(self, stmt) :
386         self.class_code.append_stmt(stmt)
387         
388     def declare(self) :
389         self.class_code.execute()
390
391
392
393 ###############################################################################
394 class CloudeebusService:
395     '''
396     support for sending DBus messages and registering for DBus signals
397     '''
398     def __init__(self, permissions):
399         self.permissions = permissions;
400         self.proxyObjects = {}
401         self.proxyMethods = {}
402         self.pendingCalls = []
403         self.dynDBusClasses = {} # DBus class source code generated dynamically (a list because one by classname)
404         self.services = {}  # DBus service created
405         self.serviceAgents = {} # Instantiated DBus class previously generated dynamically, for now, one by classname
406         self.servicePendingCalls = {} # JS methods called (and waiting for a Success/error response), containing 'methodId', (successCB, errorCB)
407         self.localCtx = locals()
408         self.globalCtx = globals()
409
410
411     def proxyObject(self, busName, serviceName, objectName):
412         '''
413         object hash id as busName#serviceName#objectName
414         '''
415         id = "#".join([busName, serviceName, objectName])
416         if not self.proxyObjects.has_key(id):
417             if not OPENDOOR:
418                 # check permissions, array.index throws exception
419                 self.permissions.index(serviceName)
420             bus = cache.dbusConnexion(busName)
421             self.proxyObjects[id] = bus.get_object(serviceName, objectName)
422         return self.proxyObjects[id]
423
424
425     def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
426         '''
427         method hash id as busName#serviceName#objectName#interfaceName#methodName
428         '''
429         id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
430         if not self.proxyMethods.has_key(id):
431             obj = self.proxyObject(busName, serviceName, objectName)
432             self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
433         return self.proxyMethods[id]
434
435
436     @exportRpc
437     def dbusRegister(self, list):
438         '''
439         arguments: bus, sender, object, interface, signal
440         '''
441         if len(list) < 5:
442             raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
443         
444         if not OPENDOOR:
445             # check permissions, array.index throws exception
446             self.permissions.index(list[1])
447         
448         # check if a handler exists
449         sigId = "#".join(list)
450         if cache.signalHandlers.has_key(sigId):
451             return sigId
452         
453         # create a handler that will publish the signal
454         dbusSignalHandler = DbusSignalHandler(*list)
455         cache.signalHandlers[sigId] = dbusSignalHandler
456         
457         return dbusSignalHandler.id
458
459
460     @exportRpc
461     def dbusSend(self, list):
462         '''
463         arguments: bus, destination, object, interface, message, [args]
464         '''
465         # clear pending calls
466         for call in self.pendingCalls:
467             if not call.pending:
468                 self.pendingCalls.remove(call)
469         
470         if len(list) < 5:
471             raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
472         
473         # parse JSON arg list
474         args = []
475         if len(list) == 6:
476             args = json.loads(list[5])
477         
478         # get dbus proxy method
479         method = self.proxyMethod(*list[0:5])
480         
481         # use a deferred call handler to manage dbus results
482         dbusCallHandler = DbusCallHandler(method, args)
483         self.pendingCalls.append(dbusCallHandler)
484         return dbusCallHandler.callMethod()
485
486
487     @exportRpc
488     def returnMethod(self, list):
489         '''
490         arguments: methodId, success (=true, error otherwise), result (to return)
491         '''
492         methodId = list[0]
493         success = list[1]
494         result = list[2]
495         if (self.servicePendingCalls.has_key(methodId)):
496             cb = self.servicePendingCalls[methodId]
497             if (success):                
498                 successCB = cb["successCB"]
499                 if (result != None):
500                     successCB(result)
501                 else:
502                     successCB()                    
503             else:     
504                 errorCB = cb["errorCB"]        
505                 if (result != None):
506                     errorCB(result)
507                 else:
508                     errorCB()
509             self.servicePendingCalls[methodId] = None
510         else:
511             raise Exception("No methodID " + methodId)
512
513     def jsonEncodeTupleKeyDict(self, data):
514         ndict = dict()
515         # creates new dictionary with the original tuple converted to json string
516         dataLen = len(data)
517         string = ""
518         for index in range(dataLen):
519             for key in data[index]:
520                 value = data[index][key]
521                 print "key=" + key
522                 print "value=" + str(value)
523                 nkey = str(key)
524                 nvalue = ""
525                 print "JSON key=" + nkey
526                 if (isinstance(value, dbus.Array)):
527                     # Searching dbus byte in array...
528                     ValueLen = len(value)
529                     nvalue = []
530                     for indexValue in range(ValueLen):
531                         a = value[indexValue]
532                         if (isinstance(a, dbus.Byte)):
533                             a = int(value[indexValue])
534                             nvalue.append(a)
535                         else:
536                             nvalue = str(value[indexValue])
537                             
538                 print "JSON value=" + str(nvalue)                
539                 ndict[nkey] =  nvalue
540
541         return ndict
542
543     def srvCB(self, name, async_succes_cb, async_error_cb, *args):
544         methodId = self.srvName + "#" + self.agentObjectPath + "#" + name
545         cb = { 'successCB': async_succes_cb, 
546                'errorCB': async_error_cb}
547         self.servicePendingCalls[methodId] = cb
548
549         if (len(args) > 0):
550             print "Received args=%s" % (str(args))
551         else:                     
552             print "No args received"
553             
554         try:               
555             print "factory.dispatch(methodId=%s, args=%s)" % (methodId, json.dumps(args))                     
556             factory.dispatch(methodId, json.dumps(args))
557             return
558         except Exception, e :
559             print "Error=%s" % (str(e))
560             
561         print "Trying to decode dbus.Dictionnary..."
562         try:
563             params = self.jsonEncodeTupleKeyDict(args)                
564             print "factory.dispatch(methodId=%s, args=%s)" % (methodId, params)                     
565             factory.dispatch(methodId, params)
566             return
567         except Exception, e :
568             print "Error=%s" % (str(e))
569                     
570         print "Trying to pass args as string..."
571         try:               
572             print "factory.dispatch(methodId=%s, args=%s)" % (methodId, str(args))                     
573             factory.dispatch(methodId, str(args))
574             return
575         except Exception, e :
576             print "Error=%s" % (str(e))
577                     
578     @exportRpc
579     def serviceAdd(self, list):
580         '''
581         arguments: busName, srvName
582         '''
583         busName = list[0]
584         self.bus =  cache.dbusConnexion( busName['name'] )
585         self.srvName = list[1]
586         if (self.services.has_key(self.srvName) == False):            
587             self.services[self.srvName] = dbus.service.BusName(name = self.srvName, bus = self.bus)
588         return self.srvName
589
590     @exportRpc
591     def serviceRelease(self, list):
592         '''
593         arguments: busName, srvName
594         '''
595         self.srvName = list[0]
596         if (self.services.has_key(self.srvName) == True):
597             self.services.pop(self.srvName)
598             return self.srvName
599         else:
600             raise Exception(self.srvName + " do not exist")
601                    
602     @exportRpc
603     def serviceAddAgent(self, list):
604         '''
605         arguments: objectPath, xmlTemplate
606         '''
607         self.agentObjectPath = list[0]
608         xmlTemplate = list[1]
609         self.className = re.sub('/', '_', self.agentObjectPath[1:])
610         if (self.dynDBusClasses.has_key(self.className) == False):
611             self.dynDBusClasses[self.className] = dynDBusClass(self.className, self.globalCtx, self.localCtx)
612             self.dynDBusClasses[self.className].createDBusServiceFromXML(xmlTemplate)
613             self.dynDBusClasses[self.className].declare()
614
615         ## Class already exist, instanciate it if not already instanciated
616         if (self.serviceAgents.has_key(self.className) == False):
617             self.serviceAgents[self.className] = eval(self.className + "(self.bus, callback=self.srvCB, objName=self.agentObjectPath, busName=self.srvName)", self.globalCtx, self.localCtx)
618             
619         self.serviceAgents[self.className].add_to_connection()
620         return (self.agentObjectPath)
621                     
622     @exportRpc
623     def serviceDelAgent(self, list):
624         '''
625         arguments: objectPath, xmlTemplate
626         '''
627         agentObjectPath = list[0]
628         className = re.sub('/', '_', agentObjectPath[1:])
629
630         if (self.serviceAgents.has_key(className)):
631             self.serviceAgents[self.className].remove_from_connection()
632             self.serviceAgents.pop(self.className)
633         else:
634             raise Exception(agentObjectPath + " doesn't exist!")
635         
636         return (agentObjectPath)
637                     
638     @exportRpc
639     def getVersion(self):
640         '''
641         return current version string
642         '''
643         return VERSION
644
645
646
647 ###############################################################################
648 class CloudeebusServerProtocol(WampCraServerProtocol):
649     '''
650     connexion and session authentication management
651     '''
652     
653     def onSessionOpen(self):
654         # CRA authentication options
655         self.clientAuthTimeout = 0
656         self.clientAuthAllowAnonymous = OPENDOOR
657         # CRA authentication init
658         WampCraServerProtocol.onSessionOpen(self)
659     
660     
661     def getAuthPermissions(self, key, extra):
662         return json.loads(extra.get("permissions", "[]"))
663     
664     
665     def getAuthSecret(self, key):
666         secret = CREDENTIALS.get(key, None)
667         if secret is None:
668             return None
669         # secret must be of str type to be hashed
670         return secret.encode('utf-8')
671     
672
673     def onAuthenticated(self, key, permissions):
674         if not OPENDOOR:
675             # check authentication key
676             if key is None:
677                 raise Exception("Authentication failed")
678             # check permissions, array.index throws exception
679             for req in permissions:
680                 WHITELIST.index(req)
681         # create cloudeebus service instance
682         self.cloudeebusService = CloudeebusService(permissions)
683         # register it for RPC
684         self.registerForRpc(self.cloudeebusService)
685         # register for Publish / Subscribe
686         self.registerForPubSub("", True)
687     
688     
689     def connectionLost(self, reason):
690         WampCraServerProtocol.connectionLost(self, reason)
691         if factory.getConnectionCount() == 0:
692             cache.reset()
693
694
695
696 ###############################################################################
697
698 if __name__ == '__main__':
699     
700     cache = DbusCache()
701
702     parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
703     parser.add_argument('-v', '--version', action='store_true', 
704         help='print version and exit')
705     parser.add_argument('-d', '--debug', action='store_true', 
706         help='log debug info on standard output')
707     parser.add_argument('-o', '--opendoor', action='store_true',
708         help='allow anonymous access to all services')
709     parser.add_argument('-p', '--port', default='9000',
710         help='port number')
711     parser.add_argument('-c', '--credentials',
712         help='path to credentials file')
713     parser.add_argument('-w', '--whitelist',
714         help='path to whitelist file')
715     
716     args = parser.parse_args(sys.argv[1:])
717
718     if args.version:
719         print("Cloudeebus version " + VERSION)
720         exit(0)
721     
722     if args.debug:
723         log.startLogging(sys.stdout)
724     
725     OPENDOOR = args.opendoor
726     
727     if args.credentials:
728         jfile = open(args.credentials)
729         CREDENTIALS = json.load(jfile)
730         jfile.close()
731     
732     if args.whitelist:
733         jfile = open(args.whitelist)
734         WHITELIST = json.load(jfile)
735         jfile.close()
736     
737     uri = "ws://localhost:" + args.port
738     
739     factory = WampServerFactory(uri, debugWamp = args.debug)
740     factory.protocol = CloudeebusServerProtocol
741     factory.setProtocolOptions(allowHixie76 = True)
742     
743     listenWS(factory)
744     
745     DBusGMainLoop(set_as_default=True)
746     
747     reactor.run()