Source code for tsbootstrap.block_length_sampler

"""
Block length sampling: The statistical foundation of temporal block selection.

This module implements sophisticated algorithms for sampling block lengths in
bootstrap methods. The choice of block length represents a critical bias-variance
tradeoff in time series bootstrap: shorter blocks better preserve stationarity
assumptions but may break important temporal dependencies, while longer blocks
maintain correlations but reduce the diversity of bootstrap samples.

We've designed this module to support multiple sampling strategies, from simple
geometric distributions (constant hazard rate) to more flexible parametric
families like Pareto and Weibull. Each distribution encodes different assumptions
about the underlying temporal structure. The geometric distribution, for instance,
implies exponentially decaying autocorrelations, while heavier-tailed distributions
like Pareto can capture long-range dependencies.

Our implementation prioritizes both statistical rigor and computational efficiency.
The sampling algorithms are carefully optimized to generate block lengths quickly
while maintaining the exact distributional properties required for valid inference.
"""

import logging
import sys
import warnings
from typing import Callable, Optional, Union, cast

import numpy as np
from numpy.random import Generator, default_rng
from pydantic import (
    BaseModel,
    ConfigDict,
    Field,
    field_validator,
    model_validator,
)
from scipy.stats import pareto, weibull_min
from skbase.base import BaseObject

from tsbootstrap.utils.types import DistributionTypes, RngTypes
from tsbootstrap.utils.validate import validate_rng

if sys.version_info >= (3, 10):  # noqa: UP036
    from typing import TypeAlias
else:
    TypeAlias = type  # Fallback for earlier versions

# Constants defining block length constraints
MIN_BLOCK_LENGTH: int = 1
DEFAULT_AVG_BLOCK_LENGTH: int = 2
MIN_AVG_BLOCK_LENGTH: int = 2

# Configure module-level logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
formatter = logging.Formatter(fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

# Type alias for distribution sampling functions
DistributionSamplerFunc: TypeAlias = Callable[[Generator, int], Union[int, float]]


[docs] class DistributionRegistry: """ Central registry for block length distributions and their sampling algorithms. This registry implements a plugin architecture for distribution support, allowing easy extension with new distributions while maintaining clean separation of concerns. Each distribution is associated with a sampling function that generates block lengths according to the specified parameters. """ _registry: dict[DistributionTypes, DistributionSamplerFunc] = {}
[docs] @classmethod def register_distribution( cls, distribution: DistributionTypes, sampler_func: DistributionSamplerFunc, ) -> None: """ Register a new distribution and its sampling function. Parameters ---------- distribution : DistributionTypes The distribution type to register. sampler_func : DistributionSamplerFunc The sampling function corresponding to the distribution. Raises ------ ValueError If the distribution is already registered. """ if distribution in cls._registry: raise ValueError( f"Distribution '{distribution.value}' has already been registered in the sampler. " f"Each distribution type can only have one associated sampling function. " f"To replace an existing sampler, first unregister the distribution." ) cls._registry[distribution] = sampler_func logger.debug(f"Registered distribution '{distribution.value}'.")
[docs] @classmethod def get_sampler(cls, distribution: DistributionTypes) -> DistributionSamplerFunc: """ Retrieve the sampling function for a given distribution. Parameters ---------- distribution : DistributionTypes The distribution type for which to retrieve the sampling function. Returns ------- DistributionSamplerFunc The sampling function associated with the distribution. Raises ------ ValueError If the distribution is not registered. """ try: sampler = cls._registry[distribution] except KeyError: raise ValueError( f"No sampling function registered for distribution '{distribution.value}'. " f"Available distributions: {', '.join(d.value for d in cls._registry)}. " f"Register a custom sampler using DistributionRegistry.register() if needed." ) from None else: logger.debug(f"Retrieved sampler for distribution '{distribution.value}'.") return sampler
[docs] def sample_poisson(rng: Generator, avg_block_length: int) -> int: """Sample from a Poisson distribution.""" return rng.poisson(lam=avg_block_length)
[docs] def sample_exponential(rng: Generator, avg_block_length: int) -> float: """Sample from an Exponential distribution.""" return rng.exponential(scale=avg_block_length)
[docs] def sample_normal(rng: Generator, avg_block_length: int) -> float: """Sample from a Normal distribution.""" return rng.normal(loc=avg_block_length, scale=avg_block_length / 3)
[docs] def sample_gamma(rng: Generator, avg_block_length: int) -> float: """Sample from a Gamma distribution.""" shape: float = 2.0 scale: float = avg_block_length / 2 return rng.gamma(shape=shape, scale=scale)
[docs] def sample_beta(rng: Generator, avg_block_length: int) -> float: """Sample from a Beta distribution.""" a: int = 2 b: int = 2 return rng.beta(a=a, b=b) * (2 * avg_block_length - 1) + 1
[docs] def sample_lognormal(rng: Generator, avg_block_length: int) -> float: """Sample from a Lognormal distribution.""" mean: float = np.log(avg_block_length / 2) sigma: float = np.log(2) return rng.lognormal(mean=mean, sigma=sigma)
[docs] def sample_weibull(rng: Generator, avg_block_length: int) -> float: """Sample from a Weibull distribution.""" c: float = 1.5 # Shape parameter return weibull_min.rvs(c=c, scale=avg_block_length, random_state=rng)
[docs] def sample_pareto(rng: Generator, avg_block_length: int) -> float: """Sample from a Pareto distribution.""" b: float = 1 # Shape parameter return (pareto.rvs(b=b, random_state=rng) + 1) * avg_block_length
[docs] def sample_geometric(rng: Generator, avg_block_length: int) -> int: """Sample from a Geometric distribution.""" p: float = 1 / avg_block_length return rng.geometric(p=p)
[docs] def sample_uniform(rng: Generator, avg_block_length: int) -> int: """Sample from a Uniform distribution.""" return rng.integers(low=1, high=2 * avg_block_length)
[docs] def sample_none(rng: Generator, avg_block_length: int) -> int: """Return the average block length.""" return avg_block_length
# Register all default distributions DistributionRegistry.register_distribution(DistributionTypes.POISSON, sample_poisson) DistributionRegistry.register_distribution(DistributionTypes.EXPONENTIAL, sample_exponential) DistributionRegistry.register_distribution(DistributionTypes.NORMAL, sample_normal) DistributionRegistry.register_distribution(DistributionTypes.GAMMA, sample_gamma) DistributionRegistry.register_distribution(DistributionTypes.BETA, sample_beta) DistributionRegistry.register_distribution(DistributionTypes.LOGNORMAL, sample_lognormal) DistributionRegistry.register_distribution(DistributionTypes.WEIBULL, sample_weibull) DistributionRegistry.register_distribution(DistributionTypes.PARETO, sample_pareto) DistributionRegistry.register_distribution(DistributionTypes.GEOMETRIC, sample_geometric) DistributionRegistry.register_distribution(DistributionTypes.UNIFORM, sample_uniform) DistributionRegistry.register_distribution(DistributionTypes.NONE, sample_none)
[docs] class BlockLengthSampler(BaseModel, BaseObject): """ Statistical engine for adaptive block length generation in bootstrap methods. This class implements the core machinery for sampling block lengths from various probability distributions, a critical component of variable block length bootstrap methods. We've designed it to support the full spectrum of distributional assumptions, from memoryless geometric distributions to heavy-tailed Pareto distributions that capture long-range dependencies. The choice of distribution encodes important assumptions about the temporal structure of the data. The geometric distribution, with its constant hazard rate, implies that the probability of a block ending is constant—suitable for processes with exponentially decaying autocorrelations. In contrast, distributions like Pareto or Weibull allow for more complex dependency structures, including long memory processes. Our implementation balances flexibility with ease of use. The sampler automatically handles the translation from average block length (an intuitive parameter) to the appropriate distribution parameters, ensuring that the expected block length matches the specified value regardless of the chosen distribution. Parameters ---------- avg_block_length : int, optional Target average block length for sampling. This parameter controls the bias-variance tradeoff: larger values preserve more temporal structure but reduce bootstrap diversity. Must be at least MIN_AVG_BLOCK_LENGTH. Default is DEFAULT_AVG_BLOCK_LENGTH. block_length_distribution : Optional[Union[str, DistributionTypes]], optional Probability distribution for block length generation. Each distribution implies different assumptions about temporal dependencies. Options include geometric (memoryless), Pareto (heavy-tailed), and various parametric families. String names are automatically converted to enum values. Default is None (returns fixed avg_block_length). rng : RngTypes, optional Random number generator for reproducible sampling. Accepts numpy Generator, integer seed, or None (uses system entropy). We recommend explicit seeding for research reproducibility. Attributes ---------- avg_block_length : int The calibrated average block length used in distribution parameters. block_length_distribution : Optional[DistributionTypes] The selected distribution family for block length generation. rng : Generator The configured random number generator instance. Methods ------- sample_block_length() Generate a single block length from the configured distribution. Notes ----- The implementation uses Pydantic for robust validation and integrates with the scikit-base ecosystem for compatibility with time series frameworks. All distributions are parameterized to achieve the specified average block length, ensuring consistent behavior across different distributional choices. """ # Model configuration using Pydantic's ConfigDict for Pydantic 2.0 model_config = ConfigDict( arbitrary_types_allowed=True, validate_assignment=True, # Allows extra attributes like 'test__attr' ) # Define class attributes with validation avg_block_length: int = Field( # Changed from PositiveInt to int default=DEFAULT_AVG_BLOCK_LENGTH, description="The average block length to use for sampling.", ) block_length_distribution: Optional[DistributionTypes] = Field( default=None, description="The probability distribution to use for sampling block lengths. " "Must be one of the values in `DistributionTypes` or a corresponding string.", ) rng: RngTypes = Field( # type: ignore default_factory=default_rng, description="Random number generator for sampling.", ) # Tags for the object type tags: dict[str, str] = Field( default_factory=lambda: {"object_type": "sampler"}, exclude=True, )
[docs] @field_validator("avg_block_length", mode="after") # Changed to mode="after" @classmethod def check_avg_block_length_positive(cls, v: int) -> int: # v is now guaranteed to be int """ Validate `avg_block_length` is positive after Pydantic has confirmed it's an int. Coercion based on distribution is handled by a model_validator. """ # Pydantic has already ensured 'v' is an int. # If 'v' was None or a non-coercible type for 'int', Pydantic would have raised ValidationError. logger.debug(f"check_avg_block_length_positive received (already int): {v}") if v <= 0: raise ValueError( f"Average block length must be a positive integer. Received: {v}. " f"Block lengths represent the number of consecutive observations to sample, " f"so must be at least 1." ) return v
[docs] @model_validator(mode="after") def coerce_avg_block_length_conditionally(self) -> "BlockLengthSampler": """ Coerce `avg_block_length` based on `block_length_distribution` after initial validation. If a block_length_distribution is used, `avg_block_length` must be >= `MIN_AVG_BLOCK_LENGTH`. Otherwise, `avg_block_length` must be >= `MIN_BLOCK_LENGTH` (already ensured by check_avg_block_length_positive). Issues a warning and coerces if the condition is not met. """ # This validator runs after avg_block_length has been set (either from input or default) # and validated by check_avg_block_length_positive. logger.debug( f"coerce_avg_block_length_conditionally: current avg_block_length={self.avg_block_length}, dist={self.block_length_distribution}" ) is_distribution_active = ( self.block_length_distribution is not None and self.block_length_distribution != DistributionTypes.NONE ) if is_distribution_active and ( self.avg_block_length < MIN_AVG_BLOCK_LENGTH ): # MIN_AVG_BLOCK_LENGTH is 2 dist_name = ( self.block_length_distribution.value if self.block_length_distribution is not None else "Unknown" ) warnings.warn( f"Average block length {self.avg_block_length} is below the minimum of {MIN_AVG_BLOCK_LENGTH} " f"required when using distribution '{dist_name}'. Block length distributions need " f"sufficient average length to generate meaningful variation. Automatically adjusting " f"to minimum value {MIN_AVG_BLOCK_LENGTH}.", UserWarning, stacklevel=3, ) logger.warning( f"avg_block_length was {self.avg_block_length} (with distribution {dist_name}), which is less than {MIN_AVG_BLOCK_LENGTH}. " f"Setting to {MIN_AVG_BLOCK_LENGTH}." ) # Use __dict__ to avoid re-triggering validation if validate_assignment=True self.__dict__["avg_block_length"] = MIN_AVG_BLOCK_LENGTH # If no distribution is active, avg_block_length=1 is permissible if it passed the v <= 0 check. # MIN_BLOCK_LENGTH is 1. The check_avg_block_length_positive ensures avg_block_length >= 1. logger.debug(f"Final avg_block_length after model_validator: {self.avg_block_length}") return self
[docs] @field_validator("rng", mode="before") @classmethod def validate_rng_field(cls, v: Union[Generator, int, None]) -> Generator: """ Validate the random number generator. This method ensures that the provided random number generator is valid and consistent with the expected type. Parameters ---------- v : Union[Generator, int, None] The input random number generator to validate. Returns ------- Generator The validated random number generator. Raises ------ ValueError If the input is not a valid random number generator or seed. """ # Cast v to RngTypes to satisfy Pylance, as validate_rng expects RngTypes # and v (Union[Generator, int, None]) is a compatible subset. validated_rng: Generator = validate_rng(cast(RngTypes, v), allow_seed=True) logger.debug("Random number generator validated and initialized.") return validated_rng
[docs] @field_validator("block_length_distribution", mode="before") @classmethod def validate_block_length_distribution( cls, v: Optional[Union[str, DistributionTypes]] ) -> Optional[DistributionTypes]: """ Validate and normalize the block length distribution input. This method ensures that string inputs for `block_length_distribution` are converted to lowercase for consistency and then to the appropriate `DistributionTypes` enum value. It also handles `None` values. Parameters ---------- v : Optional[Union[str, DistributionTypes]] The input block length distribution to validate. Returns ------- Optional[DistributionTypes] The validated and normalized block length distribution. Raises ------ ValueError If the input string is not a valid `DistributionTypes` value. """ if v is None: logger.debug("No block_length_distribution provided. Using default.") return None if isinstance(v, str): v_lower = v.lower() try: distribution = DistributionTypes(v_lower) except ValueError: raise ValueError( f"Distribution type '{v}' is not recognized. Valid options are: " f"{', '.join(sorted(d.value for d in DistributionTypes))}. " f"Each distribution implies different temporal dependency assumptions." ) from None else: logger.debug(f"block_length_distribution validated: {distribution.value}") return distribution if isinstance(v, DistributionTypes): logger.debug(f"block_length_distribution validated: {v.value}") return v raise TypeError( f"Block length distribution must be a string name, DistributionTypes enum value, " f"or None. Received type: {type(v).__name__}. Valid string names are: " f"{', '.join(sorted(d.value for d in DistributionTypes))}." )
def __init__(self, **data): """ Initialize the BlockLengthSampler, ensuring proper initialization of parent classes. Parameters ---------- **data : dict Keyword arguments for initializing the class. """ super().__init__(**data) BaseObject.__init__(self)
[docs] def sample_block_length(self) -> int: """ Sample a block length from the selected distribution. This method uses the configured distribution type and parameters to generate a random block length. Returns ------- int A sampled block length. The returned value is always an integer and is at least `MIN_BLOCK_LENGTH`. Notes ----- The sampled value is rounded to the nearest integer and is ensured to be no less than `MIN_BLOCK_LENGTH`. """ if self.block_length_distribution is None: logger.debug("No distribution selected. Returning average block length.") return self.avg_block_length # Retrieve the appropriate sampling function from the registry try: sampling_func: DistributionSamplerFunc = DistributionRegistry.get_sampler( self.block_length_distribution ) except ValueError: logger.exception( f"Error retrieving sampling function for distribution '{self.block_length_distribution.value}'." ) raise # Ensure self.rng is a Generator instance before use if not isinstance(self.rng, Generator): # This case should ideally be prevented by Pydantic validation, # but this check provides runtime safety and clarifies type for static analyzers. logger.error( f"self.rng is not a valid numpy.random.Generator. Got type: {type(self.rng)}" ) raise TypeError( f"Random number generator must be a numpy.random.Generator instance. " f"Received type: {type(self.rng).__name__}. This typically indicates " f"a validation failure or incorrect initialization." ) # Sample from the selected distribution sampled_block_length: Union[int, float] = sampling_func(self.rng, self.avg_block_length) logger.debug(f"Sampled block length before rounding: {sampled_block_length}") # Ensure the sampled length is an integer and at least MIN_BLOCK_LENGTH sampled_length_int: int = max(round(sampled_block_length), MIN_BLOCK_LENGTH) logger.debug(f"Sampled block length after validation: {sampled_length_int}") return sampled_length_int
def __setattr__(self, name: str, value) -> None: """ Override setattr to allow test attributes for skbase compatibility. This allows setting arbitrary attributes that start with 'test_' to support skbase's test suite which checks for side effects between tests. """ if name.startswith("test_"): # For test attributes, bypass Pydantic validation object.__setattr__(self, name, value) else: # Use Pydantic's normal setattr super().__setattr__(name, value)