Skip to content

ntriples

N-Triples Parser License: GPL 2, W3C, BSD, or MIT Author: Sean B. Palmer, inamidst.com

Classes:

Functions:

__all__ module-attribute

__all__ = ['unquote', 'uriquote', 'W3CNTriplesParser', 'NTGraphSink', 'NTParser', 'DummySink']

bufsiz module-attribute

bufsiz = 2048

literal module-attribute

literal = '"([^"\\\\]*(?:\\\\.[^"\\\\]*)*)"'

litinfo module-attribute

litinfo = '(?:@([a-zA-Z]+(?:-[a-zA-Z0-9]+)*)|\\^\\^' + uriref + ')?'

r_hibyte module-attribute

r_hibyte = compile('([\\x80-\\xFF])')

r_line module-attribute

r_line = compile('([^\\r\\n]*)(?:\\r\\n|\\r|\\n)')

r_literal module-attribute

r_literal = compile(literal + litinfo)

r_nodeid module-attribute

r_nodeid = compile('_:([A-Za-z0-9_:]([-A-Za-z0-9_:\\.]*[-A-Za-z0-9_:])?)')

r_quot module-attribute

r_quot = compile('\\\\([tbnrf"\'\\\\])')

r_safe module-attribute

r_safe = compile('([\\x20\\x21\\x23-\\x5B\\x5D-\\x7E]+)')

r_tail module-attribute

r_tail = compile('[ \\t]*\\.[ \\t]*(#.*)?')

r_uniquot module-attribute

r_uniquot = compile('\\\\u([0-9A-Fa-f]{4})|\\\\U([0-9A-Fa-f]{8})')

r_uriref module-attribute

r_uriref = compile(uriref)

r_wspace module-attribute

r_wspace = compile('[ \\t]*')

r_wspaces module-attribute

r_wspaces = compile('[ \\t]+')

uriref module-attribute

uriref = '<([^:]+:[^\\s"<>]*)>'

validate module-attribute

validate = False

DummySink

DummySink()

Methods:

Attributes:

Source code in rdflib/plugins/parsers/ntriples.py
def __init__(self):
    self.length = 0

length instance-attribute

length = 0

triple

triple(s, p, o)
Source code in rdflib/plugins/parsers/ntriples.py
def triple(self, s, p, o):
    self.length += 1
    print(s, p, o)

NTGraphSink

NTGraphSink(graph: Graph)

Methods:

Attributes:

Source code in rdflib/plugins/parsers/ntriples.py
def __init__(self, graph: Graph):
    self.g = graph

__slots__ class-attribute instance-attribute

__slots__ = ('g',)

g instance-attribute

g = graph

triple

triple(s: _SubjectType, p: _PredicateType, o: _ObjectType) -> None
Source code in rdflib/plugins/parsers/ntriples.py
def triple(self, s: _SubjectType, p: _PredicateType, o: _ObjectType) -> None:
    self.g.add((s, p, o))

NTParser

NTParser()

Bases: Parser

Parser for the N-Triples format, often stored with the .nt extension.

See http://www.w3.org/TR/rdf-testcases/#ntriples

Methods:

  • parse

    Parse the NT format.

Attributes:

Source code in rdflib/parser.py
def __init__(self):
    pass

__slots__ class-attribute instance-attribute

__slots__ = ()

parse classmethod

parse(source: InputSource, sink: Graph, **kwargs: Any) -> None

Parse the NT format.

Parameters:

  • source

    (InputSource) –

    The source of NT-formatted data

  • sink

    (Graph) –

    Where to send parsed triples

  • **kwargs

    (Any, default: {} ) –

    Additional arguments to pass to W3CNTriplesParser.parse

Source code in rdflib/plugins/parsers/ntriples.py
@classmethod
def parse(cls, source: InputSource, sink: Graph, **kwargs: Any) -> None:
    """Parse the NT format.

    Args:
        source: The source of NT-formatted data
        sink: Where to send parsed triples
        **kwargs: Additional arguments to pass to `W3CNTriplesParser.parse`
    """
    f: Union[TextIO, IO[bytes], codecs.StreamReader]
    f = source.getCharacterStream()
    if not f:
        b = source.getByteStream()
        # TextIOBase includes: StringIO and TextIOWrapper
        if isinstance(b, TextIOBase):
            # f is not really a ByteStream, but a CharacterStream
            f = b  # type: ignore[assignment]
        else:
            # since N-Triples 1.1 files can and should be utf-8 encoded
            f = codecs.getreader("utf-8")(b)
    parser = W3CNTriplesParser(NTGraphSink(sink))
    parser.parse(f, **kwargs)
    f.close()

W3CNTriplesParser

W3CNTriplesParser(sink: Optional[Union[DummySink, NTGraphSink]] = None, bnode_context: Optional[_BNodeContextType] = None)

An N-Triples Parser.

This is a legacy-style Triples parser for NTriples provided by W3C

Example
p = W3CNTriplesParser(sink=MySink())
sink = p.parse(f) # file; use parsestring for a string

To define a context in which blank node identifiers refer to the same blank node across instances of NTriplesParser, pass the same dict as bnode_context to each instance. By default, a new blank node context is created for each instance of W3CNTriplesParser.

Methods:

Attributes:

Source code in rdflib/plugins/parsers/ntriples.py
def __init__(
    self,
    sink: Optional[Union[DummySink, NTGraphSink]] = None,
    bnode_context: Optional[_BNodeContextType] = None,
):
    self.skolemize = False

    if bnode_context is not None:
        self._bnode_ids = bnode_context
    else:
        self._bnode_ids = {}

    self.sink: Union[DummySink, NTGraphSink]
    if sink is not None:
        self.sink = sink
    else:
        self.sink = DummySink()

    self.buffer: Optional[str] = None
    self.file: Optional[Union[TextIO, codecs.StreamReader]] = None
    self.line: Optional[str] = ""

__slots__ class-attribute instance-attribute

__slots__ = ('_bnode_ids', 'sink', 'buffer', 'file', 'line', 'skolemize')

buffer instance-attribute

buffer: Optional[str] = None

file instance-attribute

file: Optional[Union[TextIO, StreamReader]] = None

line instance-attribute

line: Optional[str] = ''

sink instance-attribute

sink: Union[DummySink, NTGraphSink]

skolemize instance-attribute

skolemize = False

eat

eat(pattern: Pattern[str]) -> Match[str]
Source code in rdflib/plugins/parsers/ntriples.py
def eat(self, pattern: Pattern[str]) -> Match[str]:
    m = pattern.match(self.line)  # type: ignore[arg-type]
    if not m:  # @@ Why can't we get the original pattern?
        # print(dir(pattern))
        # print repr(self.line), type(self.line)
        raise ParseError("Failed to eat %s at %s" % (pattern.pattern, self.line))
    self.line = self.line[m.end() :]  # type: ignore[index]
    return m

literal

literal() -> Union[Literal[False], Literal]
Source code in rdflib/plugins/parsers/ntriples.py
def literal(self) -> Union[te.Literal[False], Literal]:
    if self.peek('"'):
        lit, lang, dtype = self.eat(r_literal).groups()
        if lang:
            lang = lang
        else:
            lang = None
        if dtype:
            dtype = unquote(dtype)
            dtype = uriquote(dtype)
            dtype = URI(dtype)
        else:
            dtype = None
        if lang and dtype:
            raise ParseError("Can't have both a language and a datatype")
        lit = unquote(lit)
        return Literal(lit, lang, dtype)
    return False

nodeid

nodeid(bnode_context: Optional[_BNodeContextType] = None) -> Union[Literal[False], BNode, URIRef]
Source code in rdflib/plugins/parsers/ntriples.py
def nodeid(
    self, bnode_context: Optional[_BNodeContextType] = None
) -> Union[te.Literal[False], bNode, URI]:
    if self.peek("_"):
        if self.skolemize:
            bnode_id = self.eat(r_nodeid).group(1)
            return bNode(bnode_id).skolemize()

        else:
            # Fix for https://github.com/RDFLib/rdflib/issues/204
            if bnode_context is None:
                bnode_context = self._bnode_ids
            bnode_id = self.eat(r_nodeid).group(1)
            new_id = bnode_context.get(bnode_id, None)
            if new_id is not None:
                # Re-map to id specific to this doc
                return bNode(new_id)
            else:
                # Replace with freshly-generated document-specific BNode id
                bnode = bNode()
                # Store the mapping
                bnode_context[bnode_id] = bnode
                return bnode
    return False

object

object(bnode_context: Optional[_BNodeContextType] = None) -> Union[URIRef, BNode, Literal]
Source code in rdflib/plugins/parsers/ntriples.py
def object(
    self, bnode_context: Optional[_BNodeContextType] = None
) -> Union[URI, bNode, Literal]:
    objt = self.uriref() or self.nodeid(bnode_context) or self.literal()
    if objt is False:
        raise ParseError("Unrecognised object type")
    return objt

parse

parse(f: Union[TextIO, IO[bytes], StreamReader], bnode_context: Optional[_BNodeContextType] = None, skolemize: bool = False) -> Union[DummySink, NTGraphSink]

Parse f as an N-Triples file.

Parameters:

  • f

    (Union[TextIO, IO[bytes], StreamReader]) –

    The N-Triples source

  • bnode_context

    (Optional[_BNodeContextType], default: None ) –

    A dict mapping blank node identifiers (e.g., a in _:a) to BNode instances. An empty dict can be passed in to define a distinct context for a given call to parse.

  • skolemize

    (bool, default: False ) –

    Whether to skolemize blank nodes

Returns:

Source code in rdflib/plugins/parsers/ntriples.py
def parse(
    self,
    f: Union[TextIO, IO[bytes], codecs.StreamReader],
    bnode_context: Optional[_BNodeContextType] = None,
    skolemize: bool = False,
) -> Union[DummySink, NTGraphSink]:
    """Parse f as an N-Triples file.

    Args:
        f: The N-Triples source
        bnode_context: A dict mapping blank node identifiers (e.g., `a` in `_:a`)
            to [`BNode`][rdflib.term.BNode] instances. An empty dict can be
            passed in to define a distinct context for a given call to
            `parse`.
        skolemize: Whether to skolemize blank nodes

    Returns:
        The sink containing the parsed triples
    """

    if not hasattr(f, "read"):
        raise ParseError("Item to parse must be a file-like object.")

    if not hasattr(f, "encoding") and not hasattr(f, "charbuffer"):
        # someone still using a bytestream here?
        f = codecs.getreader("utf-8")(f)

    self.skolemize = skolemize
    self.file = f  # type: ignore[assignment]
    self.buffer = ""
    while True:
        self.line = self.readline()
        if self.line is None:
            break
        try:
            self.parseline(bnode_context=bnode_context)
        except ParseError:
            raise ParseError("Invalid line: {}".format(self.line))
    return self.sink

parseline

parseline(bnode_context: Optional[_BNodeContextType] = None) -> None
Source code in rdflib/plugins/parsers/ntriples.py
def parseline(self, bnode_context: Optional[_BNodeContextType] = None) -> None:
    self.eat(r_wspace)
    if (not self.line) or self.line.startswith("#"):
        return  # The line is empty or a comment

    subject = self.subject(bnode_context)
    self.eat(r_wspaces)

    predicate = self.predicate()
    self.eat(r_wspaces)

    object_ = self.object(bnode_context)
    self.eat(r_tail)

    if self.line:
        raise ParseError("Trailing garbage: {}".format(self.line))
    self.sink.triple(subject, predicate, object_)

parsestring

parsestring(s: Union[bytes, bytearray, str], **kwargs) -> None

Parse s as an N-Triples string.

Source code in rdflib/plugins/parsers/ntriples.py
def parsestring(self, s: Union[bytes, bytearray, str], **kwargs) -> None:
    """Parse s as an N-Triples string."""
    if not isinstance(s, (str, bytes, bytearray)):
        raise ParseError("Item to parse must be a string instance.")
    f: Union[codecs.StreamReader, StringIO]
    if isinstance(s, (bytes, bytearray)):
        f = codecs.getreader("utf-8")(BytesIO(s))
    else:
        f = StringIO(s)
    self.parse(f, **kwargs)

peek

peek(token: str) -> bool
Source code in rdflib/plugins/parsers/ntriples.py
def peek(self, token: str) -> bool:
    return self.line.startswith(token)  # type: ignore[union-attr]

predicate

predicate() -> Union[BNode, URIRef]
Source code in rdflib/plugins/parsers/ntriples.py
def predicate(self) -> Union[bNode, URIRef]:
    pred = self.uriref()
    if not pred:
        raise ParseError("Predicate must be uriref")
    return pred

readline

readline() -> Optional[str]

Read an N-Triples line from buffered input.

Source code in rdflib/plugins/parsers/ntriples.py
def readline(self) -> Optional[str]:
    """Read an N-Triples line from buffered input."""
    # N-Triples lines end in either CRLF, CR, or LF
    # Therefore, we can't just use f.readline()
    if not self.buffer:
        # type error: Item "None" of "Union[TextIO, StreamReader, None]" has no attribute "read"
        buffer = self.file.read(bufsiz)  # type: ignore[union-attr]
        if not buffer:
            return None
        self.buffer = buffer

    while True:
        m = r_line.match(self.buffer)
        if m:  # the more likely prospect
            self.buffer = self.buffer[m.end() :]
            return m.group(1)
        else:
            # type error: Item "None" of "Union[TextIO, StreamReader, None]" has no attribute "read"
            buffer = self.file.read(bufsiz)  # type: ignore[union-attr]
            if not buffer and not self.buffer.isspace():
                # Last line does not need to be terminated with a newline
                buffer += "\n"
            elif not buffer:
                return None
            self.buffer += buffer

subject

subject(bnode_context=None) -> Union[BNode, URIRef]
Source code in rdflib/plugins/parsers/ntriples.py
def subject(self, bnode_context=None) -> Union[bNode, URIRef]:
    # @@ Consider using dictionary cases
    subj = self.uriref() or self.nodeid(bnode_context)
    if not subj:
        raise ParseError("Subject must be uriref or nodeID")
    return subj

uriref

uriref() -> Union[Literal[False], URIRef]
Source code in rdflib/plugins/parsers/ntriples.py
def uriref(self) -> Union[te.Literal[False], URI]:
    if self.peek("<"):
        uri = self.eat(r_uriref).group(1)
        uri = unquote(uri)
        uri = uriquote(uri)
        return URI(uri)
    return False

unquote

unquote(s: str) -> str

Unquote an N-Triples string.

Source code in rdflib/plugins/parsers/ntriples.py
def unquote(s: str) -> str:
    """Unquote an N-Triples string."""
    if not validate:
        if isinstance(s, str):  # nquads
            s = decodeUnicodeEscape(s)
        else:
            s = s.decode("unicode-escape")  # type: ignore[unreachable]

        return s
    else:
        result = []
        while s:
            m = r_safe.match(s)
            if m:
                s = s[m.end() :]
                result.append(m.group(1))
                continue

            m = r_quot.match(s)
            if m:
                s = s[2:]
                result.append(_string_escape_map[m.group(1)])
                continue

            m = r_uniquot.match(s)
            if m:
                s = s[m.end() :]
                u, U = m.groups()  # noqa: N806
                codepoint = int(u or U, 16)
                if codepoint > 0x10FFFF:
                    raise ParseError("Disallowed codepoint: %08X" % codepoint)
                result.append(chr(codepoint))
            elif s.startswith("\\"):
                raise ParseError("Illegal escape at: %s..." % s[:10])
            else:
                raise ParseError("Illegal literal character: %r" % s[0])
        return "".join(result)

uriquote

uriquote(uri: str) -> str
Source code in rdflib/plugins/parsers/ntriples.py
def uriquote(uri: str) -> str:
    if not validate:
        return uri
    else:
        return r_hibyte.sub(lambda m: "%%%02X" % ord(m.group(1)), uri)