cloudeebus server: add bus.list_names RPC
[contrib/cloudeebus.git] / cloudeebus / server.py
1 ###############################################################################
2 # Copyright 2012 Intel Corporation.
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 ###############################################################################
16
17
18 import sys, dbus, json
19
20 from twisted.internet import glib2reactor
21 # Configure the twisted mainloop to be run inside the glib mainloop.
22 # This must be done before importing the other twisted modules
23 glib2reactor.install()
24 from twisted.internet import reactor, defer
25
26 from autobahn.websocket import listenWS
27 from autobahn.wamp import exportRpc, WampServerFactory, WampServerProtocol
28
29 from dbus.mainloop.glib import DBusGMainLoop
30
31 import gobject
32 gobject.threads_init()
33
34 from dbus import glib
35 glib.init_threads()
36
37 # enable debug log
38 from twisted.python import log
39 log.startLogging(sys.stdout)
40
41
42
43 ###############################################################################
44 def hashId(list):
45         str = list[0]
46         for item in list[1:len(list)]:
47                 str += "#" + item
48         return str
49
50
51
52 ###############################################################################
53 class DbusSignalHandler:
54         def __init__(self, bus, object, senderName, objectName, interfaceName, signalName):
55                 # publish hash id
56                 self.id = hashId([senderName, objectName, interfaceName, signalName])
57         # connect dbus proxy object to signal
58                 object.connect_to_signal(signalName, self.handleSignal, interfaceName)
59
60
61         def handleSignal(self, *args):
62                 # publish dbus args under topic hash id
63                 factory.dispatch(self.id, json.dumps(args))
64
65
66 ###############################################################################
67 class DbusCallHandler:
68         def __init__(self, method, args):
69         # deferred reply to return dbus results
70                 self.pending = False
71                 self.request = defer.Deferred()
72                 self.method = method
73                 self.args = args
74
75
76         def callMethod(self):
77                 # dbus method async call
78                 self.pending = True
79                 self.method(*self.args, reply_handler=self.dbusSuccess, error_handler=self.dbusError)
80                 return self.request
81
82
83         def dbusSuccess(self, *result):
84                 # return JSON string result array
85                 self.request.callback(json.dumps(result))
86                 self.pending = False
87
88
89         def dbusError(self, error):
90                 # return dbus error message
91                 self.request.errback(error.get_dbus_message())
92                 self.pending = False
93
94
95
96 ###############################################################################
97 class CloudeebusService:
98     def __init__(self):
99         # dbus connexions
100         self.dbusConnexions = {}
101         # proxy objects
102         self.proxyObjects = {}
103         # proxy methods
104         self.proxyMethods = {}
105         # signal handlers
106         self.signalHandlers = {}
107         # pending dbus calls
108         self.pendingCalls = []
109
110
111     def dbusConnexion(self, busName):
112         if not self.dbusConnexions.has_key(busName):
113                 if busName == "session":
114                         self.dbusConnexions[busName] = dbus.SessionBus()
115                 elif busName == "system":
116                         self.dbusConnexions[busName] = dbus.SystemBus()
117                 else:
118                         raise Exception("Error: invalid bus: %s" % busName)
119         return self.dbusConnexions[busName]
120
121
122     def proxyObject(self, bus, serviceName, objectName):
123         id = hashId([serviceName, objectName])
124         if not self.proxyObjects.has_key(id):
125                 self.proxyObjects[id] = bus.get_object(serviceName, objectName)
126         return self.proxyObjects[id]
127
128
129     def proxyMethod(self, bus, serviceName, objectName, interfaceName, methodName):
130         id = hashId([serviceName, objectName, interfaceName, methodName])
131         if not self.proxyMethods.has_key(id):
132                 obj = self.proxyObject(bus, serviceName, objectName)
133                 self.proxyMethods[id] = obj.get_dbus_method(methodName, interfaceName)
134         return self.proxyMethods[id]
135
136
137     @exportRpc
138     def dbusRegister(self, list):
139         # read arguments list by position
140         if len(list) < 5:
141                 raise Exception("Error: expected arguments: bus, sender, object, interface, signal)")
142         
143         # check if a handler exists
144         sigId = hashId(list[1:5])
145         if self.signalHandlers.has_key(sigId):
146                 return sigId
147         
148         # get dbus connexion
149         bus = self.dbusConnexion(list[0])
150         
151         # get dbus proxy
152         object = self.proxyObject(bus, list[1], list[2])
153         
154         # create a handler that will publish the signal
155         dbusSignalHandler = DbusSignalHandler(bus, object, *list[1:5])
156         self.signalHandlers[sigId] = dbusSignalHandler
157         
158         return dbusSignalHandler.id
159
160
161     @exportRpc
162     def dbusSend(self, list):
163         # clear pending calls
164         for call in self.pendingCalls:
165                 if not call.pending:
166                         self.pendingCalls.remove(call)
167         
168         # read arguments list by position
169         if len(list) < 5:
170                 raise Exception("Error: expected arguments: bus, destination, object, interface, message, [args])")
171         
172         # get dbus connexion
173         bus = self.dbusConnexion(list[0])
174         
175         # parse JSON arg list
176         args = []
177         if len(list) == 6:
178                 args = json.loads(list[5])
179         
180         # get dbus proxy
181         method = self.proxyMethod(bus, *list[1:5])
182         
183         # use a deferred call handler to manage dbus results
184         dbusCallHandler = DbusCallHandler(method, args)
185         self.pendingCalls.append(dbusCallHandler)
186         return dbusCallHandler.callMethod()
187
188
189     @exportRpc
190     def listNames(self, list):
191         # read arguments list by position
192         if len(list) < 1:
193                 raise Exception("Error: expected arguments: bus)")
194         
195         # get dbus connexion
196         bus = self.dbusConnexion(list[0])
197         
198         # return bus names as json array
199         return json.dumps(bus.list_names())    
200
201
202
203 ###############################################################################
204 class CloudeebusServerProtocol(WampServerProtocol):
205         def onSessionOpen(self):
206                 # create cloudeebus service instance
207                 self.cloudeebusService = CloudeebusService()
208                 # register it for RPC
209                 self.registerForRpc(self.cloudeebusService)
210                 # register for Publish / Subscribe
211                 self.registerForPubSub("", True)
212
213
214
215 ###############################################################################
216 if __name__ == '__main__':
217         port = "9000"
218         if len(sys.argv) == 2:
219                 port = sys.argv[1]
220
221         uri = "ws://localhost:" + port
222
223         factory = WampServerFactory(uri, debugWamp = True)
224         factory.protocol = CloudeebusServerProtocol
225         factory.setProtocolOptions(allowHixie76 = True)
226
227         listenWS(factory)
228
229         DBusGMainLoop(set_as_default=True)
230
231         reactor.run()
232