Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / src / test_driver / linux-cirque / helper / CHIPTestBase.py
1 # Lint as: python3
2 """
3 Copyright (c) 2020 Project CHIP Authors
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 """
17
18 import ipaddress
19 import json
20 import logging
21 import os
22 import re
23 import requests
24 import sys
25 import time
26 import traceback
27 from enum import IntEnum
28 from urllib.parse import urljoin
29
30
31 class TestResult(IntEnum):
32     OK = 0
33     TEST_FAILURE = 1
34     SYSTEM_FAILURE = 2
35
36
37 '''
38 CHIPVirtualHome is a base class for single home tests
39 child classes should implement:
40 - setup()
41 - test_routine()
42 - tear_down()
43 '''
44
45
46 class CHIPVirtualHome:
47     def __init__(self, cirque_url, device_config):
48         self.home_id = None
49         self.logger = None
50         self.cirque_url = cirque_url
51         self.device_config = device_config
52         self.device_ids = []
53         self.devices = []
54         self.non_ap_devices = []
55         self.ap_devices = []
56
57     # The entrance of the whole test
58     def run_test(self, save_logs=True):
59         test_ret = TestResult.OK
60         try:
61             self.setup()
62             self.test_routine()
63         except AssertionError:
64             # AssertionError is thrown in self.assertXxx function
65             test_ret = TestResult.TEST_FAILURE
66             traceback.print_exc(file=sys.stderr)
67         except Exception:
68             # Other errors indicate a failure in system.
69             test_ret = TestResult.SYSTEM_FAILURE
70             traceback.print_exc(file=sys.stderr)
71         if save_logs:
72             try:
73                 self.save_device_logs()
74             except:
75                 test_ret = TestResult.SYSTEM_FAILURE
76                 traceback.print_exc(file=sys.stderr)
77         try:
78             self.destroy_home()
79         except:
80             test_ret = TestResult.SYSTEM_FAILURE
81             traceback.print_exc(file=sys.stderr)
82         return test_ret
83
84     def query_api(self, end_point, args=[], binary=False):
85         ret = requests.get(self._build_request_url(end_point, args))
86         if binary:
87             return ret.content
88         return ret.json()
89
90     def execute_device_cmd(self, device_id, cmd, stream=False):
91         self.logger.info(
92             "device: {} exec: {}".format(self.get_device_pretty_id(device_id), cmd))
93         ret = requests.get(self._build_request_url('device_cmd', [self.home_id, device_id, cmd]),
94                            params={'stream': stream},
95                            stream=stream)
96         if stream:
97             return ret
98
99         ret_struct = ret.json()
100         command_ret_code = ret_struct.get('return_code', None)
101         if command_ret_code == None:
102             # could be 0
103             self.logger.error("cannot get command return code")
104             raise Exception("cannot get command return code")
105         self.logger.info(
106             "command return code: {}".format(
107                 ret_struct.get('return_code', 'Unknown'))
108         )
109         command_output = ret_struct.get('output', None)
110         if command_output == None:
111             # could be empty string
112             self.logger.error("cannot get command output")
113             raise Exception("cannot get command output")
114         self.logger.info(
115             "command output: \n{}".format(ret_struct.get('output', ''))
116         )
117         return ret_struct
118
119     def sequenceMatch(self, string, patterns):
120         last_find = 0
121         for s in patterns:
122             self.logger.info('Finding string: "{}"'.format(s))
123             this_find = string.find(s, last_find)
124             if this_find < 0:
125                 self.logger.info('Not found')
126                 return False
127             self.logger.info("Found at index={}".format(this_find))
128             last_find = this_find + len(s)
129         return True
130
131     def connect_to_thread_network(self):
132         self.logger.info("Running commands to form Thread network")
133         time.sleep(3)  # Avoid sending commands at very beginning.
134         otInitCommands = [
135             "ot-ctl thread stop",
136             "ot-ctl ifconfig down",
137             "ot-ctl dataset init new",
138             "ot-ctl dataset panid 0x1234",
139             "ot-ctl dataset networkname OpenThread",
140             "ot-ctl dataset channel 13",
141             "ot-ctl dataset extpanid dead00beef00cafe",
142             "ot-ctl dataset meshlocalprefix \"fd01:2345:6789:0abc::\"",
143             "ot-ctl dataset masterkey 00112233445566778899aabbccddeeff",
144             "ot-ctl dataset commit active",
145             "ot-ctl dataset active",  # This will emit an output of dataset in flask.log
146             "ot-ctl ifconfig up",
147             "ot-ctl thread start",
148         ]
149         for device in self.non_ap_devices:
150             # Set default openthread provisioning
151             for cmd in otInitCommands:
152                 self.execute_device_cmd(device['id'], cmd)
153         self.logger.info("Waiting for Thread network to be formed...")
154         time.sleep(15)
155         roles = set()
156         for device in self.non_ap_devices:
157             reply = self.execute_device_cmd(device['id'], 'ot-ctl state')
158             roles.add(reply['output'].split()[0])
159         self.assertTrue('leader' in roles)
160         self.assertTrue('router' in roles or 'child' in roles)
161         self.logger.info("Thread network formed")
162
163     def enable_wifi_on_device(self):
164         ssid, psk = self.query_api('wifi_ssid_psk', [self.home_id])
165
166         self.logger.info("wifi ap ssid: {}, psk: {}".format(ssid, psk))
167
168         for device in self.non_ap_devices:
169             self.logger.info(
170                 "device: {} connecting to desired ssid: {}".format(
171                     self.get_device_pretty_id(device['id']), ssid))
172             self.write_psk_to_wpa_supplicant_config(device['id'], ssid, psk)
173             self.kill_existing_wpa_supplicant(device['id'])
174             self.start_wpa_supplicant(device['id'])
175         time.sleep(5)
176
177     def get_device_thread_ip(self, device_id):
178         ret = self.execute_device_cmd(device_id, 'ot-ctl ipaddr')
179         ipaddr_list = ret["output"].splitlines()
180         for ipstr in ipaddr_list:
181             try:
182                 self.logger.info(
183                     "device: {} thread ip: {}".format(self.get_device_pretty_id(device_id), ipstr))
184                 ipaddr = ipaddress.ip_address(ipstr)
185                 if ipaddr.is_link_local:
186                     continue
187                 if not ipaddr.is_private:
188                     continue
189                 if re.match("fd[0-9a-f]{2}:[0-9a-f]{4}:[0-9a-f]{4}:[0-9a-f]{4}:0000:00ff:fe00:[0-9a-f]{4}", ipaddr.exploded) != None:
190                     continue
191                 self.logger.info("Get Mesh-Local EID: {}".format(ipstr))
192                 return str(ipaddr)
193             except ValueError:
194                 # Since we are using ot-ctl, which is a command line interface and it will append 'Done' to end of output
195                 pass
196         return None
197
198     def get_device_log(self, device_id):
199         return self.query_api('device_log', [self.home_id, device_id], binary=True)
200
201     def assertTrue(self, exp, note=None):
202         '''
203         assert{True|False}
204         assert(Not)Equal
205         python unittest style functions that raise exceptions when condition not met
206         '''
207         if not exp == True:
208             if note:
209                 self.logger.error(note)
210             raise AssertionError
211
212     def assertFalse(self, exp, note=None):
213         if not exp == False:
214             if note:
215                 self.logger.error(note)
216             raise AssertionError
217
218     def assertEqual(self, val1, val2, note=None):
219         if not val1 == val2:
220             if note:
221                 self.logger.error(note)
222             raise AssertionError
223
224     def assertNotEqual(self, val1, val2, note=None):
225         if val1 == val2:
226             if note:
227                 self.logger.error(note)
228             raise AssertionError
229
230     def _build_request_url(self, end_point, args=[]):
231         if len(args) == 0:
232             return urljoin(self.cirque_url, end_point)
233         return urljoin(self.cirque_url, "{}/{}".format(end_point, '/'.join([str(argv) for argv in args])))
234
235     def destroy_home(self):
236         self.logger.info("destroying home: {}".format(self.home_id))
237         self.query_api('destroy_home', [self.home_id])
238
239     def initialize_home(self):
240         home_id = requests.post(
241             self._build_request_url('create_home'), json=self.device_config).json()
242
243         self.logger.info("home id: {} created!".format(home_id))
244
245         self.assertTrue(home_id in
246                         list(self.query_api('get_homes')),
247                         "created home_id did not match id from get_homes!!")
248
249         self.home_id = home_id
250
251         device_types = set()
252         created_devices = self.query_api('home_devices', [home_id])
253
254         self.logger.info("home id: {} devices: {}".format(
255             home_id, json.dumps(created_devices, indent=4, sort_keys=True)))
256
257         for device in created_devices.values():
258             device_types.add(device['type'])
259
260         wanted_device_types = set()
261         for device in self.device_config.values():
262             wanted_device_types.add(device['type'])
263
264         self.assertEqual(device_types, wanted_device_types,
265                          "created device does not match to device config!!")
266
267         self.device_config = created_devices
268
269         self.device_ids = [device_id for device_id in self.device_config]
270         self.non_ap_devices = [device for device in self.device_config.values()
271                                if device['type'] != 'wifi_ap']
272         self.ap_devices = [device for device in self.device_config.values()
273                            if device['type'] == 'wifi_ap']
274
275     def save_device_logs(self):
276         timestamp = int(time.time())
277         log_dir = os.environ.get("DEVICE_LOG_DIR", None)
278         if log_dir != None and not os.path.exists(log_dir):
279             os.makedirs("logs")
280
281         for device in self.non_ap_devices:
282             ret_log = self.get_device_log(device['id'])
283             # Use this format for easier sort
284             f_name = '{}-{}-{}.log'.format(device['type'],
285                                            timestamp, device['id'][:8])
286             self.logger.debug("device log name: \n{}".format(f_name))
287             with open(os.path.join(log_dir, f_name), 'wb') as fp:
288                 fp.write(ret_log)
289
290     def start_wpa_supplicant(self, device_id):
291         self.logger.info("device: {}: starting wpa_supplicant on device"
292                          .format(self.get_device_pretty_id(device_id)))
293
294         start_wpa_supplicant_command = "".join(
295             ["wpa_supplicant -B -i wlan0 ",
296              "-c /etc/wpa_supplicant/wpa_supplicant.conf ",
297              "-f /var/log/wpa_supplicant.log -t -dd"])
298
299         return self.execute_device_cmd(device_id, start_wpa_supplicant_command)
300
301     def write_psk_to_wpa_supplicant_config(self, device_id, ssid, psk):
302         self.logger.info("device: {}: writing ssid, psk to wpa_supplicant config"
303                          .format(self.get_device_pretty_id(device_id)))
304
305         write_psk_command = "".join(
306             ["sh -c 'wpa_passphrase {} {} >> ".format(ssid, psk),
307              "/etc/wpa_supplicant/wpa_supplicant.conf'"])
308
309         return self.execute_device_cmd(device_id, write_psk_command)
310
311     def kill_existing_wpa_supplicant(self, device_id):
312         self.logger.info("device: {}: kill existing wpa_supplicant"
313                          .format(self.get_device_pretty_id(device_id)))
314
315         kill_wpa_supplicant_command = 'killall wpa_supplicant'
316
317         return self.execute_device_cmd(device_id, kill_wpa_supplicant_command)
318
319     def get_device_pretty_name(self, device_id):
320         device_obj = self.device_config.get(device_id, None)
321         if device_obj != None:
322             return device_obj['type']
323         return "<unknown>"
324
325     def get_device_pretty_id(self, device_id):
326         return "{}({}...)".format(self.get_device_pretty_name(device_id), device_id[:8])