Source code for triplemodel.codegen.emit

"""Emit TripleModel stub classes from OWL/RDFS graphs."""

from __future__ import annotations

import re
import warnings
from urllib.parse import urlparse

from pyoxigraph import NamedNode
from triplemodel.config.constants import OWL, RDF, RDFS
from triplemodel.store import RdfGraph as Graph

_OWL_DATATYPE = f"{OWL}DatatypeProperty"
_OWL_OBJECT = f"{OWL}ObjectProperty"
_RDF_TYPE = f"{RDF}type"
_RDFS_CLASS = f"{RDFS}Class"
_RDFS_DOMAIN = f"{RDFS}domain"
_RDFS_SUBCLASS = f"{RDFS}subClassOf"


def _local_name(uri: str) -> str:
    parsed = urlparse(uri)
    fragment = parsed.fragment
    if fragment:
        return re.sub(r"[^\w]", "_", fragment)
    path = parsed.path.rstrip("/")
    if path:
        return re.sub(r"[^\w]", "_", path.split("/")[-1])
    return "Resource"


def _class_name(uri: str) -> str:
    name = _local_name(uri)
    if not name or not name[0].isalpha():
        name = f"C_{name}"
    return name[:1].upper() + name[1:]


def _field_name(uri: str) -> str:
    name = _local_name(uri)
    if not name or not name[0].isalpha():
        name = f"f_{name}"
    return name


def _namespace_for(uri: str) -> str:
    parsed = urlparse(uri)
    if parsed.fragment:
        base = uri.split("#", 1)[0]
        return base if base.endswith(("/", "#")) else base + "#"
    path = parsed.path
    if "/" in path:
        return uri.rsplit("/", 1)[0] + "/"
    return uri + "/"


[docs] def generate_models_from_graph(graph: Graph) -> str: """Return Python source for stub ``TripleModel`` subclasses.""" classes: dict[str, list[tuple[str, str, str]]] = {} for cls in graph.subjects(NamedNode(_RDF_TYPE), NamedNode(_RDFS_CLASS)): if isinstance(cls, NamedNode): classes.setdefault(str(cls.value), []) for cls in graph.subjects(NamedNode(_RDF_TYPE), NamedNode(f"{OWL}Class")): if isinstance(cls, NamedNode): classes.setdefault(str(cls.value), []) for pred_type in (_OWL_DATATYPE, _OWL_OBJECT): for prop in graph.subjects(NamedNode(_RDF_TYPE), NamedNode(pred_type)): if not isinstance(prop, NamedNode): continue prop_s = str(prop.value) fname = _field_name(prop_s) for domain in graph.objects(prop, NamedNode(_RDFS_DOMAIN)): if not isinstance(domain, NamedNode): continue domain_s = str(domain.value) py_type = "str" classes.setdefault(domain_s, []).append((prop_s, fname, py_type)) parent_map: dict[str, str | None] = {uri: None for uri in classes} for subj, parent in graph.subject_objects(NamedNode(_RDFS_SUBCLASS)): if isinstance(subj, NamedNode) and isinstance(parent, NamedNode): parent_map[str(subj.value)] = str(parent.value) lines = [ '"""Generated by triplemodel-codegen (experimental)."""', "", "from __future__ import annotations", "", "from triplemodel import TripleModel, rdf_field", "", ] emitted: set[str] = set() for class_uri, fields in sorted(classes.items()): cname = _class_name(class_uri) if cname in emitted: warnings.warn(f"Skipping duplicate class name {cname!r}.", stacklevel=2) continue emitted.add(cname) ns = _namespace_for(class_uri) lines.extend( [ "", f"class {cname}(TripleModel):", " class Rdf:", f' namespace = "{ns}"', f' type_uri = "{class_uri}"', ' id_field = "slug"', "", ' slug: str = "generated"', ] ) seen_fields: set[str] = set() for pred_uri, fname, py_type in fields: if fname in seen_fields: warnings.warn( f"Skipping duplicate field name {fname!r}.", stacklevel=2, ) continue seen_fields.add(fname) lines.append(f' {fname}: {py_type} = rdf_field("{pred_uri}")') return "\n".join(lines) + "\n"