3 Copyright (c) 2020 Project CHIP Authors
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
27 from enum import IntEnum
28 from urllib.parse import urljoin
31 class TestResult(IntEnum):
38 CHIPVirtualHome is a base class for single home tests
39 child classes should implement:
46 class CHIPVirtualHome:
47 def __init__(self, cirque_url, device_config):
50 self.cirque_url = cirque_url
51 self.device_config = device_config
54 self.non_ap_devices = []
57 # The entrance of the whole test
58 def run_test(self, save_logs=True):
59 test_ret = TestResult.OK
63 except AssertionError:
64 # AssertionError is thrown in self.assertXxx function
65 test_ret = TestResult.TEST_FAILURE
66 traceback.print_exc(file=sys.stderr)
68 # Other errors indicate a failure in system.
69 test_ret = TestResult.SYSTEM_FAILURE
70 traceback.print_exc(file=sys.stderr)
73 self.save_device_logs()
75 test_ret = TestResult.SYSTEM_FAILURE
76 traceback.print_exc(file=sys.stderr)
80 test_ret = TestResult.SYSTEM_FAILURE
81 traceback.print_exc(file=sys.stderr)
84 def query_api(self, end_point, args=[], binary=False):
85 ret = requests.get(self._build_request_url(end_point, args))
90 def execute_device_cmd(self, device_id, cmd, stream=False):
92 "device: {} exec: {}".format(self.get_device_pretty_id(device_id), cmd))
93 ret = requests.get(self._build_request_url('device_cmd', [self.home_id, device_id, cmd]),
94 params={'stream': stream},
99 ret_struct = ret.json()
100 command_ret_code = ret_struct.get('return_code', None)
101 if command_ret_code == None:
103 self.logger.error("cannot get command return code")
104 raise Exception("cannot get command return code")
106 "command return code: {}".format(
107 ret_struct.get('return_code', 'Unknown'))
109 command_output = ret_struct.get('output', None)
110 if command_output == None:
111 # could be empty string
112 self.logger.error("cannot get command output")
113 raise Exception("cannot get command output")
115 "command output: \n{}".format(ret_struct.get('output', ''))
119 def sequenceMatch(self, string, patterns):
122 self.logger.info('Finding string: "{}"'.format(s))
123 this_find = string.find(s, last_find)
125 self.logger.info('Not found')
127 self.logger.info("Found at index={}".format(this_find))
128 last_find = this_find + len(s)
131 def connect_to_thread_network(self):
132 self.logger.info("Running commands to form Thread network")
133 time.sleep(3) # Avoid sending commands at very beginning.
135 "ot-ctl thread stop",
136 "ot-ctl ifconfig down",
137 "ot-ctl dataset init new",
138 "ot-ctl dataset panid 0x1234",
139 "ot-ctl dataset networkname OpenThread",
140 "ot-ctl dataset channel 13",
141 "ot-ctl dataset extpanid dead00beef00cafe",
142 "ot-ctl dataset meshlocalprefix \"fd01:2345:6789:0abc::\"",
143 "ot-ctl dataset masterkey 00112233445566778899aabbccddeeff",
144 "ot-ctl dataset commit active",
145 "ot-ctl dataset active", # This will emit an output of dataset in flask.log
146 "ot-ctl ifconfig up",
147 "ot-ctl thread start",
149 for device in self.non_ap_devices:
150 # Set default openthread provisioning
151 for cmd in otInitCommands:
152 self.execute_device_cmd(device['id'], cmd)
153 self.logger.info("Waiting for Thread network to be formed...")
156 for device in self.non_ap_devices:
157 reply = self.execute_device_cmd(device['id'], 'ot-ctl state')
158 roles.add(reply['output'].split()[0])
159 self.assertTrue('leader' in roles)
160 self.assertTrue('router' in roles or 'child' in roles)
161 self.logger.info("Thread network formed")
163 def enable_wifi_on_device(self):
164 ssid, psk = self.query_api('wifi_ssid_psk', [self.home_id])
166 self.logger.info("wifi ap ssid: {}, psk: {}".format(ssid, psk))
168 for device in self.non_ap_devices:
170 "device: {} connecting to desired ssid: {}".format(
171 self.get_device_pretty_id(device['id']), ssid))
172 self.write_psk_to_wpa_supplicant_config(device['id'], ssid, psk)
173 self.kill_existing_wpa_supplicant(device['id'])
174 self.start_wpa_supplicant(device['id'])
177 def get_device_thread_ip(self, device_id):
178 ret = self.execute_device_cmd(device_id, 'ot-ctl ipaddr')
179 ipaddr_list = ret["output"].splitlines()
180 for ipstr in ipaddr_list:
183 "device: {} thread ip: {}".format(self.get_device_pretty_id(device_id), ipstr))
184 ipaddr = ipaddress.ip_address(ipstr)
185 if ipaddr.is_link_local:
187 if not ipaddr.is_private:
189 if re.match("fd[0-9a-f]{2}:[0-9a-f]{4}:[0-9a-f]{4}:[0-9a-f]{4}:0000:00ff:fe00:[0-9a-f]{4}", ipaddr.exploded) != None:
191 self.logger.info("Get Mesh-Local EID: {}".format(ipstr))
194 # Since we are using ot-ctl, which is a command line interface and it will append 'Done' to end of output
198 def get_device_log(self, device_id):
199 return self.query_api('device_log', [self.home_id, device_id], binary=True)
201 def assertTrue(self, exp, note=None):
205 python unittest style functions that raise exceptions when condition not met
209 self.logger.error(note)
212 def assertFalse(self, exp, note=None):
215 self.logger.error(note)
218 def assertEqual(self, val1, val2, note=None):
221 self.logger.error(note)
224 def assertNotEqual(self, val1, val2, note=None):
227 self.logger.error(note)
230 def _build_request_url(self, end_point, args=[]):
232 return urljoin(self.cirque_url, end_point)
233 return urljoin(self.cirque_url, "{}/{}".format(end_point, '/'.join([str(argv) for argv in args])))
235 def destroy_home(self):
236 self.logger.info("destroying home: {}".format(self.home_id))
237 self.query_api('destroy_home', [self.home_id])
239 def initialize_home(self):
240 home_id = requests.post(
241 self._build_request_url('create_home'), json=self.device_config).json()
243 self.logger.info("home id: {} created!".format(home_id))
245 self.assertTrue(home_id in
246 list(self.query_api('get_homes')),
247 "created home_id did not match id from get_homes!!")
249 self.home_id = home_id
252 created_devices = self.query_api('home_devices', [home_id])
254 self.logger.info("home id: {} devices: {}".format(
255 home_id, json.dumps(created_devices, indent=4, sort_keys=True)))
257 for device in created_devices.values():
258 device_types.add(device['type'])
260 wanted_device_types = set()
261 for device in self.device_config.values():
262 wanted_device_types.add(device['type'])
264 self.assertEqual(device_types, wanted_device_types,
265 "created device does not match to device config!!")
267 self.device_config = created_devices
269 self.device_ids = [device_id for device_id in self.device_config]
270 self.non_ap_devices = [device for device in self.device_config.values()
271 if device['type'] != 'wifi_ap']
272 self.ap_devices = [device for device in self.device_config.values()
273 if device['type'] == 'wifi_ap']
275 def save_device_logs(self):
276 timestamp = int(time.time())
277 log_dir = os.environ.get("DEVICE_LOG_DIR", None)
278 if log_dir != None and not os.path.exists(log_dir):
281 for device in self.non_ap_devices:
282 ret_log = self.get_device_log(device['id'])
283 # Use this format for easier sort
284 f_name = '{}-{}-{}.log'.format(device['type'],
285 timestamp, device['id'][:8])
286 self.logger.debug("device log name: \n{}".format(f_name))
287 with open(os.path.join(log_dir, f_name), 'wb') as fp:
290 def start_wpa_supplicant(self, device_id):
291 self.logger.info("device: {}: starting wpa_supplicant on device"
292 .format(self.get_device_pretty_id(device_id)))
294 start_wpa_supplicant_command = "".join(
295 ["wpa_supplicant -B -i wlan0 ",
296 "-c /etc/wpa_supplicant/wpa_supplicant.conf ",
297 "-f /var/log/wpa_supplicant.log -t -dd"])
299 return self.execute_device_cmd(device_id, start_wpa_supplicant_command)
301 def write_psk_to_wpa_supplicant_config(self, device_id, ssid, psk):
302 self.logger.info("device: {}: writing ssid, psk to wpa_supplicant config"
303 .format(self.get_device_pretty_id(device_id)))
305 write_psk_command = "".join(
306 ["sh -c 'wpa_passphrase {} {} >> ".format(ssid, psk),
307 "/etc/wpa_supplicant/wpa_supplicant.conf'"])
309 return self.execute_device_cmd(device_id, write_psk_command)
311 def kill_existing_wpa_supplicant(self, device_id):
312 self.logger.info("device: {}: kill existing wpa_supplicant"
313 .format(self.get_device_pretty_id(device_id)))
315 kill_wpa_supplicant_command = 'killall wpa_supplicant'
317 return self.execute_device_cmd(device_id, kill_wpa_supplicant_command)
319 def get_device_pretty_name(self, device_id):
320 device_obj = self.device_config.get(device_id, None)
321 if device_obj != None:
322 return device_obj['type']
325 def get_device_pretty_id(self, device_id):
326 return "{}({}...)".format(self.get_device_pretty_name(device_id), device_id[:8])