82cb354d06bab4cc854f489cf6e1a32f34835ae6
[contrib/cloudeebus.git] / cloudeebus / cloudeebus.py
1 #!/usr/bin/env python
2
3 # Cloudeebus
4 #
5 # Copyright 2012 Intel Corporation.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # Luc Yriarte <luc.yriarte@intel.com>
20 # Christophe Guiraud <christophe.guiraud@intel.com>
21 #
22
23
24 import argparse, dbus, json, sys
25
26 from twisted.internet import glib2reactor
27 # Configure the twisted mainloop to be run inside the glib mainloop.
28 # This must be done before importing the other twisted modules
29 glib2reactor.install()
30 from twisted.internet import reactor, defer
31
32 from autobahn.websocket import listenWS
33 from autobahn.wamp import exportRpc, WampServerFactory, WampCraServerProtocol
34
35 from dbus.mainloop.glib import DBusGMainLoop
36
37 import gobject
38 import re
39 import dbus.service
40 gobject.threads_init()
41
42 from dbus import glib
43 glib.init_threads()
44
45 # enable debug log
46 from twisted.python import log
47
48 # XML parser module
49 from xml.etree.ElementTree import XMLParser
50
51
52 ###############################################################################
53
54 VERSION = "0.3.0"
55 OPENDOOR = False
56 CREDENTIALS = {}
57 WHITELIST = []
58
59 ###############################################################################
60 class DbusCache:
61     '''
62     Global cache of DBus connexions and signal handlers
63     '''
64     def __init__(self):
65         self.dbusConnexions = {}
66         self.signalHandlers = {}
67
68
69     def reset(self):
70         '''
71         Disconnect signal handlers before resetting cache.
72         '''
73         self.dbusConnexions = {}
74         # disconnect signal handlers
75         for key in self.signalHandlers:
76             self.signalHandlers[key].disconnect()
77         self.signalHandlers = {}
78
79
80     def dbusConnexion(self, busName):
81         if not self.dbusConnexions.has_key(busName):
82             if busName == "session":
83                 self.dbusConnexions[busName] = dbus.SessionBus()
84             elif busName == "system":
85                 self.dbusConnexions[busName] = dbus.SystemBus()
86             else:
87                 raise Exception("Error: invalid bus: %s" % busName)
88         return self.dbusConnexions[busName]
89
90
91
92 ###############################################################################
93 class DbusSignalHandler:
94     '''
95     signal hash id as busName#senderName#objectName#interfaceName#signalName
96     '''
97     def __init__(self, busName, senderName, objectName, interfaceName, signalName):
98         self.id = "#".join([busName, senderName, objectName, interfaceName, signalName])
99         # connect handler to signal
100         self.bus = cache.dbusConnexion(busName)
101         self.bus.add_signal_receiver(self.handleSignal, signalName, interfaceName, senderName, objectName)
102         
103     
104     def disconnect(self):
105         names = self.id.split("#")
106         self.bus.remove_signal_receiver(self.handleSignal, names[4], names[3], names[1], names[2])
107
108
109     def handleSignal(self, *args):
110         '''
111         publish dbus args under topic hash id
112         '''
113         factory.dispatch(self.id, json.dumps(args))
114
115
116
117 ###############################################################################
118 class DbusCallHandler:
119     '''
120     deferred reply to return dbus results
121     '''
122     def __init__(self, method, args):
123         self.pending = False
124         self.request = defer.Deferred()
125         self.method = method
126         self.args = args
127
128
129     def callMethod(self):
130         '''
131         dbus method async call
132         '''
133         self.pending = True
134         self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
135         return self.request
136
137
138     def dbusSuccess(self, *result):
139         '''
140         return JSON string result array
141         '''
142         self.request.callback(json.dumps(result))
143         self.pending = False
144
145
146     def dbusError(self, error):
147         '''
148         return dbus error message
149         '''
150         self.request.errback(Exception(error.get_dbus_message()))
151         self.pending = False
152
153
154
155 ################################################################################       
156 class ExecCode:
157     '''
158     Execute DynDBusClass generated code
159     '''
160     def __init__(self, globalCtx, localCtx) :
161         self.exec_string = ""
162         self.exec_code = None
163         self.exec_code_valid = 1
164         self.indent_level = 0
165         self.indent_increment = 1
166         self.line = 0
167         self.localCtx = localCtx
168         self.globalCtx = globalCtx
169         
170
171     def append_stmt(self, stmt) :
172         self.exec_code_valid = 0
173         self.line += 1
174         for x in range(0,self.indent_level):
175             self.exec_string = self.exec_string + ' '            
176         self.exec_string = self.exec_string + stmt + '\n'
177
178     def indent(self) :
179         self.indent_level = self.indent_level + self.indent_increment
180
181     def dedent(self) :
182         self.indent_level = self.indent_level - self.indent_increment
183     
184     # compile : Compile exec_string into exec_code using the builtin
185     # compile function. Skip if already in sync.
186     def compile(self) :
187         if not self.exec_code_valid :
188             self.exec_code = compile(self.exec_string, "<string>", "exec")
189         self.exec_code_valid = True
190
191     def execute(self) :
192         if not self.exec_code_valid :
193             self.compile()
194         exec(self.exec_code, self.globalCtx, self.localCtx)
195
196
197
198 ################################################################################       
199 class XmlCbParser: # The target object of the parser
200     maxDepth = 0
201     depth = 0
202     def __init__(self, dynDBusClass):
203         self.dynDBusClass = dynDBusClass
204         
205     def start(self, tag, attrib):   # Called for each opening tag.
206         if (tag == 'node'):
207             return
208         # Set interface name
209         if (tag == 'interface'):
210             self.dynDBusClass.set_interface(attrib['name'])
211             return
212         # Set method name
213         if (tag == 'method'):
214             self.current = tag
215             self.dynDBusClass.def_method(attrib['name'])
216             return
217         if (tag == 'signal'):
218             self.current = tag
219             self.dynDBusClass.def_signal(attrib['name'])
220             return
221
222         # Set signature (in/out & name) for method
223         if (tag == 'arg'):
224             if (self.current == 'method'):
225                 if (attrib.has_key('direction') == False):
226                     attrib['direction'] = "in"
227                 self.dynDBusClass.add_signature(attrib['name'],
228                                                 attrib['direction'],
229                                                 attrib['type'])
230                 return
231             if (self.current == 'signal'):
232                 if (attrib.has_key('name') == False):
233                     attrib['name'] = 'value'
234                 self.dynDBusClass.add_signature(attrib['name'], 'in',
235                                                 attrib['type'])
236                 return
237     def end(self, tag):             # Called for each closing tag.
238         if (tag == 'method'):
239             self.dynDBusClass.add_dbus_method()
240             self.dynDBusClass.add_body_method()
241             self.dynDBusClass.end_method()
242         if (tag == 'signal'):
243             self.dynDBusClass.add_dbus_signal()
244             self.dynDBusClass.add_body_signal()
245             self.dynDBusClass.end_method()
246            
247     def data(self, data):
248         pass            # We do not need to do anything with data.
249     def close(self):    # Called when all data has been parsed.
250         return self.maxDepth
251
252
253        
254 ################################################################################       
255 class DynDBusClass():
256     def __init__(self, className, globalCtx, localCtx):
257         self.className = className
258         self.xmlCB = XmlCbParser(self)
259         self.signature = {}
260         self.class_code = ExecCode(globalCtx, localCtx)  
261         self.class_code.indent_increment = 4
262         self.class_code.append_stmt("import dbus")
263         self.class_code.append_stmt("\n")
264         self.class_code.append_stmt("class " + self.className + "(dbus.service.Object):")
265         self.class_code.indent()
266         
267         ## Overload of __init__ method 
268         self.def_method("__init__")
269         self.add_method("bus, callback=None, objName='/sample', busName='org.cloudeebus'")
270         self.add_stmt("self.bus = bus")
271         self.add_stmt("self.objName = objName")
272         self.add_stmt("self.callback = callback")        
273         self.add_stmt("dbus.service.Object.__init__(self, conn=bus, bus_name=busName)")
274         self.end_method()
275                
276         ## Create 'add_to_connection' method 
277         self.def_method("add_to_connection")
278         self.add_method("connection=None, path=None")
279         self.add_stmt("dbus.service.Object.add_to_connection(self, connection=self.bus, path=self.objName)")
280         self.end_method()
281                
282         ## Create 'remove_from_connection' method 
283         self.def_method("remove_from_connection")
284         self.add_method("connection=None, path=None")
285         self.add_stmt("dbus.service.Object.remove_from_connection(self, connection=None, path=self.objName)")
286         self.end_method()
287                
288     def createDBusServiceFromXML(self, xml):
289         self.parser = XMLParser(target=self.xmlCB)
290         self.parser.feed(xml)
291         self.parser.close()
292     
293     def set_interface(self, ifName):
294         self.ifName = ifName
295         
296     def def_method(self, methodName):
297         self.methodToAdd = methodName
298         self.signalToAdd = None
299         self.args_str = str()
300         self.signature = {}
301         self.signature['name'] = str()
302         self.signature['in'] = str()                
303         self.signature['out'] = str()                        
304
305     def def_signal(self, signalName):
306         self.methodToAdd = None
307         self.signalToAdd = signalName
308         self.args_str = str()
309         self.signature = {}
310         self.signature['name'] = str()
311         self.signature['in'] = str()                
312         self.signature['out'] = str()                        
313
314     def add_signature(self, name, direction, signature):
315         if (direction == 'in'):
316             self.signature['in'] += signature
317             if (self.signature['name'] != str()):
318                 self.signature['name'] += ", "
319             self.signature['name'] += name
320         if (direction == 'out'):
321             self.signature['out'] = signature                        
322         
323     def add_method(self, args = None, async_success_cb = None, async_err_cb = None):
324         async_cb_str = str()
325         if (self.methodToAdd != None):
326             name = self.methodToAdd
327         else:
328             name = self.signalToAdd
329         if (args != None):
330             self.args_str = args
331         if (async_success_cb != None):
332             async_cb_str = async_success_cb
333         if (async_err_cb != None):
334             if (async_cb_str != str()):
335                 async_cb_str += ", "
336             async_cb_str += async_err_cb
337                         
338         parameters = self.args_str
339         if (async_cb_str != str()):
340             if (parameters != str()):
341                 parameters += ", "
342             parameters +=async_cb_str       
343         
344         if (parameters != str()):
345             self.class_code.append_stmt("def " + name + "(self, %s):" % parameters)               
346         else:
347             self.class_code.append_stmt("def " + name + "(self):")
348         self.class_code.indent()
349         
350     def end_method(self):
351         self.class_code.append_stmt("\n")
352         self.class_code.dedent()
353         
354     def add_dbus_method(self):
355         decorator = '@dbus.service.method("' + self.ifName + '"'
356         if (self.signature.has_key('in') and self.signature['in'] != str()):
357                 decorator += ", in_signature='" + self.signature['in'] + "'"
358         if (self.signature.has_key('out') and self.signature['out'] != str()):
359                 decorator += ", out_signature='" + self.signature['out'] + "'"
360         decorator += ", async_callbacks=('dbus_async_cb', 'dbus_async_err_cb')"            
361         decorator += ")"
362         self.class_code.append_stmt(decorator)
363         if (self.signature.has_key('name') and self.signature['name'] != str()):
364             self.add_method(self.signature['name'], async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
365         else:
366             self.add_method(async_success_cb='dbus_async_cb', async_err_cb='dbus_async_err_cb')
367
368     def add_dbus_signal(self):
369         decorator = '@dbus.service.signal("' + self.ifName + '"'
370         if (self.signature.has_key('in') and self.signature['in'] != str()):
371                 decorator += ", signature='" + self.signature['in'] + "'"
372         decorator += ")"            
373         self.class_code.append_stmt(decorator)
374         if (self.signature.has_key('name') and self.signature['name'] != str()):
375             self.add_method(self.signature['name'])
376         else:
377             self.add_method()
378
379     def add_body_method(self):
380         if (self.methodToAdd != None):
381             if (self.args_str != str()):
382                 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', '" + self.ifName + "', " + "dbus_async_cb, dbus_async_err_cb, %s)" % self.args_str)
383             else:        
384                 self.class_code.append_stmt("self.callback('" + self.methodToAdd + "', '" + self.ifName + "', " + "dbus_async_cb, dbus_async_err_cb)")
385
386     def add_body_signal(self):
387         self.class_code.append_stmt("return") ## TODO: Remove and fix with code ad hoc
388         self.class_code.append_stmt("\n")
389
390     def add_stmt(self, stmt) :
391         self.class_code.append_stmt(stmt)
392         
393     def declare(self) :
394         self.class_code.execute()
395
396
397
398 ###############################################################################
399 class CloudeebusService:
400     '''
401     support for sending DBus messages and registering for DBus signals
402     '''
403     def __init__(self, permissions):
404         self.permissions = permissions;
405         self.proxyObjects = {}
406         self.proxyMethods = {}
407         self.pendingCalls = []
408         self.dynDBusClasses = {} # DBus class source code generated dynamically (a list because one by classname)
409         self.services = {}  # DBus service created
410         self.serviceAgents = {} # Instantiated DBus class previously generated dynamically, for now, one by classname
411         self.servicePendingCalls = {} # JS methods called (and waiting for a Success/error response), containing 'methodId', (successCB, errorCB)
412         self.localCtx = locals()
413         self.globalCtx = globals()
414
415
416     def proxyObject(self, busName, serviceName, objectName):
417         '''
418         object hash id as busName#serviceName#objectName
419         '''
420         id = "#".join([busName, serviceName, objectName])
421         if not self.proxyObjects.has_key(id):
422             if not OPENDOOR:
423                 # check permissions, array.index throws exception
424                 self.permissions.index(serviceName)
425             bus = cache.dbusConnexion(busName)
426             self.proxyObjects[id] = bus.get_object(serviceName, objectName)
427         return self.proxyObjects[id]
428
429
430     def proxyMethod(self, busName, serviceName, objectName, interfaceName, methodName):
431         '''
432         method hash id as busName#serviceName#objectName#interfaceName#methodName
433         '''
434         id = "#".join([busName, serviceName, objectName, interfaceName, methodName])
435         if not self.proxyMethods.has_key(id):
436             obj = self.proxyObject(busName, serviceName, objectName)
437             self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
438         return self.proxyMethods[id]
439
440
441     @exportRpc
442     def dbusRegister(self, list):
443         '''
444         arguments: bus, sender, object, interface, signal
445         '''
446         if len(list) < 5:
447             raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
448         
449         if not OPENDOOR:
450             # check permissions, array.index throws exception
451             self.permissions.index(list[1])
452         
453         # check if a handler exists
454         sigId = "#".join(list)
455         if cache.signalHandlers.has_key(sigId):
456             return sigId
457         
458         # create a handler that will publish the signal
459         dbusSignalHandler = DbusSignalHandler(*list)
460         cache.signalHandlers[sigId] = dbusSignalHandler
461         
462         return dbusSignalHandler.id
463
464
465     @exportRpc
466     def dbusSend(self, list):
467         '''
468         arguments: bus, destination, object, interface, message, [args]
469         '''
470         # clear pending calls
471         for call in self.pendingCalls:
472             if not call.pending:
473                 self.pendingCalls.remove(call)
474         
475         if len(list) < 5:
476             raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
477         
478         # parse JSON arg list
479         args = []
480         if len(list) == 6:
481             args = json.loads(list[5])
482         
483         # get dbus proxy method
484         method = self.proxyMethod(*list[0:5])
485         
486         # use a deferred call handler to manage dbus results
487         dbusCallHandler = DbusCallHandler(method, args)
488         self.pendingCalls.append(dbusCallHandler)
489         return dbusCallHandler.callMethod()
490
491
492     @exportRpc
493     def emitSignal(self, list):
494         '''
495         arguments: agentObjectPath, signalName, result (to emit)
496         '''
497         objectPath = list[0]
498         className = re.sub('/', '_', objectPath[1:])
499         signalName = list[1]
500         result = list[2]
501         if (self.serviceAgents.has_key(className) == True):
502             exe_str = "self.serviceAgents['"+ className +"']."+ signalName + "(" + str(result) + ")"
503             eval(exe_str, self.globalCtx, self.localCtx)
504         else:
505             raise Exception("No object path " + objectPath)
506
507     @exportRpc
508     def returnMethod(self, list):
509         '''
510         arguments: methodId, callIndex, success (=true, error otherwise), result (to return)
511         '''
512         methodId = list[0]
513         callIndex = list[1]
514         success = list[2]
515         result = list[3]
516         if (self.servicePendingCalls.has_key(methodId)):
517             cb = self.servicePendingCalls[methodId]['calls'][callIndex]
518             if cb is None:
519                 raise Exception("No pending call " + str(callIndex) + " for methodID " + methodId)
520             if (success):                
521                 successCB = cb["successCB"]
522                 if (result != None):
523                     successCB(result)
524                 else:
525                     successCB()                    
526             else:     
527                 errorCB = cb["errorCB"]        
528                 if (result != None):
529                     errorCB(result)
530                 else:
531                     errorCB()
532             self.servicePendingCalls[methodId]['calls'][callIndex] = None
533             self.servicePendingCalls[methodId]['count'] = self.servicePendingCalls[methodId]['count'] - 1
534             if self.servicePendingCalls[methodId]['count'] == 0:
535                 del self.servicePendingCalls[methodId]
536         else:
537             raise Exception("No methodID " + methodId)
538
539     def srvCB(self, name, ifName, async_succes_cb, async_error_cb, *args):
540         methodId = self.srvName + "#" + self.agentObjectPath + "#" + ifName + "#" + name
541         cb = { 'successCB': async_succes_cb, 
542                'errorCB': async_error_cb}
543         if methodId not in self.servicePendingCalls:
544             self.servicePendingCalls[methodId] = {'count': 0, 'calls': []}
545         pendingCallStr = json.dumps({'callIndex': len(self.servicePendingCalls[methodId]['calls']), 'args': args})
546         self.servicePendingCalls[methodId]['calls'].append(cb)
547         self.servicePendingCalls[methodId]['count'] = self.servicePendingCalls[methodId]['count'] + 1
548         factory.dispatch(methodId, pendingCallStr)
549                     
550     @exportRpc
551     def serviceAdd(self, list):
552         '''
553         arguments: busName, srvName
554         '''
555         busName = list[0]
556         self.bus =  cache.dbusConnexion( busName['name'] )
557         self.srvName = list[1]
558         if (self.services.has_key(self.srvName) == False):            
559             self.services[self.srvName] = dbus.service.BusName(name = self.srvName, bus = self.bus)
560         return self.srvName
561
562     @exportRpc
563     def serviceRelease(self, list):
564         '''
565         arguments: busName, srvName
566         '''
567         self.srvName = list[0]
568         if (self.services.has_key(self.srvName) == True):
569             self.services.pop(self.srvName)
570             return self.srvName
571         else:
572             raise Exception(self.srvName + " do not exist")
573                    
574     @exportRpc
575     def serviceAddAgent(self, list):
576         '''
577         arguments: objectPath, xmlTemplate
578         '''
579         self.agentObjectPath = list[0]
580         xmlTemplate = list[1]
581         self.className = re.sub('/', '_', self.agentObjectPath[1:])
582         if (self.dynDBusClasses.has_key(self.className) == False):
583             self.dynDBusClasses[self.className] = DynDBusClass(self.className, self.globalCtx, self.localCtx)
584             self.dynDBusClasses[self.className].createDBusServiceFromXML(xmlTemplate)
585             self.dynDBusClasses[self.className].declare()
586
587         ## Class already exist, instanciate it if not already instanciated
588         if (self.serviceAgents.has_key(self.className) == False):
589             self.serviceAgents[self.className] = eval(self.className + "(self.bus, callback=self.srvCB, objName=self.agentObjectPath, busName=self.srvName)", self.globalCtx, self.localCtx)
590             
591         self.serviceAgents[self.className].add_to_connection()
592         return (self.agentObjectPath)
593                     
594     @exportRpc
595     def serviceDelAgent(self, list):
596         '''
597         arguments: objectPath, xmlTemplate
598         '''
599         agentObjectPath = list[0]
600         className = re.sub('/', '_', agentObjectPath[1:])
601
602         if (self.serviceAgents.has_key(className)):
603             self.serviceAgents[self.className].remove_from_connection()
604             self.serviceAgents.pop(self.className)
605         else:
606             raise Exception(agentObjectPath + " doesn't exist!")
607         
608         return (agentObjectPath)
609                     
610     @exportRpc
611     def getVersion(self):
612         '''
613         return current version string
614         '''
615         return VERSION
616
617
618
619 ###############################################################################
620 class CloudeebusServerProtocol(WampCraServerProtocol):
621     '''
622     connexion and session authentication management
623     '''
624     
625     def onSessionOpen(self):
626         # CRA authentication options
627         self.clientAuthTimeout = 0
628         self.clientAuthAllowAnonymous = OPENDOOR
629         # CRA authentication init
630         WampCraServerProtocol.onSessionOpen(self)
631     
632     
633     def getAuthPermissions(self, key, extra):
634         return json.loads(extra.get("permissions", "[]"))
635     
636     
637     def getAuthSecret(self, key):
638         secret = CREDENTIALS.get(key, None)
639         if secret is None:
640             return None
641         # secret must be of str type to be hashed
642         return secret.encode('utf-8')
643     
644
645     def onAuthenticated(self, key, permissions):
646         if not OPENDOOR:
647             # check authentication key
648             if key is None:
649                 raise Exception("Authentication failed")
650             # check permissions, array.index throws exception
651             for req in permissions:
652                 WHITELIST.index(req)
653         # create cloudeebus service instance
654         self.cloudeebusService = CloudeebusService(permissions)
655         # register it for RPC
656         self.registerForRpc(self.cloudeebusService)
657         # register for Publish / Subscribe
658         self.registerForPubSub("", True)
659     
660     
661     def connectionLost(self, reason):
662         WampCraServerProtocol.connectionLost(self, reason)
663         if factory.getConnectionCount() == 0:
664             cache.reset()
665
666
667
668 ###############################################################################
669
670 if __name__ == '__main__':
671     
672     cache = DbusCache()
673
674     parser = argparse.ArgumentParser(description='Javascript DBus bridge.')
675     parser.add_argument('-v', '--version', action='store_true', 
676         help='print version and exit')
677     parser.add_argument('-d', '--debug', action='store_true', 
678         help='log debug info on standard output')
679     parser.add_argument('-o', '--opendoor', action='store_true',
680         help='allow anonymous access to all services')
681     parser.add_argument('-p', '--port', default='9000',
682         help='port number')
683     parser.add_argument('-c', '--credentials',
684         help='path to credentials file')
685     parser.add_argument('-w', '--whitelist',
686         help='path to whitelist file')
687     
688     args = parser.parse_args(sys.argv[1:])
689
690     if args.version:
691         print("Cloudeebus version " + VERSION)
692         exit(0)
693     
694     if args.debug:
695         log.startLogging(sys.stdout)
696     
697     OPENDOOR = args.opendoor
698     
699     if args.credentials:
700         jfile = open(args.credentials)
701         CREDENTIALS = json.load(jfile)
702         jfile.close()
703     
704     if args.whitelist:
705         jfile = open(args.whitelist)
706         WHITELIST = json.load(jfile)
707         jfile.close()
708     
709     uri = "ws://localhost:" + args.port
710     
711     factory = WampServerFactory(uri, debugWamp = args.debug)
712     factory.protocol = CloudeebusServerProtocol
713     factory.setProtocolOptions(allowHixie76 = True)
714     
715     listenWS(factory)
716     
717     DBusGMainLoop(set_as_default=True)
718     
719     reactor.run()