Upgrade bluez5_37 :Merge the code from private
[platform/upstream/bluez.git] / test / simple-obex-agent
1 #!/usr/bin/python
2
3 from __future__ import absolute_import, print_function, unicode_literals
4
5 import sys
6 import dbus
7 import dbus.service
8 import dbus.mainloop.glib
9 try:
10   from gi.repository import GObject
11 except ImportError:
12   import gobject as GObject
13
14 BUS_NAME = 'org.bluez.obex'
15 PATH = '/org/bluez/obex'
16 AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1'
17 AGENT_INTERFACE = 'org.bluez.obex.Agent1'
18 TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
19
20 def ask(prompt):
21         try:
22                 return raw_input(prompt)
23         except:
24                 return input(prompt)
25
26 class Agent(dbus.service.Object):
27         def __init__(self, conn=None, obj_path=None):
28                 dbus.service.Object.__init__(self, conn, obj_path)
29                 self.pending_auth = False
30
31         @dbus.service.method(AGENT_INTERFACE, in_signature="o",
32                                                         out_signature="s")
33         def AuthorizePush(self, path):
34                 transfer = dbus.Interface(bus.get_object(BUS_NAME, path),
35                                         'org.freedesktop.DBus.Properties')
36                 properties = transfer.GetAll(TRANSFER_INTERFACE);
37
38                 self.pending_auth = True
39                 auth = ask("Authorize (%s, %s) (Y/n):" % (path,
40                                                         properties['Name']))
41
42                 if auth == "n" or auth == "N":
43                         self.pending_auth = False
44                         raise dbus.DBusException(
45                                         "org.bluez.obex.Error.Rejected: "
46                                         "Not Authorized")
47
48                 self.pending_auth = False
49
50                 return properties['Name']
51
52         @dbus.service.method(AGENT_INTERFACE, in_signature="",
53                                                         out_signature="")
54         def Cancel(self):
55                 print("Authorization Canceled")
56                 self.pending_auth = False
57
58 if __name__ == '__main__':
59         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
60
61         bus = dbus.SessionBus()
62         manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
63                                                 AGENT_MANAGER_INTERFACE)
64
65         path = "/test/agent"
66         agent = Agent(bus, path)
67
68         mainloop = GObject.MainLoop()
69
70         manager.RegisterAgent(path)
71         print("Agent registered")
72
73         cont = True
74         while cont:
75                 try:
76                         mainloop.run()
77                 except KeyboardInterrupt:
78                         if agent.pending_auth:
79                                 agent.Cancel()
80                         elif len(transfers) > 0:
81                                 for a in transfers:
82                                         a.cancel()
83                         else:
84                                 cont = False
85
86         # manager.UnregisterAgent(path)
87         # print "Agent unregistered"