"""
Markov sampling: Capturing temporal transitions through state-based resampling.
This module implements Markov-based bootstrap methods that explicitly model
the transition dynamics in time series data. Unlike block methods that preserve
local structure wholesale, Markov methods learn the probabilistic transitions
between states, enabling more flexible resampling that respects the underlying
stochastic process.
The key insight is dimensionality reduction: high-dimensional time series blocks
are compressed into representative states, and transitions between these states
are modeled as a Markov chain. This approach bridges the gap between simple
resampling (which ignores dependencies) and full model-based methods (which
may be too restrictive).
Our implementation supports multiple compression strategies, from simple summary
statistics to sophisticated PCA-based representations. The Markov transition
matrix is then estimated from the observed state sequences, enabling generation
of new sample paths that maintain the essential dynamics of the original series.
"""
import logging
import warnings
from numbers import Integral
from typing import Optional
import numpy as np
import scipy.stats
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_is_fitted
from tsbootstrap.utils.types import BlockCompressorTypes
from tsbootstrap.utils.validate import (
validate_blocks,
validate_integers,
validate_literal_type,
)
logger = logging.getLogger(__name__)
try:
from dtaidistance import dtw_ndim # type: ignore
# Note: dtaidistance may not compile for all Python versions
dtaidistance_installed = True
except ImportError:
dtaidistance_installed = False
[docs]
class BlockCompressor:
"""
Intelligent dimensionality reduction for temporal block representation.
This class implements various strategies for compressing time series blocks
into low-dimensional representations suitable for Markov chain modeling.
The challenge is to preserve the essential temporal characteristics while
achieving sufficient dimension reduction for tractable state space modeling.
We support multiple compression strategies, each with different tradeoffs:
- Middle: Uses central observations as representatives (simple, preserves local structure)
- Mean: Averages across time (smooth, may lose dynamics)
- Median: Robust averaging (handles outliers)
- Mode: Captures most frequent patterns (discrete data)
- First/Last: Boundary-based representation
Advanced options include PCA compression for multivariate series, which
learns optimal linear projections that maximize variance preservation.
"""
def __init__(
self,
method: BlockCompressorTypes = "middle",
apply_pca_flag: bool = False,
pca: Optional[PCA] = None,
random_seed: Optional[int] = None, # Changed from Integral to int
):
"""
Initialize the BlockCompressor with the selected method, PCA flag, PCA instance, and random seed.
Parameters
----------
method : BlockCompressorTypes, optional
The method to use for summarizing the blocks. Default is "middle".
apply_pca_flag : bool, optional
Whether to apply Principal Component Analysis (PCA) for dimensionality reduction. Default is False.
pca : sklearn.decomposition.PCA, optional
PCA instance, with `n_components` set to 1. If not provided, a default PCA instance is used. Default is None.
random_seed : Integral, optional
The seed for the random number generator. Default is None.
"""
self.method = method
self.apply_pca_flag = apply_pca_flag
self.pca = pca
self.random_seed = random_seed
if self.method in ["mean", "median"] and self.apply_pca_flag:
warnings.warn(
"PCA compression is not recommended for 'mean' or 'median' methods.",
stacklevel=2,
)
# once scikit-base object:
# set python_dependencies tag depending on method
# if method is "kmedoids"
# "scikit-learn-extra" (due to MKedoids)
# import name is sklearn_extra
# if method is "kmedians"
# "pyclustering" (due to KMedians)
@property
def method(self) -> str:
"""Getter for method."""
return self._method
@method.setter
def method(self, value: str) -> None:
"""
Setter for method. Performs validation on assignment.
Parameters
----------
value : str
The method to use for summarizing the blocks.
"""
self._method = self._validate_method(value)
def _validate_method(self, method: str) -> str:
"""
Validate and correct the method.
Parameters
----------
method : str
The method to use for summarizing the blocks.
Returns
-------
str
The validated method.
Raises
------
ValueError
If the method is not one of the BlockCompressorTypes.
"""
validate_literal_type(method, BlockCompressorTypes)
return method.lower()
@property
def apply_pca_flag(self) -> bool:
"""Getter for apply_pca_flag."""
return self._apply_pca_flag
@apply_pca_flag.setter
def apply_pca_flag(self, value: bool) -> None:
"""
Setter for apply_pca_flag. Performs validation on assignment.
Parameters
----------
value : bool
Whether to apply PCA or not.
"""
if not isinstance(value, bool):
raise TypeError(
f"PCA application flag must be a boolean value (True/False). "
f"Received type: {type(value).__name__}. This flag determines whether "
f"PCA dimensionality reduction is applied to compressed blocks."
)
self._apply_pca_flag = value
@property
def pca(self) -> PCA:
"""Getter for pca."""
return self._pca
@pca.setter
def pca(self, value: Optional[PCA]) -> None:
"""
Setter for pca. Performs validation on assignment.
Parameters
----------
value : Optional[PCA]
The PCA instance to use.
"""
if value is not None:
if not isinstance(value, PCA):
raise TypeError(
f"PCA parameter must be a scikit-learn PCA instance. "
f"Received type: {type(value).__name__}. Please provide a "
f"sklearn.decomposition.PCA object configured for compression."
)
elif value.n_components != 1: # type: ignore
raise ValueError(
f"PCA compression requires exactly 1 component for state representation. "
f"The provided PCA object has n_components={value.n_components}. "
f"Please configure PCA with n_components=1 for Markov state compression."
)
self._pca = value
else:
self._pca = PCA(n_components=1)
@property
def random_seed(self):
return self._random_seed
@random_seed.setter
def random_seed(self, value: Optional[int]) -> None: # Changed from Integral to int
"""
Setter for rng. Performs validation on assignment.
Parameters
----------
value : Generator
The random number generator to use.
"""
if value is not None:
if not isinstance(value, Integral):
raise TypeError(
f"Random seed must be an integer value. Received type: {type(value).__name__}. "
f"Provide an integer seed for reproducible random number generation."
)
else:
if value < 0 or int(value) >= 2**32:
raise ValueError(
f"Random seed must be between 0 and 2^32-1 (inclusive). "
f"Received: {value}. This constraint ensures compatibility "
f"with numpy's random number generator implementation."
)
else:
self._random_seed = value
else:
self._random_seed = None
def _pca_compression(self, block: np.ndarray, summary: np.ndarray) -> np.ndarray:
"""Compress the block using PCA.
The method fits a PCA instance to the block and transforms it to a lower dimension.
If the PCA instance has already been fitted, only the transformation is performed.
Parameters
----------
block : np.ndarray
The block to compress.
Returns
-------
np.ndarray
The compressed block.
"""
# Check if the PCA instance has already been fitted
try:
check_is_fitted(self.pca)
except NotFittedError:
self.pca.fit(block)
transformed_summary = self.pca.transform(summary)
return transformed_summary
def _summarize_block(self, block: np.ndarray) -> np.ndarray:
"""
Helper method to summarize a block using a specified method.
The available methods are 'first', 'middle', 'last', 'mean', 'median',
'mode', 'kmeans', 'kmedians', 'kmedoids'.
Parameters
----------
block : np.ndarray
A 2D numpy array representing a block of data.
Returns
-------
np.ndarray
A 1D numpy array representing the summarized block.
Raises
------
ValueError
If the specified method is not recognized.
"""
# Mapping of methods to corresponding functions
summarization_methods = {
"first": lambda x: x[0],
"middle": lambda x: x[len(x) // 2],
"last": lambda x: x[-1],
"mean": lambda x: x.mean(axis=0),
"median": lambda x: np.median(x, axis=0),
"mode": lambda x: scipy.stats.mode(x, axis=0, keepdims=True)[0][0],
"kmeans": self._kmeans_compression,
"kmedians": self._kmedians_compression,
"kmedoids": self._kmedoids_compression,
}
method = summarization_methods.get(self.method)
if method is None:
raise ValueError(
f"Method '{self.method}' is not recognized. Please select one of {list(summarization_methods.keys())}."
)
summary = method(block)
summary = np.array(summary).reshape(1, -1)
summary = self._pca_compression(block, summary) if self.apply_pca_flag else summary
return summary
# Additional private methods to handle kmeans, kmedians, and kmedoids
def _kmeans_compression(self, block: np.ndarray) -> np.ndarray:
"""
Helper method to compress a block using k-means clustering.
Parameters
----------
block : np.ndarray
A 2D numpy array representing a block of data.
Returns
-------
np.ndarray
A 1D numpy array representing the compressed block.
Notes
-----
This method uses the scikit-learn implementation of k-means clustering.
"""
return (
KMeans(n_clusters=1, random_state=self.random_seed, n_init="auto") # type: ignore
.fit(block)
.cluster_centers_[0]
)
def _kmedians_compression(self, block: np.ndarray) -> np.ndarray:
"""
Helper method to compress a block using k-medians clustering.
Parameters
----------
block : np.ndarray
A 2D numpy array representing a block of data.
Returns
-------
np.ndarray
A 1D numpy array representing the compressed block.
Notes
-----
This method uses the scipy implementation of k-medians clustering.
"""
from pyclustering.cluster.kmedians import kmedians # type: ignore
rng = np.random.default_rng(self.random_seed) # type: ignore
initial_centers = rng.choice(block.flatten(), size=(1, block.shape[1]))
kmedians_instance = kmedians(block, initial_centers)
kmedians_instance.process()
return kmedians_instance.get_medians()[0] # type: ignore
def _kmedoids_compression(self, block: np.ndarray) -> np.ndarray:
"""
Helper method to compress a block using k-medoids clustering.
Parameters
----------
block : np.ndarray
A 2D numpy array representing a block of data.
Returns
-------
np.ndarray
A 1D numpy array representing the compressed block.
Notes
-----
This method uses the scikit-learn-extra implementation of k-medoids clustering.
"""
from sklearn_extra.cluster import KMedoids
return (
KMedoids(n_clusters=1, random_state=self.random_seed) # type: ignore
.fit(block)
.cluster_centers_[0]
)
[docs]
def summarize_blocks(self, blocks) -> np.ndarray:
"""
Summarize each block in the input list of blocks using the specified method.
Parameters
----------
blocks : List[np.ndarray]
List of numpy arrays representing the blocks to be summarized.
Returns
-------
np.ndarray
Numpy array containing the summarized blocks.
Example
-------
>>> compressor = BlockCompressor(method='middle')
>>> blocks = [np.array([1, 2, 3]), np.array([4, 5, 6])]
>>> summarized_blocks = compressor.summarize_blocks(blocks)
>>> summarized_blocks
array([2, 5])
"""
"""
Summarize each block in the input list of blocks using the specified method.
Parameters
----------
blocks : List[np.ndarray]
A list of 2D NumPy arrays, each representing a block of data.
Returns
-------
np.ndarray
A 2D NumPy array of shape (len(blocks), num_features==blocks[0].shape[1]) with each row containing the summarized element for the corresponding input block.
"""
# Validate input blocks
validate_blocks(blocks)
# Preallocate an empty array of the correct size
num_blocks = len(blocks)
num_features = blocks[0].shape[1]
summaries = np.empty((num_blocks, num_features))
# Fill the array in a loop
for i, block in enumerate(blocks):
summaries[i] = self._summarize_block(block)
return summaries
[docs]
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.
Returns
-------
params : dict or list of dict, default = {}
Parameters to create testing instances of the class
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`
"""
from tsbootstrap.utils.skbase_compat import (
safe_check_soft_dependencies as _check_soft_dependencies,
)
methods = [
"first",
"middle",
"last",
"mean",
"mode",
"median",
"kmeans",
]
if _check_soft_dependencies("scikit-learn-extra", severity="none"):
methods.append("kmedoids")
if _check_soft_dependencies("pyclustering", severity="none"):
methods.append("kmedians")
return [{"method": method} for method in methods]
[docs]
class MarkovTransitionMatrixCalculator:
"""
MarkovTransitionMatrixCalculator class provides the functionality to calculate the transition matrix for a set of data blocks based on their DTW distances between consecutive blocks.
The transition matrix is normalized to obtain transition probabilities.
The underlying assumption is that the data blocks are generated from a Markov chain.
In other words, the next block is generated based on the current block and not on any previous blocks.
Methods
-------
__init__() -> None
Initialize the MarkovTransitionMatrixCalculator instance.
_calculate_dtw_distances(blocks, eps: float = 1e-5) -> np.ndarray
Calculate the DTW distances between all pairs of blocks.
calculate_transition_probabilities(blocks) -> np.ndarray
Calculate the transition probability matrix based on DTW distances between all pairs of blocks.
Examples
--------
>>> calculator = MarkovTransitionMatrixCalculator()
>>> blocks = [np.random.rand(10, 5) for _ in range(50)]
>>> transition_matrix = calculator.calculate_transition_probabilities(blocks)
"""
_tags = {"python_dependencies": "hmmlearn>=0.3.0"}
[docs]
@staticmethod
def _calculate_dtw_distances(blocks, eps: float = 1e-5) -> np.ndarray:
"""
Calculate the DTW distances between all pairs of blocks. A small constant epsilon is added to every distance to ensure that there is always a non-zero probability of remaining in the same state.
Parameters
----------
blocks : List[np.ndarray]
A list of numpy arrays, each of shape (num_timestamps, num_features), representing the time series data blocks.
eps : float
A small constant to be added to the DTW distances to ensure non-zero probabilities.
Returns
-------
np.ndarray
A matrix of DTW distances of shape (len(blocks), len(blocks)).
"""
validate_blocks(blocks)
num_blocks = len(blocks)
# Check if dtaidistance is available
if not dtaidistance_installed:
raise ImportError(
"The dtaidistance package is required for Dynamic Time Warping calculations. "
"This package enables computation of similarity between time series blocks "
"with different alignments. Install it using: pip install dtaidistance"
)
# Compute pairwise DTW distances between all pairs of blocks
distances = np.zeros((num_blocks, num_blocks))
for i in range(num_blocks):
for j in range(i, num_blocks):
dist = dtw_ndim.distance(blocks[i], blocks[j]) + eps # type: ignore
distances[i, j] = dist
distances[j, i] = dist
# Add a small constant to the diagonal to allow remaining in the same state
np.fill_diagonal(distances, eps)
return distances
[docs]
@staticmethod
def calculate_transition_probabilities(
blocks,
) -> np.ndarray:
"""
Calculate the transition probability matrix based on DTW distances between all pairs of blocks.
Parameters
----------
blocks : List[np.ndarray]
A list of numpy arrays, each of shape (num_timestamps, num_features), representing the time series data blocks.
Returns
-------
np.ndarray
A transition probability matrix of shape (len(blocks), len(blocks)).
"""
distances = MarkovTransitionMatrixCalculator._calculate_dtw_distances(blocks)
num_blocks = len(blocks)
# Normalize the distances to obtain transition probabilities
transition_probabilities = np.zeros((num_blocks, num_blocks))
for i in range(num_blocks):
total_distance = np.sum(distances[i, :])
if total_distance > 0:
transition_probabilities[i, :] = distances[i, :] / total_distance
else:
# Case when all blocks are identical, assign uniform probabilities
transition_probabilities[i, :] = 1 / num_blocks
return transition_probabilities
[docs]
class MarkovSampler:
"""
Advanced Markov chain sampler for temporal state transition modeling.
This class implements sophisticated bootstrap methods that combine block-based
resampling with Hidden Markov Model (HMM) techniques. The key innovation is
treating time series blocks as states in a Markov chain, enabling generation
of new sequences that maintain the original transition dynamics.
The sampler supports two primary modes of operation:
1. Direct block transitions: Uses DTW distances to model transitions between
observed blocks, preserving exact temporal patterns
2. HMM-based abstraction: Learns latent states and their dynamics, providing
more flexible generation at the cost of some fidelity
Our implementation leverages state-of-the-art algorithms for both compression
(reducing blocks to manageable representations) and transition modeling
(learning the probabilistic structure). This enables bootstrap methods that
respect complex temporal dependencies while maintaining computational efficiency.
Attributes
----------
transition_matrix_calculator : MarkovTransitionMatrixCalculator
Computes transition probabilities between states using DTW distances.
block_compressor : BlockCompressor
Reduces high-dimensional blocks to representative states.
Examples
--------
>>> # Direct block transition mode
>>> sampler = MarkovSampler(blocks_as_hidden_states_flag=True)
>>> blocks = [np.random.rand(10, 5) for _ in range(50)]
>>> results = sampler.sample(blocks)
>>>
>>> # HMM abstraction mode
>>> sampler = MarkovSampler(n_iter_hmm=200, n_fits_hmm=20)
>>> results = sampler.sample(blocks, n_states=5)
"""
def __init__(
self,
method: BlockCompressorTypes = "middle",
apply_pca_flag: bool = False,
pca: Optional[PCA] = None,
n_iter_hmm: int = 100, # Changed from Integral to int
n_fits_hmm: int = 10, # Changed from Integral to int
blocks_as_hidden_states_flag: bool = False,
random_seed: Optional[int] = None, # Changed from Integral to int
):
"""
Initialize the MarkovSampler instance.
Parameters
----------
method : str, optional
The method to use for summarizing the blocks. Default is "middle".
apply_pca_flag : bool, optional
Whether to apply Principal Component Analysis (PCA) for dimensionality reduction. Default is False.
pca : sklearn.decomposition.PCA, optional
An instance of sklearn's PCA class, with `n_components` set to 1. If not provided, a default PCA instance will be used.
n_iter_hmm : Integral, optional
The number of iterations to run the HMM for. Default is 100.
n_fits_hmm : Integral, optional
The number of times to fit the HMM. Default is 10.
blocks_as_hidden_states_flag : bool, optional
If True, each block will be used as a hidden state for the HMM (i.e., n_states = len(blocks)).
If False, the blocks are interpreted as separate sequences of data and the HMM is initialized with uniform transition probabilities. Default is False.
random_seed : Integral, optional
The seed for the random number generator. Default is None (no fixed seed).
Notes
-----
The MarkovSampler class uses the dtaidistance package for calculating DTW distances between blocks. This package is not available for Python 3.10 and 3.11. If you are using Python 3.10 or 3.11, the MarkovSampler class will automatically set the blocks_as_hidden_states_flag to False.
"""
self.method = method
self.apply_pca_flag = apply_pca_flag
self.pca = pca
self.n_iter_hmm = n_iter_hmm
self.n_fits_hmm = n_fits_hmm
self.blocks_as_hidden_states_flag = blocks_as_hidden_states_flag
self.random_seed = random_seed
if self.blocks_as_hidden_states_flag and not dtaidistance_installed:
warnings.warn(
"Direct block transition mode requires the 'dtaidistance' package for "
"Dynamic Time Warping calculations. This package may have compatibility "
"issues with some Python versions. Automatically switching to HMM-based "
"mode (blocks_as_hidden_states_flag=False) for this session.",
stacklevel=2,
)
self.blocks_as_hidden_states_flag = False
self.transition_matrix_calculator = MarkovTransitionMatrixCalculator()
self.block_compressor = BlockCompressor(
apply_pca_flag=self.apply_pca_flag,
pca=self.pca,
random_seed=self.random_seed,
method=self.method,
)
self.model = None
self.X = None
@property
def n_iter_hmm(self) -> int: # Changed from Integral to int
"""Getter for n_iter_hmm."""
return self._n_iter_hmm
@n_iter_hmm.setter
def n_iter_hmm(self, value: int) -> None: # Changed from Integral to int
"""
Setter for n_iter_hmm. Performs validation on assignment.
Parameters
----------
value : int
The number of iterations to run the HMM for.
"""
validate_integers(value, min_value=1) # type: ignore
self._n_iter_hmm = value
@property
def n_fits_hmm(self) -> int: # Changed from Integral to int
"""Getter for n_fits_hmm."""
return self._n_fits_hmm
@n_fits_hmm.setter
def n_fits_hmm(self, value: int) -> None: # Changed from Integral to int
"""
Setter for n_fits_hmm. Performs validation on assignment.
Parameters
----------
value : int
The number of times to fit the HMM.
"""
validate_integers(value, min_value=1) # type: ignore
self._n_fits_hmm = value
@property
def blocks_as_hidden_states_flag(self) -> bool:
"""Getter for blocks_as_hidden_states_flag."""
return self._blocks_as_hidden_states_flag
@blocks_as_hidden_states_flag.setter
def blocks_as_hidden_states_flag(self, value: bool) -> None:
"""
Setter for blocks_as_hidden_states_flag. Performs validation on assignment.
Parameters
----------
value : bool
Whether to use the blocks as hidden states for the HMM.
"""
if not isinstance(value, bool):
raise TypeError(
f"Hidden states flag must be a boolean value (True/False). "
f"Received type: {type(value).__name__}. This flag determines whether "
f"to use observed blocks directly as Markov states (True) or learn "
f"latent states via HMM (False)."
)
self._blocks_as_hidden_states_flag = value
@property
def random_seed(self):
"""Getter for random_seed."""
return self._random_seed
@random_seed.setter
def random_seed(self, value: Optional[int]) -> None: # Changed from Integral to int
"""
Setter for rng. Performs validation on assignment.
Parameters
----------
value : Generator
The random number generator to use.
"""
if value is not None:
if not isinstance(value, Integral):
raise TypeError(
f"Random seed must be an integer value. Received type: {type(value).__name__}. "
f"Provide an integer seed for reproducible random number generation."
)
else:
if value < 0 or int(value) >= 2**32:
raise ValueError(
f"Random seed must be between 0 and 2^32-1 (inclusive). "
f"Received: {value}. This constraint ensures compatibility "
f"with numpy's random number generator implementation."
)
else:
self._random_seed = value
else:
self._random_seed = None
[docs]
def fit_hidden_markov_model(
self,
X: np.ndarray,
n_states: int = 5, # Changed from Integral to int
transmat_init: Optional[np.ndarray] = None,
means_init: Optional[np.ndarray] = None,
lengths: Optional[np.ndarray] = None,
):
"""
Fit a Gaussian Hidden Markov Model on the input data.
Parameters
----------
X : np.ndarray
A 2D NumPy array, where each row represents a summarized block of data.
n_states : Integral, optional
The number of states in the hidden Markov model. By default 5.
Returns
-------
hmm.GaussianHMM
The trained Gaussian Hidden Markov Model.
"""
self._validate_fit_hidden_markov_model_inputs(X, n_states, transmat_init, means_init)
best_score = -np.inf
best_hmm_model = None
for idx in range(self.n_fits_hmm):
hmm_model = self._initialize_hmm_model(
n_states, transmat_init, means_init, idx # type: ignore
)
try:
hmm_model.fit(X, lengths=lengths)
score = hmm_model.score(X, lengths=lengths)
if score > best_score:
best_hmm_model = hmm_model
best_score = score
except ValueError as e:
logger.debug(f"HMM fitting or scoring failed for attempt {idx}: {e}")
continue # Continue to the next fit attempt
if best_hmm_model is None:
raise RuntimeError(
f"Failed to fit Hidden Markov Model after {self.n_fits_hmm} attempts. "
f"This typically indicates: (1) insufficient data for {n_states} states, "
f"(2) poor initialization values, or (3) numerical instability. Consider "
f"reducing n_states, increasing n_fits_hmm, or checking data quality."
)
return best_hmm_model
def _validate_fit_hidden_markov_model_inputs(
self,
X: np.ndarray,
n_states: int, # Changed from Integral to int
transmat_init: Optional[np.ndarray],
means_init: Optional[np.ndarray],
) -> None:
"""
Validate the inputs to fit_hidden_markov_model.
Parameters
----------
X : np.ndarray
A 2D NumPy array, where each row represents a summarized block of data.
n_states : Integral
The number of states in the hidden Markov model.
transmat_init : Optional[np.ndarray]
The initial transition matrix for the HMM.
means_init : Optional[np.ndarray]
The initial means for the HMM.
Raises
------
TypeError
If X is not a NumPy array.
ValueError
If X is not a two-dimensional array.
If n_states is not an integer >= 1.
If the shape of transmat_init is invalid.
If the shape of means_init is invalid.
Returns
-------
None
Notes
-----
This method is called by fit_hidden_markov_model. It is not intended to be called directly.
"""
if X.ndim != 2:
raise ValueError(
f"HMM input data must be a 2D array with shape (n_samples, n_features). "
f"Received array with {X.ndim} dimensions. Each row should represent "
f"a compressed block, and each column a feature dimension."
)
if not isinstance(n_states, Integral) or n_states < 1:
raise ValueError(
f"Number of HMM states must be a positive integer. Received: {n_states}. "
f"Choose n_states based on the complexity of your time series dynamics - "
f"typically 3-10 states work well for most applications."
)
if transmat_init is not None:
transmat_init = np.array(transmat_init)
if not isinstance(transmat_init, np.ndarray):
raise TypeError(
f"Initial transition matrix must be a NumPy array. "
f"Received type: {type(transmat_init).__name__}."
)
if transmat_init.shape != (n_states, n_states):
raise ValueError(
f"Initial transition matrix shape mismatch. Expected: ({n_states}, {n_states}) "
f"for {n_states} states, but received: {transmat_init.shape}. The matrix must "
f"be square with dimensions matching the number of HMM states."
)
if means_init is not None:
means_init = np.array(means_init)
if not isinstance(means_init, np.ndarray):
raise TypeError(
f"Initial means must be a NumPy array. "
f"Received type: {type(means_init).__name__}."
)
if means_init.shape != (n_states, X.shape[1]):
raise ValueError(
f"Initial means shape mismatch. Expected: ({n_states}, {X.shape[1]}) "
f"for {n_states} states and {X.shape[1]} features, but received: "
f"{means_init.shape}. Each row should represent the mean vector for one state."
)
def _initialize_hmm_model(
self,
n_states: int, # Changed from Integral to int
transmat_init: Optional[np.ndarray],
means_init: Optional[np.ndarray],
idx: int, # Changed from Integral to int
):
"""
Initialize a Gaussian Hidden Markov Model.
Parameters
----------
n_states : Integral
The number of states in the hidden Markov model.
transmat_init : Optional[np.ndarray]
The initial transition matrix for the HMM.
means_init : Optional[np.ndarray]
The initial means for the HMM.
idx : Integral
The index of the current fit.
Returns
-------
hmm.GaussianHMM
The initialized Gaussian Hidden Markov Model.
Notes
-----
This method is called by fit_hidden_markov_model. It is not intended to be called directly.
"""
try:
from hmmlearn import hmm
except ImportError as e:
raise ImportError(
"The 'hmmlearn' package is required for Hidden Markov Model functionality. "
"This package provides the Gaussian HMM implementation used for learning "
"latent states in time series. Install it using: pip install hmmlearn"
) from e
hmm_model = hmm.GaussianHMM(
n_components=n_states, # type: ignore
covariance_type="full",
n_iter=self.n_iter_hmm, # type: ignore
init_params="stmc",
params="stmc",
random_state=(self.random_seed + idx if self.random_seed is not None else idx),
)
if transmat_init is not None:
hmm_model.transmat_ = transmat_init
if means_init is not None:
hmm_model.means_ = means_init
return hmm_model
[docs]
def fit(
self,
blocks,
n_states: int = 5, # Changed from Integral to int
) -> "MarkovSampler":
"""
Sample from a Markov chain with given transition probabilities.
Parameters
----------
blocks : List[np.ndarray] or np.ndarray
A list of 2D NumPy arrays, each representing a block of data, or a 2D NumPy array, where each row represents a row of raw data.
n_states : Integral, optional
The number of states in the hidden Markov model. Default is 5.
Returns
-------
MarkovSampler
Current instance of the MarkovSampler class, with the model trained.
Examples
--------
>>> blocks = [np.random.rand(10, 5) for _ in range(50)]
>>> sampler.fit(blocks, n_states=5)
"""
X, lengths, n_states = self._prepare_fit_inputs(blocks, n_states)
transmat_init = (
self.transition_matrix_calculator.calculate_transition_probabilities(blocks)
if self.blocks_as_hidden_states_flag
else None
)
means_init = (
self.block_compressor.summarize_blocks(blocks)
if self.blocks_as_hidden_states_flag
else None
)
hmm_model = self.fit_hidden_markov_model(X, n_states, transmat_init, means_init, lengths)
self.model = hmm_model
self.X = X
return self
# utility functions for fit
def _prepare_fit_inputs(self, blocks, n_states: int): # Changed from no type to int
"""
Validate the inputs to fit.
Parameters
----------
blocks : List[np.ndarray] or np.ndarray
A list of 2D NumPy arrays, each representing a block of data, or a 2D NumPy array, where each row represents a row of raw data.
n_states : Integral
The number of states in the hidden Markov model.
Raises
------
TypeError
If blocks is not a list of NumPy arrays or a NumPy array.
ValueError
If blocks is a list of NumPy arrays and any of the arrays are not two-dimensional.
If blocks is a list of NumPy arrays and any of the arrays are empty.
If blocks is a list of NumPy arrays and any of the arrays have zero columns.
If blocks is a list of NumPy arrays and any of the arrays have zero rows.
If blocks is a list of NumPy arrays and any of the arrays have different numbers of columns.
If blocks is a list of NumPy arrays and any of the arrays have different numbers of rows.
If blocks is a NumPy array and it is not two-dimensional.
If blocks is a NumPy array and it is empty.
If blocks is a NumPy array and it has zero columns.
If blocks is a NumPy array and it has zero rows.
If blocks is a NumPy array and it has different numbers of columns.
If blocks is a NumPy array and it has different numbers of rows.
If n_states is not an integer >= 1.
If n_states is gooder than the number of rows in blocks.
Returns
-------
Tuple[np.ndarray, Optional[np.ndarray], Integral]
A tuple containing the input data, the lengths of the blocks (if applicable), and the number of states.
"""
if isinstance(blocks, list):
validate_blocks(blocks)
X = np.concatenate(blocks, axis=0)
lengths = np.array([len(block) for block in blocks])
if self.blocks_as_hidden_states_flag:
n_states = len(blocks)
if min(lengths) < 10:
raise ValueError(
f"Input 'X' must have at least {n_states * 10} points to fit a {n_states}-state HMM."
)
logger.debug(
f"Using {len(blocks)} blocks as 'n_states', since 'blocks_as_hidden_states_flag' is True. Ignoring user-provided 'n_states' parameter."
)
lengths = None
else:
self._validate_single_block_input(blocks)
X = blocks
lengths = None
if not isinstance(n_states, Integral) or n_states < 1:
raise ValueError("Input 'n_states' must be an integer >= 1.")
if n_states > X.shape[0]: # type: ignore
raise ValueError(
f"Input 'X' must have at least {n_states} points to fit a {n_states}-state HMM."
)
return X, lengths, n_states
def _validate_single_block_input(self, blocks: np.ndarray):
"""
Validate the input to fit when a single block is provided.
Parameters
----------
blocks : np.ndarray
A 2D NumPy array, where each row represents a row of raw data.
Raises
------
TypeError
If blocks is not a NumPy array.
ValueError
If blocks is not a two-dimensional array.
If blocks is empty.
If blocks has zero columns.
If blocks has zero rows.
Returns
-------
None
"""
if not isinstance(blocks, np.ndarray):
raise TypeError("Input 'blocks' must be a list of NumPy arrays or a NumPy array.")
if blocks.ndim != 2 or blocks.shape[0] == 0 or blocks.shape[1] == 0:
raise ValueError("Input 'blocks' must be a non-empty two-dimensional array.")
[docs]
def sample(
self,
n_to_sample: int, # New argument for number of samples
random_seed: Optional[int] = None,
):
"""
Sample from the fitted Markov model.
Parameters
----------
n_to_sample : int
The number of samples (observations) to generate from the model.
random_seed : Optional[int]
The seed for the random number generator. If not provided, the random_seed
attribute of the MarkovSampler instance will be used if set, otherwise
hmmlearn will use its own default seeding.
Returns
-------
Tuple[np.ndarray, np.ndarray]
A tuple containing the generated samples and the sequence of hidden states.
Typically (X_generated, Z_states).
"""
# Check if the model is already fitted
check_is_fitted(self, ["model"]) # type: ignore
effective_random_seed = random_seed if random_seed is not None else self.random_seed
# self.model is an hmmlearn.hmm.GaussianHMM instance
# Its sample method is sample(self, n_samples=1, random_state=None, currstate=None)
return self.model.sample(n_samples=n_to_sample, random_state=effective_random_seed) # type: ignore
def __repr__(self) -> str:
return (
f"MarkovSampler(method='{self.block_compressor.method}', "
f"apply_pca_flag={self.block_compressor.apply_pca_flag}, "
f"pca={self.block_compressor.pca}, "
f"n_iter_hmm={self.n_iter_hmm}, "
f"n_fits_hmm={self.n_fits_hmm}, "
f"blocks_as_hidden_states_flag={self.blocks_as_hidden_states_flag}, "
f"random_seed={self.random_seed}, "
f"model_fitted={'Fitted' if self.model is not None else 'NotFitted'})"
)
def __str__(self) -> str:
return f"MarkovSampler using method '{self.block_compressor.method}' with PCA flag {self.block_compressor.apply_pca_flag} and random seed {self.random_seed}"
def __eq__(self, other: object) -> bool:
if not isinstance(other, MarkovSampler):
return False
# Compare configuration parameters
if not (
self.block_compressor == other.block_compressor # Relies on BlockCompressor.__eq__
and self.n_iter_hmm == other.n_iter_hmm
and self.n_fits_hmm == other.n_fits_hmm
and self.blocks_as_hidden_states_flag == other.blocks_as_hidden_states_flag
and self.random_seed == other.random_seed
):
return False
# Compare fitted model status (fitted vs. not-fitted)
if (self.model is None) != (other.model is None):
return False
# Return the negated condition directly
return not (
self.model is not None
and other.model is not None
and (
not isinstance(self.model, type(other.model))
or self.model.n_components != other.model.n_components # type: ignore
)
)
# For a more robust check, one might need to compare:
# self.model.startprob_ == other.model.startprob_
# self.model.transmat_ == other.model.transmat_
# self.model.means_ == other.model.means_
# self.model.covars_ == other.model.covars_
# This requires np.allclose for float arrays.
# For simplicity now, we'll skip deep model parameter comparison.
# Add more detailed checks if necessary for test correctness.