8 import dbus.mainloop.glib
10 class Agent(dbus.service.Object):
11 def __init__(self, conn=None, obj_path=None):
12 dbus.service.Object.__init__(self, conn, obj_path)
13 self.pending_auth = False
15 @dbus.service.method("org.bluez.obex.Agent", in_signature="osssii",
17 def Authorize(self, dpath, device, filename, ftype, length, time):
20 self.pending_auth = True
21 print "Authorize (%s, %s, %s) Y/n" % (path, device, filename)
22 auth = raw_input().strip("\n ")
24 if auth == "n" or auth == "N":
25 self.pending_auth = False
26 raise dbus.DBusException(
27 "org.bluez.obex.Error.Rejected: "
30 print "Full filename (including path):"
31 self.pending_auth = False
33 transfers.append(Transfer(dpath, filename, 0, length))
34 return raw_input().strip("\n ")
36 @dbus.service.method("org.bluez.obex.Agent", in_signature="",
39 print "Authorization Canceled"
40 self.pending_auth = False
43 class Transfer(object):
44 def __init__(self, dpath, filename=None, transfered=-1, size=-1):
46 self.filename = filename
47 self.transfered = transfered
50 def update(self, filename=None, transfered=-1, total=-1):
52 self.filename = filename
53 self.transfered = transfered
57 transfer_iface = dbus.Interface(bus.get_object(
58 "org.bluez.obex", self.dpath),
59 "org.bluez.obex.Transfer")
60 transfer_iface.Cancel()
63 p = float(self.transfered) / float(self.size) * 100
64 return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p)
69 if __name__ == '__main__':
72 def new_transfer(dpath):
74 bus.add_signal_receiver(progress,
75 dbus_interface="org.bluez.obex.Transfer",
76 signal_name="Progress",
79 def transfer_completed(dpath, success):
82 print "\ntransfer completed => %s" % (success and "Success" or "Fail")
83 transfers = [t for t in transfers if t.dpath != dpath]
85 def progress(total, current, dpath):
89 t.update(None, current, total)
91 sys.stdout.write(" ".join(s) + "\r")
97 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
99 bus = dbus.SessionBus()
100 manager = dbus.Interface(bus.get_object("org.bluez.obex", "/"),
101 "org.bluez.obex.Manager")
102 bus.add_signal_receiver(new_transfer,
103 dbus_interface="org.bluez.obex.Manager",
104 signal_name="TransferStarted")
106 bus.add_signal_receiver(transfer_completed,
107 dbus_interface="org.bluez.obex.Manager",
108 signal_name="TransferCompleted")
111 agent = Agent(bus, path)
113 mainloop = gobject.MainLoop()
115 manager.RegisterAgent(path)
116 print "Agent registered"
122 except KeyboardInterrupt:
123 if agent.pending_auth:
125 elif len(transfers) > 0:
131 # manager.UnregisterAgent(path)
132 # print "Agent unregistered"