Source code for test_a_ble.test_runner

"""
Test Runner.

Discovers and executes BLE tests
"""

import asyncio
import fnmatch
import importlib
import importlib.util
import inspect
import logging
import os
import re
import sys
import traceback
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Union

from .ble_manager import BLEManager
from .test_context import TestContext, TestException, TestFailure, TestSkip, TestStatus

logger = logging.getLogger(__name__)

# Type for test function
TestFunction = Callable[[BLEManager, TestContext], Coroutine[Any, Any, None]]

# Type for test item: a test function or (class_name, class_obj, method) tuple
TestItem = Union[Callable, Tuple[str, Any, Callable]]
# Type for test: (test_name, test_item)
TestNameItem = Tuple[str, TestItem]


[docs] class TestRunner: """Discovers and runs tests against BLE devices.""" def __init__(self, ble_manager: BLEManager): """Initialize the test runner.""" self.ble_manager = ble_manager self.test_context = TestContext(ble_manager) def _is_package(self, path: str) -> bool: """ Check if a directory is a Python package (has __init__.py file). Args: path: Path to check Returns: True if the path is a Python package, False otherwise """ return os.path.isdir(path) and os.path.exists(os.path.join(path, "__init__.py")) def _import_package(self, package_path: str, base_package: str = "") -> str: """ Import a Python package and all its parent packages. Args: package_path: Absolute path to the package base_package: Base package name Returns: The imported package name """ logger.debug(f"Importing package: {package_path}") # Get the package name from the path package_name = os.path.basename(package_path) # Construct the full package name if base_package: full_package_name = f"{base_package}.{package_name}" else: full_package_name = package_name # Check if package is already imported if full_package_name in sys.modules: logger.debug(f"Package {full_package_name} already imported") return full_package_name # Find the __init__.py file init_path = os.path.join(package_path, "__init__.py") if not os.path.exists(init_path): raise ImportError(f"No __init__.py found in {package_path}") try: # Import the package spec = importlib.util.spec_from_file_location(full_package_name, init_path) if not spec or not spec.loader: raise ImportError(f"Failed to load module spec for {init_path}") module = importlib.util.module_from_spec(spec) sys.modules[full_package_name] = module # Execute the module spec.loader.exec_module(module) logger.debug(f"Successfully imported package: {full_package_name}") return full_package_name except Exception as e: raise ImportError(f"Error importing package {full_package_name}: {str(e)}") from e def _find_and_import_nearest_package(self, path: str) -> Optional[Tuple[str, str]]: """ Find the nearest package in the given path and import it. Args: path: Path to search for a package Returns: Tuple of (package_name, package_dir) if a package is found, None otherwise """ current_dir = path parent_count = 0 # Check up to 2 parent directories for __init__.py while parent_count < 2: if self._is_package(current_dir): # Found a module - use this as our base package_dir = current_dir package_name = os.path.basename(current_dir) logger.debug(f"Found package: {package_name} at {package_dir}") try: self._import_package(current_dir) return package_name, package_dir except ImportError as e: logger.error(f"Error importing package {current_dir}: {str(e)}") raise # Move up to the parent directory parent_dir = os.path.dirname(current_dir) if parent_dir == current_dir: # We've reached the root return None current_dir = parent_dir parent_count += 1 return None def _discover_tests_from_specifier(self, test_specifier: str) -> List[Tuple[str, List[TestNameItem]]]: """ Parse a test specifier. Args: test_specifier: Test specifier Returns: List of tuples (module_name, test_items) where test_items is a list of tuples (test_name, test_item) where test_item is a test function or (class, method) tuple """ def check_if_file_exists(test_dir: str, test_file: str) -> Optional[Tuple[str, str]]: """ Check if a file exists in the given directory. Returns: Tuple of (test_dir, test_file) if the file exists, None otherwise """ if test_file is None: return None if not os.path.isdir(test_dir): return None if not test_file.endswith(".py"): test_file = test_file + ".py" if os.path.isfile(os.path.join(test_dir, test_file)): return (test_dir, test_file) if os.path.isfile(os.path.join(test_dir, "tests", test_file)): return (os.path.join(test_dir, "tests"), test_file) return None def check_wildcard_match(test_wildcard: Optional[str], test_string: str) -> bool: """ Check if the test string matches the test wildcard. Args: test_wildcard: Wildcard to match against test_string: String to match Returns: True if the test string matches the test wildcard, False otherwise """ return test_wildcard is None or fnmatch.fnmatch(test_string, test_wildcard) def find_files_matching_wildcard(test_dir: str, test_file_wildcard: Optional[str] = None) -> List[str]: """ Find files matching the wildcard (or any file if test_file_wildcard is None) in the given directory. Args: test_dir: Directory to search in test_file_wildcard: Wildcard to match against, or None to match any file Returns: List of files matching the wildcard """ if not os.path.isdir(test_dir): return None # list files in test_dir that match the wildcard files = [] for file in os.listdir(test_dir): if file.endswith(".py") and check_wildcard_match(test_file_wildcard, file): files.append(file) return files def find_tests_in_module( package_dir: str, package_path: str, import_name: str, test_dir: str, test_file: str, method_or_wildcard: Optional[str] = None, ) -> List[TestNameItem]: """ Find tests in the given module. Args: package_dir: Directory of the package package_path: Path to the package (in dot notation) import_name: Import name of the module method_or_wildcard: Method name or wildcard of the tests to find, or None to find all tests in the module Returns: List of tuples (test_name, test_item) where test_item is a test function or (class, method) tuple """ file_path = os.path.join(test_dir, test_file) try: # Try to import the module using importlib.import_module first try: if package_dir is not None: # If we have a module, try to use standard import module = importlib.import_module(import_name) logger.debug(f"Imported {import_name} using import_module") else: # No module structure, use direct file import raise ImportError("Not in a package, using spec_from_file_location") except ImportError: # Fallback to the file-based import method spec = importlib.util.spec_from_file_location(import_name, file_path) if not spec or not spec.loader: raise ImportError(f"Failed to load module spec for {file_path}") module = importlib.util.module_from_spec(spec) # Add the module to sys.modules to allow relative imports sys.modules[import_name] = module # Execute the module spec.loader.exec_module(module) logger.debug(f"Imported {import_name} using spec_from_file_location") # Use the relative path from test_dir as the module prefix for test names rel_path = os.path.relpath(file_path, test_dir) rel_module = os.path.splitext(rel_path)[0].replace(os.path.sep, ".") # First, discover test classes class_tests = [] for class_name, class_obj in module.__dict__.items(): # Check if it's a class and follows naming convention if inspect.isclass(class_obj) and ( class_name.startswith("Test") or (hasattr(class_obj, "_is_test_class") and class_obj._is_test_class) ): # Store class for later use class_full_name = f"{rel_module}.{class_name}" logger.debug(f"Discovered test class: {class_full_name}") # Discover test methods in the class and collect with source line numbers class_method_tests = [] for method_name, method_obj in inspect.getmembers(class_obj, predicate=inspect.isfunction): if not check_wildcard_match(method_or_wildcard, method_name): continue # Check if the method is a test method is_test = ( hasattr(method_obj, "_is_ble_test") and method_obj._is_ble_test ) or method_name.startswith("test_") if is_test: # Check if the method is a coroutine function if asyncio.iscoroutinefunction(method_obj) or inspect.iscoroutinefunction(method_obj): test_name = f"{class_full_name}.{method_name}" # Get line number for sorting line_number = inspect.getsourcelines(method_obj)[1] # Store tuple of (test_name, class_name, class_obj, method, line_number) class_method_tests.append( ( test_name, class_full_name, class_obj, method_obj, line_number, ) ) logger.debug(f"Discovered class test method: {test_name} at line {line_number}") else: logger.warning( f"Method {method_name} in class {class_full_name} is not a coroutine function, " "skipping" ) # Sort class methods by line number to preserve definition order class_method_tests.sort(key=lambda x: x[4]) # Add sorted methods to class_tests class_tests.extend(class_method_tests) # Then, discover standalone test functions function_tests = [] for name, obj in module.__dict__.items(): if not check_wildcard_match(method_or_wildcard, name): continue # Check if the function is decorated with @ble_test or starts with test_ is_test = (hasattr(obj, "_is_ble_test") and obj._is_ble_test) or name.startswith("test_") if is_test and callable(obj) and not inspect.isclass(obj): # Don't process methods that belong to test classes (already handled) if any(t[2] == obj for t in class_tests): continue # Check if the function is a coroutine function if asyncio.iscoroutinefunction(obj) or inspect.iscoroutinefunction(obj): test_name = f"{rel_module}.{name}" # Get line number for sorting line_number = inspect.getsourcelines(obj)[1] # Store tuple of (test_name, function, line_number) function_tests.append((test_name, obj, line_number)) logger.debug(f"Discovered standalone test: {test_name} at line {line_number}") else: logger.warning(f"Function {name} in {file_path} is not a coroutine function, skipping") # Sort standalone functions by line number function_tests.sort(key=lambda x: x[2]) tests = [] # Add class tests to the order list first for test_name, class_name, class_obj, method_obj, _ in class_tests: tests.append((test_name, (class_name, class_obj, method_obj))) # Then add standalone function tests to maintain file definition order for test_name, obj, _ in function_tests: tests.append((test_name, obj)) return tests except ImportError as e: logger.error(f"Import error loading module {import_name}: {str(e)}") logger.info(f"File path: {file_path}") logger.info(f"Current sys.path: {sys.path}") raise except Exception as e: logger.error(f"Error loading module {import_name}: {str(e)}") logger.debug(f"Exception details: {traceback.format_exc()}") raise def find_tests_in_file( package_dir: Optional[str], test_dir: str, test_file: str, method_or_wildcard: Optional[str] = None, ) -> List[TestNameItem]: """ Find tests in the given file. Args: package_dir: Directory of the package test_dir: Directory of the test test_file: File to find tests in method_or_wildcard: Method name or wildcard of the tests to find, or None to find all tests in the file Returns: List of tuples (test_name, test_item) where test_item is a test function or (class, method) tuple """ # first we need to import the test file. If we are in a package, we need to import the file from the # package, otherwise we need to import the file from the test directory if package_dir is not None: # find additional path beyond package_dir to the file rel_path = os.path.relpath(test_dir, package_dir) package_name = os.path.basename(package_dir) if rel_path == ".": # File is directly in the module directory package_path = None import_name = f"{package_name}.{test_file}" else: # File is in a subdirectory package_path = rel_path.replace(os.path.sep, ".") import_name = f"{package_name}.{package_path}.{test_file}" else: # No module structure, just import the file directly package_path = None import_name = os.path.basename(test_file) # Add the test directory to sys.path to allow importing modules from it if test_dir not in sys.path: sys.path.insert(0, test_dir) logger.debug(f"Added {test_dir} to sys.path") return find_tests_in_module( package_dir, package_path, import_name, test_dir, test_file, method_or_wildcard, ) tests = {} # Split the specifier by both '.' and '/' or '\' to handle different path formats path_parts = re.split(r"[./\\]", test_specifier) starts_with_slash = ( test_specifier[0] if test_specifier.startswith("/") or test_specifier.startswith("\\") else "" ) # If the specifier is empty after splitting, skip it if not path_parts or all(not part for part in path_parts): logger.warning(f"Warning: Empty specifier after splitting: '{test_specifier}'") return tests # Check if the last path part contains a wildcard wildcard = None if path_parts and "*" in path_parts[-1]: wildcard = path_parts[-1] path_parts = path_parts[:-1] logger.debug(f"Extracted wildcard '{wildcard}' from path parts") test_dir = None test_file = None test_method = None for i in range(min(3, len(path_parts))): # create a possible path from the path_parts possible_path = os.path.join(*path_parts[:-i]) if i > 0 else os.path.join(*path_parts) logger.debug(f"possible_path {i}: {possible_path}") if starts_with_slash: possible_path = starts_with_slash + possible_path if os.path.isdir(possible_path): test_dir = possible_path if i > 1: test_file = path_parts[-i] test_method = path_parts[-i + 1] elif i > 0: test_file = path_parts[-i] test_method = None else: test_file = None test_method = None break tmp_dir = os.path.dirname(possible_path) tmp_file = os.path.basename(possible_path) if result := check_if_file_exists(tmp_dir, tmp_file): test_dir, test_file = result logger.debug(f"Found test_dir: {test_dir}, test_file: {test_file}") if i > 0: test_method = path_parts[-i] else: test_method = None break if test_dir is None: # Not found a dir yet, so specifier is not dir or file in current directory test_dir = os.getcwd() test_file = None if test_specifier == "all": logger.debug(f"Finding all tests in {test_dir}") elif len(path_parts) > 0 and (result := check_if_file_exists(test_dir, path_parts[-1])): test_dir, test_file = result logger.debug(f"Found test_dir: {test_dir}, test_file: {test_file}") test_dir = os.path.abspath(test_dir) logger.debug(f"test_dir: {test_dir}, test_file: {test_file}, test_method: {test_method}") if test_file is None: # find all files in test_dir test_file_wildcard = wildcard if test_method is None else None test_files = find_files_matching_wildcard(test_dir, test_file_wildcard or "test_*") if not test_files: if os.path.isdir(os.path.join(test_dir, "tests")): test_dir = os.path.join(test_dir, "tests") test_files = find_files_matching_wildcard(test_dir, test_file_wildcard or "test_*") if not test_files: test_files = find_files_matching_wildcard(test_dir, "test_*") if not test_files: raise ValueError(f"No test files found in {test_dir}") test_method = wildcard test_file_wildcard = None if test_file_wildcard is not None: # do not reuse wildcard for method search wildcard = None else: test_files = [test_file] if test_method is None and wildcard is not None: test_method = wildcard logger.debug(f"Discovering tests in test_dir: {test_dir}, test_file: {test_file}, test_method: {test_method}") package_name, package_dir = self._find_and_import_nearest_package(test_dir) tests = [] for test_file in test_files: module_name = os.path.splitext(os.path.basename(test_file))[0] module_tests = find_tests_in_file(package_dir, test_dir, test_file, test_method) tests.append((module_name, module_tests)) tests.sort(key=lambda x: x[0]) return tests
[docs] def discover_tests(self, test_specifiers: List[str]) -> List[Tuple[str, List[TestNameItem]]]: """ Discover test modules with the given specifiers. Args: test_specifiers: List of test specifiers Returns: Dictionary mapping test names to test functions or (class, method) tuples """ tests = [] for test_specifier in test_specifiers: tests.extend(self._discover_tests_from_specifier(test_specifier)) return tests
[docs] async def run_test(self, test_name: str, test_item: TestItem) -> Dict[str, Any]: """ Run a single test by name. Args: test_name: Name of the test to run Returns: Test result dictionary """ # Check if test is already in results (might have been directly started by another test) if ( test_name in self.test_context.test_results and self.test_context.test_results[test_name]["status"] != TestStatus.RUNNING.value ): logger.debug(f"Test {test_name} already has results, skipping") return self.test_context.test_results[test_name] # Get the test description test_description = None # Handle class method tests test_class_instance = None if isinstance(test_item, tuple): class_name, class_obj, method = test_item # Check if method has description if hasattr(method, "_test_description") and method._test_description: test_description = method._test_description else: # Use the method name test_description = method.__name__ # Create an instance of the test class test_class_instance = class_obj() # Handle standalone test functions else: test_func = test_item # Get the test description - either from the decorated function or use the function name if hasattr(test_func, "_test_description") and test_func._test_description: test_description = test_func._test_description else: # Use the base name without the module part test_description = test_name.split(".")[-1] # Display a clear message showing which test is running with visual enhancements print("\n") # Add space before test for separation self.test_context.print(f"\033[1m\033[4mRunning test: {test_description}\033[0m") print("") # Add space after header # Automatically start the test - don't rely on test function to do this self.test_context.start_test(test_description) result = None try: # If this is a class method test, call setUp if it exists if test_class_instance: class_name, class_obj, method = test_item # Call setUp if it exists if hasattr(test_class_instance, "setUp") and callable(test_class_instance.setUp): if asyncio.iscoroutinefunction(test_class_instance.setUp): logger.debug(f"Calling async setUp for {class_name}") await test_class_instance.setUp(self.ble_manager, self.test_context) else: logger.debug(f"Calling sync setUp for {class_name}") test_class_instance.setUp(self.ble_manager, self.test_context) # Run the test method logger.debug(f"Executing class test method: {test_name}") await method(test_class_instance, self.ble_manager, self.test_context) else: # Run standalone test function logger.debug(f"Executing standalone test: {test_name}") await test_func(self.ble_manager, self.test_context) # Test completed without exceptions - mark as pass result = self.test_context.end_test(TestStatus.PASS) except TestFailure as e: logger.error(f"Test {test_name} failed: {str(e)}") result = self.test_context.end_test(TestStatus.FAIL, str(e)) except TestSkip as e: logger.info(f"Test {test_name} skipped: {str(e)}") result = self.test_context.end_test(TestStatus.SKIP, str(e)) except TestException as e: logger.error(f"Test {test_name} error: {str(e)}") result = self.test_context.end_test(e.status, str(e)) except AssertionError as e: logger.error(f"Test {test_name} failed: {str(e)}") result = self.test_context.end_test(TestStatus.FAIL, str(e)) except TimeoutError as e: # Handle timeout errors gracefully without showing traceback logger.error(f"Test {test_name} error: {str(e)}") result = self.test_context.end_test(TestStatus.ERROR, str(e)) except Exception as e: logger.error(f"Error running test {test_name}: {str(e)}") traceback.print_exc() result = self.test_context.end_test(TestStatus.ERROR, str(e)) finally: # If this is a class method test, call tearDown if it exists if test_class_instance: try: if hasattr(test_class_instance, "tearDown") and callable(test_class_instance.tearDown): if asyncio.iscoroutinefunction(test_class_instance.tearDown): logger.debug(f"Calling async tearDown for {class_name}") await test_class_instance.tearDown(self.ble_manager, self.test_context) else: logger.debug(f"Calling sync tearDown for {class_name}") test_class_instance.tearDown(self.ble_manager, self.test_context) except Exception as e: logger.error(f"Error in tearDown for {test_name}: {str(e)}") # Don't override test result if tearDown fails # Clean up subscriptions after test is complete await self.test_context.unsubscribe_all() return result
[docs] async def run_tests(self, tests: List[TestNameItem]) -> Dict[str, Any]: """ Run multiple tests in the order they were defined in the source code. Args: tests: List of tests to run Returns: Summary of test results """ try: # Run each test in the order they were defined for test_name, test_item in tests: await self.run_test(test_name, test_item) # Return summary return self.test_context.get_test_summary() finally: # Ensure all tasks are cleaned up await self.test_context.cleanup_tasks()