Source code for test_a_ble.ble_manager

"""
BLE Manager.

Manages BLE device discovery, connection, and communication.
"""

import asyncio
import logging
import sys
import uuid
from typing import Any, Callable, Dict, List, Optional, Union

from bleak import BleakClient, BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

logger = logging.getLogger(__name__)


[docs] def retrieveConnectedPeripheralsWithServices( scanner: BleakScanner, services: Union[List[str], List[uuid.UUID]] ) -> list[BLEDevice]: """Retrieve connected peripherals with specified services.""" devices = [] if sys.platform == "darwin": from CoreBluetooth import CBUUID from Foundation import NSArray for p in scanner._backend._manager.central_manager.retrieveConnectedPeripheralsWithServices_( NSArray.alloc().initWithArray_(list(map(CBUUID.UUIDWithString_, services))) ): if scanner._backend._use_bdaddr: # HACK: retrieveAddressForPeripheral_ is undocumented but seems to do the # trick address_bytes: bytes = scanner._backend._manager.central_manager.retrieveAddressForPeripheral_(p) address = address_bytes.hex(":").upper() else: address = p.identifier().UUIDString() device = scanner._backend.create_or_update_device( address, p.name(), (p, scanner._backend._manager.central_manager.delegate()), AdvertisementData( local_name=p.name(), manufacturer_data=None, service_data=None, service_uuids=None, tx_power=None, rssi=None, platform_data=None, ), ) devices.append(device) logger.debug(f"Found {len(devices)} connected devices with services {services}") return devices
[docs] class BLEManager: """Manages BLE device discovery, connection, and communication.""" # Class variable to store services that the framework should look for when finding connected devices _expected_service_uuids = set()
[docs] @classmethod def register_expected_services(cls, service_uuids): """ Register service UUIDs that should be used when looking for connected devices. Args: service_uuids: List or set of service UUID strings in standard format """ if not service_uuids: return # Convert to set for deduplication if isinstance(service_uuids, (list, tuple, set)): cls._expected_service_uuids.update(service_uuids) else: # If a single UUID is provided cls._expected_service_uuids.add(service_uuids) logger.debug(f"Registered expected service UUIDs: {cls._expected_service_uuids}")
def __init__(self): """Initialize the BLEManager.""" self.device: Optional[BLEDevice] = None self.client: Optional[BleakClient] = None self.discovered_devices: List[BLEDevice] = [] self.services: Dict[str, Any] = {} self.characteristics: Dict[str, Any] = {} self.notification_callbacks: Dict[str, List[Callable]] = {} self.connected = False self.advertisement_data_map: Dict[str, AdvertisementData] = {} # Map device addresses to advertisement data self.active_subscriptions: Dict[str, Callable] = {} # Added for active subscriptions
[docs] async def discover_devices( self, timeout: float = 5.0, name_filter: Optional[str] = None, address_filter: Optional[str] = None, ) -> List[BLEDevice]: """ Scan for BLE devices and return filtered results. Args: timeout: Scan duration in seconds name_filter: Optional filter for device name (substring match) address_filter: Optional filter for device address Returns: List of discovered BLE devices matching filters """ logger.debug(f"Scanning for BLE devices (timeout: {timeout}s)") self.discovered_devices = [] self.advertisement_data_map = {} # Reset the map def _device_found(device: BLEDevice, adv_data: AdvertisementData): # Skip devices we've already found if any(d.address == device.address for d in self.discovered_devices): return # Apply filters if name_filter and name_filter.lower() not in (device.name or "").lower(): return if address_filter and address_filter != device.address: return # Store advertisement data in our map self.advertisement_data_map[device.address] = adv_data self.discovered_devices.append(device) logger.debug(f"Found device: {device.name or 'Unknown'} ({device.address})") # Perform scan scanner = BleakScanner(detection_callback=_device_found) devices = retrieveConnectedPeripheralsWithServices(scanner, self._expected_service_uuids) self.discovered_devices.extend(devices) if devices: logger.debug( f"Found {len(devices)} connected devices with services " f"{self._expected_service_uuids}, not scanning for more devices" ) else: await scanner.start() await asyncio.sleep(timeout) await scanner.stop() # Sort by signal strength (RSSI) def get_rssi(device): # Get advertisement data for device or use default RSSI adv_data = self.advertisement_data_map.get(device.address) return adv_data.rssi if adv_data else -100 self.discovered_devices.sort(key=get_rssi, reverse=True) logger.debug(f"Discovered {len(self.discovered_devices)} devices") return self.discovered_devices
[docs] async def connect_to_device( self, device_or_address: Union[BLEDevice, str], retry_count: int = 3, retry_delay: float = 1.0, ) -> bool: """ Connect to a BLE device. Args: device_or_address: BLEDevice or device address to connect to retry_count: Number of connection attempts before failing retry_delay: Delay between retries in seconds Returns: True if connection successful, False otherwise """ # Check if device_or_address is a string or a BLEDevice if isinstance(device_or_address, str): # Look up device by address in discovered devices device_address = device_or_address for device in self.discovered_devices: if device.address == device_address: self.device = device break # If not found in discovered devices, handle special cases if not self.device: logger.debug(f"Device with address {device_or_address} not in discovered devices") # on macos, we have to scan for the device first if sys.platform == "darwin": devices = await self.discover_devices(timeout=5.0, address_filter=device_or_address) if devices: self.device = devices[0] logger.debug(f"Found device: {self.device.name or 'Unknown'} ({self.device.address})") else: logger.error(f"Could not find device with address {device_or_address}") return False else: try: # For modern Bleak (0.19.0+), create a device with required parameters self.device = BLEDevice(address=device_or_address, name=None, details={}, rssi=0) except Exception as e: logger.error(f"Failed to create BLEDevice: {str(e)}") logger.debug("Attempting to discover the device first...") # Try to discover the device first devices = await self.discover_devices(timeout=5.0, address_filter=device_or_address) if devices: self.device = devices[0] logger.debug(f"Found device: {self.device.name or 'Unknown'} ({self.device.address})") else: logger.error(f"Could not find device with address {device_or_address}") self.device = None return False else: self.device = device_or_address logger.info(f"Connecting to {self.device.name or 'Unknown'} ({self.device.address})") # Attempt connection with retries for attempt in range(retry_count): try: logger.debug(f"Connection attempt {attempt + 1}/{retry_count}") # Create client with the device identifier self.client = BleakClient(self.device) # Connect to the device await self.client.connect() self.connected = True logger.info(f"Connected to {self.device.name or 'Unknown'} ({self.device.address})") return True except Exception as e: logger.warning(f"Connection attempt {attempt + 1} failed: {str(e)}") if attempt < retry_count - 1: await asyncio.sleep(retry_delay) logger.error(f"Failed to connect to device after {retry_count} attempts") self.device = None return False
[docs] async def disconnect(self): """Disconnect from the connected device and clean up resources.""" logger.debug("Starting BLE disconnect process") if not self.client: logger.debug("No client to disconnect") return # First, clean up all active subscriptions subscription_uuids = list(self.active_subscriptions.keys()) if subscription_uuids: logger.debug(f"Cleaning up {len(subscription_uuids)} active subscriptions") for sub_uuid in subscription_uuids: try: logger.debug(f"Unsubscribing from {sub_uuid}") await self.unsubscribe_from_characteristic(sub_uuid) except Exception as e: logger.debug(f"Error cleaning up subscription to {uuid}: {e}") # Now attempt to disconnect from the device try: if self.client.is_connected: logger.info(f"Disconnecting from {self.device.address if self.device else 'unknown device'}") await self.client.disconnect() logger.debug("Disconnected successfully") else: logger.debug("Client already disconnected") except Exception as e: logger.error(f"Error during disconnect: {e}") finally: # Ensure these are cleaned up regardless of disconnect success self.connected = False self.active_subscriptions.clear() self.notification_callbacks.clear() self.services.clear() self.characteristics.clear() # Clear the client reference self.client = None logger.debug("Disconnect cleanup completed")
[docs] async def discover_services(self, cache: bool = True) -> Dict[str, Any]: """ Discover services and characteristics of the connected device. Args: cache: Whether to cache results for future use Returns: Dictionary of services and their characteristics """ if not self.client or not self.client.is_connected: logger.error("Not connected to any device") return {} logger.debug("Discovering services and characteristics") # Return cached services if available if self.device.address in self.services and cache: return self.services[self.device.address] # Discover services services = {} for service in self.client.services: characteristics = {} for char in service.characteristics: properties = [] if "read" in char.properties: properties.append("read") if "write" in char.properties: properties.append("write") if "notify" in char.properties: properties.append("notify") characteristics[str(char.uuid)] = { "uuid": str(char.uuid), "properties": properties, "description": char.description or "", "handle": char.handle, } services[str(service.uuid)] = { "uuid": str(service.uuid), "characteristics": characteristics, } if cache: self.services[self.device.address] = services logger.debug(f"Discovered {len(services)} services") return services
[docs] async def read_characteristic(self, characteristic_uuid: str) -> bytearray: """ Read value from a characteristic. Args: characteristic_uuid: UUID of the characteristic to read Returns: Bytes read from the characteristic """ if not self.client or not self.client.is_connected: raise RuntimeError("Not connected to any device") logger.debug(f"Reading characteristic: {characteristic_uuid}") value = await self.client.read_gatt_char(characteristic_uuid) logger.debug(f"Read value: {value.hex()}") return value
[docs] async def write_characteristic( self, characteristic_uuid: str, data: Union[bytes, bytearray, memoryview], response: bool = True, ) -> None: """ Write value to a characteristic. Args: characteristic_uuid: UUID of the characteristic to write to data: Data to write response: Whether to wait for response """ if not self.client or not self.client.is_connected: raise RuntimeError("Not connected to any device") logger.debug(f"Writing to characteristic {characteristic_uuid}: {data.hex()}") # Check if the characteristic is readable before trying to read it is_readable = False try: # Get the services if not already cached if not self.services: await self.discover_services() # Look for the characteristic in all services for service_uuid, service_info in self.services.get(self.device.address, {}).items(): characteristics = service_info.get("characteristics", {}) if characteristic_uuid in characteristics: properties = characteristics[characteristic_uuid].get("properties", []) is_readable = "read" in properties break logger.debug(f"Characteristic {characteristic_uuid} is readable: {is_readable}") except Exception as e: logger.debug(f"Error checking if characteristic is readable: {str(e)}") is_readable = False try: # Try to get current value before writing (if characteristic supports reading) if is_readable: try: current_value = await self.client.read_gatt_char(characteristic_uuid) logger.debug(f"Current value before write: {current_value.hex()}") except Exception as e: logger.debug(f"Could not read characteristic before write despite being readable: {str(e)}") else: logger.debug("Skipping pre-write read - characteristic not readable") # Write the new value await self.client.write_gatt_char(characteristic_uuid, data, response) logger.debug(f"Write command sent for {characteristic_uuid}") # Verify the write was successful if response is True and characteristic is readable if response and is_readable: try: # Small delay to allow the device to process the write await asyncio.sleep(0.1) # Read back the value to verify new_value = await self.client.read_gatt_char(characteristic_uuid) # Check if the value matches what we wrote if new_value == data: logger.debug(f"Write verified: {new_value.hex()}") else: logger.warning(f"Write verification failed. Expected: {data.hex()}, Got: {new_value.hex()}") except Exception as e: logger.debug(f"Could not verify write: {str(e)}") elif not is_readable: logger.debug("Skipping write verification - characteristic not readable") logger.debug("Write operation completed") except Exception as e: logger.error(f"Error writing to characteristic {characteristic_uuid}: {str(e)}") raise
def _notification_handler(self, characteristic_uuid: str): """Create a notification handler for a specific characteristic.""" def _handle_notification(sender, data: bytearray): """ Handle BLE notifications in latest Bleak versions. The sender parameter can be of different types in different Bleak versions. """ # Check if we received actual data - sometimes error strings may be passed if isinstance(data, bytearray) or isinstance(data, bytes): logger.debug(f"Notification from {characteristic_uuid}: {data.hex()}") # Call all registered callbacks for this characteristic if characteristic_uuid in self.notification_callbacks: for callback in self.notification_callbacks[characteristic_uuid]: try: callback(data) except Exception as e: logger.error(f"Error in notification callback: {str(e)}") # If we get a non-data value (like an error string), log it but don't invoke callbacks elif data is not None: # Log but at debug level to avoid cluttering logs logger.debug(f"Received non-data notification from {characteristic_uuid}: {data}") return _handle_notification
[docs] async def subscribe_to_characteristic( self, characteristic_uuid: str, callback: Callable[[bytearray], None] ) -> None: """ Subscribe to notifications from a characteristic. Args: characteristic_uuid: UUID of the characteristic to subscribe to callback: Function to call when notification is received """ if not self.client or not self.client.is_connected: raise RuntimeError("Not connected to any device") # Register callback if characteristic_uuid not in self.notification_callbacks: self.notification_callbacks[characteristic_uuid] = [] # Start listening for notifications try: await self.client.start_notify(characteristic_uuid, self._notification_handler(characteristic_uuid)) # Track the active subscription self.active_subscriptions[characteristic_uuid] = True logger.debug(f"Subscribed to notifications from {characteristic_uuid}") except Exception as e: logger.error(f"Failed to subscribe to {characteristic_uuid}: {e}") raise self.notification_callbacks[characteristic_uuid].append(callback) logger.debug( f"Added callback for {characteristic_uuid}, total callbacks: " f"{len(self.notification_callbacks[characteristic_uuid])}" )
[docs] async def unsubscribe_from_characteristic(self, characteristic_uuid: str) -> None: """ Unsubscribe from notifications from a characteristic. Args: characteristic_uuid: UUID of the characteristic to unsubscribe from """ # Handle case where we're not connected anymore if not self.client or not self.client.is_connected: logger.debug(f"Not connected when unsubscribing from {characteristic_uuid}") # Clean up local tracking if characteristic_uuid in self.notification_callbacks: logger.debug(f"Clearing callbacks for {characteristic_uuid} (not connected)") del self.notification_callbacks[characteristic_uuid] if characteristic_uuid in self.active_subscriptions: logger.debug(f"Removing from active subscriptions: {characteristic_uuid} (not connected)") del self.active_subscriptions[characteristic_uuid] return # Otherwise handle normally with the connected client try: # Stop notifications from the device if we're subscribed if characteristic_uuid in self.active_subscriptions: try: await self.client.stop_notify(characteristic_uuid) logger.info(f"Unsubscribed from notifications from {characteristic_uuid}") except Exception as e: logger.error(f"Error stopping notifications for {characteristic_uuid}: {e}") finally: # Remove from active subscriptions even if there was an error del self.active_subscriptions[characteristic_uuid] else: logger.debug(f"No active subscription for {characteristic_uuid}") # Clear any registered callbacks if characteristic_uuid in self.notification_callbacks: logger.debug( f"Clearing {len(self.notification_callbacks[characteristic_uuid])} callbacks for " f"{characteristic_uuid}" ) del self.notification_callbacks[characteristic_uuid] except Exception as e: logger.error(f"Error during unsubscribe from {characteristic_uuid}: {e}") # Still clean up local state even if there was an error if characteristic_uuid in self.active_subscriptions: del self.active_subscriptions[characteristic_uuid] if characteristic_uuid in self.notification_callbacks: del self.notification_callbacks[characteristic_uuid]
[docs] def get_discovered_device_info(self) -> List[Dict[str, Any]]: """Return information about discovered devices in a structured format.""" result = [] for device in self.discovered_devices: # Get RSSI from our advertisement data map adv_data = self.advertisement_data_map.get(device.address) rssi = adv_data.rssi if adv_data else None result.append( { "name": device.name or "Unknown", "address": device.address, "rssi": rssi, } ) return result