Source code for triplemodel.io.sparql

"""SPARQL query passthrough and remote endpoint helpers."""

from __future__ import annotations

import re
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any, Literal, TypeVar, cast, overload

from pydantic import BaseModel
from pyoxigraph import BlankNode, DefaultGraph, Literal as OxLiteral, NamedNode
from triplemodel.store import RdfGraph as Graph
from triplemodel.store.sparql_result import (
    SparqlResult,
    Variable,
    bindings_to_substitutions,
)
from triplemodel.store.terms import RdfTerm as Node, term_str

from triplemodel.config import get_rdf_config, id_from_subject_uri
from triplemodel.fields.metadata import id_field_is_iri_id
from triplemodel.io.import_ import OnDuplicate, graph_to_model
from triplemodel.metadata.cardinality import scalar_python_type, union_member_types
from triplemodel.namespaces import bind_namespaces
from triplemodel.protocols import PredicateResolver as PredicateResolverProtocol
from triplemodel.terms.convert import python_to_term, term_to_python
from triplemodel.terms.opaque import OpaqueLiteral
from triplemodel.terms.iri import looks_like_iri
from triplemodel.terms.registry import LiteralRegistry, default_registry

T = TypeVar("T", bound=BaseModel)

SparqlResultKind = Literal[
    "bindings",
    "boolean",
    "graph",
    "json",
    "ASK",
    "SELECT",
    "CONSTRUCT",
    "DESCRIBE",
]
SparqlQueryForm = Literal["select", "construct", "describe", "ask", "unknown"]

_BOOLEAN_RESULT_TYPES = frozenset({"ASK", "boolean"})
_BINDINGS_RESULT_TYPES = frozenset({"SELECT", "bindings"})
_GRAPH_RESULT_TYPES = frozenset({"CONSTRUCT", "DESCRIBE", "graph"})

_SPARQL_QUERY_KWARGS = frozenset(
    {
        "use_default_graph_as_union",
        "default_graph",
        "named_graphs",
        "base_iri",
    }
)


def _is_boolean_result(result: SparqlResult) -> bool:
    return result.type in _BOOLEAN_RESULT_TYPES


def _is_bindings_result(result: SparqlResult) -> bool:
    return result.type in _BINDINGS_RESULT_TYPES


def _is_graph_result(result: SparqlResult) -> bool:
    return result.type in _GRAPH_RESULT_TYPES


_QUERY_FORM_RE = re.compile(
    r"\b(SELECT|CONSTRUCT|DESCRIBE|ASK)\b",
    re.IGNORECASE,
)
_COMMENT_LINE_RE = re.compile(r"#.*$", re.MULTILINE)
_COMMENT_BLOCK_RE = re.compile(r"/\*.*?\*/", re.DOTALL)


[docs] def detect_query_form(query: str) -> SparqlQueryForm: """Return the first SPARQL query form keyword in ``query``.""" if not isinstance(query, str): raise TypeError("detect_query_form expects a SPARQL query string.") text = _COMMENT_BLOCK_RE.sub("", query) text = _COMMENT_LINE_RE.sub("", text) match = _QUERY_FORM_RE.search(text) if not match: return "unknown" return cast(SparqlQueryForm, match.group(1).lower())
[docs] def init_ns_from_model(model_cls: type[BaseModel]) -> dict[str, str]: """Build prefix map for SPARQL from ``model_cls`` ``Rdf.prefixes``.""" return dict(get_rdf_config(model_cls).prefixes_dict)
def _sparql_term_for_inline(term: Node) -> str: if isinstance(term, NamedNode): return f"<{term.value}>" if isinstance(term, BlankNode): return f"_:{term}" if isinstance(term, OxLiteral): if term.language: return f'"{term.value}"@{term.language}' if term.datatype is not None: return f'"{term.value}"^^{term.datatype}' return f'"{term.value}"' # pragma: no cover return str(term) def _inline_init_bindings( query: str, bindings: Mapping[Variable, Node], ) -> str: """Replace ``?var`` with bound RDF terms (pyoxigraph ASK lacks substitutions).""" out = query for var, term in bindings.items(): name = str(var).lstrip("?") out = re.sub( rf"\?{re.escape(name)}\b", _sparql_term_for_inline(term), out, ) return out
[docs] def init_bindings_from_model( instance: BaseModel, mapping: Mapping[str, str], ) -> dict[Variable, Node]: """Map SPARQL variable names to RDF terms from model field values.""" bindings: dict[Variable, Node] = {} for var_name, field_name in mapping.items(): if field_name not in type(instance).model_fields: raise ValueError( f"Unknown model field {field_name!r} in init_bindings mapping." ) var = Variable(var_name.lstrip("?")) cfg = get_rdf_config(type(instance)) if field_name == cfg.id_field and cfg.id_field: subject_uri_fn = getattr(instance, "subject_uri", None) if callable(subject_uri_fn): bindings[var] = NamedNode(subject_uri_fn()) continue value = getattr(instance, field_name) bindings[var] = python_to_term(value) return bindings
[docs] def graph_from_construct_result( result: SparqlResult, graph_out: Graph | None = None, ) -> Graph: """Merge a CONSTRUCT/DESCRIBE ``Result`` graph into ``graph_out`` or a new graph.""" if not _is_graph_result(result): raise TypeError( f"Expected a graph SPARQL result, got {result.type!r}. " "Use CONSTRUCT or DESCRIBE, or call select_models for SELECT." ) target = graph_out or Graph() if result.graph is not None: for s, p, o in result.graph: target.add((s, p, o)) return target
def _coerce_init_bindings( bindings: Mapping[Variable, Node] | Mapping[str, Node] | None, ) -> dict[Variable, Node] | None: if bindings is None: return None out: dict[Variable, Node] = {} for key, value in bindings.items(): var = key if isinstance(key, Variable) else Variable(str(key).lstrip("?")) out[var] = value return out def _coerce_query_dataset_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]: unknown = sorted(set(kwargs) - _SPARQL_QUERY_KWARGS) if unknown: raise TypeError( f"run_sparql() got unexpected keyword arguments: {unknown}. " f"Supported dataset options: {sorted(_SPARQL_QUERY_KWARGS)}." ) out: dict[str, Any] = {} if "use_default_graph_as_union" in kwargs: out["use_default_graph_as_union"] = kwargs["use_default_graph_as_union"] if "base_iri" in kwargs: out["base_iri"] = kwargs["base_iri"] if "default_graph" in kwargs: dg = kwargs["default_graph"] if dg is None: out["default_graph"] = None elif isinstance(dg, str): out["default_graph"] = NamedNode(dg) elif isinstance(dg, (DefaultGraph, NamedNode)): out["default_graph"] = dg else: raise TypeError( f"default_graph must be str, NamedNode, or DefaultGraph, got {type(dg)!r}" ) if "named_graphs" in kwargs: graphs = kwargs["named_graphs"] if graphs is None: out["named_graphs"] = None else: coerced: list[NamedNode] = [] for item in graphs: if isinstance(item, str): coerced.append(NamedNode(item)) elif isinstance(item, NamedNode): coerced.append(item) else: raise TypeError( f"named_graphs items must be str or NamedNode, got {type(item)!r}" ) out["named_graphs"] = coerced return out
[docs] def run_sparql( graph: Graph, query: str, *, model_cls: type[BaseModel] | None = None, initNs: Mapping[str, Any] | None = None, # noqa: N803 initBindings: Mapping[Variable, Node] | Mapping[str, Node] | None = None, # noqa: N803 use_store_provided: bool = True, use_default_graph_as_union: bool = False, default_graph: str | NamedNode | DefaultGraph | None = None, named_graphs: list[str | NamedNode] | None = None, base_iri: str | None = None, **kwargs: Any, ) -> SparqlResult: """Run SPARQL on ``graph.store`` with optional prefixes and bindings.""" _ = use_store_provided extra: dict[str, Any] = dict(kwargs) if use_default_graph_as_union: extra["use_default_graph_as_union"] = True if default_graph is not None: extra["default_graph"] = default_graph if named_graphs is not None: extra["named_graphs"] = named_graphs if base_iri is not None: extra["base_iri"] = base_iri dataset_kwargs = _coerce_query_dataset_kwargs(extra) if not isinstance(query, str): raise TypeError("run_sparql expects a SPARQL query string in TripleModel 0.10.") resolved_bindings = _coerce_init_bindings(initBindings) resolved_prefixes: dict[str, str] | None = None if initNs is not None: resolved_prefixes = {str(k): str(v) for k, v in initNs.items()} elif model_cls is not None: resolved_prefixes = dict(init_ns_from_model(model_cls)) if resolved_prefixes is None and graph._prefixes: resolved_prefixes = dict(graph._prefixes) elif resolved_prefixes is not None and graph._prefixes: resolved_prefixes = {**graph._prefixes, **resolved_prefixes} if resolved_prefixes and model_cls is not None: bind_namespaces(graph, resolved_prefixes) form = detect_query_form(query) resolved_query = query substitutions = bindings_to_substitutions(resolved_bindings) if resolved_bindings and form == "ask": resolved_query = _inline_init_bindings(query, resolved_bindings) substitutions = None raw = graph.store.query( resolved_query, prefixes=resolved_prefixes, substitutions=substitutions, **dataset_kwargs, ) return SparqlResult.from_pyoxigraph(raw, form=form)
[docs] def ask( graph: Graph, query: str, *, model_cls: type[BaseModel] | None = None, initNs: Mapping[str, Any] | None = None, # noqa: N803 initBindings: Mapping[str, Node] | None = None, # noqa: N803 use_store_provided: bool = True, **kwargs: Any, ) -> bool: """Execute an ASK query and return the boolean result.""" result = run_sparql( graph, query, model_cls=model_cls, initNs=initNs, initBindings=initBindings, use_store_provided=use_store_provided, **kwargs, ) if not _is_boolean_result(result): raise TypeError(f"Expected ASK (boolean) result, got {result.type!r}.") return bool(result.askAnswer)
[docs] def construct_models( model_cls: type[T], graph: Graph, query: str, *, dispatch: bool = False, graph_out: Graph | None = None, type_uri: str | None = None, validate_type: bool = True, on_duplicate: OnDuplicate = "warn", resolver: PredicateResolverProtocol | None = None, registry: LiteralRegistry = default_registry, de_skolemize: bool | None = None, initNs: Mapping[str, Any] | None = None, # noqa: N803 initBindings: Mapping[str, Node] | None = None, # noqa: N803 use_store_provided: bool = True, **kwargs: Any, ) -> list[T]: """Run CONSTRUCT/DESCRIBE and load models from the result graph.""" result = run_sparql( graph, query, model_cls=model_cls, initNs=initNs, initBindings=initBindings, use_store_provided=use_store_provided, **kwargs, ) merged = graph_from_construct_result(result, graph_out) bind_namespaces(merged, get_rdf_config(model_cls).prefixes_dict) if dispatch: from triplemodel.io.dispatch import all_from_graph_dispatch return cast( list[T], all_from_graph_dispatch( merged, validate_type=validate_type, on_duplicate=on_duplicate, resolver=resolver, registry=registry, de_skolemize=de_skolemize, ), ) return model_cls.all_from_graph( # ty: ignore[unresolved-attribute] merged, type_uri=type_uri, validate_type=validate_type, on_duplicate=on_duplicate, resolver=resolver, registry=registry, de_skolemize=de_skolemize, )
def _normalize_var_name(name: str) -> str: return name.lstrip("?") def _binding_value(row: Mapping[Variable, Node], var_name: str) -> Node | None: key = Variable(_normalize_var_name(var_name)) return row.get(key) def _term_for_field( term: Node, field_info: Any, *, registry: LiteralRegistry, ) -> object: target = scalar_python_type(field_info) if target is not None: return term_to_python(term, target, registry=registry) members = union_member_types(field_info) if members: for member in members: try: return term_to_python(term, member, registry=registry) except (TypeError, ValueError): continue value = term_to_python(term, registry=registry) if isinstance(value, OpaqueLiteral): return value.value return value def _projection_value_for_field( model_cls: type[BaseModel], field_name: str, term: Node, field_info: Any, *, registry: LiteralRegistry, ) -> object: """Map a SPARQL binding to a model field value for SELECT projection.""" cfg = get_rdf_config(model_cls) if ( field_name == cfg.id_field and cfg.id_field and (isinstance(term, NamedNode) or looks_like_iri(term_str(term))) ): _, id_value = _subject_id_from_uri(model_cls, term_str(term)) return id_value return _term_for_field(term, field_info, registry=registry) def _subject_id_from_uri(model_cls: type[BaseModel], uri: str) -> tuple[str, object]: cfg = get_rdf_config(model_cls) id_field = cfg.id_field if not id_field: raise ValueError( f"{model_cls.__name__} has no Rdf.id_field; pass subject_var only when " "id_field is configured, or omit subject_var." ) if id_field_is_iri_id(model_cls, id_field): return id_field, uri segment = id_from_subject_uri(cfg.namespace, uri) if segment is not None: return id_field, segment return id_field, uri def _default_field_map( result: SparqlResult, *, exclude: str | None = None, ) -> dict[str, str]: vars_ = result.vars or [] out: dict[str, str] = {} for var in vars_: name = _normalize_var_name(str(var)) if exclude is not None and name == exclude: continue out[name] = name return out
[docs] def select_models( model_cls: type[T], graph: Graph, query: str, *, field_map: Mapping[str, str] | None = None, subject_var: str | None = None, hydrate: bool = False, type_uri: str | None = None, validate_type: bool = True, on_duplicate: OnDuplicate = "warn", resolver: PredicateResolverProtocol | None = None, registry: LiteralRegistry = default_registry, de_skolemize: bool | None = None, initNs: Mapping[str, Any] | None = None, # noqa: N803 initBindings: Mapping[str, Node] | None = None, # noqa: N803 use_store_provided: bool = True, **kwargs: Any, ) -> list[T]: """Run SELECT and return model instances from result bindings or hydration.""" if hydrate: if not subject_var: raise ValueError("select_models(hydrate=True) requires subject_var=.") return _select_models_hydrate( model_cls, graph, query, subject_var=subject_var, validate_type=validate_type, on_duplicate=on_duplicate, resolver=resolver, registry=registry, de_skolemize=de_skolemize, initNs=initNs, initBindings=initBindings, use_store_provided=use_store_provided, **kwargs, ) return _select_models_projection( model_cls, graph, query, field_map=field_map, subject_var=subject_var, initNs=initNs, initBindings=initBindings, use_store_provided=use_store_provided, registry=registry, **kwargs, )
def _select_models_hydrate( model_cls: type[T], graph: Graph, query: str, *, subject_var: str, validate_type: bool, on_duplicate: OnDuplicate, resolver: PredicateResolverProtocol | None, registry: LiteralRegistry, de_skolemize: bool | None, initNs: Mapping[str, Any] | None, initBindings: Mapping[str, Node] | None, use_store_provided: bool, **kwargs: Any, ) -> list[T]: result = run_sparql( graph, query, model_cls=model_cls, initNs=initNs, initBindings=initBindings, use_store_provided=use_store_provided, **kwargs, ) if not _is_bindings_result(result): raise TypeError(f"Expected SELECT (bindings) result, got {result.type!r}.") instances: list[T] = [] seen: set[str] = set() for row in result: term = _binding_value(cast("Mapping[Variable, Node]", row), subject_var) if term is None: continue uri = term_str(term) if uri in seen: continue seen.add(uri) instances.append( graph_to_model( graph, model_cls, uri, validate_type=validate_type, on_duplicate=on_duplicate, resolver=resolver, registry=registry, de_skolemize=de_skolemize, ) ) return instances def _select_models_projection( model_cls: type[T], graph: Graph, query: str, *, field_map: Mapping[str, str] | None, subject_var: str | None, initNs: Mapping[str, Any] | None, initBindings: Mapping[str, Node] | None, use_store_provided: bool, registry: LiteralRegistry, **kwargs: Any, ) -> list[T]: result = run_sparql( graph, query, model_cls=model_cls, initNs=initNs, initBindings=initBindings, use_store_provided=use_store_provided, **kwargs, ) if not _is_bindings_result(result): raise TypeError(f"Expected SELECT (bindings) result, got {result.type!r}.") subject_key = _normalize_var_name(subject_var) if subject_var else None mapping = ( dict(field_map) if field_map is not None else _default_field_map(result, exclude=subject_key) ) for var_name, field_name in mapping.items(): if field_name not in model_cls.model_fields: raise ValueError( f"Unknown model field {field_name!r} in field_map " f"(SPARQL variable {var_name!r})." ) instances: list[T] = [] for row in result: row_map = cast("Mapping[Variable, Node]", row) data: dict[str, object] = {} if subject_key and subject_key not in mapping: term = _binding_value(row_map, subject_key) if term is not None: id_field, id_value = _subject_id_from_uri(model_cls, term_str(term)) data[id_field] = id_value for var_name, field_name in mapping.items(): term = _binding_value(row_map, var_name) if term is None: continue field_info = model_cls.model_fields[field_name] data[field_name] = _projection_value_for_field( model_cls, field_name, term, field_info, registry=registry, ) instances.append(model_cls.model_validate(data)) return instances
[docs] def apply_update( graph: Graph, update: str, *, model_cls: type[BaseModel] | None = None, initNs: Mapping[str, Any] | None = None, # noqa: N803 initBindings: Mapping[str, Node] | None = None, # noqa: N803 use_store_provided: bool = True, **kwargs: Any, ) -> None: """Apply a SPARQL UPDATE to ``graph`` (in-memory models may be stale afterward).""" resolved_init_ns = initNs if resolved_init_ns is None and model_cls is not None: resolved_init_ns = init_ns_from_model(model_cls) if model_cls is not None: bind_namespaces(graph, get_rdf_config(model_cls).prefixes_dict) graph.store.update( update, prefixes={str(k): str(v) for k, v in resolved_init_ns.items()} if resolved_init_ns else None, )
[docs] @dataclass(frozen=True) class PreparedModelQuery: """Prepared SPARQL query with namespaces from a model's ``Rdf.prefixes``.""" model_cls: type[BaseModel] query_str: str @property def prepared(self) -> str: return self.query_str
[docs] def execute( self, graph: Graph, *, initBindings: Mapping[Variable, Node] | None = None, # noqa: N803 use_store_provided: bool = True, **kwargs: Any, ) -> SparqlResult: """Run the prepared query on ``graph``.""" return run_sparql( graph, self.query_str, model_cls=self.model_cls, initBindings=initBindings, use_store_provided=use_store_provided, **kwargs, )
[docs] def as_result(self, graph: Graph, **kwargs: Any) -> SparqlResult: """Alias for :meth:`execute`.""" return self.execute(graph, **kwargs)
[docs] def prepare_model_query(model_cls: type[BaseModel], query: str) -> PreparedModelQuery: """Prepare ``query`` (prefixes applied at execution from ``model_cls``).""" return PreparedModelQuery(model_cls=model_cls, query_str=query)
[docs] def open_sparql_graph(endpoint: str, *, read_only: bool = True) -> Graph: """Open a remote SPARQL graph (removed in 0.10.0). Raises :exc:`NotImplementedError`. Load remote data into a local :class:`~triplemodel.Store` or use SparqlModel for session-level remote stores. """ _ = endpoint, read_only raise NotImplementedError( "open_sparql_graph is not available with the pyoxigraph engine in 0.10.0. " "Query a remote endpoint with your HTTP/SPARQL client and load quads into a " "local pyoxigraph.Store, or use SparqlModel for session-level remote stores." )
@overload def load_sparql( model_cls: type[T], endpoint: str, query: str, *, query_form: SparqlQueryForm, read_only: bool = True, dispatch: bool = False, **kwargs: Any, ) -> list[T]: ... @overload def load_sparql( model_cls: type[T], endpoint: str, query: str, *, read_only: bool = True, dispatch: bool = False, **kwargs: Any, ) -> list[T]: ...
[docs] def load_sparql( model_cls: type[T], endpoint: str, query: str, *, query_form: SparqlQueryForm | None = None, read_only: bool = True, dispatch: bool = False, type_uri: str | None = None, validate_type: bool = True, on_duplicate: OnDuplicate = "warn", resolver: PredicateResolverProtocol | None = None, registry: LiteralRegistry = default_registry, de_skolemize: bool | None = None, initNs: Mapping[str, Any] | None = None, # noqa: N803 initBindings: Mapping[str, Node] | None = None, # noqa: N803 use_store_provided: bool = True, **kwargs: Any, ) -> list[T]: """Query a remote SPARQL endpoint and return model instances. Uses :func:`open_sparql_graph`, which raises :exc:`NotImplementedError` in 0.10.0. Query a local :class:`~triplemodel.Store` with :func:`construct_models` / :func:`select_models` after fetching data, or use SparqlModel for remote endpoints. """ open_sparql_graph(endpoint, read_only=read_only) if not isinstance(query, str): raise ValueError( f"Cannot load models from SPARQL query {query!r}; " "use CONSTRUCT, DESCRIBE, or SELECT." ) form = query_form if form is None: form = detect_query_form(query) if form == "ask": raise TypeError( "ASK queries do not return models; use ask(open_sparql_graph(endpoint), query)." ) graph = open_sparql_graph(endpoint, read_only=read_only) bind_namespaces(graph, get_rdf_config(model_cls).prefixes_dict) if form in ("construct", "describe"): return construct_models( model_cls, graph, query, dispatch=dispatch, type_uri=type_uri, validate_type=validate_type, on_duplicate=on_duplicate, resolver=resolver, registry=registry, de_skolemize=de_skolemize, initNs=initNs, initBindings=initBindings, use_store_provided=use_store_provided, **kwargs, ) if form == "select": return select_models( model_cls, graph, query, validate_type=validate_type, on_duplicate=on_duplicate, resolver=resolver, registry=registry, de_skolemize=de_skolemize, initNs=initNs, initBindings=initBindings, use_store_provided=use_store_provided, **kwargs, ) raise ValueError( f"Cannot load models from SPARQL query form {form!r}; " "use CONSTRUCT, DESCRIBE, or SELECT." )
__all__ = [ "PreparedModelQuery", "SparqlQueryForm", "SparqlResultKind", "apply_update", "ask", "construct_models", "detect_query_form", "graph_from_construct_result", "init_bindings_from_model", "init_ns_from_model", "load_sparql", "open_sparql_graph", "prepare_model_query", "run_sparql", "select_models", ]