Source code for tsbootstrap.time_series_simulator

from numbers import Integral
from typing import Any, List, Optional, Union

import numpy as np
from numpy.random import Generator

from tsbootstrap.tsfit import TSFit
from tsbootstrap.utils.types import ModelTypes
from tsbootstrap.utils.validate import (
    validate_fitted_model,
    validate_integers,
    validate_literal_type,
    validate_rng,
    validate_X_and_y,
)


[docs] class TimeSeriesSimulator: """ Class to simulate various types of time series models. Attributes ---------- n_samples: int Number of samples in the fitted time series model. n_features: int Number of features in the fitted time series model. burnin: int Number of burn-in samples to discard for certain models. Methods ------- _validate_ar_simulation_params(params) Validate the parameters necessary for the simulation. _simulate_ar_residuals(lags, coefs, init, max_lag) Simulates an Autoregressive (AR) process with given lags, coefficients, initial values, and random errors. simulate_ar_process(resids_lags, resids_coefs, resids) Simulate AR process from the fitted model. _simulate_non_ar_residuals() Simulate residuals according to the model type. simulate_non_ar_process() Simulate a time series from the fitted model. generate_samples_sieve(model_type, resids_lags, resids_coefs, resids) Generate a bootstrap sample using the sieve bootstrap. """ _tags = {"python_dependencies": ["arch", "statsmodels"]} def __init__( self, fitted_model, X_fitted: np.ndarray, rng=None, ) -> None: """ Initialize the TimeSeriesSimulator class. Parameters ---------- fitted_model: FittedModelTypes A fitted model object. X_fitted: np.ndarray Array of fitted values. rng: Optional[Union[Integral, Generator]], optional Random number generator instance. Defaults to None. """ self.fitted_model = fitted_model self.X_fitted = X_fitted self.rng = rng self.n_samples, self.n_features = self.X_fitted.shape self.burnin = min(100, self.n_samples // 3) @property def fitted_model(self): """Get the fitted model.""" return self._fitted_model @fitted_model.setter def fitted_model(self, fitted_model) -> None: """Set the fitted model, ensuring it's validated first.""" validate_fitted_model(fitted_model) self._fitted_model = fitted_model @property def X_fitted(self) -> np.ndarray: """Get the array of fitted values.""" return self._X_fitted @X_fitted.setter def X_fitted(self, value: np.ndarray) -> None: """ Set the array of fitted values. Parameters ---------- value: np.ndarray Array of fitted values to set. """ from arch.univariate.base import ARCHModelResult from statsmodels.tsa.vector_ar.var_model import VARResultsWrapper model_is_var = isinstance(self.fitted_model, VARResultsWrapper) model_is_arch = isinstance(self.fitted_model, ARCHModelResult) self._X_fitted, _ = validate_X_and_y( value, None, model_is_var=model_is_var, model_is_arch=model_is_arch ) @property def rng(self): """Get the random number generator instance.""" return self._rng @rng.setter def rng(self, rng) -> None: """ Set the random number generator instance. Parameters ---------- rng: Optional[Union[Integral, Generator]] Random number generator instance. """ self._rng = validate_rng(rng, allow_seed=True)
[docs] def _validate_ar_simulation_params(self, params: dict) -> None: """ Validate the parameters necessary for the simulation. """ required_params = ["resids_lags", "resids_coefs", "resids"] for param in required_params: if params.get(param) is None: # logger.error(f"{param} is not provided.") raise ValueError(f"{param} must be provided for the AR model.")
[docs] def _simulate_ar_residuals( self, lags: np.ndarray, coefs: np.ndarray, init: np.ndarray, max_lag: Integral, ) -> np.ndarray: """ Simulates an Autoregressive (AR) process with given lags, coefficients, initial values, and random errors. Parameters ---------- lags: np.ndarray The lags to be used in the AR process. Can be non-consecutive, but when called from `generate_samples_sieve`, it will be sorted. coefs: np.ndarray The coefficients corresponding to each lag. Of shape (1, len(lags)). Sorted by `generate_samples_sieve` corresponding to the sorted `lags`. init: np.ndarray The initial values for the simulation. Should be at least as long as the maximum lag. Returns ------- np.ndarray The simulated AR process as a 1D NumPy array. Raises ------ ValueError If `lags` or `coefs` are not provided. If `coefs` is not a 1D NumPy array. If `coefs` is not the same length as `lags`. If `init` is not the same length as `max_lag`. TypeError If `lags` is not an integer or a list of integers. """ random_errors = self.rng.normal(size=self.n_samples) if len(init) < max_lag: # type: ignore raise ValueError( "Length of 'init' must be at least as long as the maximum lag in 'lags'" ) # In case init is 2d with shape (X, 1), convert it to 1d init = init.ravel() series = np.zeros(self.n_samples, dtype=init.dtype) series[:max_lag] = init trend_terms = TSFit._calculate_trend_terms( model_type="ar", model=self.fitted_model ) intercepts = self.fitted_model.params[:trend_terms].reshape( 1, trend_terms ) # Loop through the series, calculating each value based on the lagged values, coefficients, random error, and trend term for t in range(max_lag, self.n_samples): ar_term = 0 for i in range(len(lags)): ar_term_iter = coefs[0, i] * series[t - lags[i]] ar_term += ar_term_iter trend_term = 0 # If the trend is 'c' or 'ct', add a constant term if self.fitted_model.model.trend in ["c", "ct"]: trend_term += intercepts[0, 0] # If the trend is 't' or 'ct', add a linear time trend if self.fitted_model.model.trend in ["t", "ct"]: trend_term += intercepts[0, -1] * t series[t] = ar_term + trend_term + random_errors[t] return series
[docs] def simulate_ar_process( self, resids_lags: Union[Integral, List[Integral]], resids_coefs: np.ndarray, resids: np.ndarray, ) -> np.ndarray: """ Simulate AR process from the fitted model. Parameters ---------- resids_lags: Union[Integral, List[Integral]] The lags to be used in the AR process. Can be non-consecutive, but when called from `generate_samples_sieve`, it will be sorted. resids_coefs: np.ndarray The coefficients corresponding to each lag. Of shape (1, len(lags)). Sorted by `generate_samples_sieve` corresponding to the sorted `lags`. resids: np.ndarray The initial values for the simulation. Should be at least as long as the maximum lag. Returns ------- np.ndarray The simulated AR process as a 1D NumPy array. Raises ------ ValueError If `resids_lags`, `resids_coefs`, or `resids` are not provided. If `resids_coefs` is not a 1D NumPy array. If `resids_coefs` is not the same length as `resids_lags`. If `resids` is not the same length as `X_fitted`. TypeError If `fitted_model` is not an instance of `AutoRegResultsWrapper`. If `resids_lags` is not an integer or a list of integers. """ from statsmodels.tsa.ar_model import AutoRegResultsWrapper validate_integers(resids_lags, min_value=1) # type: ignore if not isinstance(self.fitted_model, AutoRegResultsWrapper): # logger.error("fitted_model must be an instance of AutoRegResultsWrapper.") raise TypeError( f"fitted_model must be an instance of AutoRegResultsWrapper. Got {type(self.fitted_model)}." ) if self.n_features > 1: raise ValueError( "Only univariate time series are supported for the AR model." ) if self.n_samples != len(resids): raise ValueError( "Length of 'resids' must be the same as the number of samples in 'X_fitted'." ) # In case resids is 2d with shape (X, 1), convert it to 1d resids = resids.ravel() # In case X_fitted is 2d with shape (X, 1), convert it to 1d X_fitted = self.X_fitted.ravel() # Generate the bootstrap series bootstrap_series = np.zeros(self.n_samples, dtype=X_fitted.dtype) # Convert resids_lags to a NumPy array if it is not already. When called from `generate_samples_sieve`, it will be sorted. resids_lags = ( np.arange(1, resids_lags + 1) if isinstance(resids_lags, Integral) else np.array(sorted(resids_lags)) ) # type: ignore # resids_lags.shape: (n_lags,) max_lag = np.max(resids_lags) # type: ignore if resids_coefs.shape[0] != 1: raise ValueError( "AR coefficients must be a 1D NumPy array of shape (1, X)" ) if resids_coefs.shape[1] != len(resids_lags): # type: ignore raise ValueError( "Length of 'resids_coefs' must be the same as the length of 'lags'" ) # Simulate residuals using the AR model simulated_residuals = self._simulate_ar_residuals( lags=resids_lags, # type: ignore coefs=resids_coefs, init=resids[:max_lag], max_lag=max_lag, ) # simulated_residuals.shape: (n_samples,) bootstrap_series[:max_lag] = X_fitted[:max_lag] # Loop through the series, calculating each value based on the lagged values, coefficients, and random error for t in range(max_lag, self.n_samples): lagged_values = bootstrap_series[t - resids_lags] # type: ignore # lagged_values.shape: (n_lags,) lagged_values = lagged_values.reshape(-1, 1) # lagged_values.shape: (n_lags, 1) bootstrap_series[t] = ( resids_coefs @ lagged_values + simulated_residuals[t] ) return bootstrap_series.reshape(-1, 1)
[docs] def _simulate_non_ar_residuals(self) -> np.ndarray: """ Simulate residuals according to the model type. Returns ------- np.ndarray The simulated residuals. """ from arch.univariate.base import ARCHModelResult from statsmodels.tsa.arima.model import ARIMAResultsWrapper from statsmodels.tsa.statespace.sarimax import SARIMAXResultsWrapper from statsmodels.tsa.vector_ar.var_model import VARResultsWrapper rng_seed = ( self.rng.integers(0, 2**32 - 1) if not isinstance(self.rng, Integral) else self.rng ) if isinstance( self.fitted_model, (ARIMAResultsWrapper, SARIMAXResultsWrapper) ): return self.fitted_model.simulate( nsimulations=self.n_samples + self.burnin, random_state=self.rng, ) elif isinstance(self.fitted_model, VARResultsWrapper): return self.fitted_model.simulate_var( steps=self.n_samples + self.burnin, seed=rng_seed ) elif isinstance(self.fitted_model, ARCHModelResult): return self.fitted_model.model.simulate( # type: ignore params=self.fitted_model.params, nobs=self.n_samples, burn=self.burnin, )["data"].values raise ValueError(f"Unsupported fitted model type {self.fitted_model}.")
[docs] def simulate_non_ar_process(self) -> np.ndarray: """ Simulate a time series from the fitted model. Returns ------- np.ndarray: The simulated time series. """ from statsmodels.tsa.arima.model import ARIMAResultsWrapper from statsmodels.tsa.statespace.sarimax import SARIMAXResultsWrapper from statsmodels.tsa.vector_ar.var_model import VARResultsWrapper simulated_residuals = self._simulate_non_ar_residuals() simulated_residuals = np.reshape( simulated_residuals, (-1, self.n_features) ) # Discard the burn-in samples for certain models if isinstance( self.fitted_model, (VARResultsWrapper, ARIMAResultsWrapper, SARIMAXResultsWrapper), ): simulated_residuals = simulated_residuals[self.burnin :] return self.X_fitted + simulated_residuals
[docs] def generate_samples_sieve( self, model_type: ModelTypes, resids_lags: Optional[Union[Integral, List[Integral]]] = None, resids_coefs: Optional[np.ndarray] = None, resids: Optional[np.ndarray] = None, ) -> np.ndarray: """ Generate a bootstrap sample using the sieve bootstrap. Parameters ---------- model_type: ModelTypes The model type used for the simulation. resids_lags: Optional[Union[Integral, List[Integral]]], optional The lags to be used in the AR process. Can be non-consecutive. resids_coefs: Optional[np.ndarray], optional The coefficients corresponding to each lag. Of shape (1, len(lags)). resids: Optional[np.ndarray], optional The initial values for the simulation. Should be at least as long as the maximum lag. Returns ------- np.ndarray The bootstrap sample. Raises ------ ValueError If `resids_lags`, `resids_coefs`, or `resids` are not provided. """ validate_literal_type(model_type, ModelTypes) if model_type == "ar": self._validate_ar_simulation_params( { "resids_lags": resids_lags, "resids_coefs": resids_coefs, "resids": resids, } ) return self.simulate_ar_process(resids_lags, resids_coefs, resids) # type: ignore else: return self.simulate_non_ar_process()
def __repr__(self) -> str: return f"TimeSeriesSimulator(fitted_model={self.fitted_model}, n_samples={self.n_samples}, n_features={self.n_features})" def __str__(self) -> str: return f"TimeSeriesSimulator with {self.n_samples} samples and {self.n_features} features using fitted model {self.fitted_model}" def __eq__(self, other: object) -> bool: if isinstance(other, TimeSeriesSimulator): return ( self.fitted_model == other.fitted_model and np.array_equal(self.X_fitted, other.X_fitted) and self.rng == other.rng ) return False