Source code for stupidb.functions.ranking.core
from __future__ import annotations
import abc
from typing import Sequence
from ...aggregator import Aggregate, Aggregator
from ...protocols import Comparable
from ...row import AbstractRow
from ...typehints import Getter, Output, Result, T
[docs]class RankingAggregator(Aggregator["RankingAggregate", Result]):
"""Custom aggregator for ranking window functions.
This aggregator is required for ranking functions because ranking functions
take no arguments, but need access to the rows produced by the ordering
key.
See Also
--------
stupidb.navigation.NavigationAggregator
stupidb.associative.SegmentTree
"""
__slots__ = ("aggregate",)
def __init__(
self,
order_by_values: Sequence[tuple[Comparable | None, ...]],
aggregate_type: type[RankingAggregate],
) -> None:
self.aggregate = aggregate_type(order_by_values)
[docs] def query(self, begin: int, end: int) -> Result | None:
"""Compute the aggregation over the range of rows between `begin` and `end`."""
return self.aggregate.execute(begin, end)
[docs]class RankingAggregate(Aggregate[Output]):
"""Base ranking aggregation class."""
__slots__ = ("order_by_values",)
def __init__(
self, order_by_values: Sequence[tuple[Comparable | None, ...]]
) -> None:
super().__init__()
self.order_by_values = order_by_values
[docs] @abc.abstractmethod
def execute(self, begin: int, end: int) -> Output | None:
"""Compute an abstract row rank value for rows between `begin` and `end`."""
[docs] @classmethod
def prepare(
cls,
possible_peers: Sequence[AbstractRow],
getters: tuple[Getter, ...],
order_by_columns: Sequence[str],
) -> RankingAggregator[Output]:
"""Construct the aggregator for ranking."""
order_by_values = [
tuple(peer[column] for column in order_by_columns)
for peer in possible_peers
]
return cls.aggregator_class(order_by_values)
@classmethod
def aggregator_class(
cls, values: Sequence[tuple[T | None, ...]]
) -> RankingAggregator[Output]:
return RankingAggregator(values, cls)