2 # Copyright (c) 2021 Project CHIP Authors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from typing import Generator
19 from dataclasses import dataclass
20 from chip.ble.library_handle import _GetBleLibraryHandle
21 from queue import Queue
22 from chip.ble.types import DeviceScannedCallback, ScanDoneCallback
24 @DeviceScannedCallback
25 def ScanFoundCallback(closure, address: str, discriminator: int, vendor: int,
27 closure.DeviceFound(address, discriminator, vendor, product)
31 def ScanDoneCallback(closure):
32 closure.ScanCompleted()
35 def DiscoverAsync(timeoutMs: int, scanCallback, doneCallback, adapter=None):
36 """Initiate a BLE discovery of devices with the given timeout.
38 NOTE: devices are not guaranteed to be unique. New entries are returned
39 as soon as the underlying BLE manager detects changes.
42 timeoutMs: scan will complete after this time
43 scanCallback: callback when a device is found
44 doneCallback: callback when the scan is complete
45 adapter: what adapter to choose. Either an AdapterInfo object or
46 a string with the adapter address. If None, the first
47 adapter on the system is used.
49 if adapter and not isinstance(adapter, str):
50 adapter = adapter.address
52 handle = _GetBleLibraryHandle()
54 nativeList = handle.pychip_ble_adapter_list_new()
56 raise Exception('Failed to list available adapters')
59 while handle.pychip_ble_adapter_list_next(nativeList):
60 if adapter and (adapter != handle.pychip_ble_adapter_list_get_address(
61 nativeList).decode('utf8')):
66 def DeviceFound(self, *args):
69 def ScanCompleted(self, *args):
71 ctypes.pythonapi.Py_DecRef(ctypes.py_object(self))
73 closure = ScannerClosure()
74 ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure))
76 scanner = handle.pychip_ble_start_scanning(
77 ctypes.py_object(closure),
78 handle.pychip_ble_adapter_list_get_raw_adapter(nativeList), timeoutMs,
79 ScanFoundCallback, ScanDoneCallback)
82 raise Exception('Failed to initiate scan')
85 handle.pychip_ble_adapter_list_delete(nativeList)
95 class _DeviceInfoReceiver:
96 """Uses a queue to notify of objects received asynchronously
99 Internal queue gets filled on DeviceFound and ends with None when
105 def DeviceFound(self, address, discriminator, vendor, product):
106 self.queue.put(DeviceInfo(address, discriminator, vendor, product))
108 def ScanCompleted(self):
113 def DiscoverSync(timeoutMs: int, adapter = None) -> Generator[DeviceInfo, None, None]:
114 """Discover BLE devices over the specified period of time.
116 NOTE: devices are not guaranteed to be unique. New entries are returned
117 as soon as the underlying BLE manager detects changes.
120 timeoutMs: scan will complete after this time
121 scanCallback: callback when a device is found
122 doneCallback: callback when the scan is complete
123 adapter: what adapter to choose. Either an AdapterInfo object or
124 a string with the adapter address. If None, the first
125 adapter on the system is used.
128 receiver = _DeviceInfoReceiver()
129 DiscoverAsync(timeoutMs, receiver.DeviceFound, receiver.ScanCompleted, adapter)
132 data = receiver.queue.get()