Skip to content

Detectors

Factory Functions

get_detector(name, **kwargs)

Get a detector instance by name.

Parameters:

Name Type Description Default
name str

Detector name (e.g., "sliding_window", "pelt", "bottom_up").

required
**kwargs

Parameters to pass to the detector constructor.

{}

Returns:

Type Description
BaseDetector

Configured detector instance.

Raises:

Type Description
ValueError

If detector name is not recognized.

ImportError

If detector requires unavailable dependencies.

Example

detector = get_detector("pelt", penalty=5) result = detector.fit_detect(x, y)

Source code in src/trend_classifier/detectors/__init__.py
def get_detector(name: str, **kwargs) -> BaseDetector:
    """Get a detector instance by name.

    Args:
        name: Detector name (e.g., "sliding_window", "pelt", "bottom_up").
        **kwargs: Parameters to pass to the detector constructor.

    Returns:
        Configured detector instance.

    Raises:
        ValueError: If detector name is not recognized.
        ImportError: If detector requires unavailable dependencies.

    Example:
        >>> detector = get_detector("pelt", penalty=5)
        >>> result = detector.fit_detect(x, y)
    """
    if name not in DETECTOR_REGISTRY:
        available = ", ".join(DETECTOR_REGISTRY.keys())
        raise ValueError(f"Unknown detector '{name}'. Available detectors: {available}")

    return DETECTOR_REGISTRY[name](**kwargs)

list_detectors()

List available detector names.

Returns:

Type Description
list[str]

List of registered detector names.

Source code in src/trend_classifier/detectors/__init__.py
def list_detectors() -> list[str]:
    """List available detector names.

    Returns:
        List of registered detector names.
    """
    return list(DETECTOR_REGISTRY.keys())

Base Classes

BaseDetector

Bases: ABC

Abstract base class for trend detection algorithms.

All detection algorithms should inherit from this class and implement the required methods. This enables the Strategy pattern for swapping algorithms at runtime.

Example

class MyDetector(BaseDetector): ... name = "my_detector" ... def fit(self, x, y): ... self._x, self._y = x, y ... return self ... def detect(self): ... # Detection logic here ... return DetectionResult(segments=SegmentList())

Source code in src/trend_classifier/detectors/base.py
class BaseDetector(ABC):
    """Abstract base class for trend detection algorithms.

    All detection algorithms should inherit from this class and implement
    the required methods. This enables the Strategy pattern for swapping
    algorithms at runtime.

    Example:
        >>> class MyDetector(BaseDetector):
        ...     name = "my_detector"
        ...     def fit(self, x, y):
        ...         self._x, self._y = x, y
        ...         return self
        ...     def detect(self):
        ...         # Detection logic here
        ...         return DetectionResult(segments=SegmentList())
    """

    @property
    @abstractmethod
    def name(self) -> str:
        """Return the algorithm name for identification."""

    @abstractmethod
    def fit(self, x: np.ndarray, y: np.ndarray) -> BaseDetector:
        """Fit the detector to the data.

        Args:
            x: Array of x values (indices or timestamps).
            y: Array of y values (signal values).

        Returns:
            Self for method chaining.
        """

    @abstractmethod
    def detect(self) -> DetectionResult:
        """Detect trend changes and return segments.

        Must be called after fit().

        Returns:
            DetectionResult containing segments and metadata.

        Raises:
            RuntimeError: If called before fit().
        """

    def fit_detect(self, x: np.ndarray, y: np.ndarray) -> DetectionResult:
        """Convenience method to fit and detect in one call.

        Args:
            x: Array of x values.
            y: Array of y values.

        Returns:
            DetectionResult containing segments and metadata.
        """
        return self.fit(x, y).detect()

    def _validate_fitted(self) -> None:
        """Check if the detector has been fitted.

        Raises:
            RuntimeError: If detector hasn't been fitted.
        """
        if not hasattr(self, "_x") or self._x is None:
            raise RuntimeError(
                f"{self.name} detector must be fitted before calling detect(). "
                "Call fit(x, y) first."
            )

    def _create_segment(
        self,
        start: int,
        stop: int,
        x: np.ndarray,
        y: np.ndarray,
        reason: str = "",
    ) -> Segment:
        """Create a Segment with computed trend statistics.

        Args:
            start: Start index of segment.
            stop: Stop index of segment (exclusive).
            x: Full x array.
            y: Full y array.
            reason: Reason for segment boundary.

        Returns:
            Segment with computed slope, offset, and statistics.
        """
        xx = x[start:stop]
        yy = y[start:stop]

        if len(xx) < 2:
            return Segment(
                start=start,
                stop=stop,
                slope=0.0,
                offset=float(yy[0]) if len(yy) > 0 else 0.0,
                reason_for_new_segment=reason,
            )

        # Fit linear trend
        fit = np.polyfit(xx, yy, deg=1)
        slope, offset = float(fit[0]), float(fit[1])

        # Calculate detrended statistics
        fit_fn = np.poly1d(fit)
        y_trend = fit_fn(xx)
        y_detrended = yy - y_trend

        std = float(np.std(y_detrended, ddof=0)) if len(y_detrended) > 0 else 0.0

        mean_yy = np.mean(yy)
        if mean_yy != 0:
            span = float(
                1000 * (np.max(y_detrended) - np.min(y_detrended)) / abs(mean_yy)
            )
        else:
            span = 0.0

        return Segment(
            start=start,
            stop=stop,
            slope=slope,
            offset=offset,
            std=std,
            span=span,
            reason_for_new_segment=reason,
        )

name abstractmethod property

Return the algorithm name for identification.

fit(x, y) abstractmethod

Fit the detector to the data.

Parameters:

Name Type Description Default
x ndarray

Array of x values (indices or timestamps).

required
y ndarray

Array of y values (signal values).

required

Returns:

Type Description
BaseDetector

Self for method chaining.

Source code in src/trend_classifier/detectors/base.py
@abstractmethod
def fit(self, x: np.ndarray, y: np.ndarray) -> BaseDetector:
    """Fit the detector to the data.

    Args:
        x: Array of x values (indices or timestamps).
        y: Array of y values (signal values).

    Returns:
        Self for method chaining.
    """

detect() abstractmethod

Detect trend changes and return segments.

Must be called after fit().

Returns:

Type Description
DetectionResult

DetectionResult containing segments and metadata.

Raises:

Type Description
RuntimeError

If called before fit().

Source code in src/trend_classifier/detectors/base.py
@abstractmethod
def detect(self) -> DetectionResult:
    """Detect trend changes and return segments.

    Must be called after fit().

    Returns:
        DetectionResult containing segments and metadata.

    Raises:
        RuntimeError: If called before fit().
    """

fit_detect(x, y)

Convenience method to fit and detect in one call.

Parameters:

Name Type Description Default
x ndarray

Array of x values.

required
y ndarray

Array of y values.

required

Returns:

Type Description
DetectionResult

DetectionResult containing segments and metadata.

Source code in src/trend_classifier/detectors/base.py
def fit_detect(self, x: np.ndarray, y: np.ndarray) -> DetectionResult:
    """Convenience method to fit and detect in one call.

    Args:
        x: Array of x values.
        y: Array of y values.

    Returns:
        DetectionResult containing segments and metadata.
    """
    return self.fit(x, y).detect()

DetectionResult dataclass

Result from a trend detection algorithm.

Attributes:

Name Type Description
segments SegmentList

List of detected segments with trend information.

breakpoints list[int]

List of indices where trend changes occur.

metadata dict

Algorithm-specific metadata (e.g., cost values, statistics).

Source code in src/trend_classifier/detectors/base.py
@dataclass
class DetectionResult:
    """Result from a trend detection algorithm.

    Attributes:
        segments: List of detected segments with trend information.
        breakpoints: List of indices where trend changes occur.
        metadata: Algorithm-specific metadata (e.g., cost values, statistics).
    """

    segments: SegmentList
    breakpoints: list[int] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)

    def __len__(self) -> int:
        """Return number of segments."""
        return len(self.segments)

    def to_dataframe(self):
        """Convert segments to DataFrame."""
        return self.segments.to_dataframe()

__len__()

Return number of segments.

Source code in src/trend_classifier/detectors/base.py
def __len__(self) -> int:
    """Return number of segments."""
    return len(self.segments)

to_dataframe()

Convert segments to DataFrame.

Source code in src/trend_classifier/detectors/base.py
def to_dataframe(self):
    """Convert segments to DataFrame."""
    return self.segments.to_dataframe()

Detector Implementations

SlidingWindowDetector

SlidingWindowDetector

Bases: BaseDetector

Sliding window trend detector using linear regression.

This is the original algorithm from trend_classifier. It slides a window across the time series, fits a linear trend in each window, and detects segment boundaries when slope or offset changes exceed thresholds.

Parameters:

Name Type Description Default
n int

Window size (number of samples per window).

60
overlap_ratio float

Overlap between adjacent windows (0-1).

0.33
alpha float | None

Threshold for slope change detection. None to disable.

2.0
beta float | None

Threshold for offset change detection. None to disable.

2.0
metrics_for_alpha Metrics

Error metric for slope comparison.

RELATIVE_ABSOLUTE_ERROR
metrics_for_beta Metrics

Error metric for offset comparison.

RELATIVE_ABSOLUTE_ERROR
Example

detector = SlidingWindowDetector(n=40, alpha=2.0) result = detector.fit_detect(x, y) print(f"Found {len(result.segments)} segments")

Source code in src/trend_classifier/detectors/sliding_window.py
class SlidingWindowDetector(BaseDetector):
    """Sliding window trend detector using linear regression.

    This is the original algorithm from trend_classifier. It slides a window
    across the time series, fits a linear trend in each window, and detects
    segment boundaries when slope or offset changes exceed thresholds.

    Args:
        n: Window size (number of samples per window).
        overlap_ratio: Overlap between adjacent windows (0-1).
        alpha: Threshold for slope change detection. None to disable.
        beta: Threshold for offset change detection. None to disable.
        metrics_for_alpha: Error metric for slope comparison.
        metrics_for_beta: Error metric for offset comparison.

    Example:
        >>> detector = SlidingWindowDetector(n=40, alpha=2.0)
        >>> result = detector.fit_detect(x, y)
        >>> print(f"Found {len(result.segments)} segments")
    """

    name = "sliding_window"

    def __init__(
        self,
        n: int = 60,
        overlap_ratio: float = 0.33,
        alpha: float | None = 2.0,
        beta: float | None = 2.0,
        metrics_for_alpha: Metrics = Metrics.RELATIVE_ABSOLUTE_ERROR,
        metrics_for_beta: Metrics = Metrics.RELATIVE_ABSOLUTE_ERROR,
    ):
        self.n = n
        self.overlap_ratio = overlap_ratio
        self.alpha = alpha
        self.beta = beta
        self.metrics_for_alpha = metrics_for_alpha
        self.metrics_for_beta = metrics_for_beta

        self._x: np.ndarray | None = None
        self._y: np.ndarray | None = None

    @classmethod
    def from_config(cls, config: Config) -> SlidingWindowDetector:
        """Create detector from a Config object.

        Args:
            config: Configuration object with detector parameters.

        Returns:
            Configured SlidingWindowDetector instance.
        """
        return cls(
            n=config.N,
            overlap_ratio=config.overlap_ratio,
            alpha=config.alpha,
            beta=config.beta,
            metrics_for_alpha=config.metrics_for_alpha,
            metrics_for_beta=config.metrics_for_beta,
        )

    def fit(self, x: np.ndarray, y: np.ndarray) -> SlidingWindowDetector:
        """Fit the detector to data.

        Args:
            x: Array of x values (indices).
            y: Array of y values (signal).

        Returns:
            Self for method chaining.

        Raises:
            ValueError: If data is too short for window size.
        """
        self._x = np.asarray(x, dtype=np.float64)
        self._y = np.asarray(y, dtype=np.float64)

        data_len = len(self._x)
        if data_len < self.n:
            raise ValueError(
                f"Data length ({data_len}) must be at least window size N ({self.n}). "
                f"Reduce N or provide more data."
            )
        if data_len < 2 * self.n:
            logger.warning(
                f"Data length ({data_len}) is less than 2*N ({2 * self.n}). "
                "Results may be limited to a single segment."
            )

        return self

    def detect(self, progress_callback=None) -> DetectionResult:
        """Detect trend segments using sliding window analysis.

        Args:
            progress_callback: Optional callback(current, total) for progress.

        Returns:
            DetectionResult with segments and breakpoints.

        Raises:
            RuntimeError: If called before fit().
        """
        self._validate_fitted()

        segments = SegmentList()
        breakpoints = []
        new_segment = {"s_start": 0, "slopes": [], "offsets": [], "starts": []}

        offset = self._calculate_offset()
        prev_fit = None

        total_iterations = max(1, (len(self._x) - self.n) // offset)

        for iteration, start in enumerate(range(0, len(self._x) - self.n, offset)):
            if progress_callback is not None and iteration % 100 == 0:
                progress_callback(iteration, total_iterations)

            end = start + self.n
            fit = np.polyfit(self._x[start:end], self._y[start:end], deg=1)
            new_segment["slopes"].append(fit[0])
            new_segment["offsets"].append(fit[1])
            new_segment["starts"].append(start)

            if prev_fit is not None:
                is_slope_different = self._check_slope_change(prev_fit, fit)
                is_offset_different = self._check_offset_change(prev_fit, fit)

                if is_slope_different or is_offset_different:
                    s_stop = self._determine_boundary(start, offset)
                    breakpoints.append(s_stop)
                    reason = self._describe_reason(
                        is_slope_different, is_offset_different
                    )

                    segment = self._finalize_segment(new_segment, s_stop, reason)
                    segments.append(segment)

                    new_segment = {
                        "s_start": s_stop + 1,
                        "slopes": [],
                        "offsets": [],
                        "starts": [],
                    }

            prev_fit = fit

        # Add final segment
        last_segment = self._finalize_segment(new_segment, len(self._x))
        segments.append(last_segment)

        # Compute detailed statistics for all segments
        self._compute_segment_statistics(segments)

        return DetectionResult(
            segments=segments,
            breakpoints=breakpoints,
            metadata={
                "algorithm": self.name,
                "n": self.n,
                "overlap_ratio": self.overlap_ratio,
                "alpha": self.alpha,
                "beta": self.beta,
            },
        )

    def _calculate_offset(self) -> int:
        """Calculate step size between windows."""
        offset = int(self.n * self.overlap_ratio)
        if offset == 0:
            logger.warning(
                f"Overlap ratio {self.overlap_ratio} too small for N={self.n}, using offset=1"
            )
            offset = 1
        return offset

    def _check_slope_change(self, prev_fit, curr_fit) -> bool:
        """Check if slope changed significantly."""
        if self.alpha is None:
            return False
        prev_slope = float(prev_fit[0])
        curr_slope = float(curr_fit[0])
        error = calculate_error(prev_slope, curr_slope, self.metrics_for_alpha)
        return error >= self.alpha

    def _check_offset_change(self, prev_fit, curr_fit) -> bool:
        """Check if offset changed significantly."""
        if self.beta is None:
            return False
        prev_offset = float(prev_fit[1])
        curr_offset = float(curr_fit[1])
        error = calculate_error(prev_offset, curr_offset, self.metrics_for_beta)
        return error >= self.beta

    def _determine_boundary(self, start: int, offset: int) -> int:
        """Determine segment boundary point."""
        return int(start + offset / 2)

    def _describe_reason(
        self, is_slope_different: bool, is_offset_different: bool
    ) -> str:
        """Describe reason for creating a new segment."""
        if is_slope_different and is_offset_different:
            return "slope and offset"
        return "slope" if is_slope_different else "offset"

    def _finalize_segment(
        self, segment_data: dict, stop: int, reason: str = ""
    ) -> Segment:
        """Create a Segment from accumulated window data."""
        segment = Segment(
            start=int(segment_data["s_start"]),
            stop=int(stop),
            slopes=segment_data["slopes"],
            offsets=segment_data["offsets"],
            starts=segment_data["starts"],
            reason_for_new_segment=reason,
        )
        segment.remove_outstanding_windows(self.n)
        return segment

    def _compute_segment_statistics(self, segments: SegmentList) -> None:
        """Compute detailed statistics for each segment."""
        for segment in segments:
            start, stop = segment.start, segment.stop
            xx = self._x[start : stop + 1]
            yy = self._y[start : stop + 1]

            if len(xx) < 2:
                segment.std = 0.0
                segment.span = 0.0
                segment.slope = 0.0
                segment.offset = 0.0
                segment.slopes_std = 0.0
                segment.offsets_std = 0.0
                continue

            fit = np.polyfit(xx, yy, deg=1)
            fit_fn = np.poly1d(fit)
            y_trend = fit_fn(xx)
            y_detrended = yy - y_trend

            segment.slope = float(fit[0])
            segment.offset = float(fit[1])
            segment.std = (
                float(np.std(y_detrended, ddof=0)) if len(y_detrended) > 0 else 0.0
            )

            mean_yy = np.mean(yy)
            if mean_yy != 0:
                segment.span = float(
                    1000 * (np.max(y_detrended) - np.min(y_detrended)) / abs(mean_yy)
                )
            else:
                segment.span = 0.0

            if segment.slopes:
                segment.slopes_std = float(np.std(segment.slopes, ddof=0))
            if segment.offsets:
                segment.offsets_std = float(np.std(segment.offsets, ddof=0))

__init__(n=60, overlap_ratio=0.33, alpha=2.0, beta=2.0, metrics_for_alpha=Metrics.RELATIVE_ABSOLUTE_ERROR, metrics_for_beta=Metrics.RELATIVE_ABSOLUTE_ERROR)

Source code in src/trend_classifier/detectors/sliding_window.py
def __init__(
    self,
    n: int = 60,
    overlap_ratio: float = 0.33,
    alpha: float | None = 2.0,
    beta: float | None = 2.0,
    metrics_for_alpha: Metrics = Metrics.RELATIVE_ABSOLUTE_ERROR,
    metrics_for_beta: Metrics = Metrics.RELATIVE_ABSOLUTE_ERROR,
):
    self.n = n
    self.overlap_ratio = overlap_ratio
    self.alpha = alpha
    self.beta = beta
    self.metrics_for_alpha = metrics_for_alpha
    self.metrics_for_beta = metrics_for_beta

    self._x: np.ndarray | None = None
    self._y: np.ndarray | None = None

from_config(config) classmethod

Create detector from a Config object.

Parameters:

Name Type Description Default
config Config

Configuration object with detector parameters.

required

Returns:

Type Description
SlidingWindowDetector

Configured SlidingWindowDetector instance.

Source code in src/trend_classifier/detectors/sliding_window.py
@classmethod
def from_config(cls, config: Config) -> SlidingWindowDetector:
    """Create detector from a Config object.

    Args:
        config: Configuration object with detector parameters.

    Returns:
        Configured SlidingWindowDetector instance.
    """
    return cls(
        n=config.N,
        overlap_ratio=config.overlap_ratio,
        alpha=config.alpha,
        beta=config.beta,
        metrics_for_alpha=config.metrics_for_alpha,
        metrics_for_beta=config.metrics_for_beta,
    )

fit(x, y)

Fit the detector to data.

Parameters:

Name Type Description Default
x ndarray

Array of x values (indices).

required
y ndarray

Array of y values (signal).

required

Returns:

Type Description
SlidingWindowDetector

Self for method chaining.

Raises:

Type Description
ValueError

If data is too short for window size.

Source code in src/trend_classifier/detectors/sliding_window.py
def fit(self, x: np.ndarray, y: np.ndarray) -> SlidingWindowDetector:
    """Fit the detector to data.

    Args:
        x: Array of x values (indices).
        y: Array of y values (signal).

    Returns:
        Self for method chaining.

    Raises:
        ValueError: If data is too short for window size.
    """
    self._x = np.asarray(x, dtype=np.float64)
    self._y = np.asarray(y, dtype=np.float64)

    data_len = len(self._x)
    if data_len < self.n:
        raise ValueError(
            f"Data length ({data_len}) must be at least window size N ({self.n}). "
            f"Reduce N or provide more data."
        )
    if data_len < 2 * self.n:
        logger.warning(
            f"Data length ({data_len}) is less than 2*N ({2 * self.n}). "
            "Results may be limited to a single segment."
        )

    return self

detect(progress_callback=None)

Detect trend segments using sliding window analysis.

Parameters:

Name Type Description Default
progress_callback

Optional callback(current, total) for progress.

None

Returns:

Type Description
DetectionResult

DetectionResult with segments and breakpoints.

Raises:

Type Description
RuntimeError

If called before fit().

Source code in src/trend_classifier/detectors/sliding_window.py
def detect(self, progress_callback=None) -> DetectionResult:
    """Detect trend segments using sliding window analysis.

    Args:
        progress_callback: Optional callback(current, total) for progress.

    Returns:
        DetectionResult with segments and breakpoints.

    Raises:
        RuntimeError: If called before fit().
    """
    self._validate_fitted()

    segments = SegmentList()
    breakpoints = []
    new_segment = {"s_start": 0, "slopes": [], "offsets": [], "starts": []}

    offset = self._calculate_offset()
    prev_fit = None

    total_iterations = max(1, (len(self._x) - self.n) // offset)

    for iteration, start in enumerate(range(0, len(self._x) - self.n, offset)):
        if progress_callback is not None and iteration % 100 == 0:
            progress_callback(iteration, total_iterations)

        end = start + self.n
        fit = np.polyfit(self._x[start:end], self._y[start:end], deg=1)
        new_segment["slopes"].append(fit[0])
        new_segment["offsets"].append(fit[1])
        new_segment["starts"].append(start)

        if prev_fit is not None:
            is_slope_different = self._check_slope_change(prev_fit, fit)
            is_offset_different = self._check_offset_change(prev_fit, fit)

            if is_slope_different or is_offset_different:
                s_stop = self._determine_boundary(start, offset)
                breakpoints.append(s_stop)
                reason = self._describe_reason(
                    is_slope_different, is_offset_different
                )

                segment = self._finalize_segment(new_segment, s_stop, reason)
                segments.append(segment)

                new_segment = {
                    "s_start": s_stop + 1,
                    "slopes": [],
                    "offsets": [],
                    "starts": [],
                }

        prev_fit = fit

    # Add final segment
    last_segment = self._finalize_segment(new_segment, len(self._x))
    segments.append(last_segment)

    # Compute detailed statistics for all segments
    self._compute_segment_statistics(segments)

    return DetectionResult(
        segments=segments,
        breakpoints=breakpoints,
        metadata={
            "algorithm": self.name,
            "n": self.n,
            "overlap_ratio": self.overlap_ratio,
            "alpha": self.alpha,
            "beta": self.beta,
        },
    )

BottomUpDetector

BottomUpDetector

Bases: BaseDetector

Bottom-up merge segmentation detector.

This algorithm starts with many small segments and iteratively merges adjacent segments with the smallest merge cost until reaching the desired number of segments or a cost threshold.

This approach is good for noisy data as it considers the full signal before making decisions, unlike sliding window methods.

Parameters:

Name Type Description Default
max_segments int

Maximum number of segments to produce.

10
merge_cost_threshold float | None

Stop merging when cost exceeds this value. If None, uses max_segments to determine stopping point.

None
initial_segment_size int

Size of initial segments before merging.

5
Example

detector = BottomUpDetector(max_segments=10) result = detector.fit_detect(x, y) print(f"Found {len(result.segments)} segments")

Source code in src/trend_classifier/detectors/bottom_up.py
class BottomUpDetector(BaseDetector):
    """Bottom-up merge segmentation detector.

    This algorithm starts with many small segments and iteratively merges
    adjacent segments with the smallest merge cost until reaching the
    desired number of segments or a cost threshold.

    This approach is good for noisy data as it considers the full signal
    before making decisions, unlike sliding window methods.

    Args:
        max_segments: Maximum number of segments to produce.
        merge_cost_threshold: Stop merging when cost exceeds this value.
            If None, uses max_segments to determine stopping point.
        initial_segment_size: Size of initial segments before merging.

    Example:
        >>> detector = BottomUpDetector(max_segments=10)
        >>> result = detector.fit_detect(x, y)
        >>> print(f"Found {len(result.segments)} segments")
    """

    name = "bottom_up"

    def __init__(
        self,
        max_segments: int = 10,
        merge_cost_threshold: float | None = None,
        initial_segment_size: int = 5,
    ):
        self.max_segments = max_segments
        self.merge_cost_threshold = merge_cost_threshold
        self.initial_segment_size = initial_segment_size

        self._x: np.ndarray | None = None
        self._y: np.ndarray | None = None

    def fit(self, x: np.ndarray, y: np.ndarray) -> BottomUpDetector:
        """Fit the detector to data.

        Args:
            x: Array of x values (indices).
            y: Array of y values (signal).

        Returns:
            Self for method chaining.
        """
        self._x = np.asarray(x, dtype=np.float64)
        self._y = np.asarray(y, dtype=np.float64)
        return self

    def detect(self) -> DetectionResult:
        """Detect segments using bottom-up merging.

        Returns:
            DetectionResult with segments and breakpoints.

        Raises:
            RuntimeError: If called before fit().
        """
        self._validate_fitted()

        n = len(self._y)

        # Create initial fine-grained segments
        breakpoints = list(
            range(self.initial_segment_size, n, self.initial_segment_size)
        )

        # Iteratively merge segments with lowest cost
        while len(breakpoints) >= self.max_segments:
            if len(breakpoints) == 0:
                break

            # Find the merge with minimum cost
            min_cost = float("inf")
            min_idx = 0

            for i in range(len(breakpoints)):
                cost = self._merge_cost(breakpoints, i)
                if cost < min_cost:
                    min_cost = cost
                    min_idx = i

            # Check threshold
            if (
                self.merge_cost_threshold is not None
                and min_cost > self.merge_cost_threshold
            ):
                break

            # Perform merge (remove the breakpoint)
            breakpoints.pop(min_idx)

        # Create segments from final breakpoints
        segments = self._create_segments_from_breakpoints(breakpoints)

        return DetectionResult(
            segments=segments,
            breakpoints=breakpoints,
            metadata={
                "algorithm": self.name,
                "max_segments": self.max_segments,
                "initial_segment_size": self.initial_segment_size,
            },
        )

    def _merge_cost(self, breakpoints: list[int], idx: int) -> float:
        """Calculate cost of merging segments around breakpoint at idx.

        The cost is the increase in total squared error when merging
        two adjacent segments into one.
        """
        # Get segment boundaries
        start = 0 if idx == 0 else breakpoints[idx - 1]
        middle = breakpoints[idx]
        end = len(self._y) if idx == len(breakpoints) - 1 else breakpoints[idx + 1]

        # Calculate error for separate segments
        error_left = self._segment_error(start, middle)
        error_right = self._segment_error(middle, end)

        # Calculate error for merged segment
        error_merged = self._segment_error(start, end)

        # Cost is the increase in error
        return error_merged - (error_left + error_right)

    def _segment_error(self, start: int, stop: int) -> float:
        """Calculate squared error of linear fit for segment."""
        if stop <= start:
            return 0.0

        xx = self._x[start:stop]
        yy = self._y[start:stop]

        if len(xx) < 2:
            return 0.0

        # Fit linear trend
        fit = np.polyfit(xx, yy, deg=1)
        fit_fn = np.poly1d(fit)
        y_pred = fit_fn(xx)

        # Sum of squared errors
        return float(np.sum((yy - y_pred) ** 2))

    def _create_segments_from_breakpoints(self, breakpoints: list[int]) -> SegmentList:
        """Convert breakpoints to Segment objects."""
        segments = SegmentList()

        all_points = [0, *sorted(breakpoints), len(self._y)]

        for i in range(len(all_points) - 1):
            start = all_points[i]
            stop = all_points[i + 1]

            segment = self._create_segment_with_stats(start, stop)
            segments.append(segment)

        return segments

    def _create_segment_with_stats(self, start: int, stop: int) -> Segment:
        """Create a segment with computed statistics."""
        xx = self._x[start:stop]
        yy = self._y[start:stop]

        if len(xx) < 2:
            return Segment(
                start=start,
                stop=stop,
                slope=0.0,
                offset=float(yy[0]) if len(yy) > 0 else 0.0,
            )

        # Fit linear trend
        fit = np.polyfit(xx, yy, deg=1)
        slope, offset = float(fit[0]), float(fit[1])

        # Calculate detrended statistics
        fit_fn = np.poly1d(fit)
        y_trend = fit_fn(xx)
        y_detrended = yy - y_trend

        std = float(np.std(y_detrended, ddof=0)) if len(y_detrended) > 0 else 0.0

        mean_yy = np.mean(yy)
        if mean_yy != 0:
            span = float(
                1000 * (np.max(y_detrended) - np.min(y_detrended)) / abs(mean_yy)
            )
        else:
            span = 0.0

        return Segment(
            start=start,
            stop=stop,
            slope=slope,
            offset=offset,
            std=std,
            span=span,
            reason_for_new_segment="bottom_up",
        )

__init__(max_segments=10, merge_cost_threshold=None, initial_segment_size=5)

Source code in src/trend_classifier/detectors/bottom_up.py
def __init__(
    self,
    max_segments: int = 10,
    merge_cost_threshold: float | None = None,
    initial_segment_size: int = 5,
):
    self.max_segments = max_segments
    self.merge_cost_threshold = merge_cost_threshold
    self.initial_segment_size = initial_segment_size

    self._x: np.ndarray | None = None
    self._y: np.ndarray | None = None

fit(x, y)

Fit the detector to data.

Parameters:

Name Type Description Default
x ndarray

Array of x values (indices).

required
y ndarray

Array of y values (signal).

required

Returns:

Type Description
BottomUpDetector

Self for method chaining.

Source code in src/trend_classifier/detectors/bottom_up.py
def fit(self, x: np.ndarray, y: np.ndarray) -> BottomUpDetector:
    """Fit the detector to data.

    Args:
        x: Array of x values (indices).
        y: Array of y values (signal).

    Returns:
        Self for method chaining.
    """
    self._x = np.asarray(x, dtype=np.float64)
    self._y = np.asarray(y, dtype=np.float64)
    return self

detect()

Detect segments using bottom-up merging.

Returns:

Type Description
DetectionResult

DetectionResult with segments and breakpoints.

Raises:

Type Description
RuntimeError

If called before fit().

Source code in src/trend_classifier/detectors/bottom_up.py
def detect(self) -> DetectionResult:
    """Detect segments using bottom-up merging.

    Returns:
        DetectionResult with segments and breakpoints.

    Raises:
        RuntimeError: If called before fit().
    """
    self._validate_fitted()

    n = len(self._y)

    # Create initial fine-grained segments
    breakpoints = list(
        range(self.initial_segment_size, n, self.initial_segment_size)
    )

    # Iteratively merge segments with lowest cost
    while len(breakpoints) >= self.max_segments:
        if len(breakpoints) == 0:
            break

        # Find the merge with minimum cost
        min_cost = float("inf")
        min_idx = 0

        for i in range(len(breakpoints)):
            cost = self._merge_cost(breakpoints, i)
            if cost < min_cost:
                min_cost = cost
                min_idx = i

        # Check threshold
        if (
            self.merge_cost_threshold is not None
            and min_cost > self.merge_cost_threshold
        ):
            break

        # Perform merge (remove the breakpoint)
        breakpoints.pop(min_idx)

    # Create segments from final breakpoints
    segments = self._create_segments_from_breakpoints(breakpoints)

    return DetectionResult(
        segments=segments,
        breakpoints=breakpoints,
        metadata={
            "algorithm": self.name,
            "max_segments": self.max_segments,
            "initial_segment_size": self.initial_segment_size,
        },
    )

PELTDetector

PELTDetector

Bases: BaseDetector

PELT change point detector using the ruptures library.

PELT (Pruned Exact Linear Time) is an efficient algorithm for detecting multiple change points in a signal. It finds the optimal segmentation by minimizing a cost function with a penalty for each change point.

This detector requires the ruptures library to be installed: pip install ruptures

Parameters:

Name Type Description Default
model str

Cost model for segment evaluation. Options: - "l2": Least squares (default, good for mean shifts) - "l1": Least absolute deviation (robust to outliers) - "rbf": Kernel-based (detects distribution changes) - "linear": Linear regression model

'l2'
penalty float | None

Penalty value for adding change points. Higher values result in fewer segments. If None, uses BIC-derived penalty.

None
min_size int

Minimum segment length.

2
jump int

Subsample factor for change point candidates.

1
Example

detector = PELTDetector(model="l2", penalty=3) result = detector.fit_detect(x, y) print(f"Found {len(result.segments)} segments")

Note

For financial time series, try: - model="linear" with penalty=1-5 for trend detection - model="l2" with penalty=5-20 for level shifts

Source code in src/trend_classifier/detectors/pelt.py
class PELTDetector(BaseDetector):
    """PELT change point detector using the ruptures library.

    PELT (Pruned Exact Linear Time) is an efficient algorithm for detecting
    multiple change points in a signal. It finds the optimal segmentation
    by minimizing a cost function with a penalty for each change point.

    This detector requires the `ruptures` library to be installed:
        pip install ruptures

    Args:
        model: Cost model for segment evaluation. Options:
            - "l2": Least squares (default, good for mean shifts)
            - "l1": Least absolute deviation (robust to outliers)
            - "rbf": Kernel-based (detects distribution changes)
            - "linear": Linear regression model
        penalty: Penalty value for adding change points. Higher values
            result in fewer segments. If None, uses BIC-derived penalty.
        min_size: Minimum segment length.
        jump: Subsample factor for change point candidates.

    Example:
        >>> detector = PELTDetector(model="l2", penalty=3)
        >>> result = detector.fit_detect(x, y)
        >>> print(f"Found {len(result.segments)} segments")

    Note:
        For financial time series, try:
        - model="linear" with penalty=1-5 for trend detection
        - model="l2" with penalty=5-20 for level shifts
    """

    name = "pelt"

    def __init__(
        self,
        model: str = "l2",
        penalty: float | None = None,
        min_size: int = 2,
        jump: int = 1,
    ):
        self._check_ruptures_available()

        self.model = model
        self.penalty = penalty
        self.min_size = min_size
        self.jump = jump

        self._x: np.ndarray | None = None
        self._y: np.ndarray | None = None
        self._algo = None

    @staticmethod
    def _check_ruptures_available() -> None:
        """Check if ruptures library is installed."""
        try:
            import ruptures  # noqa: F401
        except ImportError as e:
            raise ImportError(
                "PELTDetector requires the 'ruptures' library. "
                "Install it with: pip install ruptures"
            ) from e

    def fit(self, x: np.ndarray, y: np.ndarray) -> PELTDetector:
        """Fit the detector to data.

        Args:
            x: Array of x values (indices).
            y: Array of y values (signal).

        Returns:
            Self for method chaining.
        """
        import ruptures as rpt

        self._x = np.asarray(x, dtype=np.float64)
        self._y = np.asarray(y, dtype=np.float64)

        # Reshape for ruptures (expects 2D array)
        signal = self._y.reshape(-1, 1)

        # Create and fit the PELT algorithm
        self._algo = rpt.Pelt(model=self.model, min_size=self.min_size, jump=self.jump)
        self._algo.fit(signal)

        return self

    def detect(self) -> DetectionResult:
        """Detect change points and create segments.

        Returns:
            DetectionResult with segments and breakpoints.

        Raises:
            RuntimeError: If called before fit().
        """
        self._validate_fitted()

        # Calculate penalty if not provided
        penalty = self.penalty
        if penalty is None:
            # BIC-like penalty based on data length
            penalty = np.log(len(self._y)) * 2

        # Get change points
        breakpoints = self._algo.predict(pen=penalty)

        # ruptures returns breakpoints including the end, remove the last one
        if breakpoints and breakpoints[-1] == len(self._y):
            breakpoints = breakpoints[:-1]

        # Create segments from breakpoints
        segments = self._create_segments_from_breakpoints(breakpoints)

        return DetectionResult(
            segments=segments,
            breakpoints=breakpoints,
            metadata={
                "algorithm": self.name,
                "model": self.model,
                "penalty": penalty,
                "min_size": self.min_size,
            },
        )

    def _create_segments_from_breakpoints(self, breakpoints: list[int]) -> SegmentList:
        """Convert breakpoints to Segment objects with statistics."""
        segments = SegmentList()

        # Add start point
        all_points = [0, *list(breakpoints), len(self._y)]

        for i in range(len(all_points) - 1):
            start = all_points[i]
            stop = all_points[i + 1]

            segment = self._create_segment_with_stats(start, stop)
            segments.append(segment)

        return segments

    def _create_segment_with_stats(self, start: int, stop: int) -> Segment:
        """Create a segment with computed statistics."""
        xx = self._x[start:stop]
        yy = self._y[start:stop]

        if len(xx) < 2:
            return Segment(
                start=start,
                stop=stop,
                slope=0.0,
                offset=float(yy[0]) if len(yy) > 0 else 0.0,
            )

        # Fit linear trend
        fit = np.polyfit(xx, yy, deg=1)
        slope, offset = float(fit[0]), float(fit[1])

        # Calculate detrended statistics
        fit_fn = np.poly1d(fit)
        y_trend = fit_fn(xx)
        y_detrended = yy - y_trend

        std = float(np.std(y_detrended, ddof=0)) if len(y_detrended) > 0 else 0.0

        mean_yy = np.mean(yy)
        if mean_yy != 0:
            span = float(
                1000 * (np.max(y_detrended) - np.min(y_detrended)) / abs(mean_yy)
            )
        else:
            span = 0.0

        return Segment(
            start=start,
            stop=stop,
            slope=slope,
            offset=offset,
            std=std,
            span=span,
            reason_for_new_segment="pelt",
        )

__init__(model='l2', penalty=None, min_size=2, jump=1)

Source code in src/trend_classifier/detectors/pelt.py
def __init__(
    self,
    model: str = "l2",
    penalty: float | None = None,
    min_size: int = 2,
    jump: int = 1,
):
    self._check_ruptures_available()

    self.model = model
    self.penalty = penalty
    self.min_size = min_size
    self.jump = jump

    self._x: np.ndarray | None = None
    self._y: np.ndarray | None = None
    self._algo = None

fit(x, y)

Fit the detector to data.

Parameters:

Name Type Description Default
x ndarray

Array of x values (indices).

required
y ndarray

Array of y values (signal).

required

Returns:

Type Description
PELTDetector

Self for method chaining.

Source code in src/trend_classifier/detectors/pelt.py
def fit(self, x: np.ndarray, y: np.ndarray) -> PELTDetector:
    """Fit the detector to data.

    Args:
        x: Array of x values (indices).
        y: Array of y values (signal).

    Returns:
        Self for method chaining.
    """
    import ruptures as rpt

    self._x = np.asarray(x, dtype=np.float64)
    self._y = np.asarray(y, dtype=np.float64)

    # Reshape for ruptures (expects 2D array)
    signal = self._y.reshape(-1, 1)

    # Create and fit the PELT algorithm
    self._algo = rpt.Pelt(model=self.model, min_size=self.min_size, jump=self.jump)
    self._algo.fit(signal)

    return self

detect()

Detect change points and create segments.

Returns:

Type Description
DetectionResult

DetectionResult with segments and breakpoints.

Raises:

Type Description
RuntimeError

If called before fit().

Source code in src/trend_classifier/detectors/pelt.py
def detect(self) -> DetectionResult:
    """Detect change points and create segments.

    Returns:
        DetectionResult with segments and breakpoints.

    Raises:
        RuntimeError: If called before fit().
    """
    self._validate_fitted()

    # Calculate penalty if not provided
    penalty = self.penalty
    if penalty is None:
        # BIC-like penalty based on data length
        penalty = np.log(len(self._y)) * 2

    # Get change points
    breakpoints = self._algo.predict(pen=penalty)

    # ruptures returns breakpoints including the end, remove the last one
    if breakpoints and breakpoints[-1] == len(self._y):
        breakpoints = breakpoints[:-1]

    # Create segments from breakpoints
    segments = self._create_segments_from_breakpoints(breakpoints)

    return DetectionResult(
        segments=segments,
        breakpoints=breakpoints,
        metadata={
            "algorithm": self.name,
            "model": self.model,
            "penalty": penalty,
            "min_size": self.min_size,
        },
    )