"""Import dispatch by rdf:type across registered model classes."""
from __future__ import annotations
from collections.abc import Sequence
from typing import TypeVar
from pydantic import BaseModel
from pyoxigraph import NamedNode
from triplemodel.store import RdfDataset as Dataset, RdfGraph as Graph
from triplemodel.store.terms import RdfTerm as Node, term_str
from triplemodel.config import get_graph_context, get_rdf_config
from triplemodel.io.import_ import OnDuplicate, graph_to_model
from triplemodel.protocols import (
PredicateResolver as PredicateResolverProtocol,
iter_registered_model_classes,
iter_registered_type_uris,
model_class_for_type_uri,
resolve_model_class,
)
from triplemodel.terms.iri import normalize_iri
from triplemodel.terms.registry import LiteralRegistry, default_registry
T = TypeVar("T", bound=BaseModel)
[docs]
def graph_to_model_dispatch(
graph: Graph,
uri: str | Node,
*,
validate_type: bool = True,
on_duplicate: OnDuplicate = "warn",
resolver: PredicateResolverProtocol | None = None,
registry: LiteralRegistry = default_registry,
de_skolemize: bool | None = None,
) -> BaseModel:
"""Hydrate using the most specific registered class for the subject's types."""
subject: Node = uri if isinstance(uri, Node) else NamedNode(normalize_iri(uri))
model_cls = resolve_model_class(graph, subject)
return graph_to_model(
graph,
model_cls,
subject,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=de_skolemize,
)
[docs]
def all_from_graph_dispatch(
graph: Graph,
*,
validate_type: bool = True,
on_duplicate: OnDuplicate = "warn",
resolver: PredicateResolverProtocol | None = None,
registry: LiteralRegistry = default_registry,
de_skolemize: bool | None = None,
) -> list[BaseModel]:
"""Load all subjects that resolve to a registered model class."""
from triplemodel.config import get_rdf_config
from triplemodel.io.skolem import apply_de_skolemize
if de_skolemize is None:
do_de = any(
get_rdf_config(cls).skolemize_import
for cls in iter_registered_model_classes()
)
else:
do_de = de_skolemize
graph = apply_de_skolemize(graph, de_skolemize=do_de)
seen: set[str] = set()
instances: list[BaseModel] = []
for subject in sorted(graph.subjects(None, None), key=str):
if not isinstance(subject, NamedNode):
continue
key = term_str(subject)
if key in seen:
continue
try:
model_cls = resolve_model_class(graph, subject)
except ValueError:
continue
seen.add(key)
instances.append(
graph_to_model(
graph,
model_cls,
subject,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=False,
)
)
instances.sort(key=lambda m: m.subject_uri())
return instances
def _contexts_for_subject(dataset: Dataset, subject: Node) -> list[Graph]:
return [
context
for context in dataset.graphs
if any(context.triples((subject, None, None)))
]
def _union_subject_view(contexts: list[Graph], subject: Node) -> Graph:
"""Merge ``(subject, ?, ?)`` triples from each context for type resolution."""
merged = Graph()
for context in contexts:
for triple in context.triples((subject, None, None)):
merged.add(triple)
return merged
def _resolve_dataset_context(
dataset: Dataset,
subject: Node,
matching: list[Graph],
) -> tuple[Graph, type[BaseModel]]:
"""Pick graph context and model class when ``subject`` appears in ``matching`` contexts."""
type_view = _union_subject_view(matching, subject)
model_cls = resolve_model_class(type_view, subject)
if len(matching) == 1:
return matching[0], model_cls
cfg = get_rdf_config(model_cls)
preferred = get_graph_context(dataset, cfg.graph_iri)
by_id = {ctx.identifier: ctx for ctx in matching}
preferred_id = preferred.identifier
if preferred_id in by_id:
return by_id[preferred_id], model_cls
graph_ids = sorted(str(gid) for gid in by_id)
raise ValueError(
f"Subject {subject!r} appears in multiple dataset graphs {graph_ids!r}; "
f"cannot resolve context for {model_cls.__name__} "
f"(expected graph {preferred_id!r})."
)
[docs]
def graph_to_model_dispatch_from_dataset(
dataset: Dataset,
uri: str | Node,
*,
validate_type: bool = True,
on_duplicate: OnDuplicate = "warn",
resolver: PredicateResolverProtocol | None = None,
registry: LiteralRegistry = default_registry,
de_skolemize: bool | None = None,
) -> BaseModel:
"""Hydrate using the most specific registered class for the subject's named graph."""
subject: Node = uri if isinstance(uri, Node) else NamedNode(normalize_iri(uri))
matching = _contexts_for_subject(dataset, subject)
if not matching:
raise ValueError(
f"No registered TripleModel subject {subject!r} found in dataset contexts."
)
context, model_cls = _resolve_dataset_context(dataset, subject, matching)
cfg = get_rdf_config(model_cls)
return graph_to_model(
context,
model_cls,
subject,
config=cfg,
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=de_skolemize,
)
def _type_uris_for_dispatch(
model_classes: Sequence[type[BaseModel]] | None,
) -> list[str]:
if model_classes is None:
return sorted(iter_registered_type_uris())
uris: set[str] = set()
for model_cls in model_classes:
type_uri = get_rdf_config(model_cls).type_uri
if type_uri:
uris.add(type_uri)
return sorted(uris)
[docs]
def all_from_dataset_dispatch(
dataset: Dataset,
*,
model_classes: Sequence[type[BaseModel]] | None = None,
validate_type: bool = True,
on_duplicate: OnDuplicate = "warn",
resolver: PredicateResolverProtocol | None = None,
registry: LiteralRegistry = default_registry,
de_skolemize: bool | None = None,
) -> list[BaseModel]:
"""Load all subjects whose ``rdf:type`` maps to a registered class (per-class graph).
When ``model_classes`` is set, only those classes are loaded (recommended when the
process has other registered models from unrelated modules).
"""
from triplemodel.model import TripleModel
allowed: set[type[BaseModel]] | None = None
if model_classes is None:
type_uris = _type_uris_for_dispatch(None)
else:
for model_cls in model_classes:
if not issubclass(model_cls, TripleModel):
raise TypeError(f"{model_cls!r} is not a TripleModel subclass.")
allowed = set(model_classes)
type_uris = _type_uris_for_dispatch(model_classes)
from triplemodel.io.skolem import apply_de_skolemize
if de_skolemize is None:
classes_for_skolem = (
list(allowed)
if allowed is not None
else list(iter_registered_model_classes())
)
do_de = any(get_rdf_config(cls).skolemize_import for cls in classes_for_skolem)
else:
do_de = de_skolemize
seen: set[str] = set()
instances: list[BaseModel] = []
contexts_de_skolemized: set[object] = set()
for type_uri in type_uris:
model_cls = model_class_for_type_uri(type_uri)
if model_cls is None:
continue
cfg = get_rdf_config(model_cls)
context = get_graph_context(dataset, cfg.graph_iri)
ctx_id = context.identifier
if ctx_id not in contexts_de_skolemized:
apply_de_skolemize(context, de_skolemize=do_de)
contexts_de_skolemized.add(ctx_id)
for subject in sorted(context.subjects(None, None), key=str):
if not isinstance(subject, NamedNode):
continue
key = term_str(subject)
if key in seen:
continue
matching = _contexts_for_subject(dataset, subject)
if not matching:
continue
try:
type_view = (
_union_subject_view(matching, subject)
if len(matching) > 1
else context
)
resolved_cls = resolve_model_class(type_view, subject)
except ValueError:
continue
if allowed is not None and resolved_cls not in allowed:
continue
seen.add(key)
load_context, load_cls = _resolve_dataset_context(
dataset, subject, matching
)
instances.append(
graph_to_model(
load_context,
load_cls,
subject,
config=get_rdf_config(load_cls),
validate_type=validate_type,
on_duplicate=on_duplicate,
resolver=resolver,
registry=registry,
de_skolemize=False,
)
)
instances.sort(key=lambda m: m.subject_uri())
return instances