8 import dbus.mainloop.glib
9 from optparse import OptionParser
11 from pprint import pformat
14 """Hack to unwrap D-Bus values, so that they're easier to read when
15 printed. Taken from d-feet """
17 if isinstance(x, list):
20 if isinstance(x, tuple):
21 return tuple(map(unwrap, x))
23 if isinstance(x, dict):
24 return dict([(unwrap(k), unwrap(v)) for k, v in x.iteritems()])
26 for t in [unicode, str, long, int, float, bool]:
33 parser.add_option("-d", "--device", dest="device",
34 help="Device to connect", metavar="DEVICE")
35 parser.add_option("-c", "--chdir", dest="new_dir",
36 help="Change current directory to DIR", metavar="DIR")
37 parser.add_option("-l", "--lsdir", action="store_true", dest="ls_dir",
38 help="List folders in current directory")
39 parser.add_option("-v", "--verbose", action="store_true", dest="verbose")
40 parser.add_option("-L", "--lsmsg", action="store", dest="ls_msg",
41 help="List messages in supplied CWD subdir")
42 parser.add_option("-g", "--get", action="store", dest="get_msg",
43 help="Get message contents")
45 return parser.parse_args()
47 def set_folder(session, new_dir):
48 session.SetFolder(new_dir)
51 def __init__(self, session_path, verbose=False):
53 self.transfer_path = None
55 self.verbose = verbose
56 self.path = session_path
57 bus = dbus.SessionBus()
58 obj = bus.get_object("org.bluez.obex.client", session_path)
59 self.session = dbus.Interface(obj, "org.bluez.obex.Session")
60 self.map = dbus.Interface(obj, "org.bluez.obex.MessageAccess")
61 bus.add_signal_receiver(self.transfer_complete,
62 dbus_interface="org.bluez.obex.Transfer",
63 signal_name="Complete",
65 bus.add_signal_receiver(self.transfer_error,
66 dbus_interface="org.bluez.obex.Transfer",
70 def create_transfer_reply(self, reply):
71 (path, properties) = reply
72 self.transfer_path = path
73 self.props[path] = properties
75 print "Transfer created: %s (file %s)" % (path,
76 properties["Filename"])
78 def generic_reply(self):
80 print "Operation succeeded"
86 def transfer_complete(self, path):
87 if path != self.transfer_path:
90 print "Transfer finished"
91 properties = self.props.get(path)
92 if properties == None:
94 f = open(properties["Filename"], "r")
95 os.remove(properties["Filename"])
98 def transfer_error(self, code, message, path):
99 if path != self.transfer_path:
101 print "Transfer finished with error %s: %s" % (code, message)
104 def set_folder(self, new_dir):
105 self.map.SetFolder(new_dir)
107 def list_folders(self):
108 for i in self.map.GetFolderListing(dict()):
109 print "%s/" % (i["Name"])
111 def list_messages(self, folder):
112 ret = self.map.GetMessageListing(folder, dict())
113 print pformat(unwrap(ret))
115 def get_message(self, handle):
116 self.map.GetMessageListing("", dict())
117 path = self.path + "/message" + handle
118 obj = bus.get_object("org.bluez.obex.client", path)
119 msg = dbus.Interface(obj, "org.bluez.obex.Message")
120 msg.Get("",reply_handler=self.create_transfer_reply,
121 error_handler=self.error)
123 if __name__ == '__main__':
125 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
127 parser = OptionParser()
129 (options, args) = parse_options()
131 if not options.device:
135 bus = dbus.SessionBus()
136 mainloop = gobject.MainLoop()
138 client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
139 "org.bluez.obex.Client")
141 print "Creating Session"
142 path = client.CreateSession(options.device, { "Target": "map" })
144 map_client = MapClient(path, options.verbose)
147 map_client.set_folder(options.new_dir)
150 map_client.list_folders()
152 if options.ls_msg is not None:
153 map_client.list_messages(options.ls_msg)
155 if options.get_msg is not None:
156 map_client.get_message(options.get_msg)