Upgrade bluez5_37 :Merge the code from private
[platform/upstream/bluez.git] / test / test-hfp
1 #!/usr/bin/python
2
3 from __future__ import absolute_import, print_function, unicode_literals
4
5 from optparse import OptionParser, make_option
6 import os
7 from socket import SOCK_SEQPACKET, socket
8 import sys
9 import dbus
10 import dbus.service
11 import dbus.mainloop.glib
12 import glib
13 try:
14   from gi.repository import GObject
15 except ImportError:
16   import gobject as GObject
17
18 mainloop = None
19 audio_supported = True
20
21 try:
22         from socket import AF_BLUETOOTH, BTPROTO_SCO
23 except:
24         print("WARNING: python compiled without Bluetooth support"
25                                         " - audio will not be available")
26         audio_supported = False
27
28 BUF_SIZE = 1024
29
30 BDADDR_ANY = '00:00:00:00:00:00'
31
32 HF_NREC                 = 0x0001
33 HF_3WAY                 = 0x0002
34 HF_CLI                  = 0x0004
35 HF_VOICE_RECOGNITION    = 0x0008
36 HF_REMOTE_VOL           = 0x0010
37 HF_ENHANCED_STATUS      = 0x0020
38 HF_ENHANCED_CONTROL     = 0x0040
39 HF_CODEC_NEGOTIATION    = 0x0080
40
41 AG_3WAY                 = 0x0001
42 AG_NREC                 = 0x0002
43 AG_VOICE_RECOGNITION    = 0x0004
44 AG_INBAND_RING          = 0x0008
45 AG_VOICE_TAG            = 0x0010
46 AG_REJECT_CALL          = 0x0020
47 AG_ENHANCED_STATUS      = 0x0040
48 AG_ENHANCED_CONTROL     = 0x0080
49 AG_EXTENDED_RESULT      = 0x0100
50 AG_CODEC_NEGOTIATION    = 0x0200
51
52 HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
53                         HF_REMOTE_VOL | HF_ENHANCED_STATUS |
54                         HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
55
56 AVAIL_CODECS = "1,2"
57
58 class HfpConnection:
59         slc_complete = False
60         fd = None
61         io_id = 0
62         version = 0
63         features = 0
64         pending = None
65
66         def disconnect(self):
67                 if (self.fd >= 0):
68                         os.close(self.fd)
69                         self.fd = -1
70                         glib.source_remove(self.io_id)
71                         self.io_id = 0
72
73         def slc_completed(self):
74                 print("SLC establisment complete")
75                 self.slc_complete = True
76
77         def slc_next_cmd(self, cmd):
78                 if not cmd:
79                         self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
80                 elif (cmd.startswith("AT+BRSF")):
81                         if (self.features & AG_CODEC_NEGOTIATION and
82                                         HF_FEATURES & HF_CODEC_NEGOTIATION):
83                                 self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
84                         else:
85                                 self.send_cmd("AT+CIND=?")
86                 elif (cmd.startswith("AT+BAC")):
87                         self.send_cmd("AT+CIND=?")
88                 elif (cmd.startswith("AT+CIND=?")):
89                         self.send_cmd("AT+CIND?")
90                 elif (cmd.startswith("AT+CIND?")):
91                         self.send_cmd("AT+CMER=3,0,0,1")
92                 elif (cmd.startswith("AT+CMER=")):
93                         if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
94                                 self.send_cmd("AT+CHLD=?")
95                         else:
96                                 self.slc_completed()
97                 elif (cmd.startswith("AT+CHLD=?")):
98                         self.slc_completed()
99                 else:
100                         print("Unknown SLC command completed: %s" % (cmd))
101
102         def io_cb(self, fd, cond):
103                 buf = os.read(fd, BUF_SIZE)
104                 buf = buf.strip()
105
106                 print("Received: %s" % (buf))
107
108                 if (buf == "OK" or buf == "ERROR"):
109                         cmd = self.pending
110                         self.pending = None
111
112                         if (not self.slc_complete):
113                                 self.slc_next_cmd(cmd)
114
115                         return True
116
117                 parts = buf.split(':')
118
119                 if (parts[0] == "+BRSF"):
120                         self.features = int(parts[1])
121
122                 return True
123
124         def send_cmd(self, cmd):
125                 if (self.pending):
126                         print("ERROR: Another command is pending")
127                         return
128
129                 print("Sending: %s" % (cmd))
130
131                 os.write(self.fd, cmd + "\r\n")
132                 self.pending = cmd
133
134         def __init__(self, fd, version, features):
135                 self.fd = fd
136                 self.version = version
137                 self.features = features
138
139                 print("Version 0x%04x Features 0x%04x" % (version, features))
140
141                 self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
142
143                 self.slc_next_cmd(None)
144
145 class HfpProfile(dbus.service.Object):
146         sco_socket = None
147         io_id = 0
148         conns = {}
149
150         def sco_cb(self, sock, cond):
151                 (sco, peer) = sock.accept()
152                 print("New SCO connection from %s" % (peer))
153
154         def init_sco(self, sock):
155                 self.sco_socket = sock
156                 self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
157
158         def __init__(self, bus, path, sco):
159                 dbus.service.Object.__init__(self, bus, path)
160
161                 if sco:
162                         self.init_sco(sco)
163
164         @dbus.service.method("org.bluez.Profile1",
165                                         in_signature="", out_signature="")
166         def Release(self):
167                 print("Release")
168                 mainloop.quit()
169
170         @dbus.service.method("org.bluez.Profile1",
171                                         in_signature="", out_signature="")
172         def Cancel(self):
173                 print("Cancel")
174
175         @dbus.service.method("org.bluez.Profile1",
176                                 in_signature="o", out_signature="")
177         def RequestDisconnection(self, path):
178                 conn = self.conns.pop(path)
179                 conn.disconnect()
180
181         @dbus.service.method("org.bluez.Profile1",
182                                 in_signature="oha{sv}", out_signature="")
183         def NewConnection(self, path, fd, properties):
184                 fd = fd.take()
185                 version = 0x0105
186                 features = 0
187                 print("NewConnection(%s, %d)" % (path, fd))
188                 for key in properties.keys():
189                         if key == "Version":
190                                 version = properties[key]
191                         elif key == "Features":
192                                 features = properties[key]
193
194                 conn = HfpConnection(fd, version, features)
195
196                 self.conns[path] = conn
197
198 if __name__ == '__main__':
199         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
200
201         bus = dbus.SystemBus()
202
203         manager = dbus.Interface(bus.get_object("org.bluez",
204                                 "/org/bluez"), "org.bluez.ProfileManager1")
205
206         option_list = [
207                         make_option("-p", "--path", action="store",
208                                         type="string", dest="path",
209                                         default="/bluez/test/hfp"),
210                         make_option("-n", "--name", action="store",
211                                         type="string", dest="name",
212                                         default=None),
213                         make_option("-C", "--channel", action="store",
214                                         type="int", dest="channel",
215                                         default=None),
216                         ]
217
218         parser = OptionParser(option_list=option_list)
219
220         (options, args) = parser.parse_args()
221
222         mainloop = GObject.MainLoop()
223
224         opts = {
225                         "Version" : dbus.UInt16(0x0106),
226                         "Features" : dbus.UInt16(HF_FEATURES),
227                 }
228
229         if (options.name):
230                 opts["Name"] = options.name
231
232         if (options.channel is not None):
233                 opts["Channel"] = dbus.UInt16(options.channel)
234
235         if audio_supported:
236                 sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
237                 sco.bind(BDADDR_ANY)
238                 sco.listen(1)
239         else:
240                 sco = None
241
242         profile = HfpProfile(bus, options.path, sco)
243
244         manager.RegisterProfile(options.path, "hfp-hf", opts)
245
246         print("Profile registered - waiting for connections")
247
248         mainloop.run()