"""Field helpers and predicate metadata for RDF mapping."""
from __future__ import annotations
from dataclasses import dataclass
from types import EllipsisType
from typing import Annotated, Any, TypeVar, cast, get_args, get_origin, overload
from typing_extensions import Unpack
from pydantic import BaseModel, Field
from pydantic.fields import FieldInfo
from triplemodel._typing import AnnotationExpr, JsonSchemaExtra, RdfFieldKwargs
from triplemodel.metadata.cardinality import field_annotation
from triplemodel.terms.lang import Lang
_T = TypeVar("_T")
[docs]
@dataclass(frozen=True)
class Predicate:
"""Marks a model field with its RDF predicate IRI."""
uri: str
[docs]
@dataclass(frozen=True)
class InverseOf:
"""Inverse predicate IRI used on import when the forward predicate is absent."""
uri: str
[docs]
@dataclass(frozen=True)
class IriId:
"""Mark ``id_field`` as a full IRI string (not appended to ``namespace``)."""
[docs]
@dataclass(frozen=True)
class Transitive:
"""On import, follow ``predicate`` transitively for multi-valued fields."""
@overload
def rdf_field(
predicate: str,
*,
default: EllipsisType = ...,
**field_kwargs: Unpack[RdfFieldKwargs],
) -> Any: ...
@overload
def rdf_field(
predicate: str,
*,
default: _T,
**field_kwargs: Unpack[RdfFieldKwargs],
) -> _T: ...
[docs]
def rdf_field(
predicate: str,
*,
inverse: str | None = None,
back_populates: Any | None = None,
literal_datatype: str | None = None,
transitive: bool = False,
default: _T | EllipsisType = ...,
**field_kwargs: Unpack[RdfFieldKwargs],
) -> _T:
"""Create a Pydantic field bound to an RDF predicate."""
extra = field_kwargs.pop("json_schema_extra", None) or {}
if not isinstance(extra, dict):
extra = {}
merged_extra: JsonSchemaExtra = {
**cast(JsonSchemaExtra, extra),
"rdf_predicate": predicate,
}
if inverse is not None:
merged_extra["rdf_inverse"] = inverse
if back_populates is not None:
from triplemodel.fields.back_populates import (
normalize_back_populates,
store_back_populates_extra,
)
store_back_populates_extra(
cast(dict[str, object], merged_extra),
normalize_back_populates(back_populates),
)
if literal_datatype is not None:
merged_extra["rdf_literal_datatype"] = literal_datatype
if transitive:
merged_extra["rdf_transitive"] = True
return cast(
_T,
Field(
default=default, json_schema_extra=merged_extra, **cast(Any, field_kwargs)
),
)
[docs]
def predicate_for_field(field_info: FieldInfo) -> str | None:
"""Resolve the RDF predicate URI for a Pydantic field, if any."""
extra = field_info.json_schema_extra
if isinstance(extra, dict):
predicate = cast(JsonSchemaExtra, extra).get("rdf_predicate")
if predicate is not None:
return str(predicate)
for meta in field_info.metadata:
if isinstance(meta, Predicate):
return meta.uri
return None
[docs]
def inverse_for_field(field_info: FieldInfo) -> str | None:
"""Resolve inverse predicate URI for a field, if configured."""
extra = field_info.json_schema_extra
if isinstance(extra, dict):
inv = cast(JsonSchemaExtra, extra).get("rdf_inverse")
if inv is not None:
return str(inv)
for meta in field_info.metadata:
if isinstance(meta, InverseOf):
return meta.uri
ann = field_annotation(field_info)
if get_origin(ann) is Annotated:
for meta in get_args(ann)[1:]:
if isinstance(meta, InverseOf):
return meta.uri
return None
[docs]
def inverse_from_annotation(annotation: AnnotationExpr) -> str | None:
"""Read :class:`InverseOf` from ``Annotated[..., InverseOf(...)]``."""
if get_origin(annotation) is not Annotated:
return None
for meta in get_args(annotation)[1:]:
if isinstance(meta, InverseOf):
return meta.uri
return None
[docs]
def predicate_from_annotation(annotation: AnnotationExpr) -> str | None:
"""Read :class:`Predicate` from ``Annotated[..., Predicate(...)]``."""
if get_origin(annotation) is not Annotated:
return None
for meta in get_args(annotation)[1:]:
if isinstance(meta, Predicate):
return meta.uri
return None
[docs]
def annotation_has_iri_id(annotation: AnnotationExpr) -> bool:
"""True when ``annotation`` includes :class:`IriId` metadata."""
if get_origin(annotation) is not Annotated:
return False
return any(isinstance(meta, IriId) for meta in get_args(annotation)[1:])
[docs]
def lang_from_annotation(annotation: AnnotationExpr) -> str | None:
"""Read :class:`Lang` from ``Annotated[..., Lang(...)]``."""
if get_origin(annotation) is not Annotated:
return None
for meta in get_args(annotation)[1:]:
if isinstance(meta, Lang):
return meta.code
return None
[docs]
def literal_datatype_for_field(field_info: FieldInfo) -> str | None:
"""XSD datatype IRI or CURIE for literal export (e.g. ``xsd:gYear``)."""
extra = field_info.json_schema_extra
if isinstance(extra, dict):
dt = cast(JsonSchemaExtra, extra).get("rdf_literal_datatype")
if dt is not None:
return str(dt)
return None
[docs]
def lang_for_field(field_info: FieldInfo) -> str | None:
"""Language tag for a field, if configured."""
for meta in field_info.metadata:
if isinstance(meta, Lang):
return meta.code
return lang_from_annotation(field_annotation(field_info))
[docs]
def ref_field(
predicate: str,
*,
model: type[BaseModel],
inverse: str | None = None,
default: _T | EllipsisType = ...,
**field_kwargs: Unpack[RdfFieldKwargs],
) -> _T:
"""Foreign-key field: import hydrates ``model`` from the object URI in the graph."""
if inverse is not None:
raise ValueError("inverse= is not supported on ref_field")
extra = field_kwargs.pop("json_schema_extra", None) or {}
if not isinstance(extra, dict):
extra = {}
merged_extra: JsonSchemaExtra = {
**cast(JsonSchemaExtra, extra),
"rdf_predicate": predicate,
"rdf_ref_link": True,
}
return cast(
_T,
Field(
default=default,
json_schema_extra=merged_extra,
**cast(Any, field_kwargs),
),
)
[docs]
def transitive_for_field(field_info: FieldInfo) -> bool:
"""True when the field expands objects transitively on import."""
extra = field_info.json_schema_extra
if isinstance(extra, dict):
return bool(cast(JsonSchemaExtra, extra).get("rdf_transitive"))
return any(isinstance(meta, Transitive) for meta in field_info.metadata)
[docs]
def ref_link_for_field(field_info: FieldInfo) -> bool:
"""True when the field is a URI foreign-key link (``ref_field``), not full embed."""
extra = field_info.json_schema_extra
if isinstance(extra, dict):
return bool(cast(JsonSchemaExtra, extra).get("rdf_ref_link"))
return False
[docs]
def id_field_is_iri_id(model_cls: type[BaseModel], id_field: str) -> bool:
"""True when the configured ``id_field`` is marked with :class:`IriId`."""
model_fields = getattr(model_cls, "model_fields", {})
field_info = model_fields.get(id_field)
if field_info is None:
return False
return annotation_has_iri_id(field_annotation(field_info)) or any(
isinstance(meta, IriId) for meta in field_info.metadata
)