Skip to content

parser

Parser plugin interface.

This module defines the parser plugin interface and contains other related parser support code.

The module is mainly useful for those wanting to write a parser that can plugin to rdflib. If you are wanting to invoke a parser you likely want to do so through the Graph class parse method.

Classes:

__all__ module-attribute

__all__ = ['Parser', 'InputSource', 'StringInputSource', 'URLInputSource', 'FileInputSource', 'PythonInputSource']

headers module-attribute

headers = {'User-agent': 'rdflib-%s (https://rdflib.github.io/; eikeon@eikeon.com)' % __version__}

BytesIOWrapper

BytesIOWrapper(wrapped: Union[str, StringIO, TextIOBase], encoding='utf-8')

Bases: BufferedIOBase

Methods:

Attributes:

Source code in rdflib/parser.py
def __init__(self, wrapped: Union[str, StringIO, TextIOBase], encoding="utf-8"):
    super(BytesIOWrapper, self).__init__()
    self.wrapped = wrapped
    self.encoding = encoding
    self.encoder = codecs.getencoder(self.encoding)
    self.enc_str: Optional[Union[BytesIO, BufferedIOBase]] = None
    self.text_str: Optional[Union[StringIO, TextIOBase]] = None
    self.has_read1: Optional[bool] = None
    self.has_seek: Optional[bool] = None
    self._name: Optional[str] = None
    self._fileno: Optional[Union[int, BaseException]] = None
    self._isatty: Optional[Union[bool, BaseException]] = None
    self._leftover: bytes = b""
    self._text_bytes_offset: int = 0
    norm_encoding = encoding.lower().replace("_", "-")
    if norm_encoding in ("utf-8", "utf8", "u8", "cp65001"):
        # utf-8 has a variable number of bytes per character, 1-4
        self._bytes_per_char: int = 1  # assume average of 1 byte per character
    elif norm_encoding in (
        "latin1",
        "latin-1",
        "iso-8859-1",
        "iso8859-1",
        "ascii",
        "us-ascii",
    ):
        # these are all 1-byte-per-character encodings
        self._bytes_per_char = 1
    elif norm_encoding.startswith("utf-16") or norm_encoding.startswith("utf16"):
        # utf-16 has a variable number of bytes per character, 2-3
        self._bytes_per_char = 2  # assume average of 2 bytes per character
    elif norm_encoding.startswith("utf-32") or norm_encoding.startswith("utf32"):
        # utf-32 is fixed length with 4 bytes per character
        self._bytes_per_char = 4
    else:
        # not sure, just assume it is 2 bytes per character
        self._bytes_per_char = 2

__slots__ class-attribute instance-attribute

__slots__ = ('wrapped', 'enc_str', 'text_str', 'encoding', 'encoder', 'has_read1', 'has_seek', '_name', '_fileno', '_isatty', '_leftover', '_bytes_per_char', '_text_bytes_offset')

closed property

closed: bool

enc_str instance-attribute

enc_str: Optional[Union[BytesIO, BufferedIOBase]] = None

encoder instance-attribute

encoder = getencoder(encoding)

encoding instance-attribute

encoding = encoding

has_read1 instance-attribute

has_read1: Optional[bool] = None

has_seek instance-attribute

has_seek: Optional[bool] = None

name property

name: Any

text_str instance-attribute

text_str: Optional[Union[StringIO, TextIOBase]] = None

wrapped instance-attribute

wrapped = wrapped

close

close()
Source code in rdflib/parser.py
def close(self):
    if self.enc_str is None and self.text_str is None:
        return
    if self.enc_str is not None:
        try:
            self.enc_str.close()
        except AttributeError:
            pass
    elif self.text_str is not None:
        try:
            self.text_str.close()
        except AttributeError:
            pass

fileno

fileno() -> int
Source code in rdflib/parser.py
def fileno(self) -> int:
    if self._fileno is None:
        self._check_fileno()
    if isinstance(self._fileno, BaseException):
        raise self._fileno
    else:
        return -1 if self._fileno is None else self._fileno

flush

flush()
Source code in rdflib/parser.py
def flush(self):
    return  # Does nothing on read-only streams

isatty

isatty() -> bool
Source code in rdflib/parser.py
def isatty(self) -> bool:
    if self._isatty is None:
        self._check_isatty()
    if isinstance(self._isatty, BaseException):
        raise self._isatty
    else:
        return bool(self._isatty)

read

read(size: Optional[int] = -1) -> bytes

Read at most size bytes, returned as a bytes object.

If the size argument is negative or omitted read until EOF is reached. Return an empty bytes object if already at EOF.

Source code in rdflib/parser.py
def read(self, size: Optional[int] = -1, /) -> bytes:
    """
    Read at most size bytes, returned as a bytes object.

    If the size argument is negative or omitted read until EOF is reached.
    Return an empty bytes object if already at EOF.
    """
    if size is not None and size == 0:
        return b""
    if self.enc_str is None and self.text_str is None:
        self._init()
    if self.enc_str is not None:
        ret_bytes = self.enc_str.read(size)
    else:
        ret_bytes = self._read_bytes_from_text_stream(size)
    return ret_bytes

read1

read1(size: Optional[int] = -1) -> bytes

Read at most size bytes, with at most one call to the underlying raw stream’s read() or readinto() method. Returned as a bytes object.

If the size argument is negative or omitted, read until EOF is reached. Return an empty bytes object at EOF.

Source code in rdflib/parser.py
def read1(self, size: Optional[int] = -1, /) -> bytes:
    """
    Read at most size bytes, with at most one call to the underlying raw stream’s
    read() or readinto() method. Returned as a bytes object.

    If the size argument is negative or omitted, read until EOF is reached.
    Return an empty bytes object at EOF.
    """
    if (self.enc_str is None and self.text_str is None) or self.has_read1 is None:
        self._init()
    if not self.has_read1:
        raise NotImplementedError()
    if self.enc_str is not None:
        if size is None or size < 0:
            return self.enc_str.read1()
        return self.enc_str.read1(size)
    raise NotImplementedError("read1() not supported for TextIO in BytesIOWrapper")

readable

readable() -> bool
Source code in rdflib/parser.py
def readable(self) -> bool:
    return True

readinto

readinto(b: Buffer) -> int

Read len(b) bytes into buffer b.

Returns number of bytes read (0 for EOF), or error if the object is set not to block and has no data to read.

Source code in rdflib/parser.py
def readinto(self, b: Buffer, /) -> int:
    """
    Read len(b) bytes into buffer b.

    Returns number of bytes read (0 for EOF), or error if the object
    is set not to block and has no data to read.
    """
    if TYPE_CHECKING:
        assert isinstance(b, (memoryview, bytearray))
    if len(b) == 0:
        return 0
    if self.enc_str is None and self.text_str is None:
        self._init()
    if self.enc_str is not None:
        return self.enc_str.readinto(b)
    else:
        size = len(b)
        read_data: bytes = self._read_bytes_from_text_stream(size)
        read_len = len(read_data)
        if read_len == 0:
            return 0
        b[:read_len] = read_data
        return read_len

readinto1

readinto1(b: Buffer) -> int

Read len(b) bytes into buffer b, with at most one call to the underlying raw stream’s read() or readinto() method.

Returns number of bytes read (0 for EOF), or error if the object is set not to block and has no data to read.

Source code in rdflib/parser.py
def readinto1(self, b: Buffer, /) -> int:
    """
    Read len(b) bytes into buffer b, with at most one call to the underlying raw
    stream's read() or readinto() method.

    Returns number of bytes read (0 for EOF), or error if the object
    is set not to block and has no data to read.
    """
    if TYPE_CHECKING:
        assert isinstance(b, (memoryview, bytearray))
    if (self.enc_str is None and self.text_str is None) or self.has_read1 is None:
        self._init()
    if not self.has_read1:
        raise NotImplementedError()
    if self.enc_str is not None:
        return self.enc_str.readinto1(b)
    raise NotImplementedError(
        "readinto1() not supported for TextIO in BytesIOWrapper"
    )

seek

seek(offset: int, whence: int = 0) -> int
Source code in rdflib/parser.py
def seek(self, offset: int, whence: int = 0, /) -> int:
    if self.has_seek is not None and not self.has_seek:
        raise NotImplementedError()
    if (self.enc_str is None and self.text_str is None) or self.has_seek is None:
        self._init()

    if not whence == 0:
        raise NotImplementedError("Only SEEK_SET is supported on BytesIOWrapper")
    if offset != 0:
        raise NotImplementedError(
            "Only seeking to zero is supported on BytesIOWrapper"
        )
    if self.enc_str is not None:
        self.enc_str.seek(offset, whence)
    elif self.text_str is not None:
        self.text_str.seek(offset, whence)
    self._text_bytes_offset = 0
    self._leftover = b""
    return 0

seekable

seekable()
Source code in rdflib/parser.py
def seekable(self):
    if (self.enc_str is None and self.text_str is None) or self.has_seek is None:
        self._init()
    return self.has_seek

tell

tell() -> int
Source code in rdflib/parser.py
def tell(self) -> int:
    if self.has_seek is not None and not self.has_seek:
        raise NotImplementedError("Cannot tell() pos because file is not seekable.")
    if self.enc_str is not None:
        try:
            self._text_bytes_offset = self.enc_str.tell()
        except AttributeError:
            pass
    return self._text_bytes_offset

truncate

truncate(size: Optional[int] = None) -> int
Source code in rdflib/parser.py
def truncate(self, size: Optional[int] = None) -> int:
    raise NotImplementedError("Cannot truncate on BytesIOWrapper")

writable

writable() -> bool
Source code in rdflib/parser.py
def writable(self) -> bool:
    return False

write

write(b)
Source code in rdflib/parser.py
def write(self, b, /):
    raise NotImplementedError("Cannot write to a BytesIOWrapper")

FileInputSource

FileInputSource(file: Union[BinaryIO, TextIO, TextIOBase, RawIOBase, BufferedIOBase], /, encoding: Optional[str] = None)

Bases: InputSource

Methods:

Attributes:

Source code in rdflib/parser.py
def __init__(
    self,
    file: Union[BinaryIO, TextIO, TextIOBase, RawIOBase, BufferedIOBase],
    /,
    encoding: Optional[str] = None,
):
    base = pathlib.Path.cwd().as_uri()
    system_id = URIRef(pathlib.Path(file.name).absolute().as_uri(), base=base)  # type: ignore[union-attr]
    super(FileInputSource, self).__init__(system_id)
    self.file = file
    if isinstance(file, TextIOBase):  # Python3 unicode fp
        self.setCharacterStream(file)
        self.setEncoding(file.encoding)
        try:
            b = file.buffer  # type: ignore[attr-defined]
            self.setByteStream(b)
        except (AttributeError, LookupError):
            self.setByteStream(BytesIOWrapper(file, encoding=file.encoding))
    else:
        if TYPE_CHECKING:
            assert isinstance(file, BufferedReader)
        self.setByteStream(file)
        if encoding is not None:
            self.setEncoding(encoding)
            self.setCharacterStream(TextIOWrapper(file, encoding=encoding))
        else:
            # We cannot set characterStream here because
            # we do not know the Raw Bytes File encoding.
            pass

file instance-attribute

file = file

__repr__

__repr__() -> str
Source code in rdflib/parser.py
def __repr__(self) -> str:
    return repr(self.file)

InputSource

InputSource(system_id: Optional[str] = None)

Bases: InputSource

TODO:

Methods:

Attributes:

Source code in rdflib/parser.py
def __init__(self, system_id: Optional[str] = None):
    xmlreader.InputSource.__init__(self, system_id=system_id)
    self.content_type: Optional[str] = None
    self.auto_close = False  # see Graph.parse(), true if opened by us

auto_close instance-attribute

auto_close = False

content_type instance-attribute

content_type: Optional[str] = None

close

close() -> None
Source code in rdflib/parser.py
def close(self) -> None:
    c = self.getCharacterStream()
    if c and hasattr(c, "close"):
        try:
            c.close()
        except Exception:
            pass
    f = self.getByteStream()
    if f and hasattr(f, "close"):
        try:
            f.close()
        except Exception:
            pass

Parser

Parser()

Methods:

Attributes:

Source code in rdflib/parser.py
def __init__(self):
    pass

__slots__ class-attribute instance-attribute

__slots__ = ()

parse

parse(source: InputSource, sink: Graph) -> None
Source code in rdflib/parser.py
def parse(self, source: InputSource, sink: Graph) -> None:
    pass

PythonInputSource

PythonInputSource(data: Any, system_id: Optional[str] = None)

Bases: InputSource

Constructs an RDFLib Parser InputSource from a Python data structure, for example, loaded from JSON with json.load or json.loads:

import json as_string = “”“{ … “@context” : {“ex” : “http://example.com/ns#”}, … “@graph”: [{“@type”: “ex:item”, “@id”: “#example”}] … }”“” as_python = json.loads(as_string) source = create_input_source(data=as_python) isinstance(source, PythonInputSource) True

Methods:

Attributes:

Source code in rdflib/parser.py
def __init__(self, data: Any, system_id: Optional[str] = None):
    self.content_type = None
    self.auto_close = False  # see Graph.parse(), true if opened by us
    self.public_id: Optional[str] = None
    self.system_id: Optional[str] = system_id
    self.data = data

auto_close instance-attribute

auto_close = False

content_type instance-attribute

content_type = None

data instance-attribute

data = data

public_id instance-attribute

public_id: Optional[str] = None

system_id instance-attribute

system_id: Optional[str] = system_id

close

close() -> None
Source code in rdflib/parser.py
def close(self) -> None:
    self.data = None

getPublicId

getPublicId() -> Optional[str]
Source code in rdflib/parser.py
def getPublicId(self) -> Optional[str]:  # noqa: N802
    return self.public_id

getSystemId

getSystemId() -> Optional[str]
Source code in rdflib/parser.py
def getSystemId(self) -> Optional[str]:  # noqa: N802
    return self.system_id

setPublicId

setPublicId(public_id: Optional[str]) -> None
Source code in rdflib/parser.py
def setPublicId(self, public_id: Optional[str]) -> None:  # noqa: N802
    self.public_id = public_id

setSystemId

setSystemId(system_id: Optional[str]) -> None
Source code in rdflib/parser.py
def setSystemId(self, system_id: Optional[str]) -> None:  # noqa: N802
    self.system_id = system_id

StringInputSource

StringInputSource(value: Union[str, bytes], encoding: str = 'utf-8', system_id: Optional[str] = None)

Bases: InputSource

Constructs an RDFLib Parser InputSource from a Python String or Bytes

Source code in rdflib/parser.py
def __init__(
    self,
    value: Union[str, bytes],
    encoding: str = "utf-8",
    system_id: Optional[str] = None,
):
    super(StringInputSource, self).__init__(system_id)
    stream: Union[BinaryIO, TextIO]
    if isinstance(value, str):
        stream = StringIO(value)
        self.setCharacterStream(stream)
        self.setEncoding(encoding)
        b_stream = BytesIOWrapper(value, encoding)
        self.setByteStream(b_stream)
    else:
        stream = BytesIO(value)
        self.setByteStream(stream)
        c_stream = TextIOWrapper(stream, encoding)
        self.setCharacterStream(c_stream)
        self.setEncoding(c_stream.encoding)

URLInputSource

URLInputSource(system_id: Optional[str] = None, format: Optional[str] = None)

Bases: InputSource

Constructs an RDFLib Parser InputSource from a URL to read it from the Web.

Methods:

Attributes:

Source code in rdflib/parser.py
def __init__(self, system_id: Optional[str] = None, format: Optional[str] = None):
    super(URLInputSource, self).__init__(system_id)
    self.url = system_id

    # copy headers to change
    myheaders = dict(headers)
    if format == "xml":
        myheaders["Accept"] = "application/rdf+xml, */*;q=0.1"
    elif format == "n3":
        myheaders["Accept"] = "text/n3, */*;q=0.1"
    elif format in ["turtle", "ttl"]:
        myheaders["Accept"] = "text/turtle, application/x-turtle, */*;q=0.1"
    elif format == "nt":
        myheaders["Accept"] = "text/plain, */*;q=0.1"
    elif format == "trig":
        myheaders["Accept"] = "application/trig, */*;q=0.1"
    elif format == "trix":
        myheaders["Accept"] = "application/trix, */*;q=0.1"
    elif format == "json-ld":
        myheaders["Accept"] = (
            "application/ld+json, application/json;q=0.9, */*;q=0.1"
        )
    else:
        # if format not given, create an Accept header from all registered
        # parser Media Types
        from rdflib.parser import Parser
        from rdflib.plugin import plugins

        acc = []
        for p in plugins(kind=Parser):  # only get parsers
            if "/" in p.name:  # all Media Types known have a / in them
                acc.append(p.name)

        myheaders["Accept"] = ", ".join(acc)

    req = Request(system_id, None, myheaders)  # type: ignore[arg-type]

    response: addinfourl = _urlopen(req)
    self.url = response.geturl()  # in case redirections took place
    self.links = self.get_links(response)
    if format in ("json-ld", "application/ld+json"):
        alts = self.get_alternates(type_="application/ld+json")
        for link in alts:
            full_link = urljoin(self.url, link)
            if full_link != self.url and full_link != system_id:
                response = _urlopen(Request(full_link))
                self.url = response.geturl()  # in case redirections took place
                break

    self.setPublicId(self.url)
    content_types = self.getallmatchingheaders(response.headers, "content-type")
    self.content_type = content_types[0] if content_types else None
    if self.content_type is not None:
        self.content_type = self.content_type.split(";", 1)[0]
    self.setByteStream(response)
    # TODO: self.setEncoding(encoding)
    self.response_info = response.info()  # a mimetools.Message instance

content_type instance-attribute

content_type = content_types[0] if content_types else None
links: List[str] = get_links(response)

response_info instance-attribute

response_info = info()

url instance-attribute

url = geturl()

__repr__

__repr__() -> str
Source code in rdflib/parser.py
def __repr__(self) -> str:
    # type error: Incompatible return value type (got "Optional[str]", expected "str")
    return self.url  # type: ignore[return-value]

get_alternates

get_alternates(type_: Optional[str] = None) -> List[str]
Source code in rdflib/parser.py
def get_alternates(self, type_: Optional[str] = None) -> List[str]:
    typestr: Optional[str] = f'type="{type_}"' if type_ else None
    relstr = 'rel="alternate"'
    alts = []
    for link in self.links:
        parts = [p.strip() for p in link.split(";")]
        if relstr not in parts:
            continue
        if typestr:
            if typestr in parts:
                alts.append(parts[0].strip("<>"))
        else:
            alts.append(parts[0].strip("<>"))
    return alts
get_links(response: addinfourl) -> List[str]
Source code in rdflib/parser.py
@classmethod
def get_links(cls, response: addinfourl) -> List[str]:
    linkslines = cls.getallmatchingheaders(response.headers, "Link")
    retarray: List[str] = []
    for linksline in linkslines:
        links = [linkstr.strip() for linkstr in linksline.split(",")]
        for link in links:
            retarray.append(link)
    return retarray

getallmatchingheaders classmethod

getallmatchingheaders(message: Message, name) -> List[str]
Source code in rdflib/parser.py
@classmethod
def getallmatchingheaders(cls, message: Message, name) -> List[str]:
    # This is reimplemented here, because the method
    # getallmatchingheaders from HTTPMessage is broken since Python 3.0
    name = name.lower()
    return [val for key, val in message.items() if key.lower() == name]

create_input_source

create_input_source(source: Optional[Union[IO[bytes], TextIO, InputSource, str, bytes, PurePath]] = None, publicID: Optional[str] = None, location: Optional[str] = None, file: Optional[Union[BinaryIO, TextIO]] = None, data: Optional[Union[str, bytes, dict]] = None, format: Optional[str] = None) -> InputSource

Return an appropriate InputSource instance for the given parameters.

Source code in rdflib/parser.py
def create_input_source(
    source: Optional[
        Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath]
    ] = None,
    publicID: Optional[str] = None,  # noqa: N803
    location: Optional[str] = None,
    file: Optional[Union[BinaryIO, TextIO]] = None,
    data: Optional[Union[str, bytes, dict]] = None,
    format: Optional[str] = None,
) -> InputSource:
    """
    Return an appropriate InputSource instance for the given
    parameters.
    """

    # test that exactly one of source, location, file, and data is not None.
    non_empty_arguments = list(
        filter(
            lambda v: v is not None,
            [source, location, file, data],
        )
    )

    if len(non_empty_arguments) != 1:
        raise ValueError(
            "exactly one of source, location, file or data must be given",
        )

    input_source = None

    if source is not None:
        if TYPE_CHECKING:
            assert file is None
            assert data is None
            assert location is None
        if isinstance(source, InputSource):
            input_source = source
        else:
            if isinstance(source, str):
                location = source
            elif isinstance(source, pathlib.PurePath):
                location = str(source)
            elif isinstance(source, bytes):
                data = source
            elif hasattr(source, "read") and not isinstance(source, Namespace):
                f = source
                input_source = InputSource()
                if hasattr(source, "encoding"):
                    input_source.setCharacterStream(source)
                    input_source.setEncoding(source.encoding)
                    try:
                        b = source.buffer  # type: ignore[union-attr]
                        input_source.setByteStream(b)
                    except (AttributeError, LookupError):
                        input_source.setByteStream(source)
                else:
                    input_source.setByteStream(f)
                if f is sys.stdin:
                    input_source.setSystemId("file:///dev/stdin")
                elif hasattr(f, "name"):
                    input_source.setSystemId(f.name)
            else:
                raise Exception(
                    "Unexpected type '%s' for source '%s'" % (type(source), source)
                )

    absolute_location = None  # Further to fix for issue 130

    auto_close = False  # make sure we close all file handles we open

    if location is not None:
        if TYPE_CHECKING:
            assert file is None
            assert data is None
            assert source is None
        (
            absolute_location,
            auto_close,
            file,
            input_source,
        ) = _create_input_source_from_location(
            file=file,
            format=format,
            input_source=input_source,
            location=location,
        )

    if file is not None:
        if TYPE_CHECKING:
            assert location is None
            assert data is None
            assert source is None
        input_source = FileInputSource(file)

    if data is not None:
        if TYPE_CHECKING:
            assert location is None
            assert file is None
            assert source is None
        if isinstance(data, dict):
            input_source = PythonInputSource(data)
            auto_close = True
        elif isinstance(data, (str, bytes, bytearray)):
            input_source = StringInputSource(data)
            auto_close = True
        else:
            raise RuntimeError(f"parse data can only str, or bytes. not: {type(data)}")

    if input_source is None:
        raise Exception("could not create InputSource")
    else:
        input_source.auto_close |= auto_close
        if publicID is not None:  # Further to fix for issue 130
            input_source.setPublicId(publicID)
        # Further to fix for issue 130
        elif input_source.getPublicId() is None:
            input_source.setPublicId(absolute_location or "")
        return input_source