Skip to content

concurrent

Classes:

ConcurrentStore

ConcurrentStore(store)

A store that allows concurrent reads and writes.

Methods:

Attributes:

Source code in rdflib/plugins/stores/concurrent.py
def __init__(self, store):
    self.store = store

    # number of calls to visit still in progress
    self.__visit_count = 0

    # lock for locking down the indices
    self.__lock = Lock()

    # lists for keeping track of added and removed triples while
    # we wait for the lock
    self.__pending_removes = []
    self.__pending_adds = []

__lock instance-attribute

__lock = Lock()

__pending_adds instance-attribute

__pending_adds = []

__pending_removes instance-attribute

__pending_removes = []

__visit_count instance-attribute

__visit_count = 0

store instance-attribute

store = store

__begin_read

__begin_read()
Source code in rdflib/plugins/stores/concurrent.py
def __begin_read(self):
    lock = self.__lock
    lock.acquire()
    self.__visit_count = self.__visit_count + 1
    lock.release()

__end_read

__end_read()
Source code in rdflib/plugins/stores/concurrent.py
def __end_read(self):
    lock = self.__lock
    lock.acquire()
    self.__visit_count = self.__visit_count - 1
    if self.__visit_count == 0:
        pending_removes = self.__pending_removes
        while pending_removes:
            (s, p, o) = pending_removes.pop()
            try:
                self.store.remove((s, p, o))
            except:  # noqa: E722
                # TODO: change to try finally?
                print(s, p, o, "Not in store to remove")
        pending_adds = self.__pending_adds
        while pending_adds:
            (s, p, o) = pending_adds.pop()
            self.store.add((s, p, o))
    lock.release()

__len__

__len__()
Source code in rdflib/plugins/stores/concurrent.py
def __len__(self):
    return self.store.__len__()

add

add(triple)
Source code in rdflib/plugins/stores/concurrent.py
def add(self, triple):
    (s, p, o) = triple
    if self.__visit_count == 0:
        self.store.add((s, p, o))
    else:
        self.__pending_adds.append((s, p, o))

remove

remove(triple)
Source code in rdflib/plugins/stores/concurrent.py
def remove(self, triple):
    (s, p, o) = triple
    if self.__visit_count == 0:
        self.store.remove((s, p, o))
    else:
        self.__pending_removes.append((s, p, o))

triples

triples(triple)
Source code in rdflib/plugins/stores/concurrent.py
def triples(self, triple):
    (su, pr, ob) = triple
    g = self.store.triples((su, pr, ob))
    pending_removes = self.__pending_removes
    self.__begin_read()
    for s, p, o in ResponsibleGenerator(g, self.__end_read):
        if not (s, p, o) in pending_removes:  # noqa: E713
            yield s, p, o

    for s, p, o in self.__pending_adds:
        if (
            (su is None or su == s)
            and (pr is None or pr == p)
            and (ob is None or ob == o)
        ):
            yield s, p, o

ResponsibleGenerator

ResponsibleGenerator(gen, cleanup)

A generator that will help clean up when it is done being used.

Methods:

Attributes:

Source code in rdflib/plugins/stores/concurrent.py
def __init__(self, gen, cleanup):
    self.cleanup = cleanup
    self.gen = gen

__slots__ class-attribute instance-attribute

__slots__ = ['cleanup', 'gen']

cleanup instance-attribute

cleanup = cleanup

gen instance-attribute

gen = gen

__del__

__del__()
Source code in rdflib/plugins/stores/concurrent.py
def __del__(self):
    self.cleanup()

__iter__

__iter__()
Source code in rdflib/plugins/stores/concurrent.py
def __iter__(self):
    return self

__next__

__next__()
Source code in rdflib/plugins/stores/concurrent.py
def __next__(self):
    return next(self.gen)