Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / src / controller / python / chip / ble / scan_devices.py
1 #
2 #    Copyright (c) 2021 Project CHIP Authors
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License");
5 #    you may not use this file except in compliance with the License.
6 #    You may obtain a copy of the License at
7 #
8 #        http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS,
12 #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #    See the License for the specific language governing permissions and
14 #    limitations under the License.
15 #
16
17 import ctypes
18 from typing import Generator
19 from dataclasses import dataclass
20 from chip.ble.library_handle import _GetBleLibraryHandle
21 from queue import Queue
22 from chip.ble.types import DeviceScannedCallback, ScanDoneCallback
23
24 @DeviceScannedCallback
25 def ScanFoundCallback(closure, address: str, discriminator: int, vendor: int,
26                       product: int):
27   closure.DeviceFound(address, discriminator, vendor, product)
28
29
30 @ScanDoneCallback
31 def ScanDoneCallback(closure):
32   closure.ScanCompleted()
33
34
35 def DiscoverAsync(timeoutMs: int, scanCallback, doneCallback, adapter=None):
36   """Initiate a BLE discovery of devices with the given timeout.
37
38   NOTE: devices are not guaranteed to be unique. New entries are returned
39   as soon as the underlying BLE manager detects changes.
40
41   Args:
42     timeoutMs:    scan will complete after this time
43     scanCallback: callback when a device is found
44     doneCallback: callback when the scan is complete
45     adapter:      what adapter to choose. Either an AdapterInfo object or
46                   a string with the adapter address. If None, the first
47                   adapter on the system is used.
48   """
49   if adapter and not isinstance(adapter, str):
50     adapter = adapter.address
51
52   handle = _GetBleLibraryHandle()
53
54   nativeList = handle.pychip_ble_adapter_list_new()
55   if nativeList == 0:
56     raise Exception('Failed to list available adapters')
57
58   try:
59     while handle.pychip_ble_adapter_list_next(nativeList):
60       if adapter and (adapter != handle.pychip_ble_adapter_list_get_address(
61           nativeList).decode('utf8')):
62         continue
63
64       class ScannerClosure:
65
66         def DeviceFound(self, *args):
67           scanCallback(*args)
68
69         def ScanCompleted(self, *args):
70           doneCallback(*args)
71           ctypes.pythonapi.Py_DecRef(ctypes.py_object(self))
72
73       closure = ScannerClosure()
74       ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure))
75
76       scanner = handle.pychip_ble_start_scanning(
77           ctypes.py_object(closure),
78           handle.pychip_ble_adapter_list_get_raw_adapter(nativeList), timeoutMs,
79           ScanFoundCallback, ScanDoneCallback)
80
81       if scanner == 0:
82         raise Exception('Failed to initiate scan')
83       break
84   finally:
85     handle.pychip_ble_adapter_list_delete(nativeList)
86
87
88 @dataclass
89 class DeviceInfo:
90     address: str
91     discriminator: int
92     vendor: int
93     product: int
94
95 class _DeviceInfoReceiver:
96     """Uses a queue to notify of objects received asynchronously
97        from a ble scan.
98
99        Internal queue gets filled on DeviceFound and ends with None when
100        ScanCompleted.
101     """
102     def __init__(self):
103         self.queue = Queue()
104
105     def DeviceFound(self, address, discriminator, vendor, product):
106         self.queue.put(DeviceInfo(address, discriminator, vendor, product))
107
108     def ScanCompleted(self):
109         self.queue.put(None)
110
111
112
113 def DiscoverSync(timeoutMs: int, adapter = None) -> Generator[DeviceInfo, None, None]:
114   """Discover BLE devices over the specified period of time. 
115
116   NOTE: devices are not guaranteed to be unique. New entries are returned
117   as soon as the underlying BLE manager detects changes.
118
119   Args:
120     timeoutMs:    scan will complete after this time
121     scanCallback: callback when a device is found
122     doneCallback: callback when the scan is complete
123     adapter:      what adapter to choose. Either an AdapterInfo object or
124                   a string with the adapter address. If None, the first
125                   adapter on the system is used.
126   """
127
128   receiver = _DeviceInfoReceiver()
129   DiscoverAsync(timeoutMs, receiver.DeviceFound, receiver.ScanCompleted, adapter)
130
131   while True:
132       data = receiver.queue.get()
133       if not data:
134           break
135       yield data