Skip to content

sparql

Classes:

AlreadyBound

AlreadyBound()

Bases: SPARQLError

Raised when trying to bind a variable that is already bound!

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self):
    SPARQLError.__init__(self)

Bindings

Bindings(outer: Optional[Bindings] = None, d=[])

Bases: MutableMapping

A single level of a stack of variable-value bindings. Each dict keeps a reference to the dict below it, any failed lookup is propegated back

In python 3.3 this could be a collections.ChainMap

Methods:

Attributes:

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self, outer: Optional[Bindings] = None, d=[]):
    self._d: Dict[str, str] = dict(d)
    self.outer = outer

outer instance-attribute

outer = outer

__contains__

__contains__(key: Any) -> bool
Source code in rdflib/plugins/sparql/sparql.py
def __contains__(self, key: Any) -> bool:
    try:
        self[key]
        return True
    except KeyError:
        return False

__delitem__

__delitem__(key: str) -> None
Source code in rdflib/plugins/sparql/sparql.py
def __delitem__(self, key: str) -> None:
    raise Exception("DelItem is not implemented!")

__getitem__

__getitem__(key: str) -> str
Source code in rdflib/plugins/sparql/sparql.py
def __getitem__(self, key: str) -> str:
    if key in self._d:
        return self._d[key]

    if not self.outer:
        raise KeyError()
    return self.outer[key]

__iter__

__iter__() -> Generator[str, None, None]
Source code in rdflib/plugins/sparql/sparql.py
def __iter__(self) -> Generator[str, None, None]:
    d: Optional[Bindings] = self
    while d is not None:
        yield from d._d
        d = d.outer

__len__

__len__() -> int
Source code in rdflib/plugins/sparql/sparql.py
def __len__(self) -> int:
    i = 0
    d: Optional[Bindings] = self
    while d is not None:
        i += len(d._d)
        d = d.outer
    return i

__repr__

__repr__() -> str
Source code in rdflib/plugins/sparql/sparql.py
def __repr__(self) -> str:
    return str(self)

__setitem__

__setitem__(key: str, value: Any) -> None
Source code in rdflib/plugins/sparql/sparql.py
def __setitem__(self, key: str, value: Any) -> None:
    self._d[key] = value

__str__

__str__() -> str
Source code in rdflib/plugins/sparql/sparql.py
def __str__(self) -> str:
    # type error: Generator has incompatible item type "Tuple[Any, str]"; expected "str"
    return "Bindings({" + ", ".join((k, self[k]) for k in self) + "})"  # type: ignore[misc]

FrozenBindings

FrozenBindings(ctx: QueryContext, *args, **kwargs)

Bases: FrozenDict

Methods:

Attributes:

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self, ctx: QueryContext, *args, **kwargs):
    FrozenDict.__init__(self, *args, **kwargs)
    self.ctx = ctx

bnodes property

bnodes: Mapping[Identifier, BNode]

ctx instance-attribute

ctx = ctx

now property

now: datetime

prologue property

prologue: Optional[Prologue]

__getitem__

__getitem__(key: Union[Identifier, str]) -> Identifier
Source code in rdflib/plugins/sparql/sparql.py
def __getitem__(self, key: Union[Identifier, str]) -> Identifier:
    if not isinstance(key, Node):
        key = Variable(key)

    if not isinstance(key, (BNode, Variable)):
        return key

    if key not in self._d:
        # type error: Value of type "Optional[Dict[Variable, Identifier]]" is not indexable
        # type error: Invalid index type "Union[BNode, Variable]" for "Optional[Dict[Variable, Identifier]]"; expected type "Variable"
        return self.ctx.initBindings[key]  # type: ignore[index]
    else:
        return self._d[key]

forget

forget(before: QueryContext, _except: Optional[Container[Variable]] = None) -> FrozenBindings

return a frozen dict only of bindings made in self since before

Source code in rdflib/plugins/sparql/sparql.py
def forget(
    self, before: QueryContext, _except: Optional[Container[Variable]] = None
) -> FrozenBindings:
    """
    return a frozen dict only of bindings made in self
    since before
    """
    if not _except:
        _except = []

    # bindings from initBindings are newer forgotten
    return FrozenBindings(
        self.ctx,
        (
            x
            for x in self.items()
            if (
                x[0] in _except
                # type error: Unsupported right operand type for in ("Optional[Dict[Variable, Identifier]]")
                or x[0] in self.ctx.initBindings  # type: ignore[operator]
                or before[x[0]] is None
            )
        ),
    )

merge

merge(other: Mapping[Identifier, Identifier]) -> FrozenBindings
Source code in rdflib/plugins/sparql/sparql.py
def merge(self, other: t.Mapping[Identifier, Identifier]) -> FrozenBindings:
    res = FrozenBindings(self.ctx, itertools.chain(self.items(), other.items()))
    return res

project

project(vars: Container[Variable]) -> FrozenBindings
Source code in rdflib/plugins/sparql/sparql.py
def project(self, vars: Container[Variable]) -> FrozenBindings:
    return FrozenBindings(self.ctx, (x for x in self.items() if x[0] in vars))

remember

remember(these) -> FrozenBindings

return a frozen dict only of bindings in these

Source code in rdflib/plugins/sparql/sparql.py
def remember(self, these) -> FrozenBindings:
    """
    return a frozen dict only of bindings in these
    """
    return FrozenBindings(self.ctx, (x for x in self.items() if x[0] in these))

FrozenDict

FrozenDict(*args: Any, **kwargs: Any)

Bases: Mapping

An immutable hashable dict

Taken from http://stackoverflow.com/a/2704866/81121

Methods:

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self, *args: Any, **kwargs: Any):
    self._d: Dict[Identifier, Identifier] = dict(*args, **kwargs)
    self._hash: Optional[int] = None

__getitem__

__getitem__(key: Identifier) -> Identifier
Source code in rdflib/plugins/sparql/sparql.py
def __getitem__(self, key: Identifier) -> Identifier:
    return self._d[key]

__hash__

__hash__() -> int
Source code in rdflib/plugins/sparql/sparql.py
def __hash__(self) -> int:
    # It would have been simpler and maybe more obvious to
    # use hash(tuple(sorted(self._d.items()))) from this discussion
    # so far, but this solution is O(n). I don't know what kind of
    # n we are going to run into, but sometimes it's hard to resist the
    # urge to optimize when it will gain improved algorithmic performance.
    if self._hash is None:
        self._hash = 0
        for key, value in self.items():
            self._hash ^= hash(key)
            self._hash ^= hash(value)
    return self._hash

__iter__

__iter__()
Source code in rdflib/plugins/sparql/sparql.py
def __iter__(self):
    return iter(self._d)

__len__

__len__() -> int
Source code in rdflib/plugins/sparql/sparql.py
def __len__(self) -> int:
    return len(self._d)

__repr__

__repr__() -> str
Source code in rdflib/plugins/sparql/sparql.py
def __repr__(self) -> str:
    return repr(self._d)

__str__

__str__() -> str
Source code in rdflib/plugins/sparql/sparql.py
def __str__(self) -> str:
    return str(self._d)

compatible

compatible(other: Mapping[Identifier, Identifier]) -> bool
Source code in rdflib/plugins/sparql/sparql.py
def compatible(self, other: t.Mapping[Identifier, Identifier]) -> bool:
    for k in self:
        try:
            if self[k] != other[k]:
                return False
        except KeyError:
            pass

    return True

disjointDomain

disjointDomain(other: Mapping[Identifier, Identifier]) -> bool
Source code in rdflib/plugins/sparql/sparql.py
def disjointDomain(self, other: t.Mapping[Identifier, Identifier]) -> bool:
    return not bool(set(self).intersection(other))

merge

merge(other: Mapping[Identifier, Identifier]) -> FrozenDict
Source code in rdflib/plugins/sparql/sparql.py
def merge(self, other: t.Mapping[Identifier, Identifier]) -> FrozenDict:
    res = FrozenDict(itertools.chain(self.items(), other.items()))

    return res

project

project(vars: Container[Variable]) -> FrozenDict
Source code in rdflib/plugins/sparql/sparql.py
def project(self, vars: Container[Variable]) -> FrozenDict:
    return FrozenDict(x for x in self.items() if x[0] in vars)

NotBoundError

NotBoundError(msg: Optional[str] = None)

Bases: SPARQLError

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self, msg: Optional[str] = None):
    SPARQLError.__init__(self, msg)

Prologue

Prologue()

A class for holding prefixing bindings and base URI information

Methods:

Attributes:

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self) -> None:
    self.base: Optional[str] = None
    self.namespace_manager = NamespaceManager(Graph())  # ns man needs a store

base instance-attribute

base: Optional[str] = None

namespace_manager instance-attribute

namespace_manager = NamespaceManager(Graph())

absolutize

absolutize(iri: Optional[Union[CompValue, str]]) -> Optional[Union[CompValue, str]]

Apply BASE / PREFIXes to URIs (and to datatypes in Literals)

TODO: Move resolving URIs to pre-processing

Source code in rdflib/plugins/sparql/sparql.py
def absolutize(
    self, iri: Optional[Union[CompValue, str]]
) -> Optional[Union[CompValue, str]]:
    """
    Apply BASE / PREFIXes to URIs
    (and to datatypes in Literals)

    TODO: Move resolving URIs to pre-processing
    """

    if isinstance(iri, CompValue):
        if iri.name == "pname":
            return self.resolvePName(iri.prefix, iri.localname)
        if iri.name == "literal":
            # type error: Argument "datatype" to "Literal" has incompatible type "Union[CompValue, Identifier, None]"; expected "Optional[str]"
            return Literal(
                iri.string, lang=iri.lang, datatype=self.absolutize(iri.datatype)  # type: ignore[arg-type]
            )
    elif isinstance(iri, URIRef) and not ":" in iri:  # noqa: E713
        return URIRef(iri, base=self.base)

    return iri

bind

bind(prefix: Optional[str], uri: Any) -> None
Source code in rdflib/plugins/sparql/sparql.py
def bind(self, prefix: Optional[str], uri: Any) -> None:
    self.namespace_manager.bind(prefix, uri, replace=True)

resolvePName

resolvePName(prefix: Optional[str], localname: Optional[str]) -> URIRef
Source code in rdflib/plugins/sparql/sparql.py
def resolvePName(self, prefix: Optional[str], localname: Optional[str]) -> URIRef:
    ns = self.namespace_manager.store.namespace(prefix or "")
    if ns is None:
        raise Exception("Unknown namespace prefix : %s" % prefix)
    return URIRef(ns + (localname or ""))

Query

Query(prologue: Prologue, algebra: CompValue)

A parsed and translated query

Attributes:

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self, prologue: Prologue, algebra: CompValue):
    self.prologue = prologue
    self.algebra = algebra
    self._original_args: Tuple[str, Mapping[str, str], Optional[str]]

algebra instance-attribute

algebra = algebra

prologue instance-attribute

prologue = prologue

QueryContext

QueryContext(graph: Optional[Graph] = None, bindings: Optional[Union[Bindings, FrozenBindings, List[Any]]] = None, initBindings: Optional[Mapping[str, Identifier]] = None, datasetClause=None)

Query context - passed along when evaluating the query

Methods:

Attributes:

Source code in rdflib/plugins/sparql/sparql.py
def __init__(
    self,
    graph: Optional[Graph] = None,
    bindings: Optional[Union[Bindings, FrozenBindings, List[Any]]] = None,
    initBindings: Optional[Mapping[str, Identifier]] = None,
    datasetClause=None,
):
    self.initBindings = initBindings
    self.bindings = Bindings(d=bindings or [])
    if initBindings:
        self.bindings.update(initBindings)

    self.graph: Optional[Graph]
    self._dataset: Optional[Union[Dataset, ConjunctiveGraph]]
    if isinstance(graph, (Dataset, ConjunctiveGraph)):
        if datasetClause:
            self._dataset = Dataset()
            self.graph = Graph()
            for d in datasetClause:
                if d.default:
                    from_graph = graph.get_context(d.default)
                    self.graph += from_graph
                    if not from_graph:
                        self.load(d.default, default=True)
                elif d.named:
                    namedGraphs = Graph(
                        store=self.dataset.store, identifier=d.named
                    )
                    from_named_graphs = graph.get_context(d.named)
                    namedGraphs += from_named_graphs
                    if not from_named_graphs:
                        self.load(d.named, default=False)
        else:
            self._dataset = graph
            if rdflib.plugins.sparql.SPARQL_DEFAULT_GRAPH_UNION:
                self.graph = self.dataset
            else:
                self.graph = self.dataset.default_context
    else:
        self._dataset = None
        self.graph = graph

    self.prologue: Optional[Prologue] = None
    self._now: Optional[datetime.datetime] = None

    self.bnodes: t.MutableMapping[Identifier, BNode] = collections.defaultdict(
        BNode
    )

bindings instance-attribute

bindings = Bindings(d=bindings or [])

bnodes instance-attribute

bnodes: MutableMapping[Identifier, BNode] = defaultdict(BNode)

dataset property

“current dataset

graph instance-attribute

graph: Optional[Graph]

initBindings instance-attribute

initBindings = initBindings

now property

now: datetime

prologue instance-attribute

prologue: Optional[Prologue] = None

__getitem__

__getitem__(key: Union[str, Path]) -> Optional[Union[str, Path]]
Source code in rdflib/plugins/sparql/sparql.py
def __getitem__(self, key: Union[str, Path]) -> Optional[Union[str, Path]]:
    # in SPARQL BNodes are just labels
    if not isinstance(key, (BNode, Variable)):
        return key
    try:
        return self.bindings[key]
    except KeyError:
        return None

__setitem__

__setitem__(key: str, value: str) -> None
Source code in rdflib/plugins/sparql/sparql.py
def __setitem__(self, key: str, value: str) -> None:
    if key in self.bindings and self.bindings[key] != value:
        raise AlreadyBound()

    self.bindings[key] = value

clean

clean() -> QueryContext
Source code in rdflib/plugins/sparql/sparql.py
def clean(self) -> QueryContext:
    return self.clone([])

clone

clone(bindings: Optional[Union[FrozenBindings, Bindings, List[Any]]] = None) -> QueryContext
Source code in rdflib/plugins/sparql/sparql.py
def clone(
    self, bindings: Optional[Union[FrozenBindings, Bindings, List[Any]]] = None
) -> QueryContext:
    r = QueryContext(
        self._dataset if self._dataset is not None else self.graph,
        bindings or self.bindings,
        initBindings=self.initBindings,
    )
    r.prologue = self.prologue
    r.graph = self.graph
    r.bnodes = self.bnodes
    return r

get

get(key: str, default: Optional[Any] = None) -> Any
Source code in rdflib/plugins/sparql/sparql.py
def get(self, key: str, default: Optional[Any] = None) -> Any:
    try:
        return self[key]
    except KeyError:
        return default

load

load(source: URIRef, default: bool = False, into: Optional[Identifier] = None, **kwargs: Any) -> None

Load data from the source into the query context’s.

Parameters:

  • source

    (URIRef) –

    The source to load from.

  • default

    (bool, default: False ) –

    If True, triples from the source will be added to the default graph, otherwise it will be loaded into a graph with source URI as its name.

  • into

    (Optional[Identifier], default: None ) –

    The name of the graph to load the data into. If None, the source URI will be used as as the name of the graph.

  • **kwargs

    (Any, default: {} ) –

    Keyword arguments to pass to parse.

Source code in rdflib/plugins/sparql/sparql.py
def load(
    self,
    source: URIRef,
    default: bool = False,
    into: Optional[Identifier] = None,
    **kwargs: Any,
) -> None:
    """
    Load data from the source into the query context's.

    Args:
        source: The source to load from.
        default: If `True`, triples from the source will be added
            to the default graph, otherwise it will be loaded into a
            graph with `source` URI as its name.
        into: The name of the graph to load the data into. If
            `None`, the source URI will be used as as the name of the
            graph.
        **kwargs: Keyword arguments to pass to
            [`parse`][rdflib.graph.Graph.parse].
    """

    def _load(graph, source):
        try:
            return graph.parse(source, format="turtle", **kwargs)
        except Exception:
            pass
        try:
            return graph.parse(source, format="xml", **kwargs)
        except Exception:
            pass
        try:
            return graph.parse(source, format="n3", **kwargs)
        except Exception:
            pass
        try:
            return graph.parse(source, format="nt", **kwargs)
        except Exception:
            raise Exception(
                "Could not load %s as either RDF/XML, N3 or NTriples" % source
            )

    if not rdflib.plugins.sparql.SPARQL_LOAD_GRAPHS:
        # we are not loading - if we already know the graph
        # being "loaded", just add it to the default-graph
        if default:
            # Unsupported left operand type for + ("None")
            self.graph += self.dataset.get_context(source)  # type: ignore[operator]
    else:
        if default:
            _load(self.graph, source)
        else:
            if into is None:
                into = source
            _load(self.dataset.get_context(into), source)

push

push() -> QueryContext
Source code in rdflib/plugins/sparql/sparql.py
def push(self) -> QueryContext:
    r = self.clone(Bindings(self.bindings))
    return r

pushGraph

pushGraph(graph: Optional[Graph]) -> QueryContext
Source code in rdflib/plugins/sparql/sparql.py
def pushGraph(self, graph: Optional[Graph]) -> QueryContext:
    r = self.clone()
    r.graph = graph
    return r

solution

solution(vars: Optional[Iterable[Variable]] = None) -> FrozenBindings

Return a static copy of the current variable bindings as dict

Source code in rdflib/plugins/sparql/sparql.py
def solution(self, vars: Optional[Iterable[Variable]] = None) -> FrozenBindings:
    """
    Return a static copy of the current variable bindings as dict
    """
    if vars:
        return FrozenBindings(
            self, ((k, v) for k, v in self.bindings.items() if k in vars)
        )
    else:
        return FrozenBindings(self, self.bindings.items())

thaw

thaw(frozenbindings: FrozenBindings) -> QueryContext

Create a new read/write query context from the given solution

Source code in rdflib/plugins/sparql/sparql.py
def thaw(self, frozenbindings: FrozenBindings) -> QueryContext:
    """
    Create a new read/write query context from the given solution
    """
    c = self.clone(frozenbindings)

    return c

SPARQLError

SPARQLError(msg: Optional[str] = None)

Bases: Exception

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self, msg: Optional[str] = None):
    Exception.__init__(self, msg)

SPARQLTypeError

SPARQLTypeError(msg: Optional[str])

Bases: SPARQLError

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self, msg: Optional[str]):
    SPARQLError.__init__(self, msg)

Update

Update(prologue: Prologue, algebra: List[CompValue])

A parsed and translated update

Attributes:

Source code in rdflib/plugins/sparql/sparql.py
def __init__(self, prologue: Prologue, algebra: List[CompValue]):
    self.prologue = prologue
    self.algebra = algebra
    self._original_args: Tuple[str, Mapping[str, str], Optional[str]]

algebra instance-attribute

algebra = algebra

prologue instance-attribute

prologue = prologue