upgrade obexd to 0.47
[profile/ivi/obexd.git] / test / simple-agent
1 #!/usr/bin/python
2
3 import gobject
4
5 import sys
6 import dbus
7 import dbus.service
8 import dbus.mainloop.glib
9
10 class Agent(dbus.service.Object):
11         def __init__(self, conn=None, obj_path=None):
12                 dbus.service.Object.__init__(self, conn, obj_path)
13                 self.pending_auth = False
14
15         @dbus.service.method("org.bluez.obex.Agent", in_signature="osssii",
16                                                         out_signature="s")
17         def Authorize(self, dpath, device, filename, ftype, length, time):
18                 global transfers
19
20                 self.pending_auth = True
21                 print "Authorize (%s, %s, %s) Y/n" % (path, device, filename)
22                 auth = raw_input().strip("\n ")
23
24                 if auth == "n" or auth == "N":
25                         self.pending_auth = False
26                         raise dbus.DBusException(
27                                         "org.bluez.obex.Error.Rejected: "
28                                         "Not Autorized")
29
30                 print "Full filename (including path):"
31                 self.pending_auth = False
32
33                 transfers.append(Transfer(dpath, filename, 0, length))
34                 return raw_input().strip("\n ")
35
36         @dbus.service.method("org.bluez.obex.Agent", in_signature="",
37                                                         out_signature="")
38         def Cancel(self):
39                 print "Authorization Canceled"
40                 self.pending_auth = False
41
42
43 class Transfer(object):
44         def __init__(self, dpath, filename=None, transfered=-1, size=-1):
45                 self.dpath = dpath
46                 self.filename = filename
47                 self.transfered = transfered
48                 self.size = size
49
50         def update(self, filename=None, transfered=-1, total=-1):
51                 if filename:
52                         self.filename = filename
53                 self.transfered = transfered
54                 self.size = total
55
56         def cancel(self):
57                 transfer_iface = dbus.Interface(bus.get_object(
58                                                 "org.bluez.obex", self.dpath),
59                                                 "org.bluez.obex.Transfer")
60                 transfer_iface.Cancel()
61
62         def __str__(self):
63                 p = float(self.transfered) / float(self.size) * 100
64                 return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p)
65
66         __repr__ = __str__
67
68
69 if __name__ == '__main__':
70         global transfers
71
72         def new_transfer(dpath):
73                 print "new transfer"
74                 bus.add_signal_receiver(progress,
75                                 dbus_interface="org.bluez.obex.Transfer",
76                                 signal_name="Progress",
77                                 path_keyword="dpath")
78
79         def transfer_completed(dpath, success):
80                 global transfers
81
82                 print "\ntransfer completed => %s" % (success and "Success" or "Fail")
83                 transfers = [t for t in transfers if t.dpath != dpath]
84
85         def progress(total, current, dpath):
86                 s = []
87                 for t in transfers:
88                         if t.dpath == dpath:
89                                 t.update(None, current, total)
90                         s.append("%s" % (t))
91                 sys.stdout.write(" ".join(s) + "\r")
92                 sys.stdout.flush()
93
94
95         transfers = []
96
97         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
98
99         bus = dbus.SessionBus()
100         manager = dbus.Interface(bus.get_object("org.bluez.obex", "/"),
101                                                 "org.bluez.obex.Manager")
102         bus.add_signal_receiver(new_transfer,
103                                 dbus_interface="org.bluez.obex.Manager",
104                                 signal_name="TransferStarted")
105
106         bus.add_signal_receiver(transfer_completed,
107                                 dbus_interface="org.bluez.obex.Manager",
108                                 signal_name="TransferCompleted")
109
110         path = "/test/agent"
111         agent = Agent(bus, path)
112
113         mainloop = gobject.MainLoop()
114
115         manager.RegisterAgent(path)
116         print "Agent registered"
117
118         cont = True
119         while cont:
120                 try:
121                         mainloop.run()
122                 except KeyboardInterrupt:
123                         if agent.pending_auth:
124                                 agent.Cancel()
125                         elif len(transfers) > 0:
126                                 for a in transfers:
127                                         a.cancel()
128                         else:
129                                 cont = False
130
131         # manager.UnregisterAgent(path)
132         # print "Agent unregistered"