Source code for test_a_ble.cli

"""Command Line Interface for BLE Testing Framework."""

import argparse
import asyncio
import concurrent.futures
import logging
import sys
import time
from typing import Any, Dict, Optional, Tuple

import bleak
from rich import box
from rich.console import Console
from rich.table import Table

from . import setup_logging
from .ble_manager import BLEManager
from .test_context import TestStatus
from .test_runner import TestRunner

# Set up console for rich output
console = Console()
logger = logging.getLogger("ble_tester")


[docs] def get_console() -> Console: """Return the global console object for rich output.""" global console return console
[docs] async def dynamic_device_selection(ble_manager: BLEManager, timeout: float = 10.0) -> Tuple[bool, bool]: """ Interactive device discovery with real-time updates and concurrent user input. Args: ble_manager: BLE Manager instance timeout: Maximum scan duration in seconds Returns: Tuple of (connected successfully, user quit) """ console.print("[bold]Scanning for BLE devices...[/bold]") console.print(f"[dim]Scan will continue for up to {timeout} seconds[/dim]") console.print( "[bold yellow]Enter a device number to select it immediately, press Enter for options, or wait for scan to " "complete[/bold yellow]" ) # Keep track of discovered devices in order of discovery discovered_devices = [] # Event to signal when scanning should stop stop_event = asyncio.Event() # Flag to indicate UI needs updating ui_update_needed = asyncio.Event() # Function to be called when new devices are found (runs in BLE library thread) def device_found_callback(device, adv_data): # Skip devices we've already found if any(d.address == device.address for d in discovered_devices): return # Store the device and advertisement data (thread-safe operations) ble_manager.advertisement_data_map[device.address] = adv_data discovered_devices.append(device) # Signal that UI needs updating (thread-safe) ui_update_needed.set() # Log device discovery for debugging logger.debug(f"Device discovered: {device.name or 'Unknown'} ({device.address})") # Start scanning task async def scan_for_devices(): # Create a new scanner each time scanner = bleak.BleakScanner(detection_callback=device_found_callback) try: # Start scanning await scanner.start() logger.debug("Scanner started") # Keep scanning until timeout or stop_event scan_end_time = time.time() + timeout while time.time() < scan_end_time and not stop_event.is_set(): await asyncio.sleep(0.1) logger.debug(f"Scan finished: timeout={time.time() >= scan_end_time}, stopped={stop_event.is_set()}") finally: # Ensure scanner is stopped await scanner.stop() logger.debug("Scanner stopped") ble_manager.discovered_devices = discovered_devices.copy() # Task to update the UI when needed (runs in the main asyncio loop) async def update_ui(): last_update_time = 0 last_device_count = 0 force_update = False while not stop_event.is_set(): try: # Wait for signal with timeout try: await asyncio.wait_for(ui_update_needed.wait(), timeout=0.5) ui_update_needed.clear() force_update = True # Force update when signal is received except asyncio.TimeoutError: # Force update every 3 seconds regardless of signal if time.time() - last_update_time >= 3.0: force_update = True else: continue # No update needed # Check if we need to update the UI current_device_count = len(discovered_devices) # Skip update if no new devices and not forced if not force_update and current_device_count == last_device_count: continue # Track last update time and device count last_update_time = time.time() last_device_count = current_device_count force_update = False # Reset force flag # Create new table for each update table = Table(title="Discovered Devices") table.add_column("#", justify="right", style="cyan") table.add_column("Name", style="green") table.add_column("Address", style="blue") table.add_column("RSSI", justify="right") # Add devices to table for i, device in enumerate(discovered_devices): adv_data = ble_manager.advertisement_data_map.get(device.address) rssi = adv_data.rssi if adv_data else "N/A" table.add_row(str(i + 1), device.name or "Unknown", device.address, str(rssi)) # Clear console and redraw (in main thread) console.clear() console.print("[bold]Scanning for BLE devices...[/bold]") console.print(f"[dim]Scan will continue for up to {timeout} seconds[/dim]") if discovered_devices: console.print(table) console.print( "[bold yellow]Enter a device number to select it immediately, press Enter for options, or wait " "for scan to complete[/bold yellow]" ) else: console.print("[dim]No devices found yet...[/dim]") console.print( "[bold yellow]Press Enter for options or wait for devices to be discovered[/bold yellow]" ) except Exception as e: logger.error(f"Error updating UI: {e}") await asyncio.sleep(0.5) # Avoid tight loop on error # Create the tasks scan_task = asyncio.create_task(scan_for_devices()) ui_task = asyncio.create_task(update_ui()) # Set up input handling try: while not scan_task.done(): # Get user input with timeout try: # Wait for user input user_input = await asyncio.to_thread(console.input, "") # Check if the input is a device number try: # If input is a number and valid, connect to that device if user_input.strip(): device_index = int(user_input.strip()) - 1 # Stop scanning first stop_event.set() await asyncio.wait_for( asyncio.gather(scan_task, ui_task, return_exceptions=True), timeout=2.0, ) # Check if the device index is valid if 0 <= device_index < len(discovered_devices): device = discovered_devices[device_index] console.print( f"[bold]Connecting to {device.name or 'Unknown'} ({device.address})...[/bold]" ) connected = await ble_manager.connect_to_device(device) if connected: console.print(f"[bold green]Successfully connected to {device.address}![/bold green]") return True, False # Connected, not user quit else: console.print(f"[bold red]Failed to connect to {device.address}![/bold red]") # Return to selection menu rather than quitting break else: console.print(f"[bold red]Invalid device number: {user_input}![/bold red]") await asyncio.sleep(1) # Brief pause so user can see the error # Continue scanning stop_event.clear() scan_task = asyncio.create_task(scan_for_devices()) ui_task = asyncio.create_task(update_ui()) continue else: # Empty input (just Enter key) - stop scanning and show menu stop_event.set() await asyncio.wait_for( asyncio.gather(scan_task, ui_task, return_exceptions=True), timeout=2.0, ) break except ValueError: # Not a number, treat as Enter key console.print( f"[bold red]Invalid input: {user_input}. Press Enter or enter a device number.[/bold red]" ) await asyncio.sleep(1) # Brief pause so user can see the error # Continue scanning ui_update_needed.set() # Force UI refresh continue except asyncio.TimeoutError: # No input received, continue scanning continue except asyncio.CancelledError: # Task was cancelled, clean up stop_event.set() if not scan_task.done(): scan_task.cancel() if not ui_task.done(): ui_task.cancel() finally: # Make sure scanning is stopped and tasks are cleaned up stop_event.set() # Cancel any running tasks if not scan_task.done(): scan_task.cancel() try: await asyncio.wait_for(scan_task, timeout=1.0) except (asyncio.TimeoutError, asyncio.CancelledError): pass if not ui_task.done(): ui_task.cancel() try: await asyncio.wait_for(ui_task, timeout=1.0) except (asyncio.TimeoutError, asyncio.CancelledError): pass # Show selection menu after scan completes or user presses Enter if discovered_devices: # Build a final table for selection table = Table(title="Discovered Devices") table.add_column("#", justify="right", style="cyan") table.add_column("Name", style="green") table.add_column("Address", style="blue") table.add_column("RSSI", justify="right") for i, device in enumerate(discovered_devices): adv_data = ble_manager.advertisement_data_map.get(device.address) rssi = adv_data.rssi if adv_data else "N/A" table.add_row(str(i + 1), device.name or "Unknown", device.address, str(rssi)) console.clear() console.print("[bold]Device Selection[/bold]") console.print(table) while True: selection = console.input( "\n[bold yellow]Enter device number to connect, 'r' to rescan, or 'q' to quit: [/bold yellow]" ) if selection.lower() == "q": return False, True # Not connected, user quit if selection.lower() == "r": # Reset and restart scanning discovered_devices.clear() ble_manager.advertisement_data_map.clear() ble_manager.discovered_devices.clear() return await dynamic_device_selection(ble_manager, timeout) try: index = int(selection) - 1 if 0 <= index < len(discovered_devices): device = discovered_devices[index] # Connect to selected device console.print(f"[bold]Connecting to {device.name or 'Unknown'} ({device.address})...[/bold]") connected = await ble_manager.connect_to_device(device) if connected: console.print(f"[bold green]Successfully connected to {device.address}![/bold green]") return True, False # Connected, not user quit else: console.print(f"[bold red]Failed to connect to {device.address}![/bold red]") # Ask if user wants to try again retry = console.input("[bold yellow]Try again? (y/n): [/bold yellow]") if retry.lower() == "y": # Restart scanning discovered_devices.clear() ble_manager.advertisement_data_map.clear() ble_manager.discovered_devices.clear() return await dynamic_device_selection(ble_manager, timeout) else: return False, True # User quit else: console.print("[bold red]Invalid selection![/bold red]") except ValueError: console.print("[bold red]Please enter a number, 'r', or 'q'![/bold red]") else: console.print("[bold red]No devices found![/bold red]") rescan = console.input("[bold yellow]Press 'r' to rescan or any other key to quit: [/bold yellow]") if rescan.lower() == "r": # Clear previous state before rescanning discovered_devices.clear() ble_manager.advertisement_data_map.clear() ble_manager.discovered_devices.clear() return await dynamic_device_selection(ble_manager, timeout) return False, False # Not connected, not user quit
[docs] async def connect_to_device( ble_manager: BLEManager, address: Optional[str] = None, name: Optional[str] = None, interactive: bool = False, scan_timeout: float = 10.0, ) -> Tuple[bool, bool]: """ Connect to a BLE device by address, name, or interactively. Args: ble_manager: BLE Manager instance address: Optional device address to connect to name: Optional device name to connect to interactive: Whether to use interactive mode for device selection scan_timeout: Scan timeout in seconds Returns: Tuple of (connected successfully, user quit) """ # Interactive mode if interactive and not address and not name: # Use dynamic device selection instead of the old interactive selection return await dynamic_device_selection(ble_manager, scan_timeout) # Connect by address if address: console.print(f"[bold]Connecting to device with address {address}...[/bold]") connected = await ble_manager.connect_to_device(address) if connected: console.print(f"[bold green]Successfully connected to {address}![/bold green]") return True, False # Connected, not user quit else: console.print(f"[bold red]Failed to connect to {address}![/bold red]") return False, False # Not connected, not user quit # Connect by name if name: console.print(f"[bold]Searching for device with name '{name}'...[/bold]") devices = await ble_manager.discover_devices(timeout=scan_timeout, name_filter=name) if not devices: console.print(f"[bold red]No devices found with name '{name}'![/bold red]") return False, False # Not connected, not user quit # Connect to the first matching device device = devices[0] console.print(f"[bold]Connecting to {device.name} ({device.address})...[/bold]") connected = await ble_manager.connect_to_device(device) if connected: console.print(f"[bold green]Successfully connected to {device.address}![/bold green]") return True, False # Connected, not user quit else: console.print(f"[bold red]Failed to connect to {device.address}![/bold red]") return False, False # Not connected, not user quit # No connection method specified console.print("[bold red]No device specified for connection![/bold red]") return False, False # Not connected, not user quit
[docs] async def run_ble_tests(args): """Run BLE tests based on command line arguments.""" # Create BLE manager ble_manager = BLEManager() # Create console for rich output console = get_console() # Create a TestRunner instance test_runner = TestRunner(ble_manager) try: # IMPORTANT: First discover tests before attempting connection # This allows test packages to register their service UUIDs during initialization # Determine test directory and tests to run based on test specifiers all_tests = [] logger.debug(f"args.test_specifiers: {args.test_specifiers}") # Process each test specifier to determine its test directory all_tests = test_runner.discover_tests(args.test_specifiers) if not all_tests: console.print("[bold red]No tests were discovered in any specified directories![/bold red]") console.print("[dim]Check that your test files begin with 'test_' and are in the correct location.[/dim]") return # sum number of tests in all modules: total_tests = sum(len(tests) for _, tests in all_tests) console.print(f"[bold]Found {total_tests} test(s) in {len(all_tests)} module(s)[/bold]") # Now that tests have been discovered and service UUIDs registered, connect to device # Connect to device if args.address: # Connect directly to the specified address console.print(f"[bold]Connecting to device with address {args.address}...[/bold]") connected = await ble_manager.connect_to_device(args.address) if not connected: console.print(f"[bold red]Failed to connect to {args.address}![/bold red]") return elif args.name: # Search for a device with the specified name console.print(f"[bold]Searching for device with name '{args.name}'...[/bold]") devices = await ble_manager.discover_devices(timeout=args.scan_timeout) matching_devices = [d for d in devices if args.name.lower() in (d.name or "").lower()] if not matching_devices: console.print(f"[bold red]No devices found with name containing '{args.name}'![/bold red]") return device = matching_devices[0] console.print(f"[bold]Connecting to {device.name} ({device.address})...[/bold]") connected = await ble_manager.connect_to_device(device) if not connected: console.print(f"[bold red]Failed to connect to {device.address}![/bold red]") return else: # No address or name specified, use interactive device discovery console.print("[bold]No device address or name specified, starting interactive device discovery...[/bold]") connected, user_quit = await connect_to_device( ble_manager, interactive=True, scan_timeout=args.scan_timeout ) if not connected: if user_quit: console.print("[bold yellow]User quit device selection![/bold yellow]") else: console.print("[bold red]Failed to connect to device![/bold red]") return # Run tests from each discovered test directory all_results = { "results": {}, "passed_tests": 0, "failed_tests": 0, "total_tests": 0, } # Run tests for module_name, tests in all_tests: console.print(f"[bold]Running {len(tests)} tests in {module_name}...[/bold]") results = await test_runner.run_tests(tests) # Merge results if "results" in results: all_results["results"].update(results["results"]) all_results["passed_tests"] += results.get("passed_tests", 0) all_results["failed_tests"] += results.get("failed_tests", 0) all_results["total_tests"] += results.get("total_tests", 0) # Print consolidated results if all_results["total_tests"] > 0: print_test_results(all_results, args.verbose) else: console.print("[bold red]No tests were run![/bold red]") finally: # Clean up test context tasks if "test_runner" in locals(): try: await test_runner.test_context.cleanup_tasks() except Exception as e: logger.error(f"Error cleaning up test context: {e}") # Disconnect from device console.print("[bold]Disconnecting from device...[/bold]") try: await ble_manager.disconnect() except Exception as e: logger.error(f"Error during disconnect: {e}") # More aggressive task cancellation to ensure clean exit # Get all tasks except the current one remaining_tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()] if remaining_tasks: logger.debug(f"Cancelling {len(remaining_tasks)} remaining tasks") # First attempt to cancel all tasks for task in remaining_tasks: task.cancel() # Wait for tasks to acknowledge cancellation try: # Set a short timeout to avoid blocking indefinitely await asyncio.wait(remaining_tasks, timeout=2.0) logger.debug("Tasks acknowledged cancellation") except Exception as e: logger.debug(f"Error waiting for task cancellation: {e}") # Check for any tasks that didn't cancel properly still_running = [t for t in remaining_tasks if not t.done()] if still_running: logger.debug(f"{len(still_running)} tasks still running after cancellation") # Try gathering with exceptions to force completion try: await asyncio.gather(*still_running, return_exceptions=True) except Exception as e: logger.debug(f"Error during forced task completion: {e}") # Force shutdown of all executor threads loop = asyncio.get_running_loop() # Force shutdown of any thread pools executor = concurrent.futures.ThreadPoolExecutor() executor._threads.clear() # Close all running transports - this helps with hanging socket connections for transport in getattr(loop, "_transports", set()): if hasattr(transport, "close"): logger.debug(f"Closing transport: {transport}") try: transport.close() except Exception as e: logger.debug(f"Error closing transport: {e}") # Force event loop to close by returning from this coroutine logger.debug("Cleanup complete, exiting run_ble_tests")
[docs] def main(): """Execute the main function.""" parser = argparse.ArgumentParser( description="BLE IoT Device Testing Tool - Discovers and runs tests for BLE devices. " "If no device address or name is provided, interactive device discovery will be used." ) # Device selection options device_group = parser.add_argument_group("Device Selection") device_group.add_argument("--address", "-a", help="MAC address of the BLE device") device_group.add_argument("--name", help="Name of the BLE device") device_group.add_argument( "--scan-timeout", type=float, default=10.0, help="Timeout for device scanning in seconds (default: 10.0)", ) # Test options test_group = parser.add_argument_group("Test Options") # Remove test-dir argument and keep only positional arguments for test specifiers parser.add_argument( "test_specifiers", nargs="*", default=["all"], help="Test specifiers in unittest-style format. Examples:\n" " test_module # Run all tests in a module\n" " test_module.test_function # Run a specific test function\n" " path/to/test_file.py # Run all tests in a file\n" " path/to/directory # Run all tests in a directory\n" " all # Run all tests in current directory (default)", ) # Logging options log_group = parser.add_argument_group("Logging Options") log_group.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging (includes logs for all tests)", ) log_group.add_argument("--log-file", help="Log file path (default: no file logging)") args = parser.parse_args() # Configure logging using our new setup function setup_logging(verbose=args.verbose, log_file=args.log_file) # Run tests try: logger.debug("Starting test execution") # Create a new event loop with custom executor shutdown timeout loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Run the main coroutine with our custom loop loop.run_until_complete(run_ble_tests(args)) # Perform manual cleanup after run completes pending = asyncio.all_tasks(loop) if pending: logger.debug(f"Cancelling {len(pending)} pending tasks") for task in pending: task.cancel() # Wait briefly for tasks to acknowledge cancellation loop.run_until_complete(asyncio.wait(pending, timeout=2.0, loop=loop)) # Close the loop try: loop.close() except Exception as e: logger.debug(f"Error closing loop: {e}") logger.debug("Test execution completed normally") except KeyboardInterrupt: logger.debug("Test execution interrupted by user") console.print("\n[bold yellow]Test execution interrupted![/bold yellow]") # Force shutdown of any running tasks and thread pools if "loop" in locals(): try: # Cancel all remaining tasks remaining = asyncio.all_tasks(loop) if remaining: logger.debug(f"Cancelling {len(remaining)} remaining tasks due to keyboard interrupt") for task in remaining: task.cancel() # Short wait for cancellation try: loop.run_until_complete(asyncio.wait(remaining, timeout=1.0, loop=loop)) except Exception: pass # nosec B110 # Close the loop try: loop.close() except Exception: pass # nosec B110 except Exception as e: logger.debug(f"Error during keyboard interrupt cleanup: {e}") except Exception as e: logger.error(f"Error during test execution: {e}") console.print(f"\n[bold red]Error: {str(e)}[/bold red]") if args.verbose: console.print_exception() finally: # Ensure all loggers have flushed their output for handler in logging.root.handlers: handler.flush() # Log clean exit logger.debug("Exiting program") return 0
if __name__ == "__main__": sys.exit(main())