Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / src / controller / python / chip / internal / commissioner.py
1 #
2 #    Copyright (c) 2021 Project CHIP Authors
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License");
5 #    you may not use this file except in compliance with the License.
6 #    You may obtain a copy of the License at
7 #
8 #        http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS,
12 #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #    See the License for the specific language governing permissions and
14 #    limitations under the License.
15 #
16 from chip.configuration import GetLocalNodeId
17 from chip.native import NativeLibraryHandleMethodArguments, GetLibraryHandle
18 from ctypes import c_uint64, c_uint32, c_uint16, c_char_p
19 from enum import Enum
20 from typing import Optional
21 from chip.internal.types import NetworkCredentialsRequested, OperationalCredentialsRequested, PairingComplete
22 import ctypes
23
24 # Not using c_void_p directly is IMPORTANT. Python auto-casts c_void_p
25 # to intergers and this can cause 32/64 bit issues.
26 class Commissioner_p(ctypes.c_void_p):
27     pass
28
29 class ThreadBlob_p(ctypes.c_void_p):
30     pass
31
32 @NetworkCredentialsRequested
33 def OnNetworkCredentialsRequested():
34     GetCommissioner()._OnNetworkCredentialsRequested()
35
36 @OperationalCredentialsRequested
37 def OnOperationalCredentialsRequested(csr, csr_length):
38     GetCommissioner()._OnOperationalCredentialsRequested(ctypes.string_at(csr, csr_length))
39
40 @PairingComplete
41 def OnPairingComplete(err: int):
42     GetCommissioner()._OnPairingComplete(err)
43
44 class PairingState(Enum):
45     """States throughout a pairing flow. 
46     
47     Devices generally go through:
48       initialized -> pairing -> netcreds -> opcreds -> done (initialized)
49
50     where network credentials may be skipped if device is already on the network.
51     """
52     INITIALIZED = 0
53     PAIRING = 1
54     NEEDS_NETCREDS = 2
55     NEEDS_OPCREDS = 3
56
57
58 class Commissioner:
59     """Commissioner wraps the DeviceCommissioner native class.
60     
61
62     The commissioner is a DeviceController that supports pairing. Since the device
63     controller supports multiple devices, this class is expected to be used 
64     as a singleton
65     
66     """
67
68     def __init__(self, handle: ctypes.CDLL, native: Commissioner_p):
69         self._handle = handle
70         self._native = native
71         self.pairing_state = PairingState.INITIALIZED
72         self.on_network_credentials_requested = None
73         self.on_operational_credentials_requested = None
74         self.on_pairing_complete = None
75
76     
77     def BlePair(self, remoteDeviceId: int, pinCode: int, discriminator: int):
78         result = self._handle.pychip_internal_Commissioner_BleConnectForPairing(self._native, remoteDeviceId, pinCode, discriminator)
79         if result != 0: 
80             raise Exception("Failed to pair. CHIP Error code %d" % result)
81
82         self.pairing_state = PairingState.PAIRING
83     
84     def PairSendWifiCredentials(self, ssid: str, password: str):
85         """Send wifi credentials to the actively connected device."""
86
87         if self.pairing_state != PairingState.NEEDS_NETCREDS:
88             raise Exception("Not in a state requiring network credentials")
89
90         self._handle.pychip_internal_PairingDelegate_SetWifiCredentials(c_char_p(ssid.encode('utf8')), c_char_p(password.encode('utf8')))
91
92     def PairSendThreadCredentials(self, threadBlob: bytes):
93         """Send thread credentials. Thread credentials is an opaque blob from the API perspective."""
94
95         if self.pairing_state != PairingState.NEEDS_NETCREDS:
96             raise Exception("Not in a state requiring network credentials")
97
98         if type(threadBlob) != bytes:
99             raise Exception("Thread credentials MUST be of type bytes")
100
101         result = self._handle.pychip_internal_PairingDelegate_SetThreadCredentials(threadBlob, len(threadBlob))
102
103         if result != 0: 
104             raise Exception("Failed to send thread credentials. CHIP Error code %d" % result)
105
106
107     def Unpair(self, remoteDeviceId: int):
108         result = self._handle.pychip_internal_Commissioner_Unpair(self._native, remoteDeviceId)
109         if result != 0: 
110             raise Exception("Failed to unpair. CHIP Error code %d" % result)
111
112     
113     def _OnNetworkCredentialsRequested(self):
114         self.pairing_state = PairingState.NEEDS_NETCREDS
115         if self.on_network_credentials_requested:
116           self.on_network_credentials_requested()
117
118     def _OnOperationalCredentialsRequested(self, csr: bytes):
119         self.pairing_state = PairingState.NEEDS_OPCREDS
120         if self.on_operational_credentials_requested:
121           self.on_operational_credentials_requested(csr)
122
123     def _OnPairingComplete(self, err: int):
124         self.pairing_state = PairingState.INITIALIZED
125         if self.on_pairing_complete:
126           self.on_pairing_complete(err)
127
128
129 def _SetNativeCallSignatues(handle: ctypes.CDLL):
130     """Sets up the FFI types for the cdll handle."""
131     setter = NativeLibraryHandleMethodArguments(handle)
132
133     setter.Set('pychip_internal_Commissioner_New', Commissioner_p, [c_uint64])
134     setter.Set('pychip_internal_Commissioner_Unpair', c_uint32, [Commissioner_p, c_uint64])
135     setter.Set('pychip_internal_Commissioner_BleConnectForPairing', c_uint32, [Commissioner_p, c_uint64, c_uint32, c_uint16])
136
137     setter.Set('pychip_internal_PairingDelegate_SetNetworkCredentialsRequestedCallback', None, [NetworkCredentialsRequested])
138     setter.Set('pychip_internal_PairingDelegate_SetOperationalCredentialsRequestedCallback', None, [OperationalCredentialsRequested])
139     setter.Set('pychip_internal_PairingDelegate_SetPairingCompleteCallback', None, [PairingComplete])
140     setter.Set('pychip_internal_PairingDelegate_SetWifiCredentials', None, [c_char_p, c_char_p])
141     setter.Set('pychip_internal_PairingDelegate_SetThreadCredentials', c_uint32, [ThreadBlob_p, c_uint32])
142
143
144 commissionerSingleton: Optional[Commissioner] = None
145
146 def GetCommissioner() -> Commissioner:
147     """Gets a reference to the global commissioner singleton.
148
149     Uses the configuration GetLocalNodeId().
150     """
151
152     global commissionerSingleton
153
154     if commissionerSingleton is None:
155         handle = GetLibraryHandle()
156         _SetNativeCallSignatues(handle)
157
158         native = handle.pychip_internal_Commissioner_New(GetLocalNodeId())
159         if not native:
160             raise Exception('Failed to create commissioner object.') 
161
162         handle.pychip_internal_PairingDelegate_SetNetworkCredentialsRequestedCallback(OnNetworkCredentialsRequested)
163         handle.pychip_internal_PairingDelegate_SetOperationalCredentialsRequestedCallback(OnOperationalCredentialsRequested)
164         handle.pychip_internal_PairingDelegate_SetPairingCompleteCallback(OnPairingComplete)
165
166         commissionerSingleton = Commissioner(handle, native)
167
168
169     return commissionerSingleton