Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / src / controller / python / chip-device-ctrl.py
1 #!/usr/bin/env python
2
3 #
4 #    Copyright (c) 2020-2021 Project CHIP Authors
5 #    Copyright (c) 2013-2018 Nest Labs, Inc.
6 #    All rights reserved.
7 #
8 #    Licensed under the Apache License, Version 2.0 (the "License");
9 #    you may not use this file except in compliance with the License.
10 #    You may obtain a copy of the License at
11 #
12 #        http://www.apache.org/licenses/LICENSE-2.0
13 #
14 #    Unless required by applicable law or agreed to in writing, software
15 #    distributed under the License is distributed on an "AS IS" BASIS,
16 #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 #    See the License for the specific language governing permissions and
18 #    limitations under the License.
19 #
20
21 #
22 #    @file
23 #      This file implements the Python-based Chip Device Controller Shell.
24 #
25
26 from __future__ import absolute_import
27 from __future__ import print_function
28 from chip import ChipDeviceCtrl
29 from chip import exceptions
30 import sys
31 import os
32 import platform
33 import random
34 from optparse import OptionParser, OptionValueError
35 import shlex
36 import base64
37 import textwrap
38 import string
39 import re
40 from cmd import Cmd
41 from chip.ChipBleUtility import FAKE_CONN_OBJ_VALUE
42
43 # Extend sys.path with one or more directories, relative to the location of the
44 # running script, in which the chip package might be found .  This makes it
45 # possible to run the device manager shell from a non-standard install location,
46 # as well as directly from its location the CHIP source tree.
47 #
48 # Note that relative package locations are prepended to sys.path so as to give
49 # the local version of the package higher priority over any version installed in
50 # a standard location.
51 #
52 scriptDir = os.path.dirname(os.path.abspath(__file__))
53 relChipPackageInstallDirs = [
54     ".",
55     "../lib/python",
56     "../lib/python%s.%s" % (sys.version_info.major, sys.version_info.minor),
57     "../lib/Python%s%s" % (sys.version_info.major, sys.version_info.minor),
58 ]
59 for relInstallDir in relChipPackageInstallDirs:
60     absInstallDir = os.path.realpath(os.path.join(scriptDir, relInstallDir))
61     if os.path.isdir(os.path.join(absInstallDir, "chip")):
62         sys.path.insert(0, absInstallDir)
63
64
65 if platform.system() == 'Darwin':
66     from chip.ChipCoreBluetoothMgr import CoreBluetoothManager as BleManager
67 elif sys.platform.startswith('linux'):
68     from chip.ChipBluezMgr import BluezManager as BleManager
69
70 # The exceptions for CHIP Device Controller CLI
71
72
73 class ChipDevCtrlException(exceptions.ChipStackException):
74     pass
75
76
77 class ParsingError(ChipDevCtrlException):
78     def __init__(self, msg=None):
79         self.msg = "Parsing Error: " + msg
80
81     def __str__(self):
82         return self.msg
83
84
85 def DecodeBase64Option(option, opt, value):
86     try:
87         return base64.standard_b64decode(value)
88     except TypeError:
89         raise OptionValueError(
90             "option %s: invalid base64 value: %r" % (opt, value))
91
92
93 def DecodeHexIntOption(option, opt, value):
94     try:
95         return int(value, 16)
96     except ValueError:
97         raise OptionValueError("option %s: invalid value: %r" % (opt, value))
98
99
100 def ParseEncodedString(value):
101     if value.find(":") < 0:
102         raise ParsingError(
103             "value should be encoded in encoding:encodedvalue format")
104     enc, encValue = value.split(":", 1)
105     if enc == "str":
106         return encValue.encode("utf-8") + b'\x00'
107     elif enc == "hex":
108         return bytes.fromhex(encValue)
109     raise ParsingError("only str and hex encoding is supported")
110
111
112 def FormatZCLArguments(args, command):
113     commandArgs = {}
114     for kvPair in args:
115         if kvPair.find("=") < 0:
116             raise ParsingError("Argument should in key=value format")
117         key, value = kvPair.split("=", 1)
118         valueType = command.get(key, None)
119         if valueType == 'int':
120             commandArgs[key] = int(value)
121         elif valueType == 'str':
122             commandArgs[key] = value
123         elif valueType == 'bytes':
124             commandArgs[key] = ParseEncodedString(value)
125     return commandArgs
126
127
128 class DeviceMgrCmd(Cmd):
129     def __init__(self, rendezvousAddr=None, controllerNodeId=0, bluetoothAdapter=0):
130         self.lastNetworkId = None
131
132         Cmd.__init__(self)
133
134         Cmd.identchars = string.ascii_letters + string.digits + "-"
135
136         if sys.stdin.isatty():
137             self.prompt = "chip-device-ctrl > "
138         else:
139             self.use_rawinput = 0
140             self.prompt = ""
141
142         DeviceMgrCmd.command_names.sort()
143
144         self.bleMgr = None
145
146         self.devCtrl = ChipDeviceCtrl.ChipDeviceController(controllerNodeId=controllerNodeId, bluetoothAdapter=bluetoothAdapter)
147
148         # If we are on Linux and user selects non-default bluetooth adapter.
149         if sys.platform.startswith("linux") and bluetoothAdapter != 0:
150             self.bleMgr = BleManager(self.devCtrl)
151             self.bleMgr.ble_adapter_select("hci{}".format(bluetoothAdapter))
152
153         self.historyFileName = os.path.expanduser(
154             "~/.chip-device-ctrl-history")
155
156         try:
157             import readline
158
159             if "libedit" in readline.__doc__:
160                 readline.parse_and_bind("bind ^I rl_complete")
161             readline.set_completer_delims(" ")
162             try:
163                 readline.read_history_file(self.historyFileName)
164             except IOError:
165                 pass
166         except ImportError:
167             pass
168
169     command_names = [
170         "ble-scan",
171         "ble-adapter-select",
172         "ble-adapter-print",
173         "ble-debug-log",
174
175         "connect",
176         "resolve",
177         "zcl",
178         "zclread",
179
180         "set-pairing-wifi-credential",
181         "set-pairing-thread-credential",
182     ]
183
184     def parseline(self, line):
185         cmd, arg, line = Cmd.parseline(self, line)
186         if cmd:
187             cmd = self.shortCommandName(cmd)
188             line = cmd + " " + arg
189         return cmd, arg, line
190
191     def completenames(self, text, *ignored):
192         return [
193             name + " "
194             for name in DeviceMgrCmd.command_names
195             if name.startswith(text) or self.shortCommandName(name).startswith(text)
196         ]
197
198     def shortCommandName(self, cmd):
199         return cmd.replace("-", "")
200
201     def precmd(self, line):
202         if not self.use_rawinput and line != "EOF" and line != "":
203             print(">>> " + line)
204         return line
205
206     def postcmd(self, stop, line):
207         if not stop and self.use_rawinput:
208             self.prompt = "chip-device-ctrl > "
209         return stop
210
211     def postloop(self):
212         try:
213             import readline
214
215             try:
216                 readline.write_history_file(self.historyFileName)
217             except IOError:
218                 pass
219         except ImportError:
220             pass
221
222     def do_help(self, line):
223         if line:
224             cmd, arg, unused = self.parseline(line)
225             try:
226                 doc = getattr(self, "do_" + cmd).__doc__
227             except AttributeError:
228                 doc = None
229             if doc:
230                 self.stdout.write("%s\n" % textwrap.dedent(doc))
231             else:
232                 self.stdout.write("No help on %s\n" % (line))
233         else:
234             self.print_topics(
235                 "\nAvailable commands (type help <name> for more information):",
236                 DeviceMgrCmd.command_names,
237                 15,
238                 80,
239             )
240
241     def do_close(self, line):
242         """
243         close
244
245         Close the connection to the device.
246         """
247
248         args = shlex.split(line)
249
250         if len(args) != 0:
251             print("Usage:")
252             self.do_help("close")
253             return
254
255         try:
256             self.devCtrl.Close()
257         except exceptions.ChipStackException as ex:
258             print(str(ex))
259
260     def do_setlogoutput(self, line):
261         """
262         set-log-output [ none | error | progress | detail ]
263
264         Set the level of Chip logging output.
265         """
266
267         args = shlex.split(line)
268
269         if len(args) == 0:
270             print("Usage:")
271             self.do_help("set-log-output")
272             return
273         if len(args) > 1:
274             print("Unexpected argument: " + args[1])
275             return
276
277         category = args[0].lower()
278         if category == "none":
279             category = 0
280         elif category == "error":
281             category = 1
282         elif category == "progress":
283             category = 2
284         elif category == "detail":
285             category = 3
286         else:
287             print("Invalid argument: " + args[0])
288             return
289
290         try:
291             self.devCtrl.SetLogFilter(category)
292         except exceptions.ChipStackException as ex:
293             print(str(ex))
294             return
295
296         print("Done.")
297
298     def do_bleadapterselect(self, line):
299         """
300         ble-adapter-select
301
302         Start BLE adapter select, deprecated, you can select adapter by command line arguments.
303         """
304         if sys.platform.startswith("linux"):
305             if not self.bleMgr:
306                 self.bleMgr = BleManager(self.devCtrl)
307
308             self.bleMgr.ble_adapter_select(line)
309             print(
310                 "This change only applies to ble-scan\n"
311                 "Please run device controller with --bluetooth-adapter=<adapter-name> to select adapter\n" +
312                 "e.g. chip-device-ctrl --bluetooth-adapter hci0"
313             )
314         else:
315             print(
316                 "ble-adapter-select only works in Linux, ble-adapter-select mac_address"
317             )
318
319         return
320
321     def do_bleadapterprint(self, line):
322         """
323         ble-adapter-print
324
325         Print attached BLE adapter.
326         """
327         if sys.platform.startswith("linux"):
328             if not self.bleMgr:
329                 self.bleMgr = BleManager(self.devCtrl)
330
331             self.bleMgr.ble_adapter_print()
332         else:
333             print("ble-adapter-print only works in Linux")
334
335         return
336
337     def do_bledebuglog(self, line):
338         """
339         ble-debug-log 0:1
340           0: disable BLE debug log
341           1: enable BLE debug log
342         """
343         if not self.bleMgr:
344             self.bleMgr = BleManager(self.devCtrl)
345
346         self.bleMgr.ble_debug_log(line)
347
348         return
349
350     def do_blescan(self, line):
351         """
352         ble-scan
353
354         Start BLE scanning operations.
355         """
356
357         if not self.bleMgr:
358             self.bleMgr = BleManager(self.devCtrl)
359
360         self.bleMgr.scan(line)
361
362         return
363
364     def do_connect(self, line):
365         """
366         connect -ip <ip address> <setup pin code> [<nodeid>]
367         connect -ble <discriminator> <setup pin code> [<nodeid>]
368
369         connect command is used for establishing a rendezvous session to the device.
370         currently, only connect using setupPinCode is supported.
371
372         TODO: Add more methods to connect to device (like cert for auth, and IP
373               for connection)
374         """
375
376         try:
377             args = shlex.split(line)
378             if len(args) <= 1:
379                 print("Usage:")
380                 self.do_help("connect SetupPinCode")
381                 return
382
383             nodeid = random.randint(1, 1000000)  # Just a random number
384             if len(args) == 4:
385                 nodeid = int(args[3])
386             print("Device is assigned with nodeid = {}".format(nodeid))
387
388             if args[0] == "-ip" and len(args) >= 3:
389                 self.devCtrl.ConnectIP(args[1].encode("utf-8"), int(args[2]), nodeid)
390             elif args[0] == "-ble" and len(args) >= 3:
391                 self.devCtrl.ConnectBLE(int(args[1]), int(args[2]), nodeid)
392             else:
393                 print("Usage:")
394                 self.do_help("connect SetupPinCode")
395                 return
396             print("Device temporary node id (**this does not match spec**): {}".format(nodeid))
397         except exceptions.ChipStackException as ex:
398             print(str(ex))
399             return
400
401     def do_resolve(self, line):
402         """
403         resolve <fabricid> <nodeid>
404
405         Resolve DNS-SD name corresponding with the given fabric and node IDs and
406         update address of the node in the device controller.
407         """
408         try:
409             args = shlex.split(line)
410             if len(args) == 2:
411                 err = self.devCtrl.ResolveNode(int(args[0]), int(args[1]))
412                 if err == 0:
413                     address = self.devCtrl.GetAddressAndPort(int(args[1]))
414                     address = "{}:{}".format(*address) if address else "unknown"
415                     print("Current address: " + address)
416             else:
417                 self.do_help("resolve")
418         except exceptions.ChipStackException as ex:
419             print(str(ex))
420             return
421
422     def do_zcl(self, line):
423         """
424         To send ZCL message to device:
425         zcl <cluster> <command> <nodeid> <endpoint> <groupid> [key=value]...
426         To get a list of clusters:
427         zcl ?
428         To get a list of commands in cluster:
429         zcl ? <cluster>
430
431         Send ZCL command to device nodeid
432         """
433         try:
434             args = shlex.split(line)
435             all_commands = self.devCtrl.ZCLCommandList()
436             if len(args) == 1 and args[0] == '?':
437                 print('\n'.join(all_commands.keys()))
438             elif len(args) == 2 and args[0] == '?':
439                 if args[1] not in all_commands:
440                     raise exceptions.UnknownCluster(args[1])
441                 for commands in all_commands.get(args[1]).items():
442                     args = ", ".join(["{}: {}".format(argName, argType)
443                                       for argName, argType in commands[1].items()])
444                     print(commands[0])
445                     if commands[1]:
446                         print("  ", args)
447                     else:
448                         print("  <no arguments>")
449             elif len(args) > 4:
450                 if args[0] not in all_commands:
451                     raise exceptions.UnknownCluster(args[0])
452                 command = all_commands.get(args[0]).get(args[1], None)
453                 # When command takes no arguments, (not command) is True
454                 if command == None:
455                     raise exceptions.UnknownCommand(args[0], args[1])
456                 self.devCtrl.ZCLSend(args[0], args[1], int(
457                     args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command))
458             else:
459                 self.do_help("zcl")
460         except exceptions.ChipStackException as ex:
461             print("An exception occurred during process ZCL command:")
462             print(str(ex))
463         except Exception as ex:
464             print("An exception occurred during processing input:")
465             print(str(ex))
466
467     def do_zclread(self, line):
468         """
469         To read ZCL attribute:
470         zclread <cluster> <attribute> <nodeid> <endpoint> <groupid>
471         """
472         try:
473             args = shlex.split(line)
474             all_attrs = self.devCtrl.ZCLAttributeList()
475             if len(args) == 1 and args[0] == '?':
476                 print('\n'.join(all_attrs.keys()))
477             elif len(args) == 2 and args[0] == '?':
478                 if args[1] not in all_attrs:
479                     raise exceptions.UnknownCluster(args[1])
480                 print('\n'.join(all_attrs.get(args[1])))
481             elif len(args) == 5:
482                 if args[0] not in all_attrs:
483                     raise exceptions.UnknownCluster(args[0])
484                 self.devCtrl.ZCLReadAttribute(args[0], args[1], int(args[2]), int(args[3]), int(args[4]))
485             else:
486                 self.do_help("zclread")
487         except exceptions.ChipStackException as ex:
488             print("An exception occurred during reading ZCL attribute:")
489             print(str(ex))
490         except Exception as ex:
491             print("An exception occurred during processing input:")
492             print(str(ex))
493
494     def do_setpairingwificredential(self, line):
495         """
496         set-pairing-wifi-credential <ssid> <password>
497
498         Set WiFi credential to be used while pairing a Wi-Fi device
499         """
500         try:
501             args = shlex.split(line)
502             if len(args) == 2:
503                 self.devCtrl.SetWifiCredential(args[0], args[1])
504                 print("WiFi credential set")
505             else:
506                 self.do_help("set-pairing-wifi-credential")
507         except exceptions.ChipStackException as ex:
508             print(str(ex))
509             return
510
511     def do_setpairingthreadcredential(self, line):
512         """
513         set-pairing-thread-credential <channel> <panid> <masterkey>
514
515         Set Thread credential to be used while pairing a Thread device
516         """
517         try:
518             args = shlex.split(line)
519             if len(args) == 3:
520                 self.devCtrl.SetThreadCredential(int(args[0]), int(args[1], 16), args[2])
521                 print("Thread credential set")
522             else:
523                 self.do_help("set-pairing-thread-credential")
524         except exceptions.ChipStackException as ex:
525             print(str(ex))
526             return
527
528     def do_history(self, line):
529         """
530         history
531
532         Show previously executed commands.
533         """
534
535         try:
536             import readline
537
538             h = readline.get_current_history_length()
539             for n in range(1, h + 1):
540                 print(readline.get_history_item(n))
541         except ImportError:
542             pass
543
544     def do_h(self, line):
545         self.do_history(line)
546
547     def do_exit(self, line):
548         return True
549
550     def do_quit(self, line):
551         return True
552
553     def do_q(self, line):
554         return True
555
556     def do_EOF(self, line):
557         print()
558         return True
559
560     def emptyline(self):
561         pass
562
563
564 def main():
565     optParser = OptionParser()
566     optParser.add_option(
567         "-r",
568         "--rendezvous-addr",
569         action="store",
570         dest="rendezvousAddr",
571         help="Device rendezvous address",
572         metavar="<ip-address>",
573     )
574     optParser.add_option(
575         "-n",
576         "--controller-nodeid",
577         action="store",
578         dest="controllerNodeId",
579         default=0,
580         type='int',
581         help="Controller node ID",
582         metavar="<nodeid>",
583     )
584
585     if sys.platform.startswith("linux"):
586         optParser.add_option(
587             "-b",
588             "--bluetooth-adapter",
589             action="store",
590             dest="bluetoothAdapter",
591             default="hci0",
592             type="str",
593             help="Controller bluetooth adapter ID",
594             metavar="<bluetooth-adapter>",
595         )
596     (options, remainingArgs) = optParser.parse_args(sys.argv[1:])
597
598     if len(remainingArgs) != 0:
599         print("Unexpected argument: %s" % remainingArgs[0])
600         sys.exit(-1)
601
602     adapterId = 0
603     if sys.platform.startswith("linux"):
604         if not options.bluetoothAdapter.startswith("hci"):
605             print("Invalid bluetooth adapter: {}, adapter name looks like hci0, hci1 etc.")
606             sys.exit(-1)
607         else:
608             try:
609                 adapterId = int(options.bluetoothAdapter[3:])
610             except:
611                 print("Invalid bluetooth adapter: {}, adapter name looks like hci0, hci1 etc.")
612                 sys.exit(-1)
613
614     devMgrCmd = DeviceMgrCmd(rendezvousAddr=options.rendezvousAddr, controllerNodeId=options.controllerNodeId, bluetoothAdapter=adapterId)
615     print("Chip Device Controller Shell")
616     if options.rendezvousAddr:
617         print("Rendezvous address set to %s" % options.rendezvousAddr)
618
619     # Adapter ID will always be 0
620     if adapterId != 0:
621         print("Bluetooth adapter set to hci{}".format(adapterId))
622     print()
623
624     try:
625         devMgrCmd.cmdloop()
626     except KeyboardInterrupt:
627         print("\nQuitting")
628
629     sys.exit(0)
630
631
632 if __name__ == "__main__":
633     main()