f7588a3f27b80137a06f800e5bc7510351bb408f
[contrib/cloudeebus.git] / cloudeebus / cloudeebus.py
1 #!/usr/bin/env python
2
3 # Cloudeebus
4 #
5 # Copyright 2012 Intel Corporation.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 #
22
23
24 import argparse, dbus, json, sys
25
26 from twisted.internet import glib2reactor
27 # Configure the twisted mainloop to be run inside the glib mainloop.
28 # This must be done before importing the other twisted modules
29 glib2reactor.install()
30 from twisted.internet import reactor, defer
31
32 from autobahn.websocket import listenWS
33 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
34
35 from dbus.mainloop.glib import DBusGMainLoop
36
37 import gobject
38 import re
39 import dbus.service
40 gobject.threads_init()
41
42 from dbus import glib
43 glib.init_threads()
44
45 # enable debug log
46 from twisted.python import log
47
48 # XML parser module
49 from xml.etree.ElementTree import XMLParser
50
51 # For debug only
52 import os
53
54 ###############################################################################
55
56 VERSION = "0.2.1"
57 OPENDOOR = False
58 CREDENTIALS = {}
59 WHITELIST = []
60
61 ###############################################################################
62 class DbusCache:
63     '''
64     Global cache of DBus connexions and signal handlers
65     '''
66     def __init__(self):
67         self.dbusConnexions = {}
68         self.signalHandlers = {}
69
70
71     def reset(self):
72         '''
73         Disconnect signal handlers before resetting cache.
74         '''
75         self.dbusConnexions = {}
76         # disconnect signal handlers
77         for key in self.signalHandlers:
78             self.signalHandlers[key].disconnect()
79         self.signalHandlers = {}
80
81
82     def dbusConnexion(self, busName):
83         if not self.dbusConnexions.has_key(busName):
84             if busName == "session":
85                 self.dbusConnexions[busName] = dbus.SessionBus()
86             elif busName == "system":
87                 self.dbusConnexions[busName] = dbus.SystemBus()
88             else:
89                 raise Exception("Error: invalid bus: %s" % busName)
90         return self.dbusConnexions[busName]
91
92
93
94 ###############################################################################
95 class DbusSignalHandler:
96     '''
97     signal hash id as busName#senderName#objectName#interfaceName#signalName
98     '''
99     def __init__(self, busName, senderName, objectName, interfaceName, signalName):
100         self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
101         # connect handler to signal
102         self.bus = cache.dbusConnexion(busName)
103         self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
104         
105     
106     def disconnect(self):
107         names = self.id.split("#")
108         self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
109
110
111     def handleSignal(self, *args):
112         '''
113         publish dbus args under topic hash id
114         '''
115         factory.dispatch(self.id, json.dumps(args))
116
117
118
119 ###############################################################################
120 class DbusCallHandler:
121     '''
122     deferred reply to return dbus results
123     '''
124     def __init__(self, method, args):
125         self.pending = False
126         self.request = defer.Deferred()
127         self.method = method
128         self.args = args
129
130
131     def callMethod(self):
132         '''
133         dbus method async call
134         '''
135         self.pending = True
136         self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
137         return self.request
138
139
140     def dbusSuccess(self, *result):
141         '''
142         return JSON string result array
143         '''
144         self.request.callback(json.dumps(result))
145         self.pending = False
146
147
148     def dbusError(self, error):
149         '''
150         return dbus error message
151         '''
152         self.request.errback(error.get_dbus_message())
153         self.pending = False
154
155
156
157 ################################################################################       
158 class exec_code:
159     def __init__(self) :
160         self.exec_string = ""
161         self.exec_code = None
162         self.exec_code_valid = 1
163         self.indent_level = 0
164         self.indent_increment = 1
165         self.line = 0
166
167     # __str__ : Return a string representation of the object, for
168     # nice printing.
169     def __str__(self) :
170         return self.exec_string
171
172     def p(self) :
173         print str(self)
174
175     def append_stmt(self, stmt) :
176         self.exec_code_valid = 0
177         self.line += 1
178         if (stmt != "\n"):
179             for x in range(0,self.indent_level):
180                 self.exec_string = self.exec_string + ' '            
181             self.exec_string = self.exec_string + stmt + "\t\t# l:" + str(self.line) + '\n'
182         else:
183             if (stmt == "\n"):
184                 self.exec_string = self.exec_string + "# l:" + str(self.line) + '\n'
185             else:
186                 self.exec_string = self.exec_string + stmt + "\t\t# l:" + str(self.line) + '\n'
187
188     def indent(self) :
189         self.indent_level = self.indent_level + self.indent_increment
190
191     def dedent(self) :
192         self.indent_level = self.indent_level - self.indent_increment
193     
194     # compile : Compile exec_string into exec_code using the builtin
195     # compile function. Skip if already in sync.
196     def compile(self) :
197         if not self.exec_code_valid :
198             self.exec_code = compile(self.exec_string, "<string>", "exec")
199         self.exec_code_valid = 1
200
201     def execute(self) :
202         if not self.exec_code_valid :
203             self.compile()
204         exec self.exec_code
205
206
207
208 ################################################################################       
209 class XmlCb_Parser: # The target object of the parser
210     maxDepth = 0
211     depth = 0
212     def __init__(self, dynDBusClass):
213         self.dynDBusClass = dynDBusClass
214         
215     def start(self, tag, attrib):   # Called for each opening tag.
216         if (tag == 'node'):
217             return
218         # Set interface name
219         if (tag == 'interface'):
220             self.dynDBusClass.set_interface(attrib['name'])
221             return
222         # Set method name
223         if (tag == 'method'):
224             self.current = tag
225             self.dynDBusClass.def_method(attrib['name'])
226             return
227         if (tag == 'signal'):
228             self.current = tag
229             self.dynDBusClass.def_signal(attrib['name'])
230             return
231
232         # Set signature (in/out & name) for method
233         if (tag == 'arg'):
234             if (self.current == 'method'):
235                 self.dynDBusClass.add_signature(attrib['name'],
236                                                 attrib['direction'],
237                                                 attrib['type'])
238                 return
239             if (self.current == 'signal'):
240                 self.dynDBusClass.add_signature(attrib['name'], 'in',
241                                                 attrib['type'])
242                 return
243     def end(self, tag):             # Called for each closing tag.
244         if (tag == 'method'):
245             self.dynDBusClass.add_dbus_method()
246             self.dynDBusClass.add_body_method()
247             self.dynDBusClass.end_method()
248         if (tag == 'signal'):
249             self.dynDBusClass.add_dbus_signal()
250             self.dynDBusClass.add_body_signal()
251             self.dynDBusClass.end_method()
252            
253     def data(self, data):
254         pass            # We do not need to do anything with data.
255     def close(self):    # Called when all data has been parsed.
256         return self.maxDepth
257
258
259        
260 ################################################################################       
261 class dynDBusClass():
262     def __init__(self, className, globalCtx, localCtx):
263         self.className = className
264         self.xmlCB = XmlCb_Parser(self)
265         self.localCtx = localCtx
266         self.globalCtx = globalCtx        
267         self.signature = {}
268         self.class_code = exec_code()  
269         self.class_code.indent_increment = 4
270         self.class_code.append_stmt("import dbus")
271         self.class_code.append_stmt("\n")
272         self.class_code.append_stmt("\n")
273         self.class_code.append_stmt("class " + self.className + "(dbus.service.Object):")
274         self.class_code.indent()
275         
276         ## Overload of __init__ method 
277         self.def_method("__init__")
278         self.add_method("bus, callback=None, objName='/sample', busName='org.cloudeebus'")
279         self.add_stmt("self.bus = bus")
280         self.add_stmt("self.objName = objName")
281         self.add_stmt("self.callback = callback")        
282         self.add_stmt("dbus.service.Object.__init__(self, conn=bus, object_path=objName, bus_name=busName)")
283         self.end_method()
284                
285     def createDBusServiceFromXML(self, xml):
286         self.parser = XMLParser(target=self.xmlCB)
287         self.parser.feed(xml)
288         self.parser.close()
289     
290     def set_interface(self, ifName):
291         self.ifName = ifName
292         
293     def def_method(self, methodName):
294         self.methodToAdd = methodName
295         self.signalToAdd = None
296         self.args_str = str()
297         self.signature = {}
298         self.signature['name'] = str()
299         self.signature['in'] = str()                
300         self.signature['out'] = str()                        
301
302     def def_signal(self, signalName):
303         self.methodToAdd = None
304         self.signalToAdd = signalName
305         self.args_str = str()
306         self.signature = {}
307         self.signature['name'] = str()
308         self.signature['in'] = str()                
309         self.signature['out'] = str()                        
310
311     def add_signature(self, name, direction, signature):
312         if (direction == 'in'):
313             self.signature['in'] += signature
314             if (self.signature['name'] != str()):
315                 self.signature['name'] += ", "
316             self.signature['name'] += name
317         if (direction == 'out'):
318             self.signature['out'] = signature                        
319         
320     def add_method(self, args = None, async_success_cb = None, async_err_cb = None):
321         async_cb_str = str()
322         if (self.methodToAdd != None):
323             name = self.methodToAdd
324         else:
325             name = self.signalToAdd
326         if (args != None):
327             self.args_str = args
328         if (async_success_cb != None):
329             async_cb_str = async_success_cb
330         if (async_err_cb != None):
331             if (async_cb_str != str()):
332                 async_cb_str += ", "
333             async_cb_str += async_err_cb
334                         
335         parameters = self.args_str
336         if (async_cb_str != str()):
337             if (parameters != str()):
338                 parameters += ", "
339             parameters +=async_cb_str       
340         
341         if (parameters != str()):
342             self.class_code.append_stmt("def " + name + "(self, %s):" % parameters)               
343         else:
344             self.class_code.append_stmt("def " + name + "(self):")
345         self.class_code.indent()
346         
347     def end_method(self):
348         self.class_code.append_stmt("\n")
349         self.class_code.append_stmt("\n")        
350         self.class_code.dedent()
351         
352     def add_dbus_method(self):
353         decorator = '@dbus.service.method("' + self.ifName + '"'
354         if (self.signature.has_key('in') and self.signature['in'] != str()):
355                 decorator += ", in_signature='" + self.signature['in'] + "'"
356         if (self.signature.has_key('out') and self.signature['out'] != str()):
357                 decorator += ", out_signature='" + self.signature['out'] + "'"
358         decorator += ", async_callbacks=('dbus_async_cb', 'dbus_async_err_cb')"            
359         decorator += ")"
360         self.class_code.append_stmt(decorator)
361         if (self.signature.has_key('name') and self.signature['name'] != str()):
362             self.add_method(self.signature['name'], async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
363         else:
364             self.add_method(async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
365
366     def add_dbus_signal(self):
367         decorator = '@dbus.service.signal("' + self.ifName + '"'
368         if (self.signature.has_key('in') and self.signature['in'] != str()):
369                 decorator += ", signature='" + self.signature['in'] + "'"
370         decorator += ")"            
371         self.class_code.append_stmt(decorator)
372         if (self.signature.has_key('name') and self.signature['name'] != str()):
373             self.add_method(self.signature['name'])
374         else:
375             self.add_method()
376
377     def add_body_method(self):
378         if (self.methodToAdd != None):
379             self.class_code.append_stmt("print 'In " + self.methodToAdd + "()'")
380             if (self.args_str != str()):
381                 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', dbus_async_cb, dbus_async_err_cb, %s)" % self.args_str)
382             else:        
383                 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', dbus_async_cb, dbus_async_err_cb)")
384
385     def add_body_signal(self):
386         self.class_code.append_stmt("return") ## TODO: Remove and fix with code ad hoc
387         self.class_code.append_stmt("\n")
388
389     def add_stmt(self, stmt) :
390         self.class_code.append_stmt(stmt)
391         
392     def declare(self) :
393         self.class_code.compile()
394         exec(self.class_code.exec_string, self.globalCtx, self.localCtx)
395      
396     def __str__(self) :
397         return self.class_code.exec_string
398
399     # p : Since it is often useful to be able to look at the code
400     # that is generated interactively, this function provides
401     # a shorthand for "print str(some_exec_code_instance)", which
402     # gives a reasonable nice look at the contents of the
403     # exec_code object.
404     def p(self) :
405         print str(self)
406
407
408
409 ###############################################################################
410 class CloudeebusService:
411     '''
412     support for sending DBus messages and registering for DBus signals
413     '''
414     def __init__(self, permissions):
415         self.permissions = permissions;
416         self.proxyObjects = {}
417         self.proxyMethods = {}
418         self.pendingCalls = []
419         self.dynDBusClasses = {} # DBus class source code generated dynamically (a list because one by classname)
420         self.services = {}  # DBus service created
421         self.serviceAgents = {} # Instantiated DBus class previously generated dynamically, for now, one by classname
422         self.servicePendingCalls = {} # JS methods called (and waiting for a Success/error response), containing 'methodId', (successCB, errorCB)
423
424
425     def proxyObject(self, busName, serviceName, objectName):
426         '''
427         object hash id as busName#serviceName#objectName
428         '''
429         id = "#".join([busName, serviceName, objectName])
430         if not self.proxyObjects.has_key(id):
431             if not OPENDOOR:
432                 # check permissions, array.index throws exception
433                 self.permissions.index(serviceName)
434             bus = cache.dbusConnexion(busName)
435             self.proxyObjects[id] = bus.get_object(serviceName, objectName)
436         return self.proxyObjects[id]
437
438
439     def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
440         '''
441         method hash id as busName#serviceName#objectName#interfaceName#methodName
442         '''
443         id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
444         if not self.proxyMethods.has_key(id):
445             obj = self.proxyObject(busName, serviceName, objectName)
446             self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
447         return self.proxyMethods[id]
448
449
450     @exportRpc
451     def dbusRegister(self, list):
452         '''
453         arguments: bus, sender, object, interface, signal
454         '''
455         if len(list) < 5:
456             raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
457         
458         if not OPENDOOR:
459             # check permissions, array.index throws exception
460             self.permissions.index(list[1])
461         
462         # check if a handler exists
463         sigId = "#".join(list)
464         if cache.signalHandlers.has_key(sigId):
465             return sigId
466         
467         # create a handler that will publish the signal
468         dbusSignalHandler = DbusSignalHandler(*list)
469         cache.signalHandlers[sigId] = dbusSignalHandler
470         
471         return dbusSignalHandler.id
472
473
474     @exportRpc
475     def dbusSend(self, list):
476         '''
477         arguments: bus, destination, object, interface, message, [args]
478         '''
479         # clear pending calls
480         for call in self.pendingCalls:
481             if not call.pending:
482                 self.pendingCalls.remove(call)
483         
484         if len(list) < 5:
485             raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
486         
487         # parse JSON arg list
488         args = []
489         if len(list) == 6:
490             args = json.loads(list[5])
491         
492         # get dbus proxy method
493         method = self.proxyMethod(*list[0:5])
494         
495         # use a deferred call handler to manage dbus results
496         dbusCallHandler = DbusCallHandler(method, args)
497         self.pendingCalls.append(dbusCallHandler)
498         return dbusCallHandler.callMethod()
499
500
501     @exportRpc
502     def returnMethod(self, list):
503         '''
504         arguments: methodId, success (=true, error otherwise), result (to return)
505         '''
506         methodId = list[0]
507         success = list[1]
508         result = list[2]
509         if (self.servicePendingCalls.has_key(methodId)):
510             cb = self.servicePendingCalls[methodId]
511             if (success):                
512                 successCB = cb["successCB"]
513                 if (result != None):
514                     successCB(result)
515                 else:
516                     successCB()                    
517             else:     
518                 errorCB = cb["errorCB"]        
519                 if (result != None):
520                     errorCB(result)
521                 else:
522                     errorCB()                    
523         
524
525     def srvCB(self, name, async_succes_cb, async_error_cb, *args):
526         print "self.srvCB(name='%s', args=%s')\n\n" % (name, str(args))
527         methodId = self.srvName + "#" + self.agentObjectPath + "#" + name
528         cb = { 'successCB': async_succes_cb, 
529                'errorCB': async_error_cb}
530         self.servicePendingCalls[methodId] = cb
531         
532         print "factory.dispatch(methodId='%s', json.dumps(args)=%s')\n\n" % (methodId, json.dumps(args))
533         factory.dispatch(methodId, json.dumps(args))
534         
535     @exportRpc
536     def serviceAdd(self, list):
537         '''
538         arguments: busName, srvName
539         '''
540         busName = list[0]
541         self.bus =  cache.dbusConnexion( busName['name'] )
542         self.srvName = list[1]
543         if (self.services.has_key(self.srvName) == False):            
544             self.services[self.srvName] = dbus.service.BusName(name = self.srvName, bus = self.bus)
545         return self.srvName
546                     
547     @exportRpc
548     def serviceAddAgent(self, list):
549         '''
550         arguments: objectPath, xmlTemplate
551         '''
552         self.agentObjectPath = list[0]
553         xmlTemplate = list[1]
554         className = re.sub('/', '_', self.agentObjectPath[1:])
555         if (self.dynDBusClasses.has_key(className) == False):
556             self.dynDBusClasses[className] = dynDBusClass(className, globals(), locals())
557             self.dynDBusClasses[className].createDBusServiceFromXML(xmlTemplate)
558             
559             # For Debug only
560             if (1):
561                 if (1): ## Force deletion
562                     if os.access('./MyDbusClass.py', os.R_OK) == True:
563                         os.remove('./MyDbusClass.py')
564                     
565                     if os.access('./MyDbusClass.py', os.R_OK) == False:
566                         f = open('./MyDbusClass.py', 'w')
567                         f.write(self.dynDBusClasses[className].class_code.exec_string)
568                         f.close()
569 #                self.dynDBusClass[className].p()
570                 self.dynDBusClasses[className].declare()
571             
572             if (self.serviceAgents.has_key(className) == False):            
573                 exe_str = "self.serviceAgents[" + className +"] = " + className + "(self.bus, callback=self.srvCB, objName=self.agentObjectPath, busName=self.srvName)"
574                 exec (exe_str, globals(), locals())
575                     
576     @exportRpc
577     def getVersion(self):
578         '''
579         return current version string
580         '''
581         return VERSION
582
583
584
585 ###############################################################################
586 class CloudeebusServerProtocol(WampCraServerProtocol):
587     '''
588     connexion and session authentication management
589     '''
590     
591     def onSessionOpen(self):
592         # CRA authentication options
593         self.clientAuthTimeout = 0
594         self.clientAuthAllowAnonymous = OPENDOOR
595         # CRA authentication init
596         WampCraServerProtocol.onSessionOpen(self)
597     
598     
599     def getAuthPermissions(self, key, extra):
600         return json.loads(extra.get("permissions", "[]"))
601     
602     
603     def getAuthSecret(self, key):
604         secret = CREDENTIALS.get(key, None)
605         if secret is None:
606             return None
607         # secret must be of str type to be hashed
608         return secret.encode('utf-8')
609     
610
611     def onAuthenticated(self, key, permissions):
612         if not OPENDOOR:
613             # check authentication key
614             if key is None:
615                 raise Exception("Authentication failed")
616             # check permissions, array.index throws exception
617             for req in permissions:
618                 WHITELIST.index(req)
619         # create cloudeebus service instance
620         self.cloudeebusService = CloudeebusService(permissions)
621         # register it for RPC
622         self.registerForRpc(self.cloudeebusService)
623         # register for Publish / Subscribe
624         self.registerForPubSub("", True)
625     
626     
627     def connectionLost(self, reason):
628         WampCraServerProtocol.connectionLost(self, reason)
629         if factory.getConnectionCount() == 0:
630             cache.reset()
631
632
633
634 ###############################################################################
635
636 if __name__ == '__main__':
637     
638     cache = DbusCache()
639
640     parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
641     parser.add_argument('-v', '--version', action='store_true', 
642         help='print version and exit')
643     parser.add_argument('-d', '--debug', action='store_true', 
644         help='log debug info on standard output')
645     parser.add_argument('-o', '--opendoor', action='store_true',
646         help='allow anonymous access to all services')
647     parser.add_argument('-p', '--port', default='9000',
648         help='port number')
649     parser.add_argument('-c', '--credentials',
650         help='path to credentials file')
651     parser.add_argument('-w', '--whitelist',
652         help='path to whitelist file')
653     
654     args = parser.parse_args(sys.argv[1:])
655
656     if args.version:
657         print("Cloudeebus version " + VERSION)
658         exit(0)
659     
660     if args.debug:
661         log.startLogging(sys.stdout)
662     
663     OPENDOOR = args.opendoor
664     
665     if args.credentials:
666         jfile = open(args.credentials)
667         CREDENTIALS = json.load(jfile)
668         jfile.close()
669     
670     if args.whitelist:
671         jfile = open(args.whitelist)
672         WHITELIST = json.load(jfile)
673         jfile.close()
674     
675     uri = "ws://localhost:" + args.port
676     
677     factory = WampServerFactory(uri, debugWamp = args.debug)
678     factory.protocol = CloudeebusServerProtocol
679     factory.setProtocolOptions(allowHixie76 = True)
680     
681     listenWS(factory)
682     
683     DBusGMainLoop(set_as_default=True)
684     
685     reactor.run()