Skip to content

aggregates

Aggregation functions

Classes:

Functions:

Accumulator

Accumulator(aggregation: CompValue)

abstract base class for different aggregation functions

Methods:

Attributes:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation: CompValue):
    self.get_value: Callable[[], Optional[Literal]]
    self.update: Callable[[FrozenBindings, Aggregator], None]
    self.var = aggregation.res
    self.expr = aggregation.vars
    if not aggregation.distinct:
        # type error: Cannot assign to a method
        self.use_row = self.dont_care  # type: ignore[method-assign]
        self.distinct = False
    else:
        self.distinct = aggregation.distinct
        self.seen: Set[Any] = set()

distinct instance-attribute

distinct = False

expr instance-attribute

expr = vars

get_value instance-attribute

get_value: Callable[[], Optional[Literal]]

seen instance-attribute

seen: Set[Any] = set()

update instance-attribute

update: Callable[[FrozenBindings, Aggregator], None]

var instance-attribute

var = res

dont_care

dont_care(row: FrozenBindings) -> bool

skips distinct test

Source code in rdflib/plugins/sparql/aggregates.py
def dont_care(self, row: FrozenBindings) -> bool:
    """skips distinct test"""
    return True

set_value

set_value(bindings: MutableMapping[Variable, Identifier]) -> None

sets final value in bindings

Source code in rdflib/plugins/sparql/aggregates.py
def set_value(self, bindings: MutableMapping[Variable, Identifier]) -> None:
    """sets final value in bindings"""
    # type error: Incompatible types in assignment (expression has type "Optional[Literal]", target has type "Identifier")
    bindings[self.var] = self.get_value()  # type: ignore[assignment]

use_row

use_row(row: FrozenBindings) -> bool

tests distinct with set

Source code in rdflib/plugins/sparql/aggregates.py
def use_row(self, row: FrozenBindings) -> bool:
    """tests distinct with set"""
    return _eval(self.expr, row) not in self.seen

Aggregator

Aggregator(aggregations: List[CompValue])

combines different Accumulator objects

Methods:

Attributes:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregations: List[CompValue]):
    self.bindings: Dict[Variable, Identifier] = {}
    self.accumulators: Dict[str, Accumulator] = {}
    for a in aggregations:
        accumulator_class = self.accumulator_classes.get(a.name)
        if accumulator_class is None:
            raise Exception("Unknown aggregate function " + a.name)
        self.accumulators[a.res] = accumulator_class(a)

accumulator_classes class-attribute instance-attribute

accumulator_classes = {'Aggregate_Count': Counter, 'Aggregate_Sample': Sample, 'Aggregate_Sum': Sum, 'Aggregate_Avg': Average, 'Aggregate_Min': Minimum, 'Aggregate_Max': Maximum, 'Aggregate_GroupConcat': GroupConcat}

accumulators instance-attribute

accumulators: Dict[str, Accumulator] = {}

bindings instance-attribute

bindings: Dict[Variable, Identifier] = {}

get_bindings

get_bindings() -> Mapping[Variable, Identifier]

calculate and set last values

Source code in rdflib/plugins/sparql/aggregates.py
def get_bindings(self) -> Mapping[Variable, Identifier]:
    """calculate and set last values"""
    for acc in self.accumulators.values():
        acc.set_value(self.bindings)
    return self.bindings

update

update(row: FrozenBindings) -> None

update all own accumulators

Source code in rdflib/plugins/sparql/aggregates.py
def update(self, row: FrozenBindings) -> None:
    """update all own accumulators"""
    # SAMPLE accumulators may delete themselves
    # => iterate over list not generator

    for acc in list(self.accumulators.values()):
        if acc.use_row(row):
            acc.update(row, self)

Average

Average(aggregation: CompValue)

Bases: Accumulator

Methods:

Attributes:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation: CompValue):
    super(Average, self).__init__(aggregation)
    self.counter = 0
    self.sum = 0
    self.datatype: Optional[str] = None

counter instance-attribute

counter = 0

datatype instance-attribute

datatype: Optional[str] = None

sum instance-attribute

sum = 0

get_value

get_value() -> Literal
Source code in rdflib/plugins/sparql/aggregates.py
def get_value(self) -> Literal:
    if self.counter == 0:
        return Literal(0)
    if self.datatype in (XSD.float, XSD.double):
        return Literal(self.sum / self.counter)
    else:
        return Literal(Decimal(self.sum) / Decimal(self.counter))

update

update(row: FrozenBindings, aggregator: Aggregator) -> None
Source code in rdflib/plugins/sparql/aggregates.py
def update(self, row: FrozenBindings, aggregator: Aggregator) -> None:
    try:
        value = _eval(self.expr, row)
        dt = self.datatype
        self.sum = sum(type_safe_numbers(self.sum, numeric(value)))
        if dt is None:
            dt = value.datatype
        else:
            # type error: Argument 1 to "type_promotion" has incompatible type "str"; expected "URIRef"
            dt = type_promotion(dt, value.datatype)  # type: ignore[arg-type]
        self.datatype = dt
        if self.distinct:
            self.seen.add(value)
        self.counter += 1
    # skip UNDEF or BNode => SPARQLTypeError
    except NotBoundError:
        pass
    except SPARQLTypeError:
        pass

Counter

Counter(aggregation: CompValue)

Bases: Accumulator

Methods:

Attributes:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation: CompValue):
    super(Counter, self).__init__(aggregation)
    self.value = 0
    if self.expr == "*":
        # cannot eval "*" => always use the full row
        # type error: Cannot assign to a method
        self.eval_row = self.eval_full_row  # type: ignore[assignment]

value instance-attribute

value = 0

eval_full_row

eval_full_row(row: FrozenBindings) -> FrozenBindings
Source code in rdflib/plugins/sparql/aggregates.py
def eval_full_row(self, row: FrozenBindings) -> FrozenBindings:
    return row

eval_row

eval_row(row: FrozenBindings) -> Identifier
Source code in rdflib/plugins/sparql/aggregates.py
def eval_row(self, row: FrozenBindings) -> Identifier:
    return _eval(self.expr, row)

get_value

get_value() -> Literal
Source code in rdflib/plugins/sparql/aggregates.py
def get_value(self) -> Literal:
    return Literal(self.value)

update

update(row: FrozenBindings, aggregator: Aggregator) -> None
Source code in rdflib/plugins/sparql/aggregates.py
def update(self, row: FrozenBindings, aggregator: Aggregator) -> None:
    try:
        val = self.eval_row(row)
    except NotBoundError:
        # skip UNDEF
        return
    self.value += 1
    if self.distinct:
        self.seen.add(val)

use_row

use_row(row: FrozenBindings) -> bool
Source code in rdflib/plugins/sparql/aggregates.py
def use_row(self, row: FrozenBindings) -> bool:
    try:
        return self.eval_row(row) not in self.seen
    except NotBoundError:
        # happens when counting zero optional nodes. See issue #2229
        return False

Extremum

Extremum(aggregation: CompValue)

Bases: Accumulator

abstract base class for Minimum and Maximum

Methods:

Attributes:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation: CompValue):
    self.compare: Callable[[Any, Any], Any]
    super(Extremum, self).__init__(aggregation)
    self.value: Any = None
    # DISTINCT would not change the value for MIN or MAX
    # type error: Cannot assign to a method
    self.use_row = self.dont_care  # type: ignore[method-assign]

compare instance-attribute

compare: Callable[[Any, Any], Any]

use_row instance-attribute

use_row = dont_care

value instance-attribute

value: Any = None

set_value

set_value(bindings: MutableMapping[Variable, Identifier]) -> None
Source code in rdflib/plugins/sparql/aggregates.py
def set_value(self, bindings: MutableMapping[Variable, Identifier]) -> None:
    if self.value is not None:
        # simply do not set if self.value is still None
        bindings[self.var] = Literal(self.value)

update

update(row: FrozenBindings, aggregator: Aggregator) -> None
Source code in rdflib/plugins/sparql/aggregates.py
def update(self, row: FrozenBindings, aggregator: Aggregator) -> None:
    try:
        if self.value is None:
            self.value = _eval(self.expr, row)
        else:
            # self.compare is implemented by Minimum/Maximum
            self.value = self.compare(self.value, _eval(self.expr, row))
    # skip UNDEF or BNode => SPARQLTypeError
    except NotBoundError:
        pass
    except SPARQLTypeError:
        pass

GroupConcat

GroupConcat(aggregation: CompValue)

Bases: Accumulator

Methods:

Attributes:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation: CompValue):
    super(GroupConcat, self).__init__(aggregation)
    # only GROUPCONCAT needs to have a list as accumulator
    self.value = []
    if aggregation.separator is None:
        self.separator = " "
    else:
        self.separator = aggregation.separator

separator instance-attribute

separator = ' '

value instance-attribute

value: List[Literal] = []

get_value

get_value() -> Literal
Source code in rdflib/plugins/sparql/aggregates.py
def get_value(self) -> Literal:
    return Literal(self.separator.join(str(v) for v in self.value))

update

update(row: FrozenBindings, aggregator: Aggregator) -> None
Source code in rdflib/plugins/sparql/aggregates.py
def update(self, row: FrozenBindings, aggregator: Aggregator) -> None:
    try:
        value = _eval(self.expr, row)
        # skip UNDEF
        if isinstance(value, NotBoundError):
            return
        self.value.append(value)
        if self.distinct:
            self.seen.add(value)
    # skip UNDEF
    # NOTE: It seems like this is not the way undefined values occur, they
    # come through not as exceptions but as values. This is left here
    # however as it may occur in some cases.
    # TODO: Consider removing this.
    except NotBoundError:
        pass

Maximum

Maximum(aggregation: CompValue)

Bases: Extremum

Methods:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation: CompValue):
    self.compare: Callable[[Any, Any], Any]
    super(Extremum, self).__init__(aggregation)
    self.value: Any = None
    # DISTINCT would not change the value for MIN or MAX
    # type error: Cannot assign to a method
    self.use_row = self.dont_care  # type: ignore[method-assign]

compare

compare(val1: _ValueT, val2: _ValueT) -> _ValueT
Source code in rdflib/plugins/sparql/aggregates.py
def compare(self, val1: _ValueT, val2: _ValueT) -> _ValueT:
    return max(val1, val2, key=_val)

Minimum

Minimum(aggregation: CompValue)

Bases: Extremum

Methods:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation: CompValue):
    self.compare: Callable[[Any, Any], Any]
    super(Extremum, self).__init__(aggregation)
    self.value: Any = None
    # DISTINCT would not change the value for MIN or MAX
    # type error: Cannot assign to a method
    self.use_row = self.dont_care  # type: ignore[method-assign]

compare

compare(val1: _ValueT, val2: _ValueT) -> _ValueT
Source code in rdflib/plugins/sparql/aggregates.py
def compare(self, val1: _ValueT, val2: _ValueT) -> _ValueT:
    return min(val1, val2, key=_val)

Sample

Sample(aggregation)

Bases: Accumulator

takes the first eligible value

Methods:

Attributes:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation):
    super(Sample, self).__init__(aggregation)
    # DISTINCT would not change the value
    # type error: Cannot assign to a method
    self.use_row = self.dont_care  # type: ignore[method-assign]

use_row instance-attribute

use_row = dont_care

get_value

get_value() -> None
Source code in rdflib/plugins/sparql/aggregates.py
def get_value(self) -> None:
    # set None if no value was set
    return None

update

update(row: FrozenBindings, aggregator: Aggregator) -> None
Source code in rdflib/plugins/sparql/aggregates.py
def update(self, row: FrozenBindings, aggregator: Aggregator) -> None:
    try:
        # set the value now
        aggregator.bindings[self.var] = _eval(self.expr, row)
        # and skip this accumulator for future rows
        del aggregator.accumulators[self.var]
    except NotBoundError:
        pass

Sum

Sum(aggregation: CompValue)

Bases: Accumulator

Methods:

Attributes:

Source code in rdflib/plugins/sparql/aggregates.py
def __init__(self, aggregation: CompValue):
    super(Sum, self).__init__(aggregation)
    self.value = 0
    self.datatype: Optional[str] = None

datatype instance-attribute

datatype: Optional[str] = None

value instance-attribute

value = 0

get_value

get_value() -> Literal
Source code in rdflib/plugins/sparql/aggregates.py
def get_value(self) -> Literal:
    return Literal(self.value, datatype=self.datatype)

update

update(row: FrozenBindings, aggregator: Aggregator) -> None
Source code in rdflib/plugins/sparql/aggregates.py
def update(self, row: FrozenBindings, aggregator: Aggregator) -> None:
    try:
        value = _eval(self.expr, row)
        dt = self.datatype
        if dt is None:
            dt = value.datatype
        else:
            # type error: Argument 1 to "type_promotion" has incompatible type "str"; expected "URIRef"
            dt = type_promotion(dt, value.datatype)  # type: ignore[arg-type]
        self.datatype = dt
        self.value = sum(type_safe_numbers(self.value, numeric(value)))
        if self.distinct:
            self.seen.add(value)
    except NotBoundError:
        # skip UNDEF
        pass

type_safe_numbers

type_safe_numbers(*args: int) -> Tuple[int]
type_safe_numbers(*args: Union[Decimal, float, int]) -> Tuple[Union[float, int]]
type_safe_numbers(*args: Union[Decimal, float, int]) -> Iterable[Union[float, int]]
Source code in rdflib/plugins/sparql/aggregates.py
def type_safe_numbers(*args: Union[Decimal, float, int]) -> Iterable[Union[float, int]]:
    if any(isinstance(arg, float) for arg in args) and any(
        isinstance(arg, Decimal) for arg in args
    ):
        return map(float, args)
    # type error: Incompatible return value type (got "Tuple[Union[Decimal, float, int], ...]", expected "Iterable[Union[float, int]]")
    # NOTE on type error: if args contains a Decimal it will nopt get here.
    return args  # type: ignore[return-value]