2 # Copyright (c) 2021 Project CHIP Authors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from chip.configuration import GetLocalNodeId
17 from chip.native import NativeLibraryHandleMethodArguments, GetLibraryHandle
18 from ctypes import c_uint64, c_uint32, c_uint16, c_char_p
20 from typing import Optional
21 from chip.internal.types import NetworkCredentialsRequested, OperationalCredentialsRequested, PairingComplete
24 # Not using c_void_p directly is IMPORTANT. Python auto-casts c_void_p
25 # to intergers and this can cause 32/64 bit issues.
26 class Commissioner_p(ctypes.c_void_p):
29 class ThreadBlob_p(ctypes.c_void_p):
32 @NetworkCredentialsRequested
33 def OnNetworkCredentialsRequested():
34 GetCommissioner()._OnNetworkCredentialsRequested()
36 @OperationalCredentialsRequested
37 def OnOperationalCredentialsRequested(csr, csr_length):
38 GetCommissioner()._OnOperationalCredentialsRequested(ctypes.string_at(csr, csr_length))
41 def OnPairingComplete(err: int):
42 GetCommissioner()._OnPairingComplete(err)
44 class PairingState(Enum):
45 """States throughout a pairing flow.
47 Devices generally go through:
48 initialized -> pairing -> netcreds -> opcreds -> done (initialized)
50 where network credentials may be skipped if device is already on the network.
59 """Commissioner wraps the DeviceCommissioner native class.
62 The commissioner is a DeviceController that supports pairing. Since the device
63 controller supports multiple devices, this class is expected to be used
68 def __init__(self, handle: ctypes.CDLL, native: Commissioner_p):
71 self.pairing_state = PairingState.INITIALIZED
72 self.on_network_credentials_requested = None
73 self.on_operational_credentials_requested = None
74 self.on_pairing_complete = None
77 def BlePair(self, remoteDeviceId: int, pinCode: int, discriminator: int):
78 result = self._handle.pychip_internal_Commissioner_BleConnectForPairing(self._native, remoteDeviceId, pinCode, discriminator)
80 raise Exception("Failed to pair. CHIP Error code %d" % result)
82 self.pairing_state = PairingState.PAIRING
84 def PairSendWifiCredentials(self, ssid: str, password: str):
85 """Send wifi credentials to the actively connected device."""
87 if self.pairing_state != PairingState.NEEDS_NETCREDS:
88 raise Exception("Not in a state requiring network credentials")
90 self._handle.pychip_internal_PairingDelegate_SetWifiCredentials(c_char_p(ssid.encode('utf8')), c_char_p(password.encode('utf8')))
92 def PairSendThreadCredentials(self, threadBlob: bytes):
93 """Send thread credentials. Thread credentials is an opaque blob from the API perspective."""
95 if self.pairing_state != PairingState.NEEDS_NETCREDS:
96 raise Exception("Not in a state requiring network credentials")
98 if type(threadBlob) != bytes:
99 raise Exception("Thread credentials MUST be of type bytes")
101 result = self._handle.pychip_internal_PairingDelegate_SetThreadCredentials(threadBlob, len(threadBlob))
104 raise Exception("Failed to send thread credentials. CHIP Error code %d" % result)
107 def Unpair(self, remoteDeviceId: int):
108 result = self._handle.pychip_internal_Commissioner_Unpair(self._native, remoteDeviceId)
110 raise Exception("Failed to unpair. CHIP Error code %d" % result)
113 def _OnNetworkCredentialsRequested(self):
114 self.pairing_state = PairingState.NEEDS_NETCREDS
115 if self.on_network_credentials_requested:
116 self.on_network_credentials_requested()
118 def _OnOperationalCredentialsRequested(self, csr: bytes):
119 self.pairing_state = PairingState.NEEDS_OPCREDS
120 if self.on_operational_credentials_requested:
121 self.on_operational_credentials_requested(csr)
123 def _OnPairingComplete(self, err: int):
124 self.pairing_state = PairingState.INITIALIZED
125 if self.on_pairing_complete:
126 self.on_pairing_complete(err)
129 def _SetNativeCallSignatues(handle: ctypes.CDLL):
130 """Sets up the FFI types for the cdll handle."""
131 setter = NativeLibraryHandleMethodArguments(handle)
133 setter.Set('pychip_internal_Commissioner_New', Commissioner_p, [c_uint64])
134 setter.Set('pychip_internal_Commissioner_Unpair', c_uint32, [Commissioner_p, c_uint64])
135 setter.Set('pychip_internal_Commissioner_BleConnectForPairing', c_uint32, [Commissioner_p, c_uint64, c_uint32, c_uint16])
137 setter.Set('pychip_internal_PairingDelegate_SetNetworkCredentialsRequestedCallback', None, [NetworkCredentialsRequested])
138 setter.Set('pychip_internal_PairingDelegate_SetOperationalCredentialsRequestedCallback', None, [OperationalCredentialsRequested])
139 setter.Set('pychip_internal_PairingDelegate_SetPairingCompleteCallback', None, [PairingComplete])
140 setter.Set('pychip_internal_PairingDelegate_SetWifiCredentials', None, [c_char_p, c_char_p])
141 setter.Set('pychip_internal_PairingDelegate_SetThreadCredentials', c_uint32, [ThreadBlob_p, c_uint32])
144 commissionerSingleton: Optional[Commissioner] = None
146 def GetCommissioner() -> Commissioner:
147 """Gets a reference to the global commissioner singleton.
149 Uses the configuration GetLocalNodeId().
152 global commissionerSingleton
154 if commissionerSingleton is None:
155 handle = GetLibraryHandle()
156 _SetNativeCallSignatues(handle)
158 native = handle.pychip_internal_Commissioner_New(GetLocalNodeId())
160 raise Exception('Failed to create commissioner object.')
162 handle.pychip_internal_PairingDelegate_SetNetworkCredentialsRequestedCallback(OnNetworkCredentialsRequested)
163 handle.pychip_internal_PairingDelegate_SetOperationalCredentialsRequestedCallback(OnOperationalCredentialsRequested)
164 handle.pychip_internal_PairingDelegate_SetPairingCompleteCallback(OnPairingComplete)
166 commissionerSingleton = Commissioner(handle, native)
169 return commissionerSingleton