Skip to content

tsvresults

This implements the Tab Separated SPARQL Result Format

It is implemented with pyparsing, reusing the elements from the SPARQL Parser

Classes:

Attributes:

EMPTY module-attribute

EMPTY = FollowedBy(LineEnd()) | FollowedBy('\t')

HEADER module-attribute

HEADER = Var + ZeroOrMore(Suppress('\t') + Var)

NONE_VALUE module-attribute

NONE_VALUE = object()

RDFLITERAL module-attribute

RDFLITERAL = Comp('literal', Param('string', String) + Optional(Param('lang', leaveWhitespace()) | leaveWhitespace() + leaveWhitespace()))

ROW module-attribute

ROW = (EMPTY | TERM) + ZeroOrMore(Suppress('\t') + (EMPTY | TERM))

String module-attribute

TERM module-attribute

TSVResultParser

TSVResultParser()

Bases: ResultParser

Parses SPARQL TSV results into a Result object.

Methods:

Source code in rdflib/query.py
def __init__(self):
    pass

convertTerm

convertTerm(t: Union[object, Literal, BNode, CompValue, URIRef]) -> Optional[Union[object, BNode, URIRef, Literal]]
Source code in rdflib/plugins/sparql/results/tsvresults.py
def convertTerm(
    self, t: Union[object, RDFLiteral, BNode, CompValue, URIRef]
) -> typing.Optional[Union[object, BNode, URIRef, RDFLiteral]]:
    if t is NONE_VALUE:
        return None
    if isinstance(t, CompValue):
        if t.name == "literal":
            return RDFLiteral(t.string, lang=t.lang, datatype=t.datatype)
        else:
            raise Exception("I dont know how to handle this: %s" % (t,))
    else:
        return t

parse

parse(source: IO, content_type: Optional[str] = None) -> Result
Source code in rdflib/plugins/sparql/results/tsvresults.py
def parse(self, source: IO, content_type: typing.Optional[str] = None) -> Result:  # type: ignore[override]
    if isinstance(source.read(0), bytes):
        # if reading from source returns bytes do utf-8 decoding
        # type error: Incompatible types in assignment (expression has type "StreamReader", variable has type "IO[Any]")
        source = codecs.getreader("utf-8")(source)  # type: ignore[assignment]

    r = Result("SELECT")

    header = source.readline()

    r.vars = list(HEADER.parseString(header.strip(), parseAll=True))
    r.bindings = []
    while True:
        line = source.readline()
        if not line:
            break
        line = line.strip("\n")
        if line == "":
            continue

        row = ROW.parseString(line, parseAll=True)
        # type error: Generator has incompatible item type "object"; expected "Identifier"
        r.bindings.append(dict(zip(r.vars, (self.convertTerm(x) for x in row))))  # type: ignore[misc]

    return r