3 # Copyright (c) 2020, The OpenThread Authors.
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are met:
8 # 1. Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # 2. Redistributions in binary form must reproduce the above copyright
11 # notice, this list of conditions and the following disclaimer in the
12 # documentation and/or other materials provided with the distribution.
13 # 3. Neither the name of the copyright holder nor the
14 # names of its contributors may be used to endorse or promote products
15 # derived from this software without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
35 from threading import Thread
37 rest_api_addr = "http://0.0.0.0:8081"
40 def assert_is_ipv6_address(string):
41 assert (type(ipaddress.ip_address(string)) is ipaddress.IPv6Address)
43 def get_data_from_url(url, result, index):
44 response = urllib.request.urlopen(urllib.request.Request(url))
45 body = response.read()
46 data = json.loads(body)
50 def get_error_from_url(url, result, index):
52 urllib.request.urlopen(urllib.request.Request(url))
55 except urllib.error.HTTPError as e:
59 def create_multi_thread(func, url, thread_num, response_data):
60 threads = [None] * thread_num
62 for i in range(thread_num):
63 threads[i] = Thread(target=func, args=(url, response_data, i))
65 for thread in threads:
68 for thread in threads:
72 def error404_check(data):
73 assert data is not None
75 assert (data.code == 404)
80 def diagnostics_check(data):
81 assert data is not None
87 "ExtAddress", "Rloc16", "Mode", "Connectivity", "Route",
88 "LeaderData", "NetworkData", "IP6AddressList", "MACCounters",
89 "ChildTable", "ChannelPages"
91 expected_value_type = [
92 str, int, dict, dict, dict, dict, str, list, dict, list,
95 expected_check_dict = dict(zip(expected_keys, expected_value_type))
97 for key, value in expected_check_dict.items():
99 assert (type(diag[key]) == value)
101 assert (re.match(r'^[A-F0-9]{16}$', diag["ExtAddress"]) is not None)
104 mode_expected_keys = [
105 "RxOnWhenIdle", "DeviceType", "NetworkData"
107 for key in mode_expected_keys:
109 assert (type(mode[key]) == int)
111 connectivity = diag["Connectivity"]
112 connectivity_expected_keys = [
113 "ParentPriority", "LinkQuality3", "LinkQuality2", "LinkQuality1",
114 "LeaderCost", "IdSequence", "ActiveRouters", "SedBufferSize",
117 for key in connectivity_expected_keys:
118 assert (key in connectivity)
119 assert (type(connectivity[key]) == int)
121 route = diag["Route"]
122 assert ("IdSequence" in route)
123 assert (type(route["IdSequence"]) == int)
125 assert ("RouteData" in route)
126 route_routedata = route["RouteData"]
127 assert (type(route["RouteData"]) == list)
129 routedata_expected_keys = [
130 "RouteId", "LinkQualityOut", "LinkQualityIn", "RouteCost"
133 for item in route_routedata:
134 for key in routedata_expected_keys:
136 assert (type(item[key]) == int)
138 leaderdata = diag["LeaderData"]
139 leaderdata_expected_keys = [
140 "PartitionId", "Weighting", "DataVersion", "StableDataVersion",
144 for key in leaderdata_expected_keys:
145 assert (key in leaderdata)
146 assert (type(leaderdata[key]) == int)
148 assert (re.match(r'^[A-F0-9]{12}$', diag["NetworkData"]) is not None)
150 ip6_address_list = diag["IP6AddressList"]
151 assert (type(ip6_address_list) == list)
153 for ip6_address in ip6_address_list:
154 assert (type(ip6_address) == str)
155 assert_is_ipv6_address(ip6_address)
157 mac_counters = diag["MACCounters"]
158 assert (type(mac_counters) == dict)
159 mac_counters_expected_keys = [
160 "IfInUnknownProtos", "IfInErrors", "IfOutErrors", "IfInUcastPkts",
161 "IfInBroadcastPkts", "IfInDiscards", "IfOutUcastPkts",
162 "IfOutBroadcastPkts", "IfOutDiscards"
164 for key in mac_counters_expected_keys:
165 assert (key in mac_counters)
166 assert (type(mac_counters[key]) == int)
168 child_table = diag["ChildTable"]
169 assert (type(child_table) == list)
171 for child in child_table:
172 assert ("ChildId" in child)
173 assert (type(child["ChildId"]) == int)
174 assert ("Timeout" in child)
175 assert (type(child["Timeout"]) == int)
176 assert ("Mode" in child)
178 assert (type(mode) == dict)
179 for key in mode_expected_keys:
181 assert (type(mode[key]) == int)
183 assert (type(diag["ChannelPages"]) == str)
184 assert (re.match(r'^[A-F0-9]{2}$', diag["ChannelPages"]) is not None)
189 def node_check(data):
190 assert data is not None
193 "State", "NumOfRouter", "RlocAddress", "NetworkName", "ExtAddress",
194 "Rloc16", "LeaderData", "ExtPanId"
196 expected_value_type = [
197 int, int, str, str, str, int, dict, str
199 expected_check_dict = dict(zip(expected_keys, expected_value_type))
201 for key, value in expected_check_dict.items():
203 assert (type(data[key]) == value)
205 assert_is_ipv6_address(data["RlocAddress"])
207 assert (re.match(r'^[A-F0-9]{16}$', data["ExtAddress"]) is not None)
208 assert (re.match(r'[A-F0-9]{16}', data["ExtPanId"]) is not None)
210 leaderdata = data["LeaderData"]
211 leaderdata_expected_keys = [
212 "PartitionId", "Weighting", "DataVersion", "StableDataVersion",
216 for key in leaderdata_expected_keys:
217 assert (key in leaderdata)
218 assert (type(leaderdata[key]) == int)
223 def node_rloc_check(data):
224 assert data is not None
226 assert (type(data) == str)
228 assert_is_ipv6_address(data)
233 def node_rloc16_check(data):
234 assert data is not None
236 assert (type(data) == int)
241 def node_ext_address_check(data):
242 assert data is not None
244 assert (type(data) == str)
245 assert (re.match(r'^[A-F0-9]{16}$', data) is not None)
250 def node_state_check(data):
251 assert data is not None
253 assert (type(data) == int)
258 def node_network_name_check(data):
259 assert data is not None
261 assert (type(data) == str)
266 def node_leader_data_check(data):
267 assert data is not None
269 assert (type(data) == dict)
271 leaderdata_expected_keys = [
272 "PartitionId", "Weighting", "DataVersion", "StableDataVersion",
276 for key in leaderdata_expected_keys:
278 assert (type(data[key]) == int)
283 def node_num_of_router_check(data):
284 assert data is not None
286 assert (type(data) == int)
291 def node_ext_panid_check(data):
292 assert data is not None
294 assert (type(data) == str)
299 def node_test(thread_num):
300 url = rest_api_addr + "/node"
302 response_data = [None] * thread_num
304 create_multi_thread(get_data_from_url, url, thread_num, response_data)
306 valid = [node_check(data) for data in response_data].count(True)
308 print(" /node : all {}, valid {} ".format(thread_num, valid))
311 def node_rloc_test(thread_num):
312 url = rest_api_addr + "/node/rloc"
314 response_data = [None] * thread_num
316 create_multi_thread(get_data_from_url, url, thread_num, response_data)
318 valid = [node_rloc_check(data) for data in response_data].count(True)
320 print(" /node/rloc : all {}, valid {} ".format(thread_num, valid))
323 def node_rloc16_test(thread_num):
324 url = rest_api_addr + "/node/rloc16"
326 response_data = [None] * thread_num
328 create_multi_thread(get_data_from_url, url, thread_num, response_data)
330 valid = [node_rloc16_check(data) for data in response_data].count(True)
332 print(" /node/rloc16 : all {}, valid {} ".format(thread_num, valid))
335 def node_ext_address_test(thread_num):
336 url = rest_api_addr + "/node/ext-address"
338 response_data = [None] * thread_num
340 create_multi_thread(get_data_from_url, url, thread_num, response_data)
342 valid = [node_ext_address_check(data) for data in response_data].count(True)
344 print(" /node/ext-address : all {}, valid {} ".format(thread_num, valid))
347 def node_state_test(thread_num):
348 url = rest_api_addr + "/node/state"
350 response_data = [None] * thread_num
352 create_multi_thread(get_data_from_url, url, thread_num, response_data)
354 valid = [node_state_check(data) for data in response_data].count(True)
356 print(" /node/state : all {}, valid {} ".format(thread_num, valid))
359 def node_network_name_test(thread_num):
360 url = rest_api_addr + "/node/network-name"
362 response_data = [None] * thread_num
364 create_multi_thread(get_data_from_url, url, thread_num, response_data)
366 valid = [node_network_name_check(data) for data in response_data
369 print(" /node/network-name : all {}, valid {} ".format(thread_num, valid))
372 def node_leader_data_test(thread_num):
373 url = rest_api_addr + "/node/leader-data"
375 response_data = [None] * thread_num
377 create_multi_thread(get_data_from_url, url, thread_num, response_data)
379 valid = [node_leader_data_check(data) for data in response_data].count(True)
381 print(" /node/leader-data : all {}, valid {} ".format(thread_num, valid))
384 def node_num_of_router_test(thread_num):
385 url = rest_api_addr + "/node/num-of-router"
387 response_data = [None] * thread_num
389 create_multi_thread(get_data_from_url, url, thread_num, response_data)
391 valid = [node_num_of_router_check(data) for data in response_data
394 print(" /v1/node/num-of-router : all {}, valid {} ".format(thread_num, valid))
397 def node_ext_panid_test(thread_num):
398 url = rest_api_addr + "/node/ext-panid"
400 response_data = [None] * thread_num
402 create_multi_thread(get_data_from_url, url, thread_num, response_data)
404 valid = [node_ext_panid_check(data) for data in response_data].count(True)
406 print(" /node/ext-panid : all {}, valid {} ".format(thread_num, valid))
409 def diagnostics_test(thread_num):
410 url = rest_api_addr + "/diagnostics"
412 response_data = [None] * thread_num
414 create_multi_thread(get_data_from_url, url, thread_num, response_data)
418 for data in response_data:
420 ret = diagnostics_check(data)
427 print(" /diagnostics : all {}, has content {}, valid {} ".format(
428 thread_num, has_content, valid))
431 def error_test(thread_num):
432 url = rest_api_addr + "/hello"
434 response_data = [None] * thread_num
436 create_multi_thread(get_error_from_url, url, thread_num, response_data)
438 valid = [error404_check(data) for data in response_data].count(True)
440 print(" /v1/hello : all {}, valid {} ".format(thread_num, valid))
446 node_rloc16_test(200)
447 node_ext_address_test(200)
449 node_network_name_test(200)
450 node_leader_data_test(200)
451 node_num_of_router_test(200)
452 node_ext_panid_test(200)
459 if __name__ == '__main__':