Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / integrations / mobly / chip_mobly / pigweed_device.py
1 # Copyright (c) 2020 Project CHIP Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import os
16 from pathlib import Path
17 import serial # type: ignore
18 import importlib
19
20 from pw_hdlc.rpc import HdlcRpcClient, default_channels
21
22 # Point the script to the .proto file with our RPC services.
23 PROTO = Path(os.environ["PW_ROOT"], "pw_rpc/pw_rpc_protos/echo.proto")
24
25 MOBLY_CONTROLLER_CONFIG_NAME = "PigweedDevice"
26
27
28 class Error(Exception):
29     """This is the Exception class defined for all errors."""
30
31
32 class PigweedDevice:
33     def __init__(self, device_tty, baud, platform_module=None, platform_args=None):
34         ser = serial.Serial(device_tty, baud, timeout=0.01)
35         self.pw_rpc_client = HdlcRpcClient(lambda: ser.read(4096),
36                                            [PROTO], default_channels(ser.write))
37         self._platform = None
38         print("Platform args: %s" % platform_args)
39         print("Platform module: %s" % platform_module)
40         if platform_module:
41             m = importlib.import_module(platform_module)
42             create_platform_method = getattr(m, "create_platform")
43             self._platform = create_platform_method(platform_args)
44
45     def rpcs(self):
46         return self.pw_rpc_client.rpcs().pw.rpc
47
48     @property
49     def platform(self):
50         return self._platform
51
52
53 def create(configs):
54     """Initializes the CHIP devices based on the testbed configuration.
55
56     Args:
57       configs: a list of testbed configs.
58
59     Returns:
60       a list of device objects
61     """
62     objs = []
63     for config in configs:
64         _validate_config(config)
65         device = PigweedDevice(**config)
66         objs.append(device)
67     return objs
68
69
70 def destroy(unused_objs):
71     """Destroys the wearable objects.
72
73     Args:
74       unused_objs: a list of device objects.
75     """
76     pass
77
78
79 def _validate_config(config):
80     """Verifies that a config dict for a CHIP device is valid.
81
82     Args:
83       config: A dict that is the configuration for a CHIP device.
84
85     Raises:
86       chip_device.Error: Config file is not valid.
87     """
88     required_keys = ["device_tty", "baud"]  # A placeholder.
89     for key in required_keys:
90         if key not in config:
91             raise Error("Required key %s missing from config %s" % (key, config))