Source code for doatools.performance.evaluator

import inspect
import multiprocessing
import time
import numpy as np
from tqdm import tqdm
from .crb import crb_det_farfield_1d, crb_sto_farfield_1d, crb_stouc_farfield_1d
from ..model.arrays import ArrayDesign
from ..model.signals import ComplexStochasticSignal
from ..model.sources import FarField1DSourcePlacement
from ..model import get_narrowband_snapshots


[docs] class PerformanceResult: """Encapsulates performance evaluation results. Attributes: snr (float): Signal-to-noise ratio in dB. n_snapshots (int): Number of snapshots. n_monte_carlo (int): Number of Monte Carlo simulations. crb_values (dict): Dictionary containing different types of CRB values, where keys are CRB types and values are CRB values. estimator_results (dict): Results for different estimators, where keys are estimator names and values are dictionaries containing different metrics. estimator_times (dict): Computation time for each estimator in seconds. """ def __init__(self, snr, n_snapshots, n_monte_carlo): self.snr = snr self.n_snapshots = n_snapshots self.n_monte_carlo = n_monte_carlo self.crb_values = {} self.estimator_results = {} self.estimator_times = {} self.sample_estimates = {}
[docs] def add_crb(self, crb_type, value): """Adds a CRB value. Args: crb_type (str): Type of CRB. value (float): CRB value. """ self.crb_values[crb_type] = value
[docs] def add_estimator_result(self, estimator_name, metric_results, sample_estimates=None, computation_time=0.0): """Adds estimator results. Args: estimator_name (str): Name of the estimator. metric_results (dict): Metric results, where keys are metric names and values are metric values. sample_estimates (array, optional): Array of sample estimates with shape (n_runs, n_sources). computation_time (float, optional): Computation time for this estimator in seconds. """ self.estimator_results[estimator_name] = metric_results self.estimator_times[estimator_name] = computation_time if sample_estimates is not None: self.sample_estimates[estimator_name] = sample_estimates
def __str__(self): """Returns a string representation of the results.""" s = f"Performance Result (SNR: {self.snr} dB, Snapshots: {self.n_snapshots}, Monte Carlo: {self.n_monte_carlo})\n" s += "=" * 70 + "\n" s += "CRB Values:\n" for crb_type, value in self.crb_values.items(): s += f" {crb_type.upper():<15}: {value:12.6e} rad²\n" s += "\n" s += "Estimator Results:\n" for estimator_name, metric_results in self.estimator_results.items(): s += f" {estimator_name}:\n" for metric, value in metric_results.items(): if metric in ['bias', 'mae', 'rmse']: unit = "rad" elif metric == 'mse': unit = "rad²" else: unit = "" s += f" {metric.upper():<25}: {value:12.6e} {unit}\n" # Add estimator computation time s += f" {'AVERAGE COMPUTATION TIME':<25}: {self.estimator_times[estimator_name]:12.6e} seconds\n" s += "\n" return s def __repr__(self): """Returns a repr representation of the results.""" return self.__str__()
def _single_monte_carlo_run(estimator, array, sources, n_snapshots, power_source, power_noise, run_id): """Performs a single Monte Carlo run for a given estimator. Args: estimator: The DOA estimator instance. array: The array design. sources: The source locations. n_snapshots: Number of snapshots. power_source: Source power. power_noise: Noise power. run_id: The run identifier (unused, but required for parallel processing). Returns: tuple: (estimated_locations, run_time) where estimated_locations is numpy.ndarray if resolved, otherwise None, and run_time is the time taken for this run in seconds. """ source_signal = ComplexStochasticSignal(sources.size, power_source) noise_signal = ComplexStochasticSignal(array.size, power_noise) y, Ry = get_narrowband_snapshots( array, sources, estimator._wavelength, source_signal, noise_signal, n_snapshots, return_covariance=True ) run_start_time = time.time() # execute DOA estimation sig = inspect.signature(estimator.estimate) params = list(sig.parameters.keys()) if len(params) >= 4 or 'd0' in params: resolved, estimates = estimator.estimate(Ry, sources.size, array.d0[0]) else: resolved, estimates = estimator.estimate(Ry, sources.size) run_time = time.time() - run_start_time if resolved: return estimates.locations, run_time else: return None, run_time
[docs] class DOAPerformanceEvaluator: """DOA performance evaluator for assessing DOA estimation algorithms under different conditions. This evaluator accepts user-specified parameters, performs Monte Carlo simulations, computes evaluation metrics (MSE, RMSE) and theoretical performance bounds (CRLB), and returns the results along with computation time. """ def __init__(self, array, sources, snr, n_snapshots, n_monte_carlo, estimators, crb_types=None, metrics=None, save_sample_estimates=False, n_jobs=1): """Initializes the performance evaluator. Args: array (~doatools.model.arrays.ArrayDesign): Array design. sources (~doatools.model.sources.FarField1DSourcePlacement): Source locations. snr (float): Signal-to-noise ratio in dB. n_snapshots (int): Number of snapshots. n_monte_carlo (int): Number of Monte Carlo simulations. estimators (dict or list or object): DOA estimator instance, list of instances, or dictionary of estimators. If a dictionary, keys are estimator names and values are estimator instances. If a list, estimator class names are used as names. If a single instance, its class name is used as the name. crb_types (list or str, optional): List of CRLB types or a single type. Possible values: 'sto' (stochastic CRB), 'det' (deterministic CRB), 'stouc' (stochastic uncorrelated CRB). Default value is ['sto']. metrics (list or str, optional): List of evaluation metrics or a single metric. Possible values: 'mse' (mean squared error), 'rmse' (root mean squared error). Default value is ['mse']. save_sample_estimates (bool, optional): Whether to save sample estimates. Default value is False. n_jobs (int, optional): Number of parallel jobs to run. Default is 1 (no parallelism). If -1, use all available CPUs. """ self.array = array self.sources = sources self.snr = snr self.n_snapshots = n_snapshots self.n_monte_carlo = n_monte_carlo self.n_jobs = n_jobs self.estimators = self._process_estimators(estimators) if crb_types is None: crb_types = ['sto'] elif isinstance(crb_types, str): crb_types = [crb_types] self.crb_types = crb_types if metrics is None: metrics = ['mse'] elif isinstance(metrics, str): metrics = [metrics] self.metrics = metrics self._validate_inputs() self.save_sample_estimates = save_sample_estimates self._precompute_parameters() @staticmethod def _process_estimators(estimators): """Processes the estimators parameter and converts it to a dictionary. Args: estimators (dict or list or object): Estimator instance, list of instances, or dictionary. Returns: dict: Dictionary of estimators, where keys are estimator names and values are instances. """ if isinstance(estimators, dict): return estimators elif isinstance(estimators, list): return {estimator.__class__.__name__: estimator for estimator in estimators} else: return {estimators.__class__.__name__: estimators} def _validate_inputs(self): """Validates the input parameters.""" if not isinstance(self.array, ArrayDesign): raise ValueError('array must be an instance of ArrayDesign') if not isinstance(self.sources, FarField1DSourcePlacement): raise ValueError('sources must be an instance of FarField1DSourcePlacement') # validate each estimator for name, estimator in self.estimators.items(): if not hasattr(estimator, 'estimate') or not callable(estimator.estimate): raise ValueError(f'estimator {name} must have a callable estimate method') if not hasattr(estimator, '_wavelength'): raise ValueError(f'estimator {name} must have a _wavelength attribute') # validate CRB types valid_crb_types = ['sto', 'det', 'stouc'] for crb_type in self.crb_types: if crb_type not in valid_crb_types: raise ValueError(f'crb_type {crb_type} must be one of: {valid_crb_types}') # validate metrics valid_metrics = ['bias', 'mae', 'mse', 'rmse'] for metric in self.metrics: if metric not in valid_metrics: raise ValueError(f'metric {metric} must be one of: {valid_metrics}') if self.n_monte_carlo <= 0: raise ValueError('n_monte_carlo must be positive') if self.n_snapshots <= 0: raise ValueError('n_snapshots must be positive') def _precompute_parameters(self): """Precomputes some parameters.""" self.power_source = 1.0 self.power_noise = self.power_source / (10 ** (self.snr / 10)) self.source_signal = ComplexStochasticSignal(self.sources.size, self.power_source) self.noise_signal = ComplexStochasticSignal(self.array.size, self.power_noise) def _compute_crb(self, crb_type, wavelength): """Computes the theoretical performance bound (CRLB). Args: crb_type (str): CRB type. wavelength (float): Wavelength. Returns: float: CRB value. """ crb = 0.0 if crb_type == 'sto': crb = crb_sto_farfield_1d(self.array, self.sources, wavelength, self.power_source, self.power_noise, self.n_snapshots, return_mode='mean_diag') elif crb_type == 'det': # for deterministic CRB, we need the source covariance matrix # use identity matrix as approximation Rs = np.eye(self.sources.size) * self.power_source crb = crb_det_farfield_1d(self.array, self.sources, wavelength, Rs, self.power_noise, self.n_snapshots, return_mode='mean_diag') elif crb_type == 'stouc': crb = crb_stouc_farfield_1d(self.array, self.sources, wavelength, self.power_source, self.power_noise, self.n_snapshots, return_mode='mean_diag') return crb
[docs] def evaluate(self, custom_metrics=None, verbose=0): """Performs performance evaluation. Args: custom_metrics (dict, optional): Dictionary of custom evaluation metrics, where keys are metric names and values are functions that accept two parameters (estimated locations and true locations) and return a metric value. For example: {'mae': lambda est, true: np.mean(np.abs(est - true))} verbose (int, optional): Verbosity level. 0 for silent, 1 for progress bar. Returns: PerformanceResult: Evaluation result object. """ start_time = time.time() result = PerformanceResult( snr=self.snr, n_snapshots=self.n_snapshots, n_monte_carlo=self.n_monte_carlo ) # compute all CRB values reference_wavelength = next(iter(self.estimators.values()))._wavelength for crb_type in self.crb_types: crb_value = self._compute_crb(crb_type, reference_wavelength) result.add_crb(crb_type, crb_value) # handle custom metrics if custom_metrics is None: custom_metrics = {} # evaluate each estimator tbar = tqdm(total=len(self.estimators) * self.n_monte_carlo, desc='Evaluating estimators', disable=verbose < 1) # Determine the number of processes to use if self.n_jobs == -1: n_processes = multiprocessing.cpu_count() else: n_processes = max(1, self.n_jobs) for estimator_name, estimator in self.estimators.items(): metric_results = {} all_estimates = [] all_run_times = [] if n_processes > 1: # Use parallel processing with multiprocessing.Pool(processes=n_processes) as pool: # Create a list of arguments for each run args_list = [(estimator, self.array, self.sources, self.n_snapshots, self.power_source, self.power_noise, i) for i in range(self.n_monte_carlo)] # Execute in parallel results = pool.starmap(_single_monte_carlo_run, args_list) # Collect valid results and run times for res, run_time in results: all_run_times.append(run_time) if res is not None: all_estimates.append(res) tbar.update(1) else: for _ in range(self.n_monte_carlo): res, run_time = _single_monte_carlo_run(estimator, self.array, self.sources, self.n_snapshots, self.power_source, self.power_noise, 0) all_run_times.append(run_time) if res is not None: all_estimates.append(res) tbar.update(1) # compute basic statistics if len(all_estimates) > 0: all_estimates = np.array(all_estimates) true_locations = self.sources.locations biases = np.mean(all_estimates - true_locations, axis=0) avg_bias = np.mean(np.abs(biases)) mse_per_source = np.mean((all_estimates - true_locations) ** 2, axis=0) avg_mse = np.mean(mse_per_source) avg_rmse = np.sqrt(avg_mse) mae_per_source = np.mean(np.abs(all_estimates - true_locations), axis=0) avg_mae = np.mean(mae_per_source) else: avg_bias = np.pi avg_mse = np.pi ** 2 avg_rmse = np.pi avg_mae = np.pi # save all metrics metric_values = { 'bias': avg_bias, 'mae': avg_mae, 'mse': avg_mse, 'rmse': avg_rmse } # save custom metrics for metric in self.metrics: if metric in metric_values: metric_results[metric] = metric_values[metric] if len(all_estimates) > 0: for custom_name, custom_func in custom_metrics.items(): custom_per_source = [] true_locations = self.sources.locations for i in range(true_locations.size): custom_value = custom_func(all_estimates[:, i], true_locations[i]) custom_per_source.append(custom_value) metric_results[custom_name] = np.mean(custom_per_source) else: for custom_name in custom_metrics: metric_results[custom_name] = np.pi # Calculate total estimator time as sum of all run times estimator_time = np.mean(all_run_times) sample_estimates = all_estimates if (self.save_sample_estimates and len(all_estimates) > 0) else None result.add_estimator_result(estimator_name, metric_results, sample_estimates, estimator_time) tbar.close() result.computation_time = time.time() - start_time return result
[docs] def evaluate_performance(array, sources, snr, n_snapshots, n_monte_carlo, estimators, crb_types=None, metrics=None, custom_metrics=None, save_sample_estimates=False, verbose=0, n_jobs=1): """Performance evaluation function for quickly assessing DOA algorithm performance. This function is a simplified interface for the DOAPerformanceEvaluator class, designed for easy direct use by users. Args: array (~doatools.model.arrays.ArrayDesign): Array design. sources (~doatools.model.sources.FarField1DSourcePlacement): Source locations. snr (float): Signal-to-noise ratio in dB. n_snapshots (int): Number of snapshots. n_monte_carlo (int): Number of Monte Carlo simulations. estimators (dict or list or object): DOA estimator instance, list of instances, or dictionary. crb_types (list or str, optional): List of CRLB types or a single type. Possible values: 'sto' (stochastic CRB), 'det' (deterministic CRB), 'stouc' (stochastic uncorrelated CRB). Default value is ['sto'].\n metrics (list or str, optional): List of evaluation metrics or a single metric. Possible values: 'bias' (bias), 'mae' (mean absolute error), 'mse' (mean squared error), 'rmse' (root mean squared error). Default value is ['mse'].\n custom_metrics (dict, optional): Dictionary of custom evaluation metrics. metrics custom_metrics save_sample_estimates (bool, optional): Whether to save sample estimates. Default value is False. verbose (int, optional): Verbosity level. 0 for silent, 1 for progress bar. n_jobs (int, optional): Number of parallel jobs to run. Default is 1 (no parallelism). If -1, use all available CPUs. Returns: PerformanceResult: Evaluation result object. """ evaluator = DOAPerformanceEvaluator( array=array, sources=sources, snr=snr, n_snapshots=n_snapshots, n_monte_carlo=n_monte_carlo, estimators=estimators, crb_types=crb_types, metrics=metrics, save_sample_estimates=save_sample_estimates, n_jobs=n_jobs ) return evaluator.evaluate(custom_metrics, verbose=verbose)