Source code for triplemodel.io.sync.nested_cleanup

"""Remove stale nested IRI child subgraphs during sync."""

from __future__ import annotations

from typing import cast

from pydantic import BaseModel
from pyoxigraph import BlankNode as BNode, NamedNode
from triplemodel.store import RdfGraph as Graph

from triplemodel.config import RdfConfig, get_rdf_config
from triplemodel.fields.resolver import default_resolver
from triplemodel.io.sync.inverse_ops import clear_inverse_links_to_subject
from triplemodel.io.sync.predicate_ops import remove_owned_triples
from triplemodel.metadata.cardinality import field_cardinality, nested_model_type
from triplemodel.protocols import PredicateResolver as PredicateResolverProtocol
from triplemodel.terms.bnode import (
    nested_bnode_key,
    remove_bnode_subgraph,
    stable_bnode,
)
from triplemodel.terms.iri import subject_ref


[docs] def clear_stale_nested_iri_children( model: BaseModel, graph: Graph, parent_uri: str, *, config: RdfConfig, resolver: PredicateResolverProtocol | None = None, ) -> None: """Remove owned triples for nested IRI children no longer linked from the parent.""" if config.embed != "iri": return r = resolver or default_resolver parent_ref = subject_ref(parent_uri) cls = type(model) prefixes = config.prefixes_dict for name, field_info in cls.model_fields.items(): if config.id_field and name == config.id_field: continue if field_cardinality(field_info) != "nested": continue pred = r.resolve_field_predicate(field_info, prefixes) if pred is None: continue nested_cls = nested_model_type(field_info) if nested_cls is None: continue nested_cfg = get_rdf_config(nested_cls) pred_ref = NamedNode(pred) in_graph = { str(obj) for obj in graph.objects(parent_ref, pred_ref) if isinstance(obj, NamedNode) } value = getattr(model, name) keep = {nested_cfg.subject_uri(value)} if value is not None else set() for stale_uri in in_graph - keep: clear_inverse_links_to_subject( graph, stale_uri, cast(type[BaseModel], nested_cls), config=nested_cfg, resolver=r, ) remove_owned_triples( graph, stale_uri, cast(type[BaseModel], nested_cls), config=nested_cfg, resolver=r, )
[docs] def clear_nested_iri_children( model: BaseModel, graph: Graph, *, config: RdfConfig, resolver: PredicateResolverProtocol | None = None, ) -> None: """Remove owned triples for nested IRI-embedded children before parent replace.""" if config.embed != "iri": return r = resolver or default_resolver cls = type(model) for name, field_info in cls.model_fields.items(): if config.id_field and name == config.id_field: continue if field_cardinality(field_info) != "nested": continue nested_cls = nested_model_type(field_info) if nested_cls is None: continue value = getattr(model, name) if value is None: continue nested_cfg = get_rdf_config(nested_cls) child_uri = nested_cfg.subject_uri(value) remove_owned_triples( graph, child_uri, cast(type[BaseModel], nested_cls), config=nested_cfg, resolver=r, )
def clear_stale_nested_bnode_children( model: BaseModel, graph: Graph, parent_uri: str, *, config: RdfConfig, resolver: PredicateResolverProtocol | None = None, ) -> None: """Remove blank-node subgraphs for nested children no longer linked.""" if config.embed != "bnode": return r = resolver or default_resolver parent_ref = subject_ref(parent_uri) cls = type(model) prefixes = config.prefixes_dict for name, field_info in cls.model_fields.items(): if config.id_field and name == config.id_field: continue if field_cardinality(field_info) != "nested": continue pred = r.resolve_field_predicate(field_info, prefixes) if pred is None: continue nested_cls = nested_model_type(field_info) if nested_cls is None: continue nested_cfg = get_rdf_config(nested_cls) pred_ref = NamedNode(pred) in_graph = { obj for obj in graph.objects(parent_ref, pred_ref) if isinstance(obj, BNode) } value = getattr(model, name) if value is None: keep: set[BNode] = set() elif config.blank_node_policy == "stable": keep = {stable_bnode(nested_bnode_key(parent_uri, pred, value))} else: keep = set() for stale in in_graph - keep: clear_inverse_links_to_subject( graph, stale, cast(type[BaseModel], nested_cls), config=nested_cfg, resolver=r, ) remove_bnode_subgraph(graph, stale) def clear_nested_bnode_children( model: BaseModel, graph: Graph, parent_uri: str, *, config: RdfConfig, resolver: PredicateResolverProtocol | None = None, ) -> None: """Remove current nested blank-node subgraphs before parent replace.""" if config.embed != "bnode": return r = resolver or default_resolver parent_ref = subject_ref(parent_uri) cls = type(model) prefixes = config.prefixes_dict for name, field_info in cls.model_fields.items(): if config.id_field and name == config.id_field: continue if field_cardinality(field_info) != "nested": continue pred = r.resolve_field_predicate(field_info, prefixes) if pred is None: continue pred_ref = NamedNode(pred) for bnode in list(graph.objects(parent_ref, pred_ref)): if isinstance(bnode, BNode): remove_bnode_subgraph(graph, bnode)