0f6e54320674f89f2c49c28e973375d564e7bbfd
[contrib/cloudeebus.git] / cloudeebus / cloudeebus.py
1 ###############################################################################
2 # Copyright 2012 Intel Corporation.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 ###############################################################################
16
17
18 import sys, dbus, json
19
20 from twisted.internet import glib2reactor
21 # Configure the twisted mainloop to be run inside the glib mainloop.
22 # This must be done before importing the other twisted modules
23 glib2reactor.install()
24 from twisted.internet import reactor, defer
25
26 from autobahn.websocket import listenWS
27 from autobahn.wamp import exportRpc, WampServerFactory, WampServerProtocol
28
29 from dbus.mainloop.glib import DBusGMainLoop
30
31 import gobject
32 gobject.threads_init()
33
34 from dbus import glib
35 glib.init_threads()
36
37 # enable debug log
38 from twisted.python import log
39 log.startLogging(sys.stdout)
40
41
42
43 ###############################################################################
44 def hashId(list):
45         str = list[0]
46         for item in list[1:len(list)]:
47                 str += "#" + item
48         return str
49
50
51
52 ###############################################################################
53 class DbusSignalHandler:
54         def __init__(self, bus, object, senderName, objectName, interfaceName, signalName):
55                 # publish hash id
56                 self.id = hashId([senderName, objectName, interfaceName, signalName])
57                 # connect dbus proxy object to signal
58                 object.connect_to_signal(signalName, self.handleSignal, interfaceName)
59
60
61         def handleSignal(self, *args):
62                 # publish dbus args under topic hash id
63                 factory.dispatch(self.id, json.dumps(args))
64
65
66 ###############################################################################
67 class DbusCallHandler:
68         def __init__(self, method, args):
69                 # deferred reply to return dbus results
70                 self.pending = False
71                 self.request = defer.Deferred()
72                 self.method = method
73                 self.args = args
74
75
76         def callMethod(self):
77                 # dbus method async call
78                 self.pending = True
79                 self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
80                 return self.request
81
82
83         def dbusSuccess(self, *result):
84                 # return JSON string result array
85                 self.request.callback(json.dumps(result))
86                 self.pending = False
87
88
89         def dbusError(self, error):
90                 # return dbus error message
91                 self.request.errback(error.get_dbus_message())
92                 self.pending = False
93
94
95
96 ###############################################################################
97 class CloudeebusService:
98         def __init__(self):
99                 # dbus connexions
100                 self.dbusConnexions = {}
101                 # proxy objects
102                 self.proxyObjects = {}
103                 # proxy methods
104                 self.proxyMethods = {}
105                 # signal handlers
106                 self.signalHandlers = {}
107                 # pending dbus calls
108                 self.pendingCalls = []
109
110
111         def dbusConnexion(self, busName):
112                 if not self.dbusConnexions.has_key(busName):
113                         if busName == "session":
114                                 self.dbusConnexions[busName] = dbus.SessionBus()
115                         elif busName == "system":
116                                 self.dbusConnexions[busName] = dbus.SystemBus()
117                         else:
118                                 raise Exception("Error: invalid bus: %s" % busName)
119                 return self.dbusConnexions[busName]
120
121
122         def proxyObject(self, bus, serviceName, objectName):
123                 id = hashId([serviceName, objectName])
124                 if not self.proxyObjects.has_key(id):
125                         self.proxyObjects[id] = bus.get_object(serviceName, objectName)
126                 return self.proxyObjects[id]
127
128
129         def proxyMethod(self, bus, serviceName, objectName, interfaceName, methodName):
130                 id = hashId([serviceName, objectName, interfaceName, methodName])
131                 if not self.proxyMethods.has_key(id):
132                         obj = self.proxyObject(bus, serviceName, objectName)
133                         self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
134                 return self.proxyMethods[id]
135
136
137         @exportRpc
138         def dbusRegister(self, list):
139                 # read arguments list by position
140                 if len(list) < 5:
141                         raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
142                 
143                 # check if a handler exists
144                 sigId = hashId(list[1:5])
145                 if self.signalHandlers.has_key(sigId):
146                         return sigId
147                 
148                 # get dbus connexion
149                 bus = self.dbusConnexion(list[0])
150                 
151                 # get dbus proxy
152                 object = self.proxyObject(bus, list[1], list[2])
153                 
154                 # create a handler that will publish the signal
155                 dbusSignalHandler = DbusSignalHandler(bus, object, *list[1:5])
156                 self.signalHandlers[sigId] = dbusSignalHandler
157                 
158                 return dbusSignalHandler.id
159
160
161         @exportRpc
162         def dbusSend(self, list):
163                 # clear pending calls
164                 for call in self.pendingCalls:
165                         if not call.pending:
166                                 self.pendingCalls.remove(call)
167                 
168                 # read arguments list by position
169                 if len(list) < 5:
170                         raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
171                 
172                 # get dbus connexion
173                 bus = self.dbusConnexion(list[0])
174                 
175                 # parse JSON arg list
176                 args = []
177                 if len(list) == 6:
178                         args = json.loads(list[5])
179                 
180                 # get dbus proxy
181                 method = self.proxyMethod(bus, *list[1:5])
182                 
183                 # use a deferred call handler to manage dbus results
184                 dbusCallHandler = DbusCallHandler(method, args)
185                 self.pendingCalls.append(dbusCallHandler)
186                 return dbusCallHandler.callMethod()
187
188
189
190 ###############################################################################
191 class CloudeebusServerProtocol(WampServerProtocol):
192         def onSessionOpen(self):
193                 # create cloudeebus service instance
194                 self.cloudeebusService = CloudeebusService()
195                 # register it for RPC
196                 self.registerForRpc(self.cloudeebusService)
197                 # register for Publish / Subscribe
198                 self.registerForPubSub("", True)
199
200
201
202 ###############################################################################
203 if __name__ == '__main__':
204         port = "9000"
205         if len(sys.argv) == 2:
206                 port = sys.argv[1]
207
208         uri = "ws://localhost:" + port
209
210         factory = WampServerFactory(uri, debugWamp = True)
211         factory.protocol = CloudeebusServerProtocol
212         factory.setProtocolOptions(allowHixie76 = True)
213
214         listenWS(factory)
215
216         DBusGMainLoop(set_as_default=True)
217
218         reactor.run()