Source code for triplemodel.store.ops

"""pyoxigraph Store operations (bulk load, backup, named graphs, patterns)."""

from __future__ import annotations

from collections.abc import Iterator
from pathlib import Path
from typing import Any

from pyoxigraph import BlankNode, DefaultGraph, NamedNode, Quad, Store as OxigraphStore

from triplemodel.store import RdfGraph as Graph
from triplemodel.store.formats import to_rdf_format
from triplemodel.store.graph import GraphName, _graph_name


def _resolve_store(graph: Graph | None, store_path: str | Path | None) -> OxigraphStore:
    if graph is not None:
        return graph.store
    if store_path is None:
        raise ValueError("Pass graph= or store_path= for this operation.")
    return OxigraphStore(str(store_path))


def _resolve_disk_path(graph: Graph | None, store_path: str | Path | None) -> Path:
    if store_path is not None:
        return Path(store_path)
    if graph is None:
        raise ValueError("Pass graph= or store_path= for disk store operations.")
    path = graph.disk_store_path or graph.ephemeral_store_path
    if path is None:
        raise ValueError(
            "Disk store operations require open_graph('disk', path) or parse_into_store_graph; "
            "pass store_path= explicitly for a pyoxigraph on-disk directory."
        )
    return Path(path)


def _coerce_graph_name(
    graph: Graph | None,
    to_graph: str | NamedNode | DefaultGraph | None,
) -> NamedNode | DefaultGraph | None:
    if to_graph is None:
        if graph is not None:
            return graph.graph_name
        return None
    return _graph_name(to_graph)


[docs] def bulk_load_into_graph( graph: Graph, source: str | Path | None = None, *, data: str | bytes | None = None, format: str | None = None, base_iri: str | None = None, to_graph: str | NamedNode | DefaultGraph | None = None, lenient: bool = False, ) -> None: """Load RDF from a file or bytes into ``graph.store`` via :meth:`pyoxigraph.Store.bulk_load`.""" from triplemodel.io.files import infer_format if data is not None and source is not None: raise ValueError("Pass source= or data=, not both.") if data is None and source is None: raise ValueError("bulk_load_into_graph requires source= or data=.") hint: str | Path | None = source if data is None else None fmt = infer_format(hint, format) rdf_format = to_rdf_format(fmt) target = _coerce_graph_name(graph, to_graph) if data is not None: payload: str | bytes = data.encode("utf-8") if isinstance(data, str) else data graph.store.bulk_load( payload, format=rdf_format, base_iri=base_iri, to_graph=target, lenient=lenient, ) elif source is not None: graph.store.bulk_load( format=rdf_format, path=str(source), base_iri=base_iri, to_graph=target, lenient=lenient, )
[docs] def dump_store( output: str | Path, *, graph: Graph | None = None, store_path: str | Path | None = None, format: str = "nquads", from_graph: str | NamedNode | DefaultGraph | None = None, prefixes: dict[str, str] | None = None, base_iri: str | None = None, ) -> None: """Dump an on-disk store to a file (default N-Quads).""" store = _resolve_store(graph, store_path) rdf_format = to_rdf_format(format) graph_name = ( _coerce_graph_name(graph, from_graph) if from_graph is not None else None ) if graph is not None and from_graph is None: graph_name = graph.graph_name prefix_map = prefixes if prefix_map is None and graph is not None: prefix_map = dict(graph._prefixes) or None store.dump( str(output), format=rdf_format, from_graph=graph_name, prefixes=prefix_map, base_iri=base_iri, )
[docs] def load_store( graph: Graph, source: str | Path | None = None, *, data: str | bytes | None = None, format: str = "nquads", base_iri: str | None = None, to_graph: str | NamedNode | DefaultGraph | None = None, lenient: bool = False, ) -> None: """Load a dump file into an on-disk store backing ``graph``.""" from triplemodel.io.files import infer_format if data is not None and source is not None: raise ValueError("Pass source= or data=, not both.") hint: str | Path | None = source if data is None else None fmt = infer_format(hint, format) rdf_format = to_rdf_format(fmt) target = _coerce_graph_name(graph, to_graph) if data is not None: payload = data.encode("utf-8") if isinstance(data, str) else data graph.store.load( payload, format=rdf_format, base_iri=base_iri, to_graph=target, lenient=lenient, ) elif source is not None: graph.store.load( format=rdf_format, path=str(source), base_iri=base_iri, to_graph=target, lenient=lenient, ) else: raise ValueError("load_store requires source= or data=.")
[docs] def backup_store( target_directory: str | Path, *, graph: Graph | None = None, store_path: str | Path | None = None, ) -> None: """Create a backup of an on-disk pyoxigraph store directory.""" if graph is not None: graph.store.backup(str(target_directory)) return path = _resolve_disk_path(None, store_path) OxigraphStore(str(path)).backup(str(target_directory))
[docs] def optimize_store( *, graph: Graph | None = None, store_path: str | Path | None = None, ) -> None: """Optimize an on-disk store after bulk import or heavy updates.""" if graph is not None: graph.store.optimize() return path = _resolve_disk_path(None, store_path) OxigraphStore(str(path)).optimize()
[docs] def store_flush(graph: Graph) -> None: """Flush pending writes on an on-disk store (no-op if unsupported).""" flush = getattr(graph.store, "flush", None) if callable(flush): try: flush() except RuntimeError as exc: if "read-write" in str(exc).lower(): return raise
[docs] def list_named_graphs(graph: Graph) -> list[str]: """Return IRIs of all named graphs in ``graph.store``.""" out: list[str] = [] for name in graph.store.named_graphs(): if isinstance(name, NamedNode): out.append(str(name.value)) elif isinstance(name, BlankNode): out.append(str(name)) return out
[docs] def ensure_named_graph(graph: Graph, graph_iri: str) -> None: """Ensure a named graph exists in the store.""" graph.store.add_graph(NamedNode(graph_iri))
[docs] def clear_named_graph(graph: Graph, graph_iri: str) -> None: """Remove all quads from a named graph.""" graph.store.clear_graph(NamedNode(graph_iri))
[docs] def remove_named_graph(graph: Graph, graph_iri: str) -> None: """Remove a named graph and its quads from the store.""" graph.store.remove_graph(NamedNode(graph_iri))
[docs] def iter_quads_for_pattern( graph: Graph, subject: Any = None, predicate: Any = None, obj: Any = None, *, graph_iri: str | NamedNode | DefaultGraph | None = None, ) -> Iterator[Quad]: """Iterate quads matching an optional ``(s, p, o, graph)`` pattern on ``graph.store``.""" from triplemodel.store.terms import ( pattern_object, pattern_predicate, pattern_subject, ) s = pattern_subject(subject) p = pattern_predicate(predicate) o = pattern_object(obj) if graph_iri is None: graph_name: GraphName = graph.graph_name else: graph_name = _graph_name(graph_iri) yield from graph.store.quads_for_pattern(s, p, o, graph_name)
__all__ = [ "backup_store", "bulk_load_into_graph", "clear_named_graph", "dump_store", "ensure_named_graph", "iter_quads_for_pattern", "list_named_graphs", "load_store", "optimize_store", "remove_named_graph", "store_flush", ]