Skip to content

auditable

This wrapper intercepts calls through the store interface and implements thread-safe logging of destructive operations (adds / removes) in reverse. This is persisted on the store instance and the reverse operations are executed In order to return the store to the state it was when the transaction began Since the reverse operations are persisted on the store, the store itself acts as a transaction.

Calls to commit or rollback, flush the list of reverse operations This provides thread-safe atomicity and isolation (assuming concurrent operations occur with different store instances), but no durability (transactions are persisted in memory and won’t be available to reverse operations after the system fails): A and I out of ACID.

Classes:

  • AuditableStore

    A store that logs destructive operations (add/remove) in reverse order.

Attributes:

destructiveOpLocks module-attribute

destructiveOpLocks = {'add': None, 'remove': None}

AuditableStore

AuditableStore(store: Store)

Bases: Store

A store that logs destructive operations (add/remove) in reverse order.

Methods:

Attributes:

Source code in rdflib/plugins/stores/auditable.py
def __init__(self, store: Store):
    self.store = store
    self.context_aware = store.context_aware
    # NOTE: this store can't be formula_aware as it doesn't have enough
    # info to reverse the removal of a quoted statement
    self.formula_aware = False  # store.formula_aware
    self.transaction_aware = True  # This is only half true
    self.reverseOps: List[
        Tuple[
            Optional[_SubjectType],
            Optional[_PredicateType],
            Optional[_ObjectType],
            Optional[_ContextIdentifierType],
            str,
        ]
    ] = []
    self.rollbackLock = threading.RLock()

context_aware instance-attribute

context_aware = context_aware

formula_aware instance-attribute

formula_aware = False

reverseOps instance-attribute

reverseOps: List[Tuple[Optional[_SubjectType], Optional[_PredicateType], Optional[_ObjectType], Optional[_ContextIdentifierType], str]] = []

rollbackLock instance-attribute

rollbackLock = RLock()

store instance-attribute

store = store

transaction_aware instance-attribute

transaction_aware = True

__len__

__len__(context: Optional[_ContextType] = None)
Source code in rdflib/plugins/stores/auditable.py
def __len__(self, context: Optional[_ContextType] = None):
    context = (
        context.__class__(self.store, context.identifier)
        if context is not None
        else None
    )
    return self.store.__len__(context)

add

add(triple: _TripleType, context: _ContextType, quoted: bool = False) -> None
Source code in rdflib/plugins/stores/auditable.py
def add(
    self, triple: _TripleType, context: _ContextType, quoted: bool = False
) -> None:
    (s, p, o) = triple
    lock = destructiveOpLocks["add"]
    lock = lock if lock else threading.RLock()
    with lock:
        context = (
            context.__class__(self.store, context.identifier)
            if context is not None
            else None
        )
        ctxId = context.identifier if context is not None else None  # noqa: N806
        if list(self.store.triples(triple, context)):
            return  # triple already in store, do nothing
        self.reverseOps.append((s, p, o, ctxId, "remove"))
        try:
            self.reverseOps.remove((s, p, o, ctxId, "add"))
        except ValueError:
            pass
        self.store.add((s, p, o), context, quoted)

bind

bind(prefix: str, namespace: URIRef, override: bool = True) -> None
Source code in rdflib/plugins/stores/auditable.py
def bind(self, prefix: str, namespace: URIRef, override: bool = True) -> None:
    self.store.bind(prefix, namespace, override=override)

close

close(commit_pending_transaction: bool = False) -> None
Source code in rdflib/plugins/stores/auditable.py
def close(self, commit_pending_transaction: bool = False) -> None:
    self.store.close()

commit

commit() -> None
Source code in rdflib/plugins/stores/auditable.py
def commit(self) -> None:
    self.reverseOps = []

contexts

contexts(triple: Optional[_TripleType] = None) -> Generator[_ContextType, None, None]
Source code in rdflib/plugins/stores/auditable.py
def contexts(
    self, triple: Optional[_TripleType] = None
) -> Generator[_ContextType, None, None]:
    for ctx in self.store.contexts(triple):
        yield ctx

destroy

destroy(configuration: str) -> None
Source code in rdflib/plugins/stores/auditable.py
def destroy(self, configuration: str) -> None:
    self.store.destroy(configuration)

namespace

namespace(prefix: str) -> Optional[URIRef]
Source code in rdflib/plugins/stores/auditable.py
def namespace(self, prefix: str) -> Optional[URIRef]:
    return self.store.namespace(prefix)

namespaces

namespaces() -> Iterator[Tuple[str, URIRef]]
Source code in rdflib/plugins/stores/auditable.py
def namespaces(self) -> Iterator[Tuple[str, URIRef]]:
    return self.store.namespaces()

open

open(configuration: Union[str, tuple[str, str]], create: bool = True) -> Optional[int]
Source code in rdflib/plugins/stores/auditable.py
def open(
    self, configuration: Union[str, tuple[str, str]], create: bool = True
) -> Optional[int]:
    return self.store.open(configuration, create)

prefix

prefix(namespace: URIRef) -> Optional[str]
Source code in rdflib/plugins/stores/auditable.py
def prefix(self, namespace: URIRef) -> Optional[str]:
    return self.store.prefix(namespace)

query

query(*args: Any, **kw: Any) -> Result
Source code in rdflib/plugins/stores/auditable.py
def query(self, *args: Any, **kw: Any) -> Result:
    return self.store.query(*args, **kw)

remove

remove(spo: _TriplePatternType, context: Optional[_ContextType] = None) -> None
Source code in rdflib/plugins/stores/auditable.py
def remove(
    self, spo: _TriplePatternType, context: Optional[_ContextType] = None
) -> None:
    subject, predicate, object_ = spo
    lock = destructiveOpLocks["remove"]
    lock = lock if lock else threading.RLock()
    with lock:
        # Need to determine which quads will be removed if any term is a
        # wildcard
        context = (
            context.__class__(self.store, context.identifier)
            if context is not None
            else None
        )
        ctxId = context.identifier if context is not None else None  # noqa: N806
        if None in [subject, predicate, object_, context]:
            if ctxId:
                # type error: Item "None" of "Optional[Graph]" has no attribute "triples"
                for s, p, o in context.triples((subject, predicate, object_)):  # type: ignore[union-attr]
                    try:
                        self.reverseOps.remove((s, p, o, ctxId, "remove"))
                    except ValueError:
                        self.reverseOps.append((s, p, o, ctxId, "add"))
            else:
                for s, p, o, ctx in ConjunctiveGraph(self.store).quads(
                    (subject, predicate, object_)
                ):
                    try:
                        # type error: Item "None" of "Optional[Graph]" has no attribute "identifier"
                        self.reverseOps.remove((s, p, o, ctx.identifier, "remove"))  # type: ignore[union-attr]
                    except ValueError:
                        # type error: Item "None" of "Optional[Graph]" has no attribute "identifier"
                        self.reverseOps.append((s, p, o, ctx.identifier, "add"))  # type: ignore[union-attr]
        else:
            if not list(self.triples((subject, predicate, object_), context)):
                return  # triple not present in store, do nothing
            try:
                self.reverseOps.remove(
                    (subject, predicate, object_, ctxId, "remove")
                )
            except ValueError:
                self.reverseOps.append((subject, predicate, object_, ctxId, "add"))
        self.store.remove((subject, predicate, object_), context)

rollback

rollback() -> None
Source code in rdflib/plugins/stores/auditable.py
def rollback(self) -> None:
    # Acquire Rollback lock and apply reverse operations in the forward
    # order
    with self.rollbackLock:
        for subject, predicate, obj, context, op in self.reverseOps:
            if op == "add":
                # type error: Argument 2 to "Graph" has incompatible type "Optional[Node]"; expected "Union[IdentifiedNode, str, None]"
                self.store.add(
                    (subject, predicate, obj), Graph(self.store, context)  # type: ignore[arg-type]
                )
            else:
                self.store.remove(
                    (subject, predicate, obj), Graph(self.store, context)
                )

        self.reverseOps = []

triples

triples(triple: _TriplePatternType, context: Optional[_ContextType] = None) -> Iterator[Tuple[_TripleType, Iterator[Optional[_ContextType]]]]
Source code in rdflib/plugins/stores/auditable.py
def triples(
    self, triple: _TriplePatternType, context: Optional[_ContextType] = None
) -> Iterator[Tuple[_TripleType, Iterator[Optional[_ContextType]]]]:
    (su, pr, ob) = triple
    context = (
        context.__class__(self.store, context.identifier)
        if context is not None
        else None
    )
    for (s, p, o), cg in self.store.triples((su, pr, ob), context):
        yield (s, p, o), cg