c95eb9b1aa37dbc363f1ce41c76784f1785a4944
[contrib/cloudeebus.git] / cloudeebus / cloudeebus.py
1 #!/usr/bin/env python
2
3 # Cloudeebus
4 #
5 # Copyright 2012 Intel Corporation.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 #
22
23
24 import argparse, dbus, json, sys
25
26 from twisted.internet import glib2reactor
27 # Configure the twisted mainloop to be run inside the glib mainloop.
28 # This must be done before importing the other twisted modules
29 glib2reactor.install()
30 from twisted.internet import reactor, defer
31
32 from autobahn.websocket import listenWS
33 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
34
35 from dbus.mainloop.glib import DBusGMainLoop
36
37 import gobject
38 import re
39 import dbus.service
40 gobject.threads_init()
41
42 from dbus import glib
43 glib.init_threads()
44
45 # enable debug log
46 from twisted.python import log
47
48 # XML parser module
49 from xml.etree.ElementTree import XMLParser
50
51 # For debug only
52 import os
53
54 ###############################################################################
55
56 VERSION = "0.2.1"
57 OPENDOOR = False
58 CREDENTIALS = {}
59 WHITELIST = []
60
61 ###############################################################################
62 class DbusCache:
63     '''
64     Global cache of DBus connexions and signal handlers
65     '''
66     def __init__(self):
67         self.dbusConnexions = {}
68         self.signalHandlers = {}
69
70
71     def reset(self):
72         '''
73         Disconnect signal handlers before resetting cache.
74         '''
75         self.dbusConnexions = {}
76         # disconnect signal handlers
77         for key in self.signalHandlers:
78             self.signalHandlers[key].disconnect()
79         self.signalHandlers = {}
80
81
82     def dbusConnexion(self, busName):
83         if not self.dbusConnexions.has_key(busName):
84             if busName == "session":
85                 self.dbusConnexions[busName] = dbus.SessionBus()
86             elif busName == "system":
87                 self.dbusConnexions[busName] = dbus.SystemBus()
88             else:
89                 raise Exception("Error: invalid bus: %s" % busName)
90         return self.dbusConnexions[busName]
91
92
93
94 ###############################################################################
95 class DbusSignalHandler:
96     '''
97     signal hash id as busName#senderName#objectName#interfaceName#signalName
98     '''
99     def __init__(self, busName, senderName, objectName, interfaceName, signalName):
100         self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
101         # connect handler to signal
102         self.bus = cache.dbusConnexion(busName)
103         self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
104         
105     
106     def disconnect(self):
107         names = self.id.split("#")
108         self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
109
110
111     def handleSignal(self, *args):
112         '''
113         publish dbus args under topic hash id
114         '''
115         factory.dispatch(self.id, json.dumps(args))
116
117
118
119 ###############################################################################
120 class DbusCallHandler:
121     '''
122     deferred reply to return dbus results
123     '''
124     def __init__(self, method, args):
125         self.pending = False
126         self.request = defer.Deferred()
127         self.method = method
128         self.args = args
129
130
131     def callMethod(self):
132         '''
133         dbus method async call
134         '''
135         self.pending = True
136         self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
137         return self.request
138
139
140     def dbusSuccess(self, *result):
141         '''
142         return JSON string result array
143         '''
144         self.request.callback(json.dumps(result))
145         self.pending = False
146
147
148     def dbusError(self, error):
149         '''
150         return dbus error message
151         '''
152         self.request.errback(Exception(error.get_dbus_message()))
153         self.pending = False
154
155
156
157 ################################################################################       
158 class exec_code:
159     def __init__(self) :
160         self.exec_string = ""
161         self.exec_code = None
162         self.exec_code_valid = 1
163         self.indent_level = 0
164         self.indent_increment = 1
165         self.line = 0
166
167     # __str__ : Return a string representation of the object, for
168     # nice printing.
169     def __str__(self) :
170         return self.exec_string
171
172     def p(self) :
173         print str(self)
174
175     def append_stmt(self, stmt) :
176         self.exec_code_valid = 0
177         self.line += 1
178         if (stmt != "\n"):
179             for x in range(0,self.indent_level):
180                 self.exec_string = self.exec_string + ' '            
181             self.exec_string = self.exec_string + stmt + "\t\t# l:" + str(self.line) + '\n'
182         else:
183             if (stmt == "\n"):
184                 self.exec_string = self.exec_string + "# l:" + str(self.line) + '\n'
185             else:
186                 self.exec_string = self.exec_string + stmt + "\t\t# l:" + str(self.line) + '\n'
187
188     def indent(self) :
189         self.indent_level = self.indent_level + self.indent_increment
190
191     def dedent(self) :
192         self.indent_level = self.indent_level - self.indent_increment
193     
194     # compile : Compile exec_string into exec_code using the builtin
195     # compile function. Skip if already in sync.
196     def compile(self) :
197         if not self.exec_code_valid :
198             self.exec_code = compile(self.exec_string, "<string>", "exec")
199         self.exec_code_valid = 1
200
201     def execute(self) :
202         if not self.exec_code_valid :
203             self.compile()
204         exec self.exec_code
205
206
207
208 ################################################################################       
209 class XmlCb_Parser: # The target object of the parser
210     maxDepth = 0
211     depth = 0
212     def __init__(self, dynDBusClass):
213         self.dynDBusClass = dynDBusClass
214         
215     def start(self, tag, attrib):   # Called for each opening tag.
216         if (tag == 'node'):
217             return
218         # Set interface name
219         if (tag == 'interface'):
220             self.dynDBusClass.set_interface(attrib['name'])
221             return
222         # Set method name
223         if (tag == 'method'):
224             self.current = tag
225             self.dynDBusClass.def_method(attrib['name'])
226             return
227         if (tag == 'signal'):
228             self.current = tag
229             self.dynDBusClass.def_signal(attrib['name'])
230             return
231
232         # Set signature (in/out & name) for method
233         if (tag == 'arg'):
234             if (self.current == 'method'):
235                 self.dynDBusClass.add_signature(attrib['name'],
236                                                 attrib['direction'],
237                                                 attrib['type'])
238                 return
239             if (self.current == 'signal'):
240                 self.dynDBusClass.add_signature(attrib['name'], 'in',
241                                                 attrib['type'])
242                 return
243     def end(self, tag):             # Called for each closing tag.
244         if (tag == 'method'):
245             self.dynDBusClass.add_dbus_method()
246             self.dynDBusClass.add_body_method()
247             self.dynDBusClass.end_method()
248         if (tag == 'signal'):
249             self.dynDBusClass.add_dbus_signal()
250             self.dynDBusClass.add_body_signal()
251             self.dynDBusClass.end_method()
252            
253     def data(self, data):
254         pass            # We do not need to do anything with data.
255     def close(self):    # Called when all data has been parsed.
256         return self.maxDepth
257
258
259        
260 ################################################################################       
261 class dynDBusClass():
262     def __init__(self, className, globalCtx, localCtx):
263         self.className = className
264         self.xmlCB = XmlCb_Parser(self)
265         self.localCtx = localCtx
266         self.globalCtx = globalCtx        
267         self.signature = {}
268         self.class_code = exec_code()  
269         self.class_code.indent_increment = 4
270         self.class_code.append_stmt("import dbus")
271         self.class_code.append_stmt("\n")
272         self.class_code.append_stmt("\n")
273         self.class_code.append_stmt("class " + self.className + "(dbus.service.Object):")
274         self.class_code.indent()
275         
276         ## Overload of __init__ method 
277         self.def_method("__init__")
278         self.add_method("bus, callback=None, objName='/sample', busName='org.cloudeebus'")
279         self.add_stmt("self.bus = bus")
280         self.add_stmt("self.objName = objName")
281         self.add_stmt("self.callback = callback")        
282         self.add_stmt("dbus.service.Object.__init__(self, conn=bus, object_path=objName, bus_name=busName)")
283         self.end_method()
284                
285     def createDBusServiceFromXML(self, xml):
286         self.parser = XMLParser(target=self.xmlCB)
287         self.parser.feed(xml)
288         self.parser.close()
289     
290     def set_interface(self, ifName):
291         self.ifName = ifName
292         
293     def def_method(self, methodName):
294         self.methodToAdd = methodName
295         self.signalToAdd = None
296         self.args_str = str()
297         self.signature = {}
298         self.signature['name'] = str()
299         self.signature['in'] = str()                
300         self.signature['out'] = str()                        
301
302     def def_signal(self, signalName):
303         self.methodToAdd = None
304         self.signalToAdd = signalName
305         self.args_str = str()
306         self.signature = {}
307         self.signature['name'] = str()
308         self.signature['in'] = str()                
309         self.signature['out'] = str()                        
310
311     def add_signature(self, name, direction, signature):
312         if (direction == 'in'):
313             self.signature['in'] += signature
314             if (self.signature['name'] != str()):
315                 self.signature['name'] += ", "
316             self.signature['name'] += name
317         if (direction == 'out'):
318             self.signature['out'] = signature                        
319         
320     def add_method(self, args = None, async_success_cb = None, async_err_cb = None):
321         async_cb_str = str()
322         if (self.methodToAdd != None):
323             name = self.methodToAdd
324         else:
325             name = self.signalToAdd
326         if (args != None):
327             self.args_str = args
328         if (async_success_cb != None):
329             async_cb_str = async_success_cb
330         if (async_err_cb != None):
331             if (async_cb_str != str()):
332                 async_cb_str += ", "
333             async_cb_str += async_err_cb
334                         
335         parameters = self.args_str
336         if (async_cb_str != str()):
337             if (parameters != str()):
338                 parameters += ", "
339             parameters +=async_cb_str       
340         
341         if (parameters != str()):
342             self.class_code.append_stmt("def " + name + "(self, %s):" % parameters)               
343         else:
344             self.class_code.append_stmt("def " + name + "(self):")
345         self.class_code.indent()
346         
347     def end_method(self):
348         self.class_code.append_stmt("\n")
349         self.class_code.append_stmt("\n")        
350         self.class_code.dedent()
351         
352     def add_dbus_method(self):
353         decorator = '@dbus.service.method("' + self.ifName + '"'
354         if (self.signature.has_key('in') and self.signature['in'] != str()):
355                 decorator += ", in_signature='" + self.signature['in'] + "'"
356         if (self.signature.has_key('out') and self.signature['out'] != str()):
357                 decorator += ", out_signature='" + self.signature['out'] + "'"
358         decorator += ", async_callbacks=('dbus_async_cb', 'dbus_async_err_cb')"            
359         decorator += ")"
360         self.class_code.append_stmt(decorator)
361         if (self.signature.has_key('name') and self.signature['name'] != str()):
362             self.add_method(self.signature['name'], async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
363         else:
364             self.add_method(async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
365
366     def add_dbus_signal(self):
367         decorator = '@dbus.service.signal("' + self.ifName + '"'
368         if (self.signature.has_key('in') and self.signature['in'] != str()):
369                 decorator += ", signature='" + self.signature['in'] + "'"
370         decorator += ")"            
371         self.class_code.append_stmt(decorator)
372         if (self.signature.has_key('name') and self.signature['name'] != str()):
373             self.add_method(self.signature['name'])
374         else:
375             self.add_method()
376
377     def add_body_method(self):
378         if (self.methodToAdd != None):
379             self.class_code.append_stmt("print 'In " + self.methodToAdd + "()'")
380             if (self.args_str != str()):
381                 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', dbus_async_cb, dbus_async_err_cb, %s)" % self.args_str)
382             else:        
383                 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', dbus_async_cb, dbus_async_err_cb)")
384
385     def add_body_signal(self):
386         self.class_code.append_stmt("return") ## TODO: Remove and fix with code ad hoc
387         self.class_code.append_stmt("\n")
388
389     def add_stmt(self, stmt) :
390         self.class_code.append_stmt(stmt)
391         
392     def declare(self) :
393         self.class_code.compile()
394         exec(self.class_code.exec_string, self.globalCtx, self.localCtx)
395      
396     def __str__(self) :
397         return self.class_code.exec_string
398
399     # p : Since it is often useful to be able to look at the code
400     # that is generated interactively, this function provides
401     # a shorthand for "print str(some_exec_code_instance)", which
402     # gives a reasonable nice look at the contents of the
403     # exec_code object.
404     def p(self) :
405         print str(self)
406
407
408
409 ###############################################################################
410 class CloudeebusService:
411     '''
412     support for sending DBus messages and registering for DBus signals
413     '''
414     def __init__(self, permissions):
415         self.permissions = permissions;
416         self.proxyObjects = {}
417         self.proxyMethods = {}
418         self.pendingCalls = []
419         self.dynDBusClasses = {} # DBus class source code generated dynamically (a list because one by classname)
420         self.services = {}  # DBus service created
421         self.serviceAgents = {} # Instantiated DBus class previously generated dynamically, for now, one by classname
422         self.servicePendingCalls = {} # JS methods called (and waiting for a Success/error response), containing 'methodId', (successCB, errorCB)
423
424
425     def proxyObject(self, busName, serviceName, objectName):
426         '''
427         object hash id as busName#serviceName#objectName
428         '''
429         id = "#".join([busName, serviceName, objectName])
430         if not self.proxyObjects.has_key(id):
431             if not OPENDOOR:
432                 # check permissions, array.index throws exception
433                 self.permissions.index(serviceName)
434             bus = cache.dbusConnexion(busName)
435             self.proxyObjects[id] = bus.get_object(serviceName, objectName)
436         return self.proxyObjects[id]
437
438
439     def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
440         '''
441         method hash id as busName#serviceName#objectName#interfaceName#methodName
442         '''
443         id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
444         if not self.proxyMethods.has_key(id):
445             obj = self.proxyObject(busName, serviceName, objectName)
446             self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
447         return self.proxyMethods[id]
448
449
450     @exportRpc
451     def dbusRegister(self, list):
452         '''
453         arguments: bus, sender, object, interface, signal
454         '''
455         if len(list) < 5:
456             raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
457         
458         if not OPENDOOR:
459             # check permissions, array.index throws exception
460             self.permissions.index(list[1])
461         
462         # check if a handler exists
463         sigId = "#".join(list)
464         if cache.signalHandlers.has_key(sigId):
465             return sigId
466         
467         # create a handler that will publish the signal
468         dbusSignalHandler = DbusSignalHandler(*list)
469         cache.signalHandlers[sigId] = dbusSignalHandler
470         
471         return dbusSignalHandler.id
472
473
474     @exportRpc
475     def dbusSend(self, list):
476         '''
477         arguments: bus, destination, object, interface, message, [args]
478         '''
479         # clear pending calls
480         for call in self.pendingCalls:
481             if not call.pending:
482                 self.pendingCalls.remove(call)
483         
484         if len(list) < 5:
485             raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
486         
487         # parse JSON arg list
488         args = []
489         if len(list) == 6:
490             args = json.loads(list[5])
491         
492         # get dbus proxy method
493         method = self.proxyMethod(*list[0:5])
494         
495         # use a deferred call handler to manage dbus results
496         dbusCallHandler = DbusCallHandler(method, args)
497         self.pendingCalls.append(dbusCallHandler)
498         return dbusCallHandler.callMethod()
499
500
501     @exportRpc
502     def returnMethod(self, list):
503         '''
504         arguments: methodId, success (=true, error otherwise), result (to return)
505         '''
506         methodId = list[0]
507         success = list[1]
508         result = list[2]
509         if (self.servicePendingCalls.has_key(methodId)):
510             cb = self.servicePendingCalls[methodId]
511             if (success):                
512                 successCB = cb["successCB"]
513                 if (result != None):
514                     successCB(result)
515                 else:
516                     successCB()                    
517             else:     
518                 errorCB = cb["errorCB"]        
519                 if (result != None):
520                     errorCB(result)
521                 else:
522                     errorCB()
523         else:
524             print "No methodID %s  !!" % (methodId)                     
525         
526
527     def srvCB(self, name, async_succes_cb, async_error_cb, *args):
528         methodId = self.srvName + "#" + self.agentObjectPath + "#" + name
529         cb = { 'successCB': async_succes_cb, 
530                'errorCB': async_error_cb}
531         self.servicePendingCalls[methodId] = cb       
532         factory.dispatch(methodId, json.dumps(args))
533         
534     @exportRpc
535     def serviceAdd(self, list):
536         '''
537         arguments: busName, srvName
538         '''
539         busName = list[0]
540         self.bus =  cache.dbusConnexion( busName['name'] )
541         self.srvName = list[1]
542         if (self.services.has_key(self.srvName) == False):            
543             self.services[self.srvName] = dbus.service.BusName(name = self.srvName, bus = self.bus)
544             return self.srvName
545
546     @exportRpc
547     def serviceRelease(self, list):
548         '''
549         arguments: busName, srvName
550         '''
551         busName = list[0]
552         self.bus =  cache.dbusConnexion( busName['name'] )
553         self.srvName = list[1]
554         if (self.services.has_key(self.srvName) == True):
555             exe_str = "self.services['" + self.srvName +"']"
556             exec (exe_str, globals(), locals())
557             return self.srvName
558         else:
559             raise Exception(self.srvName + " do not exist")
560                    
561     @exportRpc
562     def serviceAddAgent(self, list):
563         '''
564         arguments: objectPath, xmlTemplate
565         '''
566         self.agentObjectPath = list[0]
567         xmlTemplate = list[1]
568         self.className = re.sub('/', '_', self.agentObjectPath[1:])
569         if (self.dynDBusClasses.has_key(self.className) == False):
570             self.dynDBusClasses[self.className] = dynDBusClass(self.className, globals(), locals())
571             self.dynDBusClasses[self.className].createDBusServiceFromXML(xmlTemplate)
572             
573             # For Debug only
574             if (1):
575                 if (1): ## Force deletion
576                     if os.access('./MyDbusClass.py', os.R_OK) == True:
577                         os.remove('./MyDbusClass.py')
578                     
579                     if os.access('./MyDbusClass.py', os.R_OK) == False:
580                         f = open('./MyDbusClass.py', 'w')
581                         f.write(self.dynDBusClasses[self.className].class_code.exec_string)
582                         f.close()
583 #                self.dynDBusClass[className].p()
584                 self.dynDBusClasses[self.className].declare()
585             
586             if (self.serviceAgents.has_key(self.className) == False):            
587                 exe_str = "self.serviceAgents['" + self.className +"'] = " + self.className + "(self.bus, callback=self.srvCB, objName=self.agentObjectPath, busName=self.srvName)"
588                 exec (exe_str, globals(), locals())
589                 return (self.agentObjectPath)
590         else:
591             raise Exception(self.agentObjectPath + " already exist !!")
592                     
593     @exportRpc
594     def serviceDelAgent(self, list):
595         '''
596         arguments: objectPath, xmlTemplate
597         '''
598         agentObjectPath = list[0]
599         className = re.sub('/', '_', agentObjectPath[1:])
600
601         if (self.serviceAgents.has_key(className)):            
602             exe_str = "self.serviceAgents['" + className +"'] = None"
603             exec (exe_str, globals(), locals())
604             return (self.className)
605         else:
606             raise Exception(agentObjectPath + "doesn't exist!")
607                     
608     @exportRpc
609     def getVersion(self):
610         '''
611         return current version string
612         '''
613         return VERSION
614
615
616
617 ###############################################################################
618 class CloudeebusServerProtocol(WampCraServerProtocol):
619     '''
620     connexion and session authentication management
621     '''
622     
623     def onSessionOpen(self):
624         # CRA authentication options
625         self.clientAuthTimeout = 0
626         self.clientAuthAllowAnonymous = OPENDOOR
627         # CRA authentication init
628         WampCraServerProtocol.onSessionOpen(self)
629     
630     
631     def getAuthPermissions(self, key, extra):
632         return json.loads(extra.get("permissions", "[]"))
633     
634     
635     def getAuthSecret(self, key):
636         secret = CREDENTIALS.get(key, None)
637         if secret is None:
638             return None
639         # secret must be of str type to be hashed
640         return secret.encode('utf-8')
641     
642
643     def onAuthenticated(self, key, permissions):
644         if not OPENDOOR:
645             # check authentication key
646             if key is None:
647                 raise Exception("Authentication failed")
648             # check permissions, array.index throws exception
649             for req in permissions:
650                 WHITELIST.index(req)
651         # create cloudeebus service instance
652         self.cloudeebusService = CloudeebusService(permissions)
653         # register it for RPC
654         self.registerForRpc(self.cloudeebusService)
655         # register for Publish / Subscribe
656         self.registerForPubSub("", True)
657     
658     
659     def connectionLost(self, reason):
660         WampCraServerProtocol.connectionLost(self, reason)
661         if factory.getConnectionCount() == 0:
662             cache.reset()
663
664
665
666 ###############################################################################
667
668 if __name__ == '__main__':
669     
670     cache = DbusCache()
671
672     parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
673     parser.add_argument('-v', '--version', action='store_true', 
674         help='print version and exit')
675     parser.add_argument('-d', '--debug', action='store_true', 
676         help='log debug info on standard output')
677     parser.add_argument('-o', '--opendoor', action='store_true',
678         help='allow anonymous access to all services')
679     parser.add_argument('-p', '--port', default='9000',
680         help='port number')
681     parser.add_argument('-c', '--credentials',
682         help='path to credentials file')
683     parser.add_argument('-w', '--whitelist',
684         help='path to whitelist file')
685     
686     args = parser.parse_args(sys.argv[1:])
687
688     if args.version:
689         print("Cloudeebus version " + VERSION)
690         exit(0)
691     
692     if args.debug:
693         log.startLogging(sys.stdout)
694     
695     OPENDOOR = args.opendoor
696     
697     if args.credentials:
698         jfile = open(args.credentials)
699         CREDENTIALS = json.load(jfile)
700         jfile.close()
701     
702     if args.whitelist:
703         jfile = open(args.whitelist)
704         WHITELIST = json.load(jfile)
705         jfile.close()
706     
707     uri = "ws://localhost:" + args.port
708     
709     factory = WampServerFactory(uri, debugWamp = args.debug)
710     factory.protocol = CloudeebusServerProtocol
711     factory.setProtocolOptions(allowHixie76 = True)
712     
713     listenWS(factory)
714     
715     DBusGMainLoop(set_as_default=True)
716     
717     reactor.run()