Source code for stupidb.aggregator

"""Base aggregator interface."""

from __future__ import annotations

import abc
from typing import Generic, Sequence, TypeVar

from .row import AbstractRow
from .typehints import Getter, Output, Result, T

AggClass = TypeVar("AggClass", covariant=True)


[docs]class Aggregator(Generic[AggClass, Result], abc.ABC): """Interface for aggregators. Aggregators must implement the :meth:`~stupidb.aggregator.Aggregator.query` method. Aggregators are tied to a specific kind of aggregation. See the :meth:`~stupidb.aggregator.Aggregate.prepare` method for how to provide a custom aggregator. See Also -------- stupidb.associative.segmenttree.SegmentTree stupidb.functions.navigation.core.NavigationAggregator """ @abc.abstractmethod def __init__(self, arguments: Sequence[T], cls: AggClass) -> None: """Initialize an aggregator from `arguments` and `cls`."""
[docs] @abc.abstractmethod def query(self, begin: int, end: int) -> Result | None: """Query the aggregator over the range from `begin` to `end`."""
[docs]class Aggregate(Generic[Output], abc.ABC): """An aggregate or window function.""" __slots__ = ()
[docs] @classmethod def prepare( cls, possible_peers: Sequence[AbstractRow], getters: tuple[Getter, ...], order_by_columns: Sequence[str], ) -> Aggregator[Aggregate[Output], Output]: """Prepare an aggregation of this type for computation.""" arguments = [ tuple(getter(peer) for getter in getters) for peer in possible_peers ] return cls.aggregator_class(arguments)
@classmethod @abc.abstractmethod def aggregator_class( cls, inputs: Sequence[tuple[T | None, ...]] ) -> Aggregator[Aggregate[Output], Output]: # pragma: no cover ...