Source code for triplemodel.io.rdfs

"""RDFS helpers: subclass dispatch, transitive graph walks."""

from __future__ import annotations

from typing import cast

from pydantic import BaseModel
from pyoxigraph import NamedNode
from triplemodel.store import RdfGraph as Graph
from triplemodel.store.terms import RdfTerm as Node, term_str

from triplemodel.config import RDFS, RDF_TYPE, get_rdf_config
from triplemodel.protocols import (
    _mro_depth,
    iter_registered_type_uris,
    model_class_for_type_uri,
)

RDFS_SUBCLASS = NamedNode(f"{RDFS}subClassOf")


[docs] def subject_type_closure(graph: Graph, subject: Node) -> frozenset[str]: """Return ``rdf:type`` IRIs for ``subject`` plus ``rdfs:subClassOf`` ancestors.""" closure: set[str] = set() for direct in graph.objects(subject, NamedNode(RDF_TYPE)): closure.add(term_str(direct)) for ancestor in graph.transitive_objects(direct, RDFS_SUBCLASS): closure.add(term_str(ancestor)) return frozenset(closure)
[docs] def subclass_uris(graph: Graph, type_uri: str) -> frozenset[str]: """Return ``type_uri`` and all superclasses via ``rdfs:subClassOf``.""" term = NamedNode(type_uri) return frozenset( { type_uri, *(term_str(o) for o in graph.transitive_objects(term, RDFS_SUBCLASS)), } )
[docs] def transitive_objects( graph: Graph, subject: str | Node, predicate: str, ) -> list[str]: """Return object IRIs reachable from ``subject`` along ``predicate`` (transitive).""" subj = subject if isinstance(subject, Node) else NamedNode(subject) pred = NamedNode(predicate) return [term_str(o) for o in graph.transitive_objects(subj, pred)]
[docs] def transitive_subjects( graph: Graph, predicate: str, obj: str | Node, ) -> list[str]: """Return subject IRIs that reach ``obj`` along ``predicate`` (transitive).""" object_node = obj if isinstance(obj, Node) else NamedNode(obj) pred = NamedNode(predicate) return [term_str(s) for s in graph.transitive_subjects(pred, object_node)]
def _is_more_specific(graph: Graph, sub_uri: str, super_uri: str) -> bool: """True when ``sub_uri`` is a subclass of ``super_uri`` in ``graph``.""" return NamedNode(super_uri) in graph.transitive_objects( NamedNode(sub_uri), RDFS_SUBCLASS ) def _pick_most_specific( graph: Graph, candidates: list[type[BaseModel]], ) -> type[BaseModel]: if len(candidates) == 1: return candidates[0] typed: list[tuple[str, type[BaseModel]]] = [] for cls in candidates: cfg = get_rdf_config(cls) if cfg.type_uri: typed.append((cfg.type_uri, cls)) if not typed: return cast(type[BaseModel], max(candidates, key=_mro_depth)) best: list[type[BaseModel]] = [] for uri_a, cls_a in typed: dominated = False for uri_b, cls_b in typed: if uri_a == uri_b: continue if _is_more_specific(graph, uri_b, uri_a): dominated = True break if not dominated: best.append(cls_a) if len(best) == 1: return best[0] return cast(type[BaseModel], max(best or candidates, key=_mro_depth))
[docs] def resolve_model_class_with_rdfs( graph: Graph, subject: Node, *, use_subclass: bool = True, ) -> type[BaseModel]: """Pick the most specific registered class for ``subject``'s types.""" type_nodes = list(graph.objects(subject, NamedNode(RDF_TYPE))) if not type_nodes: raise ValueError( f"No registered TripleModel class for subject {subject!r} (rdf:types: [])." ) if not use_subclass: candidates: list[type[BaseModel]] = [] for t in type_nodes: cls = model_class_for_type_uri(term_str(t)) if cls is not None: candidates.append(cls) if not candidates: raise ValueError( f"No registered TripleModel class for subject {subject!r} " f"(rdf:types: {[term_str(t) for t in type_nodes]})." ) return _pick_most_specific(graph, candidates) closure = subject_type_closure(graph, subject) candidates = [] for type_uri in iter_registered_type_uris(): if type_uri in closure: cls = model_class_for_type_uri(type_uri) if cls is not None: candidates.append(cls) if not candidates: raise ValueError( f"No registered TripleModel class for subject {subject!r} " f"(type closure: {sorted(closure)})." ) return _pick_most_specific(graph, candidates)
__all__ = [ "resolve_model_class_with_rdfs", "subject_type_closure", "subclass_uris", "transitive_objects", "transitive_subjects", ]