"""SPARQL query passthrough and remote endpoint helpers."""
from __future__ import annotations
import re
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any, Literal, TypeVar, cast, overload
from pydantic import BaseModel
from pyoxigraph import BlankNode, DefaultGraph, Literal as OxLiteral, NamedNode
from triplemodel.store import RdfGraph as Graph
from triplemodel.store.sparql_result import (
SparqlResult,
Variable,
bindings_to_substitutions,
)
from triplemodel.store.terms import RdfTerm as Node, term_str
from triplemodel.config import get_rdf_config, id_from_subject_uri
from triplemodel.fields.metadata import id_field_is_iri_id
from triplemodel.io.import_ import OnDuplicate, graph_to_model
from triplemodel.metadata.cardinality import scalar_python_type, union_member_types
from triplemodel.namespaces import bind_namespaces
from triplemodel.protocols import PredicateResolver as PredicateResolverProtocol
from triplemodel.terms.convert import python_to_term, term_to_python
from triplemodel.terms.opaque import OpaqueLiteral
from triplemodel.terms.iri import looks_like_iri
from triplemodel.terms.registry import LiteralRegistry, default_registry
T = TypeVar("T", bound=BaseModel)
SparqlResultKind = Literal[
"bindings",
"boolean",
"graph",
"json",
"ASK",
"SELECT",
"CONSTRUCT",
"DESCRIBE",
]
SparqlQueryForm = Literal["select", "construct", "describe", "ask", "unknown"]
_BOOLEAN_RESULT_TYPES = frozenset({"ASK", "boolean"})
_BINDINGS_RESULT_TYPES = frozenset({"SELECT", "bindings"})
_GRAPH_RESULT_TYPES = frozenset({"CONSTRUCT", "DESCRIBE", "graph"})
_SPARQL_QUERY_KWARGS = frozenset(
{
"use_default_graph_as_union",
"default_graph",
"named_graphs",
"base_iri",
}
)
def _is_boolean_result(result: SparqlResult) -> bool:
return result.type in _BOOLEAN_RESULT_TYPES
def _is_bindings_result(result: SparqlResult) -> bool:
return result.type in _BINDINGS_RESULT_TYPES
def _is_graph_result(result: SparqlResult) -> bool:
return result.type in _GRAPH_RESULT_TYPES
_QUERY_FORM_RE = re.compile(
r"\b(SELECT|CONSTRUCT|DESCRIBE|ASK)\b",
re.IGNORECASE,
)
_COMMENT_LINE_RE = re.compile(r"#.*$", re.MULTILINE)
_COMMENT_BLOCK_RE = re.compile(r"/\*.*?\*/", re.DOTALL)
[docs]
def init_ns_from_model(model_cls: type[BaseModel]) -> dict[str, str]:
"""Build prefix map for SPARQL from ``model_cls`` ``Rdf.prefixes``."""
return dict(get_rdf_config(model_cls).prefixes_dict)
def _sparql_term_for_inline(term: Node) -> str:
if isinstance(term, NamedNode):
return f"<{term.value}>"
if isinstance(term, BlankNode):
return f"_:{term}"
if isinstance(term, OxLiteral):
if term.language:
return f'"{term.value}"@{term.language}'
if term.datatype is not None:
return f'"{term.value}"^^{term.datatype}'
return f'"{term.value}"' # pragma: no cover
return str(term)
def _inline_init_bindings(
query: str,
bindings: Mapping[Variable, Node],
) -> str:
"""Replace ``?var`` with bound RDF terms (pyoxigraph ASK lacks substitutions)."""
out = query
for var, term in bindings.items():
name = str(var).lstrip("?")
out = re.sub(
rf"\?{re.escape(name)}\b",
_sparql_term_for_inline(term),
out,
)
return out
[docs]
def init_bindings_from_model(
instance: BaseModel,
mapping: Mapping[str, str],
) -> dict[Variable, Node]:
"""Map SPARQL variable names to RDF terms from model field values."""
bindings: dict[Variable, Node] = {}
for var_name, field_name in mapping.items():
if field_name not in type(instance).model_fields:
raise ValueError(
f"Unknown model field {field_name!r} in init_bindings mapping."
)
var = Variable(var_name.lstrip("?"))
cfg = get_rdf_config(type(instance))
if field_name == cfg.id_field and cfg.id_field:
subject_uri_fn = getattr(instance, "subject_uri", None)
if callable(subject_uri_fn):
bindings[var] = NamedNode(subject_uri_fn())
continue
value = getattr(instance, field_name)
bindings[var] = python_to_term(value)
return bindings
[docs]
def graph_from_construct_result(
result: SparqlResult,
graph_out: Graph | None = None,
) -> Graph:
"""Merge a CONSTRUCT/DESCRIBE ``Result`` graph into ``graph_out`` or a new graph."""
if not _is_graph_result(result):
raise TypeError(
f"Expected a graph SPARQL result, got {result.type!r}. "
"Use CONSTRUCT or DESCRIBE, or call select_models for SELECT."
)
target = graph_out or Graph()
if result.graph is not None:
for s, p, o in result.graph:
target.add((s, p, o))
return target
def _coerce_init_bindings(
bindings: Mapping[Variable, Node] | Mapping[str, Node] | None,
) -> dict[Variable, Node] | None:
if bindings is None:
return None
out: dict[Variable, Node] = {}
for key, value in bindings.items():
var = key if isinstance(key, Variable) else Variable(str(key).lstrip("?"))
out[var] = value
return out
def _coerce_query_dataset_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]:
unknown = sorted(set(kwargs) - _SPARQL_QUERY_KWARGS)
if unknown:
raise TypeError(
f"run_sparql() got unexpected keyword arguments: {unknown}. "
f"Supported dataset options: {sorted(_SPARQL_QUERY_KWARGS)}."
)
out: dict[str, Any] = {}
if "use_default_graph_as_union" in kwargs:
out["use_default_graph_as_union"] = kwargs["use_default_graph_as_union"]
if "base_iri" in kwargs:
out["base_iri"] = kwargs["base_iri"]
if "default_graph" in kwargs:
dg = kwargs["default_graph"]
if dg is None:
out["default_graph"] = None
elif isinstance(dg, str):
out["default_graph"] = NamedNode(dg)
elif isinstance(dg, (DefaultGraph, NamedNode)):
out["default_graph"] = dg
else:
raise TypeError(
f"default_graph must be str, NamedNode, or DefaultGraph, got {type(dg)!r}"
)
if "named_graphs" in kwargs:
graphs = kwargs["named_graphs"]
if graphs is None:
out["named_graphs"] = None
else:
coerced: list[NamedNode] = []
for item in graphs:
if isinstance(item, str):
coerced.append(NamedNode(item))
elif isinstance(item, NamedNode):
coerced.append(item)
else:
raise TypeError(
f"named_graphs items must be str or NamedNode, got {type(item)!r}"
)
out["named_graphs"] = coerced
return out
[docs]
def run_sparql(
graph: Graph,
query: str,
*,
model_cls: type[BaseModel] | None = None,
initNs: Mapping[str, Any] | None = None, # noqa: N803
initBindings: Mapping[Variable, Node] | Mapping[str, Node] | None = None, # noqa: N803
use_store_provided: bool = True,
use_default_graph_as_union: bool = False,
default_graph: str | NamedNode | DefaultGraph | None = None,
named_graphs: list[str | NamedNode] | None = None,
base_iri: str | None = None,
**kwargs: Any,
) -> SparqlResult:
"""Run SPARQL on ``graph.store`` with optional prefixes and bindings."""
_ = use_store_provided
extra: dict[str, Any] = dict(kwargs)
if use_default_graph_as_union:
extra["use_default_graph_as_union"] = True
if default_graph is not None:
extra["default_graph"] = default_graph
if named_graphs is not None:
extra["named_graphs"] = named_graphs
if base_iri is not None:
extra["base_iri"] = base_iri
dataset_kwargs = _coerce_query_dataset_kwargs(extra)
if not isinstance(query, str):
raise TypeError("run_sparql expects a SPARQL query string in TripleModel 0.10.")
resolved_bindings = _coerce_init_bindings(initBindings)
resolved_prefixes: dict[str, str] | None = None
if initNs is not None:
resolved_prefixes = {str(k): str(v) for k, v in initNs.items()}
elif model_cls is not None:
resolved_prefixes = dict(init_ns_from_model(model_cls))
if resolved_prefixes is None and graph._prefixes:
resolved_prefixes = dict(graph._prefixes)
elif resolved_prefixes is not None and graph._prefixes:
resolved_prefixes = {**graph._prefixes, **resolved_prefixes}
if resolved_prefixes and model_cls is not None:
bind_namespaces(graph, resolved_prefixes)
form = detect_query_form(query)
resolved_query = query
substitutions = bindings_to_substitutions(resolved_bindings)
if resolved_bindings and form == "ask":
resolved_query = _inline_init_bindings(query, resolved_bindings)
substitutions = None
raw = graph.store.query(
resolved_query,
prefixes=resolved_prefixes,
substitutions=substitutions,
**dataset_kwargs,
)
return SparqlResult.from_pyoxigraph(raw, form=form)
[docs]
def ask(
graph: Graph,
query: str,
*,
model_cls: type[BaseModel] | None = None,
initNs: Mapping[str, Any] | None = None, # noqa: N803
initBindings: Mapping[str, Node] | None = None, # noqa: N803
use_store_provided: bool = True,
**kwargs: Any,
) -> bool:
"""Execute an ASK query and return the boolean result."""
result = run_sparql(
graph,
query,
model_cls=model_cls,
initNs=initNs,
initBindings=initBindings,
use_store_provided=use_store_provided,
**kwargs,
)
if not _is_boolean_result(result):
raise TypeError(f"Expected ASK (boolean) result, got {result.type!r}.")
return bool(result.askAnswer)
[docs]
def construct_models(
model_cls: type[T],
graph: Graph,
query: str,
*,
dispatch: bool = False,
graph_out: Graph | None = None,
type_uri: str | None = None,
validate_type: bool = True,
on_duplicate: OnDuplicate = "warn",
resolver: PredicateResolverProtocol | None = None,
registry: LiteralRegistry = default_registry,
de_skolemize: bool | None = None,
initNs: Mapping[str, Any] | None = None, # noqa: N803
initBindings: Mapping[str, Node] | None = None, # noqa: N803
use_store_provided: bool = True,
**kwargs: Any,
) -> list[T]:
"""Run CONSTRUCT/DESCRIBE and load models from the result graph."""
result = run_sparql(
graph,
query,
model_cls=model_cls,
initNs=initNs,
initBindings=initBindings,
use_store_provided=use_store_provided,
**kwargs,
)
merged = graph_from_construct_result(result, graph_out)
bind_namespaces(merged, get_rdf_config(model_cls).prefixes_dict)
if dispatch:
from triplemodel.io.dispatch import all_from_graph_dispatch
return cast(
list[T],
all_from_graph_dispatch(
merged,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=de_skolemize,
),
)
return model_cls.all_from_graph( # ty: ignore[unresolved-attribute]
merged,
type_uri=type_uri,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=de_skolemize,
)
def _normalize_var_name(name: str) -> str:
return name.lstrip("?")
def _binding_value(row: Mapping[Variable, Node], var_name: str) -> Node | None:
key = Variable(_normalize_var_name(var_name))
return row.get(key)
def _term_for_field(
term: Node,
field_info: Any,
*,
registry: LiteralRegistry,
) -> object:
target = scalar_python_type(field_info)
if target is not None:
return term_to_python(term, target, registry=registry)
members = union_member_types(field_info)
if members:
for member in members:
try:
return term_to_python(term, member, registry=registry)
except (TypeError, ValueError):
continue
value = term_to_python(term, registry=registry)
if isinstance(value, OpaqueLiteral):
return value.value
return value
def _projection_value_for_field(
model_cls: type[BaseModel],
field_name: str,
term: Node,
field_info: Any,
*,
registry: LiteralRegistry,
) -> object:
"""Map a SPARQL binding to a model field value for SELECT projection."""
cfg = get_rdf_config(model_cls)
if (
field_name == cfg.id_field
and cfg.id_field
and (isinstance(term, NamedNode) or looks_like_iri(term_str(term)))
):
_, id_value = _subject_id_from_uri(model_cls, term_str(term))
return id_value
return _term_for_field(term, field_info, registry=registry)
def _subject_id_from_uri(model_cls: type[BaseModel], uri: str) -> tuple[str, object]:
cfg = get_rdf_config(model_cls)
id_field = cfg.id_field
if not id_field:
raise ValueError(
f"{model_cls.__name__} has no Rdf.id_field; pass subject_var only when "
"id_field is configured, or omit subject_var."
)
if id_field_is_iri_id(model_cls, id_field):
return id_field, uri
segment = id_from_subject_uri(cfg.namespace, uri)
if segment is not None:
return id_field, segment
return id_field, uri
def _default_field_map(
result: SparqlResult,
*,
exclude: str | None = None,
) -> dict[str, str]:
vars_ = result.vars or []
out: dict[str, str] = {}
for var in vars_:
name = _normalize_var_name(str(var))
if exclude is not None and name == exclude:
continue
out[name] = name
return out
[docs]
def select_models(
model_cls: type[T],
graph: Graph,
query: str,
*,
field_map: Mapping[str, str] | None = None,
subject_var: str | None = None,
hydrate: bool = False,
type_uri: str | None = None,
validate_type: bool = True,
on_duplicate: OnDuplicate = "warn",
resolver: PredicateResolverProtocol | None = None,
registry: LiteralRegistry = default_registry,
de_skolemize: bool | None = None,
initNs: Mapping[str, Any] | None = None, # noqa: N803
initBindings: Mapping[str, Node] | None = None, # noqa: N803
use_store_provided: bool = True,
**kwargs: Any,
) -> list[T]:
"""Run SELECT and return model instances from result bindings or hydration."""
if hydrate:
if not subject_var:
raise ValueError("select_models(hydrate=True) requires subject_var=.")
return _select_models_hydrate(
model_cls,
graph,
query,
subject_var=subject_var,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=de_skolemize,
initNs=initNs,
initBindings=initBindings,
use_store_provided=use_store_provided,
**kwargs,
)
return _select_models_projection(
model_cls,
graph,
query,
field_map=field_map,
subject_var=subject_var,
initNs=initNs,
initBindings=initBindings,
use_store_provided=use_store_provided,
registry=registry,
**kwargs,
)
def _select_models_hydrate(
model_cls: type[T],
graph: Graph,
query: str,
*,
subject_var: str,
validate_type: bool,
on_duplicate: OnDuplicate,
resolver: PredicateResolverProtocol | None,
registry: LiteralRegistry,
de_skolemize: bool | None,
initNs: Mapping[str, Any] | None,
initBindings: Mapping[str, Node] | None,
use_store_provided: bool,
**kwargs: Any,
) -> list[T]:
result = run_sparql(
graph,
query,
model_cls=model_cls,
initNs=initNs,
initBindings=initBindings,
use_store_provided=use_store_provided,
**kwargs,
)
if not _is_bindings_result(result):
raise TypeError(f"Expected SELECT (bindings) result, got {result.type!r}.")
instances: list[T] = []
seen: set[str] = set()
for row in result:
term = _binding_value(cast("Mapping[Variable, Node]", row), subject_var)
if term is None:
continue
uri = term_str(term)
if uri in seen:
continue
seen.add(uri)
instances.append(
graph_to_model(
graph,
model_cls,
uri,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=de_skolemize,
)
)
return instances
def _select_models_projection(
model_cls: type[T],
graph: Graph,
query: str,
*,
field_map: Mapping[str, str] | None,
subject_var: str | None,
initNs: Mapping[str, Any] | None,
initBindings: Mapping[str, Node] | None,
use_store_provided: bool,
registry: LiteralRegistry,
**kwargs: Any,
) -> list[T]:
result = run_sparql(
graph,
query,
model_cls=model_cls,
initNs=initNs,
initBindings=initBindings,
use_store_provided=use_store_provided,
**kwargs,
)
if not _is_bindings_result(result):
raise TypeError(f"Expected SELECT (bindings) result, got {result.type!r}.")
subject_key = _normalize_var_name(subject_var) if subject_var else None
mapping = (
dict(field_map)
if field_map is not None
else _default_field_map(result, exclude=subject_key)
)
for var_name, field_name in mapping.items():
if field_name not in model_cls.model_fields:
raise ValueError(
f"Unknown model field {field_name!r} in field_map "
f"(SPARQL variable {var_name!r})."
)
instances: list[T] = []
for row in result:
row_map = cast("Mapping[Variable, Node]", row)
data: dict[str, object] = {}
if subject_key and subject_key not in mapping:
term = _binding_value(row_map, subject_key)
if term is not None:
id_field, id_value = _subject_id_from_uri(model_cls, term_str(term))
data[id_field] = id_value
for var_name, field_name in mapping.items():
term = _binding_value(row_map, var_name)
if term is None:
continue
field_info = model_cls.model_fields[field_name]
data[field_name] = _projection_value_for_field(
model_cls,
field_name,
term,
field_info,
registry=registry,
)
instances.append(model_cls.model_validate(data))
return instances
[docs]
def apply_update(
graph: Graph,
update: str,
*,
model_cls: type[BaseModel] | None = None,
initNs: Mapping[str, Any] | None = None, # noqa: N803
initBindings: Mapping[str, Node] | None = None, # noqa: N803
use_store_provided: bool = True,
**kwargs: Any,
) -> None:
"""Apply a SPARQL UPDATE to ``graph`` (in-memory models may be stale afterward)."""
resolved_init_ns = initNs
if resolved_init_ns is None and model_cls is not None:
resolved_init_ns = init_ns_from_model(model_cls)
if model_cls is not None:
bind_namespaces(graph, get_rdf_config(model_cls).prefixes_dict)
graph.store.update(
update,
prefixes={str(k): str(v) for k, v in resolved_init_ns.items()}
if resolved_init_ns
else None,
)
[docs]
@dataclass(frozen=True)
class PreparedModelQuery:
"""Prepared SPARQL query with namespaces from a model's ``Rdf.prefixes``."""
model_cls: type[BaseModel]
query_str: str
@property
def prepared(self) -> str:
return self.query_str
[docs]
def execute(
self,
graph: Graph,
*,
initBindings: Mapping[Variable, Node] | None = None, # noqa: N803
use_store_provided: bool = True,
**kwargs: Any,
) -> SparqlResult:
"""Run the prepared query on ``graph``."""
return run_sparql(
graph,
self.query_str,
model_cls=self.model_cls,
initBindings=initBindings,
use_store_provided=use_store_provided,
**kwargs,
)
[docs]
def as_result(self, graph: Graph, **kwargs: Any) -> SparqlResult:
"""Alias for :meth:`execute`."""
return self.execute(graph, **kwargs)
[docs]
def prepare_model_query(model_cls: type[BaseModel], query: str) -> PreparedModelQuery:
"""Prepare ``query`` (prefixes applied at execution from ``model_cls``)."""
return PreparedModelQuery(model_cls=model_cls, query_str=query)
[docs]
def open_sparql_graph(endpoint: str, *, read_only: bool = True) -> Graph:
"""Open a remote SPARQL graph (removed in 0.10.0).
Raises :exc:`NotImplementedError`. Load remote data into a local :class:`~triplemodel.Store`
or use SparqlModel for session-level remote stores.
"""
_ = endpoint, read_only
raise NotImplementedError(
"open_sparql_graph is not available with the pyoxigraph engine in 0.10.0. "
"Query a remote endpoint with your HTTP/SPARQL client and load quads into a "
"local pyoxigraph.Store, or use SparqlModel for session-level remote stores."
)
@overload
def load_sparql(
model_cls: type[T],
endpoint: str,
query: str,
*,
query_form: SparqlQueryForm,
read_only: bool = True,
dispatch: bool = False,
**kwargs: Any,
) -> list[T]: ...
@overload
def load_sparql(
model_cls: type[T],
endpoint: str,
query: str,
*,
read_only: bool = True,
dispatch: bool = False,
**kwargs: Any,
) -> list[T]: ...
[docs]
def load_sparql(
model_cls: type[T],
endpoint: str,
query: str,
*,
query_form: SparqlQueryForm | None = None,
read_only: bool = True,
dispatch: bool = False,
type_uri: str | None = None,
validate_type: bool = True,
on_duplicate: OnDuplicate = "warn",
resolver: PredicateResolverProtocol | None = None,
registry: LiteralRegistry = default_registry,
de_skolemize: bool | None = None,
initNs: Mapping[str, Any] | None = None, # noqa: N803
initBindings: Mapping[str, Node] | None = None, # noqa: N803
use_store_provided: bool = True,
**kwargs: Any,
) -> list[T]:
"""Query a remote SPARQL endpoint and return model instances.
Uses :func:`open_sparql_graph`, which raises :exc:`NotImplementedError` in 0.10.0.
Query a local :class:`~triplemodel.Store` with :func:`construct_models` / :func:`select_models`
after fetching data, or use SparqlModel for remote endpoints.
"""
open_sparql_graph(endpoint, read_only=read_only)
if not isinstance(query, str):
raise ValueError(
f"Cannot load models from SPARQL query {query!r}; "
"use CONSTRUCT, DESCRIBE, or SELECT."
)
form = query_form
if form is None:
form = detect_query_form(query)
if form == "ask":
raise TypeError(
"ASK queries do not return models; use ask(open_sparql_graph(endpoint), query)."
)
graph = open_sparql_graph(endpoint, read_only=read_only)
bind_namespaces(graph, get_rdf_config(model_cls).prefixes_dict)
if form in ("construct", "describe"):
return construct_models(
model_cls,
graph,
query,
dispatch=dispatch,
type_uri=type_uri,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=de_skolemize,
initNs=initNs,
initBindings=initBindings,
use_store_provided=use_store_provided,
**kwargs,
)
if form == "select":
return select_models(
model_cls,
graph,
query,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=de_skolemize,
initNs=initNs,
initBindings=initBindings,
use_store_provided=use_store_provided,
**kwargs,
)
raise ValueError(
f"Cannot load models from SPARQL query form {form!r}; "
"use CONSTRUCT, DESCRIBE, or SELECT."
)
__all__ = [
"PreparedModelQuery",
"SparqlQueryForm",
"SparqlResultKind",
"apply_update",
"ask",
"construct_models",
"detect_query_form",
"graph_from_construct_result",
"init_bindings_from_model",
"init_ns_from_model",
"load_sparql",
"open_sparql_graph",
"prepare_model_query",
"run_sparql",
"select_models",
]