mirror of
https://github.com/fastapi/sqlmodel.git
synced 2026-01-01 07:02:06 -06:00
✨ Add support for Pydantic v2 (while keeping support for v1 if v2 is not available), including initial work by AntonDeMeester (#722)
Co-authored-by: Mohamed Farahat <farahats9@yahoo.com> Co-authored-by: Stefan Borer <stefan.borer@gmail.com> Co-authored-by: Peter Landry <peter.landry@gmail.com> Co-authored-by: Anton De Meester <antondemeester+github@gmail.com>
This commit is contained in:
committed by
GitHub
parent
5b733b348d
commit
fa2f178b8a
554
sqlmodel/_compat.py
Normal file
554
sqlmodel/_compat.py
Normal file
@@ -0,0 +1,554 @@
|
||||
import types
|
||||
from contextlib import contextmanager
|
||||
from contextvars import ContextVar
|
||||
from dataclasses import dataclass
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
Any,
|
||||
Dict,
|
||||
ForwardRef,
|
||||
Generator,
|
||||
Mapping,
|
||||
Optional,
|
||||
Set,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
from pydantic import VERSION as PYDANTIC_VERSION
|
||||
from pydantic.fields import FieldInfo
|
||||
from typing_extensions import get_args, get_origin
|
||||
|
||||
IS_PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.")
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .main import RelationshipInfo, SQLModel
|
||||
|
||||
UnionType = getattr(types, "UnionType", Union)
|
||||
NoneType = type(None)
|
||||
T = TypeVar("T")
|
||||
InstanceOrType = Union[T, Type[T]]
|
||||
_TSQLModel = TypeVar("_TSQLModel", bound="SQLModel")
|
||||
|
||||
|
||||
class FakeMetadata:
|
||||
max_length: Optional[int] = None
|
||||
max_digits: Optional[int] = None
|
||||
decimal_places: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObjectWithUpdateWrapper:
|
||||
obj: Any
|
||||
update: Dict[str, Any]
|
||||
|
||||
def __getattribute__(self, __name: str) -> Any:
|
||||
if __name in self.update:
|
||||
return self.update[__name]
|
||||
return getattr(self.obj, __name)
|
||||
|
||||
|
||||
def _is_union_type(t: Any) -> bool:
|
||||
return t is UnionType or t is Union
|
||||
|
||||
|
||||
finish_init: ContextVar[bool] = ContextVar("finish_init", default=True)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def partial_init() -> Generator[None, None, None]:
|
||||
token = finish_init.set(False)
|
||||
yield
|
||||
finish_init.reset(token)
|
||||
|
||||
|
||||
if IS_PYDANTIC_V2:
|
||||
from pydantic import ConfigDict as BaseConfig
|
||||
from pydantic._internal._fields import PydanticMetadata
|
||||
from pydantic._internal._model_construction import ModelMetaclass
|
||||
from pydantic._internal._repr import Representation as Representation
|
||||
from pydantic_core import PydanticUndefined as Undefined
|
||||
from pydantic_core import PydanticUndefinedType as UndefinedType
|
||||
|
||||
# Dummy for types, to make it importable
|
||||
class ModelField:
|
||||
pass
|
||||
|
||||
class SQLModelConfig(BaseConfig, total=False):
|
||||
table: Optional[bool]
|
||||
registry: Optional[Any]
|
||||
|
||||
def get_config_value(
|
||||
*, model: InstanceOrType["SQLModel"], parameter: str, default: Any = None
|
||||
) -> Any:
|
||||
return model.model_config.get(parameter, default)
|
||||
|
||||
def set_config_value(
|
||||
*,
|
||||
model: InstanceOrType["SQLModel"],
|
||||
parameter: str,
|
||||
value: Any,
|
||||
) -> None:
|
||||
model.model_config[parameter] = value # type: ignore[literal-required]
|
||||
|
||||
def get_model_fields(model: InstanceOrType["SQLModel"]) -> Dict[str, "FieldInfo"]:
|
||||
return model.model_fields
|
||||
|
||||
def set_fields_set(
|
||||
new_object: InstanceOrType["SQLModel"], fields: Set["FieldInfo"]
|
||||
) -> None:
|
||||
object.__setattr__(new_object, "__pydantic_fields_set__", fields)
|
||||
|
||||
def get_annotations(class_dict: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return class_dict.get("__annotations__", {})
|
||||
|
||||
def is_table_model_class(cls: Type[Any]) -> bool:
|
||||
config = getattr(cls, "model_config", {})
|
||||
if config:
|
||||
return config.get("table", False) or False
|
||||
return False
|
||||
|
||||
def get_relationship_to(
|
||||
name: str,
|
||||
rel_info: "RelationshipInfo",
|
||||
annotation: Any,
|
||||
) -> Any:
|
||||
origin = get_origin(annotation)
|
||||
use_annotation = annotation
|
||||
# Direct relationships (e.g. 'Team' or Team) have None as an origin
|
||||
if origin is None:
|
||||
if isinstance(use_annotation, ForwardRef):
|
||||
use_annotation = use_annotation.__forward_arg__
|
||||
else:
|
||||
return use_annotation
|
||||
# If Union (e.g. Optional), get the real field
|
||||
elif _is_union_type(origin):
|
||||
use_annotation = get_args(annotation)
|
||||
if len(use_annotation) > 2:
|
||||
raise ValueError(
|
||||
"Cannot have a (non-optional) union as a SQLAlchemy field"
|
||||
)
|
||||
arg1, arg2 = use_annotation
|
||||
if arg1 is NoneType and arg2 is not NoneType:
|
||||
use_annotation = arg2
|
||||
elif arg2 is NoneType and arg1 is not NoneType:
|
||||
use_annotation = arg1
|
||||
else:
|
||||
raise ValueError(
|
||||
"Cannot have a Union of None and None as a SQLAlchemy field"
|
||||
)
|
||||
|
||||
# If a list, then also get the real field
|
||||
elif origin is list:
|
||||
use_annotation = get_args(annotation)[0]
|
||||
|
||||
return get_relationship_to(
|
||||
name=name, rel_info=rel_info, annotation=use_annotation
|
||||
)
|
||||
|
||||
def is_field_noneable(field: "FieldInfo") -> bool:
|
||||
if getattr(field, "nullable", Undefined) is not Undefined:
|
||||
return field.nullable # type: ignore
|
||||
origin = get_origin(field.annotation)
|
||||
if origin is not None and _is_union_type(origin):
|
||||
args = get_args(field.annotation)
|
||||
if any(arg is NoneType for arg in args):
|
||||
return True
|
||||
if not field.is_required():
|
||||
if field.default is Undefined:
|
||||
return False
|
||||
if field.annotation is None or field.annotation is NoneType: # type: ignore[comparison-overlap]
|
||||
return True
|
||||
return False
|
||||
return False
|
||||
|
||||
def get_type_from_field(field: Any) -> Any:
|
||||
type_: Any = field.annotation
|
||||
# Resolve Optional fields
|
||||
if type_ is None:
|
||||
raise ValueError("Missing field type")
|
||||
origin = get_origin(type_)
|
||||
if origin is None:
|
||||
return type_
|
||||
if _is_union_type(origin):
|
||||
bases = get_args(type_)
|
||||
if len(bases) > 2:
|
||||
raise ValueError(
|
||||
"Cannot have a (non-optional) union as a SQLAlchemy field"
|
||||
)
|
||||
# Non optional unions are not allowed
|
||||
if bases[0] is not NoneType and bases[1] is not NoneType:
|
||||
raise ValueError(
|
||||
"Cannot have a (non-optional) union as a SQLlchemy field"
|
||||
)
|
||||
# Optional unions are allowed
|
||||
return bases[0] if bases[0] is not NoneType else bases[1]
|
||||
return origin
|
||||
|
||||
def get_field_metadata(field: Any) -> Any:
|
||||
for meta in field.metadata:
|
||||
if isinstance(meta, PydanticMetadata):
|
||||
return meta
|
||||
return FakeMetadata()
|
||||
|
||||
def post_init_field_info(field_info: FieldInfo) -> None:
|
||||
return None
|
||||
|
||||
# Dummy to make it importable
|
||||
def _calculate_keys(
|
||||
self: "SQLModel",
|
||||
include: Optional[Mapping[Union[int, str], Any]],
|
||||
exclude: Optional[Mapping[Union[int, str], Any]],
|
||||
exclude_unset: bool,
|
||||
update: Optional[Dict[str, Any]] = None,
|
||||
) -> Optional[AbstractSet[str]]: # pragma: no cover
|
||||
return None
|
||||
|
||||
def sqlmodel_table_construct(
|
||||
*,
|
||||
self_instance: _TSQLModel,
|
||||
values: Dict[str, Any],
|
||||
_fields_set: Union[Set[str], None] = None,
|
||||
) -> _TSQLModel:
|
||||
# Copy from Pydantic's BaseModel.construct()
|
||||
# Ref: https://github.com/pydantic/pydantic/blob/v2.5.2/pydantic/main.py#L198
|
||||
# Modified to not include everything, only the model fields, and to
|
||||
# set relationships
|
||||
# SQLModel override to get class SQLAlchemy __dict__ attributes and
|
||||
# set them back in after creating the object
|
||||
# new_obj = cls.__new__(cls)
|
||||
cls = type(self_instance)
|
||||
old_dict = self_instance.__dict__.copy()
|
||||
# End SQLModel override
|
||||
|
||||
fields_values: Dict[str, Any] = {}
|
||||
defaults: Dict[
|
||||
str, Any
|
||||
] = {} # keeping this separate from `fields_values` helps us compute `_fields_set`
|
||||
for name, field in cls.model_fields.items():
|
||||
if field.alias and field.alias in values:
|
||||
fields_values[name] = values.pop(field.alias)
|
||||
elif name in values:
|
||||
fields_values[name] = values.pop(name)
|
||||
elif not field.is_required():
|
||||
defaults[name] = field.get_default(call_default_factory=True)
|
||||
if _fields_set is None:
|
||||
_fields_set = set(fields_values.keys())
|
||||
fields_values.update(defaults)
|
||||
|
||||
_extra: Union[Dict[str, Any], None] = None
|
||||
if cls.model_config.get("extra") == "allow":
|
||||
_extra = {}
|
||||
for k, v in values.items():
|
||||
_extra[k] = v
|
||||
# SQLModel override, do not include everything, only the model fields
|
||||
# else:
|
||||
# fields_values.update(values)
|
||||
# End SQLModel override
|
||||
# SQLModel override
|
||||
# Do not set __dict__, instead use setattr to trigger SQLAlchemy
|
||||
# object.__setattr__(new_obj, "__dict__", fields_values)
|
||||
# instrumentation
|
||||
for key, value in {**old_dict, **fields_values}.items():
|
||||
setattr(self_instance, key, value)
|
||||
# End SQLModel override
|
||||
object.__setattr__(self_instance, "__pydantic_fields_set__", _fields_set)
|
||||
if not cls.__pydantic_root_model__:
|
||||
object.__setattr__(self_instance, "__pydantic_extra__", _extra)
|
||||
|
||||
if cls.__pydantic_post_init__:
|
||||
self_instance.model_post_init(None)
|
||||
elif not cls.__pydantic_root_model__:
|
||||
# Note: if there are any private attributes, cls.__pydantic_post_init__ would exist
|
||||
# Since it doesn't, that means that `__pydantic_private__` should be set to None
|
||||
object.__setattr__(self_instance, "__pydantic_private__", None)
|
||||
# SQLModel override, set relationships
|
||||
# Get and set any relationship objects
|
||||
for key in self_instance.__sqlmodel_relationships__:
|
||||
value = values.get(key, Undefined)
|
||||
if value is not Undefined:
|
||||
setattr(self_instance, key, value)
|
||||
# End SQLModel override
|
||||
return self_instance
|
||||
|
||||
def sqlmodel_validate(
|
||||
cls: Type[_TSQLModel],
|
||||
obj: Any,
|
||||
*,
|
||||
strict: Union[bool, None] = None,
|
||||
from_attributes: Union[bool, None] = None,
|
||||
context: Union[Dict[str, Any], None] = None,
|
||||
update: Union[Dict[str, Any], None] = None,
|
||||
) -> _TSQLModel:
|
||||
if not is_table_model_class(cls):
|
||||
new_obj: _TSQLModel = cls.__new__(cls)
|
||||
else:
|
||||
# If table, create the new instance normally to make SQLAlchemy create
|
||||
# the _sa_instance_state attribute
|
||||
# The wrapper of this function should use with _partial_init()
|
||||
with partial_init():
|
||||
new_obj = cls()
|
||||
# SQLModel Override to get class SQLAlchemy __dict__ attributes and
|
||||
# set them back in after creating the object
|
||||
old_dict = new_obj.__dict__.copy()
|
||||
use_obj = obj
|
||||
if isinstance(obj, dict) and update:
|
||||
use_obj = {**obj, **update}
|
||||
elif update:
|
||||
use_obj = ObjectWithUpdateWrapper(obj=obj, update=update)
|
||||
cls.__pydantic_validator__.validate_python(
|
||||
use_obj,
|
||||
strict=strict,
|
||||
from_attributes=from_attributes,
|
||||
context=context,
|
||||
self_instance=new_obj,
|
||||
)
|
||||
# Capture fields set to restore it later
|
||||
fields_set = new_obj.__pydantic_fields_set__.copy()
|
||||
if not is_table_model_class(cls):
|
||||
# If not table, normal Pydantic code, set __dict__
|
||||
new_obj.__dict__ = {**old_dict, **new_obj.__dict__}
|
||||
else:
|
||||
# Do not set __dict__, instead use setattr to trigger SQLAlchemy
|
||||
# instrumentation
|
||||
for key, value in {**old_dict, **new_obj.__dict__}.items():
|
||||
setattr(new_obj, key, value)
|
||||
# Restore fields set
|
||||
object.__setattr__(new_obj, "__pydantic_fields_set__", fields_set)
|
||||
# Get and set any relationship objects
|
||||
if is_table_model_class(cls):
|
||||
for key in new_obj.__sqlmodel_relationships__:
|
||||
value = getattr(use_obj, key, Undefined)
|
||||
if value is not Undefined:
|
||||
setattr(new_obj, key, value)
|
||||
return new_obj
|
||||
|
||||
def sqlmodel_init(*, self: "SQLModel", data: Dict[str, Any]) -> None:
|
||||
old_dict = self.__dict__.copy()
|
||||
if not is_table_model_class(self.__class__):
|
||||
self.__pydantic_validator__.validate_python(
|
||||
data,
|
||||
self_instance=self,
|
||||
)
|
||||
else:
|
||||
sqlmodel_table_construct(
|
||||
self_instance=self,
|
||||
values=data,
|
||||
)
|
||||
object.__setattr__(
|
||||
self,
|
||||
"__dict__",
|
||||
{**old_dict, **self.__dict__},
|
||||
)
|
||||
|
||||
else:
|
||||
from pydantic import BaseConfig as BaseConfig # type: ignore[assignment]
|
||||
from pydantic.errors import ConfigError
|
||||
from pydantic.fields import ( # type: ignore[attr-defined, no-redef]
|
||||
SHAPE_SINGLETON,
|
||||
ModelField,
|
||||
)
|
||||
from pydantic.fields import ( # type: ignore[attr-defined, no-redef]
|
||||
Undefined as Undefined, # noqa
|
||||
)
|
||||
from pydantic.fields import ( # type: ignore[attr-defined, no-redef]
|
||||
UndefinedType as UndefinedType,
|
||||
)
|
||||
from pydantic.main import ( # type: ignore[no-redef]
|
||||
ModelMetaclass as ModelMetaclass,
|
||||
)
|
||||
from pydantic.main import validate_model
|
||||
from pydantic.typing import resolve_annotations
|
||||
from pydantic.utils import ROOT_KEY, ValueItems
|
||||
from pydantic.utils import ( # type: ignore[no-redef]
|
||||
Representation as Representation,
|
||||
)
|
||||
|
||||
class SQLModelConfig(BaseConfig): # type: ignore[no-redef]
|
||||
table: Optional[bool] = None # type: ignore[misc]
|
||||
registry: Optional[Any] = None # type: ignore[misc]
|
||||
|
||||
def get_config_value(
|
||||
*, model: InstanceOrType["SQLModel"], parameter: str, default: Any = None
|
||||
) -> Any:
|
||||
return getattr(model.__config__, parameter, default) # type: ignore[union-attr]
|
||||
|
||||
def set_config_value(
|
||||
*,
|
||||
model: InstanceOrType["SQLModel"],
|
||||
parameter: str,
|
||||
value: Any,
|
||||
) -> None:
|
||||
setattr(model.__config__, parameter, value) # type: ignore
|
||||
|
||||
def get_model_fields(model: InstanceOrType["SQLModel"]) -> Dict[str, "FieldInfo"]:
|
||||
return model.__fields__ # type: ignore
|
||||
|
||||
def set_fields_set(
|
||||
new_object: InstanceOrType["SQLModel"], fields: Set["FieldInfo"]
|
||||
) -> None:
|
||||
object.__setattr__(new_object, "__fields_set__", fields)
|
||||
|
||||
def get_annotations(class_dict: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return resolve_annotations( # type: ignore[no-any-return]
|
||||
class_dict.get("__annotations__", {}),
|
||||
class_dict.get("__module__", None),
|
||||
)
|
||||
|
||||
def is_table_model_class(cls: Type[Any]) -> bool:
|
||||
config = getattr(cls, "__config__", None)
|
||||
if config:
|
||||
return getattr(config, "table", False)
|
||||
return False
|
||||
|
||||
def get_relationship_to(
|
||||
name: str,
|
||||
rel_info: "RelationshipInfo",
|
||||
annotation: Any,
|
||||
) -> Any:
|
||||
temp_field = ModelField.infer( # type: ignore[attr-defined]
|
||||
name=name,
|
||||
value=rel_info,
|
||||
annotation=annotation,
|
||||
class_validators=None,
|
||||
config=SQLModelConfig,
|
||||
)
|
||||
relationship_to = temp_field.type_
|
||||
if isinstance(temp_field.type_, ForwardRef):
|
||||
relationship_to = temp_field.type_.__forward_arg__
|
||||
return relationship_to
|
||||
|
||||
def is_field_noneable(field: "FieldInfo") -> bool:
|
||||
if not field.required: # type: ignore[attr-defined]
|
||||
# Taken from [Pydantic](https://github.com/samuelcolvin/pydantic/blob/v1.8.2/pydantic/fields.py#L946-L947)
|
||||
return field.allow_none and ( # type: ignore[attr-defined]
|
||||
field.shape != SHAPE_SINGLETON or not field.sub_fields # type: ignore[attr-defined]
|
||||
)
|
||||
return field.allow_none # type: ignore[no-any-return, attr-defined]
|
||||
|
||||
def get_type_from_field(field: Any) -> Any:
|
||||
if isinstance(field.type_, type) and field.shape == SHAPE_SINGLETON:
|
||||
return field.type_
|
||||
raise ValueError(f"The field {field.name} has no matching SQLAlchemy type")
|
||||
|
||||
def get_field_metadata(field: Any) -> Any:
|
||||
metadata = FakeMetadata()
|
||||
metadata.max_length = field.field_info.max_length
|
||||
metadata.max_digits = getattr(field.type_, "max_digits", None)
|
||||
metadata.decimal_places = getattr(field.type_, "decimal_places", None)
|
||||
return metadata
|
||||
|
||||
def post_init_field_info(field_info: FieldInfo) -> None:
|
||||
field_info._validate() # type: ignore[attr-defined]
|
||||
|
||||
def _calculate_keys(
|
||||
self: "SQLModel",
|
||||
include: Optional[Mapping[Union[int, str], Any]],
|
||||
exclude: Optional[Mapping[Union[int, str], Any]],
|
||||
exclude_unset: bool,
|
||||
update: Optional[Dict[str, Any]] = None,
|
||||
) -> Optional[AbstractSet[str]]:
|
||||
if include is None and exclude is None and not exclude_unset:
|
||||
# Original in Pydantic:
|
||||
# return None
|
||||
# Updated to not return SQLAlchemy attributes
|
||||
# Do not include relationships as that would easily lead to infinite
|
||||
# recursion, or traversing the whole database
|
||||
return (
|
||||
self.__fields__.keys() # noqa
|
||||
) # | self.__sqlmodel_relationships__.keys()
|
||||
|
||||
keys: AbstractSet[str]
|
||||
if exclude_unset:
|
||||
keys = self.__fields_set__.copy() # noqa
|
||||
else:
|
||||
# Original in Pydantic:
|
||||
# keys = self.__dict__.keys()
|
||||
# Updated to not return SQLAlchemy attributes
|
||||
# Do not include relationships as that would easily lead to infinite
|
||||
# recursion, or traversing the whole database
|
||||
keys = (
|
||||
self.__fields__.keys() # noqa
|
||||
) # | self.__sqlmodel_relationships__.keys()
|
||||
if include is not None:
|
||||
keys &= include.keys()
|
||||
|
||||
if update:
|
||||
keys -= update.keys()
|
||||
|
||||
if exclude:
|
||||
keys -= {k for k, v in exclude.items() if ValueItems.is_true(v)}
|
||||
|
||||
return keys
|
||||
|
||||
def sqlmodel_validate(
|
||||
cls: Type[_TSQLModel],
|
||||
obj: Any,
|
||||
*,
|
||||
strict: Union[bool, None] = None,
|
||||
from_attributes: Union[bool, None] = None,
|
||||
context: Union[Dict[str, Any], None] = None,
|
||||
update: Union[Dict[str, Any], None] = None,
|
||||
) -> _TSQLModel:
|
||||
# This was SQLModel's original from_orm() for Pydantic v1
|
||||
# Duplicated from Pydantic
|
||||
if not cls.__config__.orm_mode: # type: ignore[attr-defined] # noqa
|
||||
raise ConfigError(
|
||||
"You must have the config attribute orm_mode=True to use from_orm"
|
||||
)
|
||||
if not isinstance(obj, Mapping):
|
||||
obj = (
|
||||
{ROOT_KEY: obj}
|
||||
if cls.__custom_root_type__ # type: ignore[attr-defined] # noqa
|
||||
else cls._decompose_class(obj) # type: ignore[attr-defined] # noqa
|
||||
)
|
||||
# SQLModel, support update dict
|
||||
if update is not None:
|
||||
obj = {**obj, **update}
|
||||
# End SQLModel support dict
|
||||
if not getattr(cls.__config__, "table", False): # noqa
|
||||
# If not table, normal Pydantic code
|
||||
m: _TSQLModel = cls.__new__(cls)
|
||||
else:
|
||||
# If table, create the new instance normally to make SQLAlchemy create
|
||||
# the _sa_instance_state attribute
|
||||
m = cls()
|
||||
values, fields_set, validation_error = validate_model(cls, obj)
|
||||
if validation_error:
|
||||
raise validation_error
|
||||
# Updated to trigger SQLAlchemy internal handling
|
||||
if not getattr(cls.__config__, "table", False): # noqa
|
||||
object.__setattr__(m, "__dict__", values)
|
||||
else:
|
||||
for key, value in values.items():
|
||||
setattr(m, key, value)
|
||||
# Continue with standard Pydantic logic
|
||||
object.__setattr__(m, "__fields_set__", fields_set)
|
||||
m._init_private_attributes() # type: ignore[attr-defined] # noqa
|
||||
return m
|
||||
|
||||
def sqlmodel_init(*, self: "SQLModel", data: Dict[str, Any]) -> None:
|
||||
values, fields_set, validation_error = validate_model(self.__class__, data)
|
||||
# Only raise errors if not a SQLModel model
|
||||
if (
|
||||
not is_table_model_class(self.__class__) # noqa
|
||||
and validation_error
|
||||
):
|
||||
raise validation_error
|
||||
if not is_table_model_class(self.__class__):
|
||||
object.__setattr__(self, "__dict__", values)
|
||||
else:
|
||||
# Do not set values as in Pydantic, pass them through setattr, so
|
||||
# SQLAlchemy can handle them
|
||||
for key, value in values.items():
|
||||
setattr(self, key, value)
|
||||
object.__setattr__(self, "__fields_set__", fields_set)
|
||||
non_pydantic_keys = data.keys() - values.keys()
|
||||
|
||||
if is_table_model_class(self.__class__):
|
||||
for key in non_pydantic_keys:
|
||||
if key in self.__sqlmodel_relationships__:
|
||||
setattr(self, key, data[key])
|
||||
517
sqlmodel/main.py
517
sqlmodel/main.py
@@ -11,7 +11,6 @@ from typing import (
|
||||
Callable,
|
||||
ClassVar,
|
||||
Dict,
|
||||
ForwardRef,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
@@ -25,13 +24,8 @@ from typing import (
|
||||
overload,
|
||||
)
|
||||
|
||||
from pydantic import BaseConfig, BaseModel
|
||||
from pydantic.errors import ConfigError, DictError
|
||||
from pydantic.fields import SHAPE_SINGLETON, ModelField, Undefined, UndefinedType
|
||||
from pydantic import BaseModel
|
||||
from pydantic.fields import FieldInfo as PydanticFieldInfo
|
||||
from pydantic.main import ModelMetaclass, validate_model
|
||||
from pydantic.typing import NoArgAnyCallable, resolve_annotations
|
||||
from pydantic.utils import ROOT_KEY, Representation
|
||||
from sqlalchemy import (
|
||||
Boolean,
|
||||
Column,
|
||||
@@ -57,11 +51,38 @@ from sqlalchemy.orm.decl_api import DeclarativeMeta
|
||||
from sqlalchemy.orm.instrumentation import is_instrumented
|
||||
from sqlalchemy.sql.schema import MetaData
|
||||
from sqlalchemy.sql.sqltypes import LargeBinary, Time
|
||||
from typing_extensions import get_origin
|
||||
from typing_extensions import Literal, deprecated, get_origin
|
||||
|
||||
from ._compat import ( # type: ignore[attr-defined]
|
||||
IS_PYDANTIC_V2,
|
||||
BaseConfig,
|
||||
ModelField,
|
||||
ModelMetaclass,
|
||||
Representation,
|
||||
SQLModelConfig,
|
||||
Undefined,
|
||||
UndefinedType,
|
||||
_calculate_keys,
|
||||
finish_init,
|
||||
get_annotations,
|
||||
get_config_value,
|
||||
get_field_metadata,
|
||||
get_model_fields,
|
||||
get_relationship_to,
|
||||
get_type_from_field,
|
||||
is_field_noneable,
|
||||
is_table_model_class,
|
||||
post_init_field_info,
|
||||
set_config_value,
|
||||
set_fields_set,
|
||||
sqlmodel_init,
|
||||
sqlmodel_validate,
|
||||
)
|
||||
from .sql.sqltypes import GUID, AutoString
|
||||
|
||||
_T = TypeVar("_T")
|
||||
NoArgAnyCallable = Callable[[], Any]
|
||||
IncEx = Union[Set[int], Set[str], Dict[int, Any], Dict[str, Any], None]
|
||||
|
||||
|
||||
def __dataclass_transform__(
|
||||
@@ -321,7 +342,7 @@ def Field(
|
||||
sa_column_kwargs=sa_column_kwargs,
|
||||
**current_schema_extra,
|
||||
)
|
||||
field_info._validate()
|
||||
post_init_field_info(field_info)
|
||||
return field_info
|
||||
|
||||
|
||||
@@ -341,7 +362,7 @@ def Relationship(
|
||||
*,
|
||||
back_populates: Optional[str] = None,
|
||||
link_model: Optional[Any] = None,
|
||||
sa_relationship: Optional[RelationshipProperty] = None, # type: ignore
|
||||
sa_relationship: Optional[RelationshipProperty[Any]] = None,
|
||||
) -> Any:
|
||||
...
|
||||
|
||||
@@ -350,7 +371,7 @@ def Relationship(
|
||||
*,
|
||||
back_populates: Optional[str] = None,
|
||||
link_model: Optional[Any] = None,
|
||||
sa_relationship: Optional[RelationshipProperty] = None, # type: ignore
|
||||
sa_relationship: Optional[RelationshipProperty[Any]] = None,
|
||||
sa_relationship_args: Optional[Sequence[Any]] = None,
|
||||
sa_relationship_kwargs: Optional[Mapping[str, Any]] = None,
|
||||
) -> Any:
|
||||
@@ -367,18 +388,20 @@ def Relationship(
|
||||
@__dataclass_transform__(kw_only_default=True, field_descriptors=(Field, FieldInfo))
|
||||
class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta):
|
||||
__sqlmodel_relationships__: Dict[str, RelationshipInfo]
|
||||
__config__: Type[BaseConfig]
|
||||
__fields__: Dict[str, ModelField]
|
||||
model_config: SQLModelConfig
|
||||
model_fields: Dict[str, FieldInfo]
|
||||
__config__: Type[SQLModelConfig]
|
||||
__fields__: Dict[str, ModelField] # type: ignore[assignment]
|
||||
|
||||
# Replicate SQLAlchemy
|
||||
def __setattr__(cls, name: str, value: Any) -> None:
|
||||
if getattr(cls.__config__, "table", False):
|
||||
if is_table_model_class(cls):
|
||||
DeclarativeMeta.__setattr__(cls, name, value)
|
||||
else:
|
||||
super().__setattr__(name, value)
|
||||
|
||||
def __delattr__(cls, name: str) -> None:
|
||||
if getattr(cls.__config__, "table", False):
|
||||
if is_table_model_class(cls):
|
||||
DeclarativeMeta.__delattr__(cls, name)
|
||||
else:
|
||||
super().__delattr__(name)
|
||||
@@ -393,9 +416,7 @@ class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta):
|
||||
) -> Any:
|
||||
relationships: Dict[str, RelationshipInfo] = {}
|
||||
dict_for_pydantic = {}
|
||||
original_annotations = resolve_annotations(
|
||||
class_dict.get("__annotations__", {}), class_dict.get("__module__", None)
|
||||
)
|
||||
original_annotations = get_annotations(class_dict)
|
||||
pydantic_annotations = {}
|
||||
relationship_annotations = {}
|
||||
for k, v in class_dict.items():
|
||||
@@ -424,10 +445,8 @@ class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta):
|
||||
key.startswith("__") and key.endswith("__")
|
||||
) # skip dunder methods and attributes
|
||||
}
|
||||
pydantic_kwargs = kwargs.copy()
|
||||
config_kwargs = {
|
||||
key: pydantic_kwargs.pop(key)
|
||||
for key in pydantic_kwargs.keys() & allowed_config_kwargs
|
||||
key: kwargs[key] for key in kwargs.keys() & allowed_config_kwargs
|
||||
}
|
||||
new_cls = super().__new__(cls, name, bases, dict_used, **config_kwargs)
|
||||
new_cls.__annotations__ = {
|
||||
@@ -437,7 +456,9 @@ class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta):
|
||||
}
|
||||
|
||||
def get_config(name: str) -> Any:
|
||||
config_class_value = getattr(new_cls.__config__, name, Undefined)
|
||||
config_class_value = get_config_value(
|
||||
model=new_cls, parameter=name, default=Undefined
|
||||
)
|
||||
if config_class_value is not Undefined:
|
||||
return config_class_value
|
||||
kwarg_value = kwargs.get(name, Undefined)
|
||||
@@ -448,22 +469,27 @@ class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta):
|
||||
config_table = get_config("table")
|
||||
if config_table is True:
|
||||
# If it was passed by kwargs, ensure it's also set in config
|
||||
new_cls.__config__.table = config_table
|
||||
for k, v in new_cls.__fields__.items():
|
||||
set_config_value(model=new_cls, parameter="table", value=config_table)
|
||||
for k, v in get_model_fields(new_cls).items():
|
||||
col = get_column_from_field(v)
|
||||
setattr(new_cls, k, col)
|
||||
# Set a config flag to tell FastAPI that this should be read with a field
|
||||
# in orm_mode instead of preemptively converting it to a dict.
|
||||
# This could be done by reading new_cls.__config__.table in FastAPI, but
|
||||
# This could be done by reading new_cls.model_config['table'] in FastAPI, but
|
||||
# that's very specific about SQLModel, so let's have another config that
|
||||
# other future tools based on Pydantic can use.
|
||||
new_cls.__config__.read_with_orm_mode = True
|
||||
set_config_value(
|
||||
model=new_cls, parameter="read_from_attributes", value=True
|
||||
)
|
||||
# For compatibility with older versions
|
||||
# TODO: remove this in the future
|
||||
set_config_value(model=new_cls, parameter="read_with_orm_mode", value=True)
|
||||
|
||||
config_registry = get_config("registry")
|
||||
if config_registry is not Undefined:
|
||||
config_registry = cast(registry, config_registry)
|
||||
# If it was passed by kwargs, ensure it's also set in config
|
||||
new_cls.__config__.registry = config_table
|
||||
set_config_value(model=new_cls, parameter="registry", value=config_table)
|
||||
setattr(new_cls, "_sa_registry", config_registry) # noqa: B010
|
||||
setattr(new_cls, "metadata", config_registry.metadata) # noqa: B010
|
||||
setattr(new_cls, "__abstract__", True) # noqa: B010
|
||||
@@ -477,13 +503,8 @@ class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta):
|
||||
# this allows FastAPI cloning a SQLModel for the response_model without
|
||||
# trying to create a new SQLAlchemy, for a new table, with the same name, that
|
||||
# triggers an error
|
||||
base_is_table = False
|
||||
for base in bases:
|
||||
config = getattr(base, "__config__") # noqa: B009
|
||||
if config and getattr(config, "table", False):
|
||||
base_is_table = True
|
||||
break
|
||||
if getattr(cls.__config__, "table", False) and not base_is_table:
|
||||
base_is_table = any(is_table_model_class(base) for base in bases)
|
||||
if is_table_model_class(cls) and not base_is_table:
|
||||
for rel_name, rel_info in cls.__sqlmodel_relationships__.items():
|
||||
if rel_info.sa_relationship:
|
||||
# There's a SQLAlchemy relationship declared, that takes precedence
|
||||
@@ -500,16 +521,9 @@ class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta):
|
||||
# handled well by SQLAlchemy without Mapped, so, wrap the
|
||||
# annotations in Mapped here
|
||||
cls.__annotations__[rel_name] = Mapped[ann] # type: ignore[valid-type]
|
||||
temp_field = ModelField.infer(
|
||||
name=rel_name,
|
||||
value=rel_info,
|
||||
annotation=ann,
|
||||
class_validators=None,
|
||||
config=BaseConfig,
|
||||
relationship_to = get_relationship_to(
|
||||
name=rel_name, rel_info=rel_info, annotation=ann
|
||||
)
|
||||
relationship_to = temp_field.type_
|
||||
if isinstance(temp_field.type_, ForwardRef):
|
||||
relationship_to = temp_field.type_.__forward_arg__
|
||||
rel_kwargs: Dict[str, Any] = {}
|
||||
if rel_info.back_populates:
|
||||
rel_kwargs["back_populates"] = rel_info.back_populates
|
||||
@@ -537,77 +551,89 @@ class SQLModelMetaclass(ModelMetaclass, DeclarativeMeta):
|
||||
ModelMetaclass.__init__(cls, classname, bases, dict_, **kw)
|
||||
|
||||
|
||||
def get_sqlalchemy_type(field: ModelField) -> Any:
|
||||
sa_type = getattr(field.field_info, "sa_type", Undefined) # noqa: B009
|
||||
def get_sqlalchemy_type(field: Any) -> Any:
|
||||
if IS_PYDANTIC_V2:
|
||||
field_info = field
|
||||
else:
|
||||
field_info = field.field_info
|
||||
sa_type = getattr(field_info, "sa_type", Undefined) # noqa: B009
|
||||
if sa_type is not Undefined:
|
||||
return sa_type
|
||||
if isinstance(field.type_, type) and field.shape == SHAPE_SINGLETON:
|
||||
# Check enums first as an enum can also be a str, needed by Pydantic/FastAPI
|
||||
if issubclass(field.type_, Enum):
|
||||
return sa_Enum(field.type_)
|
||||
if issubclass(field.type_, str):
|
||||
if field.field_info.max_length:
|
||||
return AutoString(length=field.field_info.max_length)
|
||||
return AutoString
|
||||
if issubclass(field.type_, float):
|
||||
return Float
|
||||
if issubclass(field.type_, bool):
|
||||
return Boolean
|
||||
if issubclass(field.type_, int):
|
||||
return Integer
|
||||
if issubclass(field.type_, datetime):
|
||||
return DateTime
|
||||
if issubclass(field.type_, date):
|
||||
return Date
|
||||
if issubclass(field.type_, timedelta):
|
||||
return Interval
|
||||
if issubclass(field.type_, time):
|
||||
return Time
|
||||
if issubclass(field.type_, bytes):
|
||||
return LargeBinary
|
||||
if issubclass(field.type_, Decimal):
|
||||
return Numeric(
|
||||
precision=getattr(field.type_, "max_digits", None),
|
||||
scale=getattr(field.type_, "decimal_places", None),
|
||||
)
|
||||
if issubclass(field.type_, ipaddress.IPv4Address):
|
||||
return AutoString
|
||||
if issubclass(field.type_, ipaddress.IPv4Network):
|
||||
return AutoString
|
||||
if issubclass(field.type_, ipaddress.IPv6Address):
|
||||
return AutoString
|
||||
if issubclass(field.type_, ipaddress.IPv6Network):
|
||||
return AutoString
|
||||
if issubclass(field.type_, Path):
|
||||
return AutoString
|
||||
if issubclass(field.type_, uuid.UUID):
|
||||
return GUID
|
||||
raise ValueError(f"The field {field.name} has no matching SQLAlchemy type")
|
||||
|
||||
type_ = get_type_from_field(field)
|
||||
metadata = get_field_metadata(field)
|
||||
|
||||
# Check enums first as an enum can also be a str, needed by Pydantic/FastAPI
|
||||
if issubclass(type_, Enum):
|
||||
return sa_Enum(type_)
|
||||
if issubclass(type_, str):
|
||||
max_length = getattr(metadata, "max_length", None)
|
||||
if max_length:
|
||||
return AutoString(length=max_length)
|
||||
return AutoString
|
||||
if issubclass(type_, float):
|
||||
return Float
|
||||
if issubclass(type_, bool):
|
||||
return Boolean
|
||||
if issubclass(type_, int):
|
||||
return Integer
|
||||
if issubclass(type_, datetime):
|
||||
return DateTime
|
||||
if issubclass(type_, date):
|
||||
return Date
|
||||
if issubclass(type_, timedelta):
|
||||
return Interval
|
||||
if issubclass(type_, time):
|
||||
return Time
|
||||
if issubclass(type_, bytes):
|
||||
return LargeBinary
|
||||
if issubclass(type_, Decimal):
|
||||
return Numeric(
|
||||
precision=getattr(metadata, "max_digits", None),
|
||||
scale=getattr(metadata, "decimal_places", None),
|
||||
)
|
||||
if issubclass(type_, ipaddress.IPv4Address):
|
||||
return AutoString
|
||||
if issubclass(type_, ipaddress.IPv4Network):
|
||||
return AutoString
|
||||
if issubclass(type_, ipaddress.IPv6Address):
|
||||
return AutoString
|
||||
if issubclass(type_, ipaddress.IPv6Network):
|
||||
return AutoString
|
||||
if issubclass(type_, Path):
|
||||
return AutoString
|
||||
if issubclass(type_, uuid.UUID):
|
||||
return GUID
|
||||
raise ValueError(f"{type_} has no matching SQLAlchemy type")
|
||||
|
||||
|
||||
def get_column_from_field(field: ModelField) -> Column: # type: ignore
|
||||
sa_column = getattr(field.field_info, "sa_column", Undefined)
|
||||
def get_column_from_field(field: Any) -> Column: # type: ignore
|
||||
if IS_PYDANTIC_V2:
|
||||
field_info = field
|
||||
else:
|
||||
field_info = field.field_info
|
||||
sa_column = getattr(field_info, "sa_column", Undefined)
|
||||
if isinstance(sa_column, Column):
|
||||
return sa_column
|
||||
sa_type = get_sqlalchemy_type(field)
|
||||
primary_key = getattr(field.field_info, "primary_key", Undefined)
|
||||
primary_key = getattr(field_info, "primary_key", Undefined)
|
||||
if primary_key is Undefined:
|
||||
primary_key = False
|
||||
index = getattr(field.field_info, "index", Undefined)
|
||||
index = getattr(field_info, "index", Undefined)
|
||||
if index is Undefined:
|
||||
index = False
|
||||
nullable = not primary_key and _is_field_noneable(field)
|
||||
nullable = not primary_key and is_field_noneable(field)
|
||||
# Override derived nullability if the nullable property is set explicitly
|
||||
# on the field
|
||||
field_nullable = getattr(field.field_info, "nullable", Undefined) # noqa: B009
|
||||
if field_nullable != Undefined:
|
||||
field_nullable = getattr(field_info, "nullable", Undefined) # noqa: B009
|
||||
if field_nullable is not Undefined:
|
||||
assert not isinstance(field_nullable, UndefinedType)
|
||||
nullable = field_nullable
|
||||
args = []
|
||||
foreign_key = getattr(field.field_info, "foreign_key", Undefined)
|
||||
foreign_key = getattr(field_info, "foreign_key", Undefined)
|
||||
if foreign_key is Undefined:
|
||||
foreign_key = None
|
||||
unique = getattr(field.field_info, "unique", Undefined)
|
||||
unique = getattr(field_info, "unique", Undefined)
|
||||
if unique is Undefined:
|
||||
unique = False
|
||||
if foreign_key:
|
||||
@@ -620,16 +646,16 @@ def get_column_from_field(field: ModelField) -> Column: # type: ignore
|
||||
"unique": unique,
|
||||
}
|
||||
sa_default = Undefined
|
||||
if field.field_info.default_factory:
|
||||
sa_default = field.field_info.default_factory
|
||||
elif field.field_info.default is not Undefined:
|
||||
sa_default = field.field_info.default
|
||||
if field_info.default_factory:
|
||||
sa_default = field_info.default_factory
|
||||
elif field_info.default is not Undefined:
|
||||
sa_default = field_info.default
|
||||
if sa_default is not Undefined:
|
||||
kwargs["default"] = sa_default
|
||||
sa_column_args = getattr(field.field_info, "sa_column_args", Undefined)
|
||||
sa_column_args = getattr(field_info, "sa_column_args", Undefined)
|
||||
if sa_column_args is not Undefined:
|
||||
args.extend(list(cast(Sequence[Any], sa_column_args)))
|
||||
sa_column_kwargs = getattr(field.field_info, "sa_column_kwargs", Undefined)
|
||||
sa_column_kwargs = getattr(field_info, "sa_column_kwargs", Undefined)
|
||||
if sa_column_kwargs is not Undefined:
|
||||
kwargs.update(cast(Dict[Any, Any], sa_column_kwargs))
|
||||
return Column(sa_type, *args, **kwargs) # type: ignore
|
||||
@@ -639,13 +665,6 @@ class_registry = weakref.WeakValueDictionary() # type: ignore
|
||||
|
||||
default_registry = registry()
|
||||
|
||||
|
||||
def _value_items_is_true(v: Any) -> bool:
|
||||
# Re-implement Pydantic's ValueItems.is_true() as it hasn't been released as of
|
||||
# the current latest, Pydantic 1.8.2
|
||||
return v is True or v is ...
|
||||
|
||||
|
||||
_TSQLModel = TypeVar("_TSQLModel", bound="SQLModel")
|
||||
|
||||
|
||||
@@ -653,13 +672,17 @@ class SQLModel(BaseModel, metaclass=SQLModelMetaclass, registry=default_registry
|
||||
# SQLAlchemy needs to set weakref(s), Pydantic will set the other slots values
|
||||
__slots__ = ("__weakref__",)
|
||||
__tablename__: ClassVar[Union[str, Callable[..., str]]]
|
||||
__sqlmodel_relationships__: ClassVar[Dict[str, RelationshipProperty]] # type: ignore
|
||||
__sqlmodel_relationships__: ClassVar[Dict[str, RelationshipProperty[Any]]]
|
||||
__name__: ClassVar[str]
|
||||
metadata: ClassVar[MetaData]
|
||||
__allow_unmapped__ = True # https://docs.sqlalchemy.org/en/20/changelog/migration_20.html#migration-20-step-six
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
if IS_PYDANTIC_V2:
|
||||
model_config = SQLModelConfig(from_attributes=True)
|
||||
else:
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
|
||||
def __new__(cls, *args: Any, **kwargs: Any) -> Any:
|
||||
new_object = super().__new__(cls)
|
||||
@@ -668,31 +691,28 @@ class SQLModel(BaseModel, metaclass=SQLModelMetaclass, registry=default_registry
|
||||
# Set __fields_set__ here, that would have been set when calling __init__
|
||||
# in the Pydantic model so that when SQLAlchemy sets attributes that are
|
||||
# added (e.g. when querying from DB) to the __fields_set__, this already exists
|
||||
object.__setattr__(new_object, "__fields_set__", set())
|
||||
set_fields_set(new_object, set())
|
||||
return new_object
|
||||
|
||||
def __init__(__pydantic_self__, **data: Any) -> None:
|
||||
# Uses something other than `self` the first arg to allow "self" as a
|
||||
# settable attribute
|
||||
values, fields_set, validation_error = validate_model(
|
||||
__pydantic_self__.__class__, data
|
||||
)
|
||||
# Only raise errors if not a SQLModel model
|
||||
if (
|
||||
not getattr(__pydantic_self__.__config__, "table", False)
|
||||
and validation_error
|
||||
):
|
||||
raise validation_error
|
||||
# Do not set values as in Pydantic, pass them through setattr, so SQLAlchemy
|
||||
# can handle them
|
||||
# object.__setattr__(__pydantic_self__, '__dict__', values)
|
||||
for key, value in values.items():
|
||||
setattr(__pydantic_self__, key, value)
|
||||
object.__setattr__(__pydantic_self__, "__fields_set__", fields_set)
|
||||
non_pydantic_keys = data.keys() - values.keys()
|
||||
for key in non_pydantic_keys:
|
||||
if key in __pydantic_self__.__sqlmodel_relationships__:
|
||||
setattr(__pydantic_self__, key, data[key])
|
||||
|
||||
# SQLAlchemy does very dark black magic and modifies the __init__ method in
|
||||
# sqlalchemy.orm.instrumentation._generate_init()
|
||||
# so, to make SQLAlchemy work, it's needed to explicitly call __init__ to
|
||||
# trigger all the SQLAlchemy logic, it doesn't work using cls.__new__, setting
|
||||
# attributes obj.__dict__, etc. The __init__ method has to be called. But
|
||||
# there are cases where calling all the default logic is not ideal, e.g.
|
||||
# when calling Model.model_validate(), as the validation is done outside
|
||||
# of instance creation.
|
||||
# At the same time, __init__ is what users would normally call, by creating
|
||||
# a new instance, which should have validation and all the default logic.
|
||||
# So, to be able to set up the internal SQLAlchemy logic alone without
|
||||
# executing the rest, and support things like Model.model_validate(), we
|
||||
# use a contextvar to know if we should execute everything.
|
||||
if finish_init.get():
|
||||
sqlmodel_init(self=__pydantic_self__, data=data)
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None:
|
||||
if name in {"_sa_instance_state"}:
|
||||
@@ -700,59 +720,13 @@ class SQLModel(BaseModel, metaclass=SQLModelMetaclass, registry=default_registry
|
||||
return
|
||||
else:
|
||||
# Set in SQLAlchemy, before Pydantic to trigger events and updates
|
||||
if getattr(self.__config__, "table", False) and is_instrumented(self, name): # type: ignore
|
||||
if is_table_model_class(self.__class__) and is_instrumented(self, name): # type: ignore[no-untyped-call]
|
||||
set_attribute(self, name, value)
|
||||
# Set in Pydantic model to trigger possible validation changes, only for
|
||||
# non relationship values
|
||||
if name not in self.__sqlmodel_relationships__:
|
||||
super().__setattr__(name, value)
|
||||
|
||||
@classmethod
|
||||
def from_orm(
|
||||
cls: Type[_TSQLModel], obj: Any, update: Optional[Dict[str, Any]] = None
|
||||
) -> _TSQLModel:
|
||||
# Duplicated from Pydantic
|
||||
if not cls.__config__.orm_mode:
|
||||
raise ConfigError(
|
||||
"You must have the config attribute orm_mode=True to use from_orm"
|
||||
)
|
||||
obj = {ROOT_KEY: obj} if cls.__custom_root_type__ else cls._decompose_class(obj)
|
||||
# SQLModel, support update dict
|
||||
if update is not None:
|
||||
obj = {**obj, **update}
|
||||
# End SQLModel support dict
|
||||
if not getattr(cls.__config__, "table", False):
|
||||
# If not table, normal Pydantic code
|
||||
m: _TSQLModel = cls.__new__(cls)
|
||||
else:
|
||||
# If table, create the new instance normally to make SQLAlchemy create
|
||||
# the _sa_instance_state attribute
|
||||
m = cls()
|
||||
values, fields_set, validation_error = validate_model(cls, obj)
|
||||
if validation_error:
|
||||
raise validation_error
|
||||
# Updated to trigger SQLAlchemy internal handling
|
||||
if not getattr(cls.__config__, "table", False):
|
||||
object.__setattr__(m, "__dict__", values)
|
||||
else:
|
||||
for key, value in values.items():
|
||||
setattr(m, key, value)
|
||||
# Continue with standard Pydantic logic
|
||||
object.__setattr__(m, "__fields_set__", fields_set)
|
||||
m._init_private_attributes()
|
||||
return m
|
||||
|
||||
@classmethod
|
||||
def parse_obj(
|
||||
cls: Type[_TSQLModel], obj: Any, update: Optional[Dict[str, Any]] = None
|
||||
) -> _TSQLModel:
|
||||
obj = cls._enforce_dict_if_root(obj)
|
||||
# SQLModel, support update dict
|
||||
if update is not None:
|
||||
obj = {**obj, **update}
|
||||
# End SQLModel support dict
|
||||
return super().parse_obj(obj)
|
||||
|
||||
def __repr_args__(self) -> Sequence[Tuple[Optional[str], Any]]:
|
||||
# Don't show SQLAlchemy private attributes
|
||||
return [
|
||||
@@ -761,33 +735,126 @@ class SQLModel(BaseModel, metaclass=SQLModelMetaclass, registry=default_registry
|
||||
if not (isinstance(k, str) and k.startswith("_sa_"))
|
||||
]
|
||||
|
||||
# From Pydantic, override to enforce validation with dict
|
||||
@classmethod
|
||||
def validate(cls: Type[_TSQLModel], value: Any) -> _TSQLModel:
|
||||
if isinstance(value, cls):
|
||||
return value.copy() if cls.__config__.copy_on_model_validation else value
|
||||
@declared_attr # type: ignore
|
||||
def __tablename__(cls) -> str:
|
||||
return cls.__name__.lower()
|
||||
|
||||
value = cls._enforce_dict_if_root(value)
|
||||
if isinstance(value, dict):
|
||||
values, fields_set, validation_error = validate_model(cls, value)
|
||||
if validation_error:
|
||||
raise validation_error
|
||||
model = cls(**value)
|
||||
# Reset fields set, this would have been done in Pydantic in __init__
|
||||
object.__setattr__(model, "__fields_set__", fields_set)
|
||||
return model
|
||||
elif cls.__config__.orm_mode:
|
||||
return cls.from_orm(value)
|
||||
elif cls.__custom_root_type__:
|
||||
return cls.parse_obj(value)
|
||||
@classmethod
|
||||
def model_validate(
|
||||
cls: Type[_TSQLModel],
|
||||
obj: Any,
|
||||
*,
|
||||
strict: Union[bool, None] = None,
|
||||
from_attributes: Union[bool, None] = None,
|
||||
context: Union[Dict[str, Any], None] = None,
|
||||
update: Union[Dict[str, Any], None] = None,
|
||||
) -> _TSQLModel:
|
||||
return sqlmodel_validate(
|
||||
cls=cls,
|
||||
obj=obj,
|
||||
strict=strict,
|
||||
from_attributes=from_attributes,
|
||||
context=context,
|
||||
update=update,
|
||||
)
|
||||
|
||||
# TODO: remove when deprecating Pydantic v1, only for compatibility
|
||||
def model_dump(
|
||||
self,
|
||||
*,
|
||||
mode: Union[Literal["json", "python"], str] = "python",
|
||||
include: IncEx = None,
|
||||
exclude: IncEx = None,
|
||||
by_alias: bool = False,
|
||||
exclude_unset: bool = False,
|
||||
exclude_defaults: bool = False,
|
||||
exclude_none: bool = False,
|
||||
round_trip: bool = False,
|
||||
warnings: bool = True,
|
||||
) -> Dict[str, Any]:
|
||||
if IS_PYDANTIC_V2:
|
||||
return super().model_dump(
|
||||
mode=mode,
|
||||
include=include,
|
||||
exclude=exclude,
|
||||
by_alias=by_alias,
|
||||
exclude_unset=exclude_unset,
|
||||
exclude_defaults=exclude_defaults,
|
||||
exclude_none=exclude_none,
|
||||
round_trip=round_trip,
|
||||
warnings=warnings,
|
||||
)
|
||||
else:
|
||||
try:
|
||||
value_as_dict = dict(value)
|
||||
except (TypeError, ValueError) as e:
|
||||
raise DictError() from e
|
||||
return cls(**value_as_dict)
|
||||
return super().dict(
|
||||
include=include,
|
||||
exclude=exclude,
|
||||
by_alias=by_alias,
|
||||
exclude_unset=exclude_unset,
|
||||
exclude_defaults=exclude_defaults,
|
||||
exclude_none=exclude_none,
|
||||
)
|
||||
|
||||
@deprecated(
|
||||
"""
|
||||
🚨 `obj.dict()` was deprecated in SQLModel 0.0.14, you should
|
||||
instead use `obj.model_dump()`.
|
||||
"""
|
||||
)
|
||||
def dict(
|
||||
self,
|
||||
*,
|
||||
include: IncEx = None,
|
||||
exclude: IncEx = None,
|
||||
by_alias: bool = False,
|
||||
exclude_unset: bool = False,
|
||||
exclude_defaults: bool = False,
|
||||
exclude_none: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
return self.model_dump(
|
||||
include=include,
|
||||
exclude=exclude,
|
||||
by_alias=by_alias,
|
||||
exclude_unset=exclude_unset,
|
||||
exclude_defaults=exclude_defaults,
|
||||
exclude_none=exclude_none,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@deprecated(
|
||||
"""
|
||||
🚨 `obj.from_orm(data)` was deprecated in SQLModel 0.0.14, you should
|
||||
instead use `obj.model_validate(data)`.
|
||||
"""
|
||||
)
|
||||
def from_orm(
|
||||
cls: Type[_TSQLModel], obj: Any, update: Optional[Dict[str, Any]] = None
|
||||
) -> _TSQLModel:
|
||||
return cls.model_validate(obj, update=update)
|
||||
|
||||
@classmethod
|
||||
@deprecated(
|
||||
"""
|
||||
🚨 `obj.parse_obj(data)` was deprecated in SQLModel 0.0.14, you should
|
||||
instead use `obj.model_validate(data)`.
|
||||
"""
|
||||
)
|
||||
def parse_obj(
|
||||
cls: Type[_TSQLModel], obj: Any, update: Optional[Dict[str, Any]] = None
|
||||
) -> _TSQLModel:
|
||||
if not IS_PYDANTIC_V2:
|
||||
obj = cls._enforce_dict_if_root(obj) # type: ignore[attr-defined] # noqa
|
||||
return cls.model_validate(obj, update=update)
|
||||
|
||||
# From Pydantic, override to only show keys from fields, omit SQLAlchemy attributes
|
||||
@deprecated(
|
||||
"""
|
||||
🚨 You should not access `obj._calculate_keys()` directly.
|
||||
|
||||
It is only useful for Pydantic v1.X, you should probably upgrade to
|
||||
Pydantic v2.X.
|
||||
""",
|
||||
category=None,
|
||||
)
|
||||
def _calculate_keys(
|
||||
self,
|
||||
include: Optional[Mapping[Union[int, str], Any]],
|
||||
@@ -795,44 +862,10 @@ class SQLModel(BaseModel, metaclass=SQLModelMetaclass, registry=default_registry
|
||||
exclude_unset: bool,
|
||||
update: Optional[Dict[str, Any]] = None,
|
||||
) -> Optional[AbstractSet[str]]:
|
||||
if include is None and exclude is None and not exclude_unset:
|
||||
# Original in Pydantic:
|
||||
# return None
|
||||
# Updated to not return SQLAlchemy attributes
|
||||
# Do not include relationships as that would easily lead to infinite
|
||||
# recursion, or traversing the whole database
|
||||
return self.__fields__.keys() # | self.__sqlmodel_relationships__.keys()
|
||||
|
||||
keys: AbstractSet[str]
|
||||
if exclude_unset:
|
||||
keys = self.__fields_set__.copy()
|
||||
else:
|
||||
# Original in Pydantic:
|
||||
# keys = self.__dict__.keys()
|
||||
# Updated to not return SQLAlchemy attributes
|
||||
# Do not include relationships as that would easily lead to infinite
|
||||
# recursion, or traversing the whole database
|
||||
keys = self.__fields__.keys() # | self.__sqlmodel_relationships__.keys()
|
||||
if include is not None:
|
||||
keys &= include.keys()
|
||||
|
||||
if update:
|
||||
keys -= update.keys()
|
||||
|
||||
if exclude:
|
||||
keys -= {k for k, v in exclude.items() if _value_items_is_true(v)}
|
||||
|
||||
return keys
|
||||
|
||||
@declared_attr # type: ignore
|
||||
def __tablename__(cls) -> str:
|
||||
return cls.__name__.lower()
|
||||
|
||||
|
||||
def _is_field_noneable(field: ModelField) -> bool:
|
||||
if not field.required:
|
||||
# Taken from [Pydantic](https://github.com/samuelcolvin/pydantic/blob/v1.8.2/pydantic/fields.py#L946-L947)
|
||||
return field.allow_none and (
|
||||
field.shape != SHAPE_SINGLETON or not field.sub_fields
|
||||
return _calculate_keys(
|
||||
self,
|
||||
include=include,
|
||||
exclude=exclude,
|
||||
exclude_unset=exclude_unset,
|
||||
update=update,
|
||||
)
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user