Skip to content

Sessions

plateforme.core.database.sessions

This module provides utilities for managing database sessions within the Plateforme framework using SQLAlchemy features.

Async

This module provides utilities for managing database sessions within the Plateforme framework using SQLAlchemy features.

AsyncSessionFactory module-attribute

AsyncSessionFactory = (
    async_sessionmaker["AsyncSession"]
    | async_scoped_session["AsyncSession"]
)

A type alias for an async session factory for async session objects.

AsyncSession

AsyncSession(
    bind: AsyncConnection | AsyncEngine | None = None,
    routing: DatabaseManager | None = None,
    *args: Any,
    **kwargs: Any,
)

Bases: AsyncSession

Manages persistence operations asynchronously for ORM-mapped objects.

Asyncio version of the Session. It is a proxy for a traditional class Session instance.

Initialize the async session.

See also the async_session_factory function which is used to generate an AsyncSession-producing callable with a given set of arguments.

Parameters:

Name Type Description Default
bind AsyncConnection | AsyncEngine | None

An optional AsyncEngine or AsyncConnection to which this AsyncSession should be bound. When specified, all SQL operations performed by this async session will execute via this connectable.

None
routing DatabaseManager | None

An optional DatabaseManager instance to use for the database routing.

None
Note

All other keyword arguments are passed to the constructor of the SQLAlchemy parent session class.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
def __init__(
    self,
    bind: AsyncConnection | AsyncEngine | None = None,
    routing: DatabaseManager | None = None,
    *args: Any,
    **kwargs: Any,
):
    """Initialize the async session.

    See also the `async_session_factory` function which is used to generate
    an `AsyncSession`-producing callable with a given set of arguments.

    Args:
        bind: An optional `AsyncEngine` or `AsyncConnection` to which this
            `AsyncSession` should be bound. When specified, all SQL
            operations performed by this async session will execute via
            this connectable.
        routing: An optional `DatabaseManager` instance to use for the
            database routing.

    Note:
        All other keyword arguments are passed to the constructor of the
        SQLAlchemy parent session class.
    """
    super().__init__(
        bind,
        *args,
        routing=routing,
        sync_session_class=Session,
        **kwargs,
        proxy=self,
    )
    self.routing = routing

sync_session_class class-attribute instance-attribute

sync_session_class: Type[Session] = Session

The class or callable that provides the underlying :class:_orm.Session instance for a particular :class:_asyncio.AsyncSession.

At the class level, this attribute is the default value for the :paramref:_asyncio.AsyncSession.sync_session_class parameter. Custom subclasses of :class:_asyncio.AsyncSession can override this.

At the instance level, this attribute indicates the current class or callable that was used to provide the :class:_orm.Session instance for this :class:_asyncio.AsyncSession instance.

.. versionadded:: 1.4.24

sync_session instance-attribute

sync_session: Session

Reference to the underlying :class:_orm.Session this :class:_asyncio.AsyncSession proxies requests towards.

This instance can be used as an event target.

.. seealso::

:ref:`asyncio_events`

dirty property

dirty: Any

The set of all persistent instances considered dirty.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class
on behalf of the :class:`_asyncio.AsyncSession` class.

E.g.::

some_mapped_object in session.dirty

Instances are considered dirty when they were modified but not deleted.

Note that this 'dirty' calculation is 'optimistic'; most attribute-setting or collection modification operations will mark an instance as 'dirty' and place it in this set, even if there is no net change to the attribute's value. At flush time, the value of each attribute is compared to its previously saved value, and if there's no net change, no SQL operation will occur (this is a more expensive operation so it's only done at flush time).

To check if an instance has actionable net changes to its attributes, use the :meth:.Session.is_modified method.

deleted property

deleted: Any

The set of all instances marked as 'deleted' within this Session

.. container:: class_bases

Proxied for the :class:`_orm.Session` class
on behalf of the :class:`_asyncio.AsyncSession` class.

new property

new: Any

The set of all instances marked as 'new' within this Session.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class
on behalf of the :class:`_asyncio.AsyncSession` class.

identity_map property writable

identity_map: IdentityMap

Proxy for the :attr:_orm.Session.identity_map attribute on behalf of the :class:_asyncio.AsyncSession class.

is_active property

is_active: Any

True if this :class:.Session not in "partial rollback" state.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class
on behalf of the :class:`_asyncio.AsyncSession` class.

.. versionchanged:: 1.4 The :class:_orm.Session no longer begins a new transaction immediately, so this attribute will be False when the :class:_orm.Session is first instantiated.

"partial rollback" state typically indicates that the flush process of the :class:_orm.Session has failed, and that the :meth:_orm.Session.rollback method must be emitted in order to fully roll back the transaction.

If this :class:_orm.Session is not in a transaction at all, the :class:_orm.Session will autobegin when it is first used, so in this case :attr:_orm.Session.is_active will return True.

Otherwise, if this :class:_orm.Session is within a transaction, and that transaction has not been rolled back internally, the :attr:_orm.Session.is_active will also return True.

.. seealso::

:ref:`faq_session_rollback`

:meth:`_orm.Session.in_transaction`

autoflush property writable

autoflush: bool

Proxy for the :attr:_orm.Session.autoflush attribute on behalf of the :class:_asyncio.AsyncSession class.

no_autoflush property

no_autoflush: Any

Return a context manager that disables autoflush.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class
on behalf of the :class:`_asyncio.AsyncSession` class.

e.g.::

with session.no_autoflush:

    some_object = SomeClass()
    session.add(some_object)
    # won't autoflush
    some_object.related_thing = session.query(SomeRelated).first()

Operations that proceed within the with: block will not be subject to flushes occurring upon query access. This is useful when initializing a series of objects which involve existing database queries, where the uncompleted object should not yet be flushed.

info property

info: Any

A user-modifiable dictionary.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class
on behalf of the :class:`_asyncio.AsyncSession` class.

The initial value of this dictionary can be populated using the info argument to the :class:.Session constructor or :class:.sessionmaker constructor or factory methods. The dictionary here is always local to this :class:.Session and can be modified independently of all other :class:.Session objects.

refresh async

refresh(
    instance: object,
    attribute_names: Optional[Iterable[str]] = None,
    with_for_update: ForUpdateParameter = None,
) -> None

Expire and refresh the attributes on the given instance.

A query will be issued to the database and all attributes will be refreshed with their current database value.

This is the async version of the :meth:_orm.Session.refresh method. See that method for a complete description of all options.

.. seealso::

:meth:`_orm.Session.refresh` - main documentation for refresh
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def refresh(
    self,
    instance: object,
    attribute_names: Optional[Iterable[str]] = None,
    with_for_update: ForUpdateParameter = None,
) -> None:
    """Expire and refresh the attributes on the given instance.

    A query will be issued to the database and all attributes will be
    refreshed with their current database value.

    This is the async version of the :meth:`_orm.Session.refresh` method.
    See that method for a complete description of all options.

    .. seealso::

        :meth:`_orm.Session.refresh` - main documentation for refresh

    """

    await greenlet_spawn(
        self.sync_session.refresh,
        instance,
        attribute_names=attribute_names,
        with_for_update=with_for_update,
    )

run_sync async

run_sync(fn: Callable[..., _T], *arg: Any, **kw: Any) -> _T

Invoke the given synchronous (i.e. not async) callable, passing a synchronous-style :class:_orm.Session as the first argument.

This method allows traditional synchronous SQLAlchemy functions to run within the context of an asyncio application.

E.g.::

def some_business_method(session: Session, param: str) -> str:
    '''A synchronous function that does not require awaiting

    :param session: a SQLAlchemy Session, used synchronously

    :return: an optional return value is supported

    '''
    session.add(MyObject(param=param))
    session.flush()
    return "success"


async def do_something_async(async_engine: AsyncEngine) -> None:
    '''an async function that uses awaiting'''

    with AsyncSession(async_engine) as async_session:
        # run some_business_method() with a sync-style
        # Session, proxied into an awaitable
        return_code = await async_session.run_sync(some_business_method, param="param1")
        print(return_code)

This method maintains the asyncio event loop all the way through to the database connection by running the given callable in a specially instrumented greenlet.

.. tip::

The provided callable is invoked inline within the asyncio event
loop, and will block on traditional IO calls.  IO within this
callable should only call into SQLAlchemy's asyncio database
APIs which will be properly adapted to the greenlet context.

.. seealso::

:class:`.AsyncAttrs`  - a mixin for ORM mapped classes that provides
a similar feature more succinctly on a per-attribute basis

:meth:`.AsyncConnection.run_sync`

:ref:`session_run_sync`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def run_sync(
    self, fn: Callable[..., _T], *arg: Any, **kw: Any
) -> _T:
    """Invoke the given synchronous (i.e. not async) callable,
    passing a synchronous-style :class:`_orm.Session` as the first
    argument.

    This method allows traditional synchronous SQLAlchemy functions to
    run within the context of an asyncio application.

    E.g.::

        def some_business_method(session: Session, param: str) -> str:
            '''A synchronous function that does not require awaiting

            :param session: a SQLAlchemy Session, used synchronously

            :return: an optional return value is supported

            '''
            session.add(MyObject(param=param))
            session.flush()
            return "success"


        async def do_something_async(async_engine: AsyncEngine) -> None:
            '''an async function that uses awaiting'''

            with AsyncSession(async_engine) as async_session:
                # run some_business_method() with a sync-style
                # Session, proxied into an awaitable
                return_code = await async_session.run_sync(some_business_method, param="param1")
                print(return_code)

    This method maintains the asyncio event loop all the way through
    to the database connection by running the given callable in a
    specially instrumented greenlet.

    .. tip::

        The provided callable is invoked inline within the asyncio event
        loop, and will block on traditional IO calls.  IO within this
        callable should only call into SQLAlchemy's asyncio database
        APIs which will be properly adapted to the greenlet context.

    .. seealso::

        :class:`.AsyncAttrs`  - a mixin for ORM mapped classes that provides
        a similar feature more succinctly on a per-attribute basis

        :meth:`.AsyncConnection.run_sync`

        :ref:`session_run_sync`
    """  # noqa: E501

    return await greenlet_spawn(fn, self.sync_session, *arg, **kw)

execute async

execute(
    statement: TypedReturnsRows[_T],
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    _parent_execute_state: Optional[Any] = None,
    _add_event: Optional[Any] = None,
) -> Result[_T]
execute(
    statement: UpdateBase,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    _parent_execute_state: Optional[Any] = None,
    _add_event: Optional[Any] = None,
) -> CursorResult[Any]
execute(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    _parent_execute_state: Optional[Any] = None,
    _add_event: Optional[Any] = None,
) -> Result[Any]
execute(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Result[Any]

Execute a statement and return a buffered :class:_engine.Result object.

.. seealso::

:meth:`_orm.Session.execute` - main documentation for execute
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def execute(
    self,
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Result[Any]:
    """Execute a statement and return a buffered
    :class:`_engine.Result` object.

    .. seealso::

        :meth:`_orm.Session.execute` - main documentation for execute

    """

    if execution_options:
        execution_options = util.immutabledict(execution_options).union(
            _EXECUTE_OPTIONS
        )
    else:
        execution_options = _EXECUTE_OPTIONS

    result = await greenlet_spawn(
        self.sync_session.execute,
        statement,
        params=params,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
        **kw,
    )
    return await _ensure_sync_result(result, self.execute)

scalar async

scalar(
    statement: TypedReturnsRows[Tuple[_T]],
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Optional[_T]
scalar(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Any
scalar(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Any

Execute a statement and return a scalar result.

.. seealso::

:meth:`_orm.Session.scalar` - main documentation for scalar
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def scalar(
    self,
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Any:
    """Execute a statement and return a scalar result.

    .. seealso::

        :meth:`_orm.Session.scalar` - main documentation for scalar

    """

    if execution_options:
        execution_options = util.immutabledict(execution_options).union(
            _EXECUTE_OPTIONS
        )
    else:
        execution_options = _EXECUTE_OPTIONS

    return await greenlet_spawn(
        self.sync_session.scalar,
        statement,
        params=params,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
        **kw,
    )

scalars async

scalars(
    statement: TypedReturnsRows[Tuple[_T]],
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> ScalarResult[_T]
scalars(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> ScalarResult[Any]
scalars(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> ScalarResult[Any]

Execute a statement and return scalar results.

:return: a :class:_result.ScalarResult object

.. versionadded:: 1.4.24 Added :meth:_asyncio.AsyncSession.scalars

.. versionadded:: 1.4.26 Added :meth:_asyncio.async_scoped_session.scalars

.. seealso::

:meth:`_orm.Session.scalars` - main documentation for scalars

:meth:`_asyncio.AsyncSession.stream_scalars` - streaming version
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def scalars(
    self,
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> ScalarResult[Any]:
    """Execute a statement and return scalar results.

    :return: a :class:`_result.ScalarResult` object

    .. versionadded:: 1.4.24 Added :meth:`_asyncio.AsyncSession.scalars`

    .. versionadded:: 1.4.26 Added
       :meth:`_asyncio.async_scoped_session.scalars`

    .. seealso::

        :meth:`_orm.Session.scalars` - main documentation for scalars

        :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version

    """

    result = await self.execute(
        statement,
        params=params,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
        **kw,
    )
    return result.scalars()

get async

get(
    entity: _EntityBindKey[_O],
    ident: _PKIdentityArgument,
    *,
    options: Optional[Sequence[ORMOption]] = None,
    populate_existing: bool = False,
    with_for_update: ForUpdateParameter = None,
    identity_token: Optional[Any] = None,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
) -> Union[_O, None]

Return an instance based on the given primary key identifier, or None if not found.

.. seealso::

:meth:`_orm.Session.get` - main documentation for get
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def get(
    self,
    entity: _EntityBindKey[_O],
    ident: _PKIdentityArgument,
    *,
    options: Optional[Sequence[ORMOption]] = None,
    populate_existing: bool = False,
    with_for_update: ForUpdateParameter = None,
    identity_token: Optional[Any] = None,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
) -> Union[_O, None]:
    """Return an instance based on the given primary key identifier,
    or ``None`` if not found.

    .. seealso::

        :meth:`_orm.Session.get` - main documentation for get


    """

    return await greenlet_spawn(
        cast("Callable[..., _O]", self.sync_session.get),
        entity,
        ident,
        options=options,
        populate_existing=populate_existing,
        with_for_update=with_for_update,
        identity_token=identity_token,
        execution_options=execution_options,
    )

get_one async

get_one(
    entity: _EntityBindKey[_O],
    ident: _PKIdentityArgument,
    *,
    options: Optional[Sequence[ORMOption]] = None,
    populate_existing: bool = False,
    with_for_update: ForUpdateParameter = None,
    identity_token: Optional[Any] = None,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
) -> _O

Return an instance based on the given primary key identifier, or raise an exception if not found.

Raises sqlalchemy.orm.exc.NoResultFound if the query selects no rows.

..versionadded: 2.0.22

.. seealso::

:meth:`_orm.Session.get_one` - main documentation for get_one
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def get_one(
    self,
    entity: _EntityBindKey[_O],
    ident: _PKIdentityArgument,
    *,
    options: Optional[Sequence[ORMOption]] = None,
    populate_existing: bool = False,
    with_for_update: ForUpdateParameter = None,
    identity_token: Optional[Any] = None,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
) -> _O:
    """Return an instance based on the given primary key identifier,
    or raise an exception if not found.

    Raises ``sqlalchemy.orm.exc.NoResultFound`` if the query selects
    no rows.

    ..versionadded: 2.0.22

    .. seealso::

        :meth:`_orm.Session.get_one` - main documentation for get_one

    """

    return await greenlet_spawn(
        cast("Callable[..., _O]", self.sync_session.get_one),
        entity,
        ident,
        options=options,
        populate_existing=populate_existing,
        with_for_update=with_for_update,
        identity_token=identity_token,
        execution_options=execution_options,
    )

stream async

stream(
    statement: TypedReturnsRows[_T],
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> AsyncResult[_T]
stream(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> AsyncResult[Any]
stream(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> AsyncResult[Any]

Execute a statement and return a streaming :class:_asyncio.AsyncResult object.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def stream(
    self,
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> AsyncResult[Any]:
    """Execute a statement and return a streaming
    :class:`_asyncio.AsyncResult` object.

    """

    if execution_options:
        execution_options = util.immutabledict(execution_options).union(
            _STREAM_OPTIONS
        )
    else:
        execution_options = _STREAM_OPTIONS

    result = await greenlet_spawn(
        self.sync_session.execute,
        statement,
        params=params,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
        **kw,
    )
    return AsyncResult(result)

stream_scalars async

stream_scalars(
    statement: TypedReturnsRows[Tuple[_T]],
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> AsyncScalarResult[_T]
stream_scalars(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> AsyncScalarResult[Any]
stream_scalars(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> AsyncScalarResult[Any]

Execute a statement and return a stream of scalar results.

:return: an :class:_asyncio.AsyncScalarResult object

.. versionadded:: 1.4.24

.. seealso::

:meth:`_orm.Session.scalars` - main documentation for scalars

:meth:`_asyncio.AsyncSession.scalars` - non streaming version
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def stream_scalars(
    self,
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> AsyncScalarResult[Any]:
    """Execute a statement and return a stream of scalar results.

    :return: an :class:`_asyncio.AsyncScalarResult` object

    .. versionadded:: 1.4.24

    .. seealso::

        :meth:`_orm.Session.scalars` - main documentation for scalars

        :meth:`_asyncio.AsyncSession.scalars` - non streaming version

    """

    result = await self.stream(
        statement,
        params=params,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
        **kw,
    )
    return result.scalars()

delete async

delete(instance: object) -> None

Mark an instance as deleted.

The database delete operation occurs upon flush().

As this operation may need to cascade along unloaded relationships, it is awaitable to allow for those queries to take place.

.. seealso::

:meth:`_orm.Session.delete` - main documentation for delete
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def delete(self, instance: object) -> None:
    """Mark an instance as deleted.

    The database delete operation occurs upon ``flush()``.

    As this operation may need to cascade along unloaded relationships,
    it is awaitable to allow for those queries to take place.

    .. seealso::

        :meth:`_orm.Session.delete` - main documentation for delete

    """
    await greenlet_spawn(self.sync_session.delete, instance)

merge async

merge(
    instance: _O,
    *,
    load: bool = True,
    options: Optional[Sequence[ORMOption]] = None,
) -> _O

Copy the state of a given instance into a corresponding instance within this :class:_asyncio.AsyncSession.

.. seealso::

:meth:`_orm.Session.merge` - main documentation for merge
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def merge(
    self,
    instance: _O,
    *,
    load: bool = True,
    options: Optional[Sequence[ORMOption]] = None,
) -> _O:
    """Copy the state of a given instance into a corresponding instance
    within this :class:`_asyncio.AsyncSession`.

    .. seealso::

        :meth:`_orm.Session.merge` - main documentation for merge

    """
    return await greenlet_spawn(
        self.sync_session.merge, instance, load=load, options=options
    )

flush async

flush(objects: Optional[Sequence[Any]] = None) -> None

Flush all the object changes to the database.

.. seealso::

:meth:`_orm.Session.flush` - main documentation for flush
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def flush(self, objects: Optional[Sequence[Any]] = None) -> None:
    """Flush all the object changes to the database.

    .. seealso::

        :meth:`_orm.Session.flush` - main documentation for flush

    """
    await greenlet_spawn(self.sync_session.flush, objects=objects)

get_transaction

get_transaction() -> Optional[AsyncSessionTransaction]

Return the current root transaction in progress, if any.

:return: an :class:_asyncio.AsyncSessionTransaction object, or None.

.. versionadded:: 1.4.18

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def get_transaction(self) -> Optional[AsyncSessionTransaction]:
    """Return the current root transaction in progress, if any.

    :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
     ``None``.

    .. versionadded:: 1.4.18

    """
    trans = self.sync_session.get_transaction()
    if trans is not None:
        return AsyncSessionTransaction._retrieve_proxy_for_target(trans)
    else:
        return None

get_nested_transaction

get_nested_transaction() -> Optional[
    AsyncSessionTransaction
]

Return the current nested transaction in progress, if any.

:return: an :class:_asyncio.AsyncSessionTransaction object, or None.

.. versionadded:: 1.4.18

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def get_nested_transaction(self) -> Optional[AsyncSessionTransaction]:
    """Return the current nested transaction in progress, if any.

    :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
     ``None``.

    .. versionadded:: 1.4.18

    """

    trans = self.sync_session.get_nested_transaction()
    if trans is not None:
        return AsyncSessionTransaction._retrieve_proxy_for_target(trans)
    else:
        return None

get_bind

get_bind(
    mapper: Optional[_EntityBindKey[_O]] = None,
    clause: Optional[ClauseElement] = None,
    bind: Optional[_SessionBind] = None,
    **kw: Any,
) -> Union[Engine, Connection]

Return a "bind" to which the synchronous proxied :class:_orm.Session is bound.

Unlike the :meth:_orm.Session.get_bind method, this method is currently not used by this :class:.AsyncSession in any way in order to resolve engines for requests.

.. note::

This method proxies directly to the :meth:`_orm.Session.get_bind`
method, however is currently **not** useful as an override target,
in contrast to that of the :meth:`_orm.Session.get_bind` method.
The example below illustrates how to implement custom
:meth:`_orm.Session.get_bind` schemes that work with
:class:`.AsyncSession` and :class:`.AsyncEngine`.

The pattern introduced at :ref:session_custom_partitioning illustrates how to apply a custom bind-lookup scheme to a :class:_orm.Session given a set of :class:_engine.Engine objects. To apply a corresponding :meth:_orm.Session.get_bind implementation for use with a :class:.AsyncSession and :class:.AsyncEngine objects, continue to subclass :class:_orm.Session and apply it to :class:.AsyncSession using :paramref:.AsyncSession.sync_session_class. The inner method must continue to return :class:_engine.Engine instances, which can be acquired from a :class:_asyncio.AsyncEngine using the :attr:_asyncio.AsyncEngine.sync_engine attribute::

# using example from "Custom Vertical Partitioning"


import random

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import Session

# construct async engines w/ async drivers
engines = {
    'leader':create_async_engine("sqlite+aiosqlite:///leader.db"),
    'other':create_async_engine("sqlite+aiosqlite:///other.db"),
    'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"),
    'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"),
}

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        # within get_bind(), return sync engines
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines['other'].sync_engine
        elif self._flushing or isinstance(clause, (Update, Delete)):
            return engines['leader'].sync_engine
        else:
            return engines[
                random.choice(['follower1','follower2'])
            ].sync_engine

# apply to AsyncSession using sync_session_class
AsyncSessionMaker = async_sessionmaker(
    sync_session_class=RoutingSession
)

The :meth:_orm.Session.get_bind method is called in a non-asyncio, implicitly non-blocking context in the same manner as ORM event hooks and functions that are invoked via :meth:.AsyncSession.run_sync, so routines that wish to run SQL commands inside of :meth:_orm.Session.get_bind can continue to do so using blocking-style code, which will be translated to implicitly async calls at the point of invoking IO on the database drivers.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def get_bind(
    self,
    mapper: Optional[_EntityBindKey[_O]] = None,
    clause: Optional[ClauseElement] = None,
    bind: Optional[_SessionBind] = None,
    **kw: Any,
) -> Union[Engine, Connection]:
    """Return a "bind" to which the synchronous proxied :class:`_orm.Session`
    is bound.

    Unlike the :meth:`_orm.Session.get_bind` method, this method is
    currently **not** used by this :class:`.AsyncSession` in any way
    in order to resolve engines for requests.

    .. note::

        This method proxies directly to the :meth:`_orm.Session.get_bind`
        method, however is currently **not** useful as an override target,
        in contrast to that of the :meth:`_orm.Session.get_bind` method.
        The example below illustrates how to implement custom
        :meth:`_orm.Session.get_bind` schemes that work with
        :class:`.AsyncSession` and :class:`.AsyncEngine`.

    The pattern introduced at :ref:`session_custom_partitioning`
    illustrates how to apply a custom bind-lookup scheme to a
    :class:`_orm.Session` given a set of :class:`_engine.Engine` objects.
    To apply a corresponding :meth:`_orm.Session.get_bind` implementation
    for use with a :class:`.AsyncSession` and :class:`.AsyncEngine`
    objects, continue to subclass :class:`_orm.Session` and apply it to
    :class:`.AsyncSession` using
    :paramref:`.AsyncSession.sync_session_class`. The inner method must
    continue to return :class:`_engine.Engine` instances, which can be
    acquired from a :class:`_asyncio.AsyncEngine` using the
    :attr:`_asyncio.AsyncEngine.sync_engine` attribute::

        # using example from "Custom Vertical Partitioning"


        import random

        from sqlalchemy.ext.asyncio import AsyncSession
        from sqlalchemy.ext.asyncio import create_async_engine
        from sqlalchemy.ext.asyncio import async_sessionmaker
        from sqlalchemy.orm import Session

        # construct async engines w/ async drivers
        engines = {
            'leader':create_async_engine("sqlite+aiosqlite:///leader.db"),
            'other':create_async_engine("sqlite+aiosqlite:///other.db"),
            'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"),
            'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"),
        }

        class RoutingSession(Session):
            def get_bind(self, mapper=None, clause=None, **kw):
                # within get_bind(), return sync engines
                if mapper and issubclass(mapper.class_, MyOtherClass):
                    return engines['other'].sync_engine
                elif self._flushing or isinstance(clause, (Update, Delete)):
                    return engines['leader'].sync_engine
                else:
                    return engines[
                        random.choice(['follower1','follower2'])
                    ].sync_engine

        # apply to AsyncSession using sync_session_class
        AsyncSessionMaker = async_sessionmaker(
            sync_session_class=RoutingSession
        )

    The :meth:`_orm.Session.get_bind` method is called in a non-asyncio,
    implicitly non-blocking context in the same manner as ORM event hooks
    and functions that are invoked via :meth:`.AsyncSession.run_sync`, so
    routines that wish to run SQL commands inside of
    :meth:`_orm.Session.get_bind` can continue to do so using
    blocking-style code, which will be translated to implicitly async calls
    at the point of invoking IO on the database drivers.

    """  # noqa: E501

    return self.sync_session.get_bind(
        mapper=mapper, clause=clause, bind=bind, **kw
    )

connection async

connection(
    bind_arguments: Optional[_BindArguments] = None,
    execution_options: Optional[
        CoreExecuteOptionsParameter
    ] = None,
    **kw: Any,
) -> AsyncConnection

Return a :class:_asyncio.AsyncConnection object corresponding to this :class:.Session object's transactional state.

This method may also be used to establish execution options for the database connection used by the current transaction.

.. versionadded:: 1.4.24 Added **kw arguments which are passed through to the underlying :meth:_orm.Session.connection method.

.. seealso::

:meth:`_orm.Session.connection` - main documentation for
"connection"
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def connection(
    self,
    bind_arguments: Optional[_BindArguments] = None,
    execution_options: Optional[CoreExecuteOptionsParameter] = None,
    **kw: Any,
) -> AsyncConnection:
    r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
    this :class:`.Session` object's transactional state.

    This method may also be used to establish execution options for the
    database connection used by the current transaction.

    .. versionadded:: 1.4.24  Added \**kw arguments which are passed
       through to the underlying :meth:`_orm.Session.connection` method.

    .. seealso::

        :meth:`_orm.Session.connection` - main documentation for
        "connection"

    """

    sync_connection = await greenlet_spawn(
        self.sync_session.connection,
        bind_arguments=bind_arguments,
        execution_options=execution_options,
        **kw,
    )
    return engine.AsyncConnection._retrieve_proxy_for_target(
        sync_connection
    )

begin

Return an :class:_asyncio.AsyncSessionTransaction object.

The underlying :class:_orm.Session will perform the "begin" action when the :class:_asyncio.AsyncSessionTransaction object is entered::

async with async_session.begin():
    # .. ORM transaction is begun

Note that database IO will not normally occur when the session-level transaction is begun, as database transactions begin on an on-demand basis. However, the begin block is async to accommodate for a :meth:_orm.SessionEvents.after_transaction_create event hook that may perform IO.

For a general description of ORM begin, see :meth:_orm.Session.begin.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def begin(self) -> AsyncSessionTransaction:
    """Return an :class:`_asyncio.AsyncSessionTransaction` object.

    The underlying :class:`_orm.Session` will perform the
    "begin" action when the :class:`_asyncio.AsyncSessionTransaction`
    object is entered::

        async with async_session.begin():
            # .. ORM transaction is begun

    Note that database IO will not normally occur when the session-level
    transaction is begun, as database transactions begin on an
    on-demand basis.  However, the begin block is async to accommodate
    for a :meth:`_orm.SessionEvents.after_transaction_create`
    event hook that may perform IO.

    For a general description of ORM begin, see
    :meth:`_orm.Session.begin`.

    """

    return AsyncSessionTransaction(self)

begin_nested

begin_nested() -> AsyncSessionTransaction

Return an :class:_asyncio.AsyncSessionTransaction object which will begin a "nested" transaction, e.g. SAVEPOINT.

Behavior is the same as that of :meth:_asyncio.AsyncSession.begin.

For a general description of ORM begin nested, see :meth:_orm.Session.begin_nested.

.. seealso::

:ref:`aiosqlite_serializable` - special workarounds required
with the SQLite asyncio driver in order for SAVEPOINT to work
correctly.
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def begin_nested(self) -> AsyncSessionTransaction:
    """Return an :class:`_asyncio.AsyncSessionTransaction` object
    which will begin a "nested" transaction, e.g. SAVEPOINT.

    Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`.

    For a general description of ORM begin nested, see
    :meth:`_orm.Session.begin_nested`.

    .. seealso::

        :ref:`aiosqlite_serializable` - special workarounds required
        with the SQLite asyncio driver in order for SAVEPOINT to work
        correctly.

    """

    return AsyncSessionTransaction(self, nested=True)

rollback async

rollback() -> None

Rollback the current transaction in progress.

.. seealso::

:meth:`_orm.Session.rollback` - main documentation for
"rollback"
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def rollback(self) -> None:
    """Rollback the current transaction in progress.

    .. seealso::

        :meth:`_orm.Session.rollback` - main documentation for
        "rollback"
    """
    await greenlet_spawn(self.sync_session.rollback)

commit async

commit() -> None

Commit the current transaction in progress.

.. seealso::

:meth:`_orm.Session.commit` - main documentation for
"commit"
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def commit(self) -> None:
    """Commit the current transaction in progress.

    .. seealso::

        :meth:`_orm.Session.commit` - main documentation for
        "commit"
    """
    await greenlet_spawn(self.sync_session.commit)

close async

close() -> None

Close out the transactional resources and ORM objects used by this :class:_asyncio.AsyncSession.

.. seealso::

:meth:`_orm.Session.close` - main documentation for
"close"

:ref:`session_closing` - detail on the semantics of
:meth:`_asyncio.AsyncSession.close` and
:meth:`_asyncio.AsyncSession.reset`.
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def close(self) -> None:
    """Close out the transactional resources and ORM objects used by this
    :class:`_asyncio.AsyncSession`.

    .. seealso::

        :meth:`_orm.Session.close` - main documentation for
        "close"

        :ref:`session_closing` - detail on the semantics of
        :meth:`_asyncio.AsyncSession.close` and
        :meth:`_asyncio.AsyncSession.reset`.

    """
    await greenlet_spawn(self.sync_session.close)

reset async

reset() -> None

Close out the transactional resources and ORM objects used by this :class:_orm.Session, resetting the session to its initial state.

.. versionadded:: 2.0.22

.. seealso::

:meth:`_orm.Session.reset` - main documentation for
"reset"

:ref:`session_closing` - detail on the semantics of
:meth:`_asyncio.AsyncSession.close` and
:meth:`_asyncio.AsyncSession.reset`.
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def reset(self) -> None:
    """Close out the transactional resources and ORM objects used by this
    :class:`_orm.Session`, resetting the session to its initial state.

    .. versionadded:: 2.0.22

    .. seealso::

        :meth:`_orm.Session.reset` - main documentation for
        "reset"

        :ref:`session_closing` - detail on the semantics of
        :meth:`_asyncio.AsyncSession.close` and
        :meth:`_asyncio.AsyncSession.reset`.

    """
    await greenlet_spawn(self.sync_session.reset)

aclose async

aclose() -> None

A synonym for :meth:_asyncio.AsyncSession.close.

The :meth:_asyncio.AsyncSession.aclose name is specifically to support the Python standard library @contextlib.aclosing context manager function.

.. versionadded:: 2.0.20

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def aclose(self) -> None:
    """A synonym for :meth:`_asyncio.AsyncSession.close`.

    The :meth:`_asyncio.AsyncSession.aclose` name is specifically
    to support the Python standard library ``@contextlib.aclosing``
    context manager function.

    .. versionadded:: 2.0.20

    """
    await self.close()

invalidate async

invalidate() -> None

Close this Session, using connection invalidation.

For a complete description, see :meth:_orm.Session.invalidate.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
async def invalidate(self) -> None:
    """Close this Session, using connection invalidation.

    For a complete description, see :meth:`_orm.Session.invalidate`.
    """
    await greenlet_spawn(self.sync_session.invalidate)

close_all async classmethod

close_all() -> None

Close all :class:_asyncio.AsyncSession sessions.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
@classmethod
@util.deprecated(
    "2.0",
    "The :meth:`.AsyncSession.close_all` method is deprecated and will be "
    "removed in a future release.  Please refer to "
    ":func:`_asyncio.close_all_sessions`.",
)
async def close_all(cls) -> None:
    """Close all :class:`_asyncio.AsyncSession` sessions."""
    await close_all_sessions()

add

add(instance: object, _warn: bool = True) -> None

Place an object into this :class:_orm.Session.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

Objects that are in the :term:transient state when passed to the :meth:_orm.Session.add method will move to the :term:pending state, until the next flush, at which point they will move to the :term:persistent state.

Objects that are in the :term:detached state when passed to the :meth:_orm.Session.add method will move to the :term:persistent state directly.

If the transaction used by the :class:_orm.Session is rolled back, objects which were transient when they were passed to :meth:_orm.Session.add will be moved back to the :term:transient state, and will no longer be present within this :class:_orm.Session.

.. seealso::

:meth:`_orm.Session.add_all`

:ref:`session_adding` - at :ref:`session_basics`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def add(self, instance: object, _warn: bool = True) -> None:
    r"""Place an object into this :class:`_orm.Session`.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    Objects that are in the :term:`transient` state when passed to the
    :meth:`_orm.Session.add` method will move to the
    :term:`pending` state, until the next flush, at which point they
    will move to the :term:`persistent` state.

    Objects that are in the :term:`detached` state when passed to the
    :meth:`_orm.Session.add` method will move to the :term:`persistent`
    state directly.

    If the transaction used by the :class:`_orm.Session` is rolled back,
    objects which were transient when they were passed to
    :meth:`_orm.Session.add` will be moved back to the
    :term:`transient` state, and will no longer be present within this
    :class:`_orm.Session`.

    .. seealso::

        :meth:`_orm.Session.add_all`

        :ref:`session_adding` - at :ref:`session_basics`


    """  # noqa: E501

    return self._proxied.add(instance, _warn=_warn)

add_all

add_all(instances: Iterable[object]) -> None

Add the given collection of instances to this :class:_orm.Session.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

See the documentation for :meth:_orm.Session.add for a general behavioral description.

.. seealso::

:meth:`_orm.Session.add`

:ref:`session_adding` - at :ref:`session_basics`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def add_all(self, instances: Iterable[object]) -> None:
    r"""Add the given collection of instances to this :class:`_orm.Session`.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    See the documentation for :meth:`_orm.Session.add` for a general
    behavioral description.

    .. seealso::

        :meth:`_orm.Session.add`

        :ref:`session_adding` - at :ref:`session_basics`


    """  # noqa: E501

    return self._proxied.add_all(instances)

expire

expire(
    instance: object,
    attribute_names: Optional[Iterable[str]] = None,
) -> None

Expire the attributes on an instance.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

Marks the attributes of an instance as out of date. When an expired attribute is next accessed, a query will be issued to the :class:.Session object's current transactional context in order to load all expired attributes for the given instance. Note that a highly isolated transaction will return the same values as were previously read in that same transaction, regardless of changes in database state outside of that transaction.

To expire all objects in the :class:.Session simultaneously, use :meth:Session.expire_all.

The :class:.Session object's default behavior is to expire all state whenever the :meth:Session.rollback or :meth:Session.commit methods are called, so that new state can be loaded for the new transaction. For this reason, calling :meth:Session.expire only makes sense for the specific case that a non-ORM SQL statement was emitted in the current transaction.

:param instance: The instance to be refreshed. :param attribute_names: optional list of string attribute names indicating a subset of attributes to be expired.

.. seealso::

:ref:`session_expire` - introductory material

:meth:`.Session.expire`

:meth:`.Session.refresh`

:meth:`_orm.Query.populate_existing`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def expire(
    self, instance: object, attribute_names: Optional[Iterable[str]] = None
) -> None:
    r"""Expire the attributes on an instance.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    Marks the attributes of an instance as out of date. When an expired
    attribute is next accessed, a query will be issued to the
    :class:`.Session` object's current transactional context in order to
    load all expired attributes for the given instance.   Note that
    a highly isolated transaction will return the same values as were
    previously read in that same transaction, regardless of changes
    in database state outside of that transaction.

    To expire all objects in the :class:`.Session` simultaneously,
    use :meth:`Session.expire_all`.

    The :class:`.Session` object's default behavior is to
    expire all state whenever the :meth:`Session.rollback`
    or :meth:`Session.commit` methods are called, so that new
    state can be loaded for the new transaction.   For this reason,
    calling :meth:`Session.expire` only makes sense for the specific
    case that a non-ORM SQL statement was emitted in the current
    transaction.

    :param instance: The instance to be refreshed.
    :param attribute_names: optional list of string attribute names
      indicating a subset of attributes to be expired.

    .. seealso::

        :ref:`session_expire` - introductory material

        :meth:`.Session.expire`

        :meth:`.Session.refresh`

        :meth:`_orm.Query.populate_existing`


    """  # noqa: E501

    return self._proxied.expire(instance, attribute_names=attribute_names)

expire_all

expire_all() -> None

Expires all persistent instances within this Session.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

When any attributes on a persistent instance is next accessed, a query will be issued using the :class:.Session object's current transactional context in order to load all expired attributes for the given instance. Note that a highly isolated transaction will return the same values as were previously read in that same transaction, regardless of changes in database state outside of that transaction.

To expire individual objects and individual attributes on those objects, use :meth:Session.expire.

The :class:.Session object's default behavior is to expire all state whenever the :meth:Session.rollback or :meth:Session.commit methods are called, so that new state can be loaded for the new transaction. For this reason, calling :meth:Session.expire_all is not usually needed, assuming the transaction is isolated.

.. seealso::

:ref:`session_expire` - introductory material

:meth:`.Session.expire`

:meth:`.Session.refresh`

:meth:`_orm.Query.populate_existing`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def expire_all(self) -> None:
    r"""Expires all persistent instances within this Session.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    When any attributes on a persistent instance is next accessed,
    a query will be issued using the
    :class:`.Session` object's current transactional context in order to
    load all expired attributes for the given instance.   Note that
    a highly isolated transaction will return the same values as were
    previously read in that same transaction, regardless of changes
    in database state outside of that transaction.

    To expire individual objects and individual attributes
    on those objects, use :meth:`Session.expire`.

    The :class:`.Session` object's default behavior is to
    expire all state whenever the :meth:`Session.rollback`
    or :meth:`Session.commit` methods are called, so that new
    state can be loaded for the new transaction.   For this reason,
    calling :meth:`Session.expire_all` is not usually needed,
    assuming the transaction is isolated.

    .. seealso::

        :ref:`session_expire` - introductory material

        :meth:`.Session.expire`

        :meth:`.Session.refresh`

        :meth:`_orm.Query.populate_existing`


    """  # noqa: E501

    return self._proxied.expire_all()

expunge

expunge(instance: object) -> None

Remove the instance from this Session.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

This will free all internal references to the instance. Cascading will be applied according to the expunge cascade rule.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def expunge(self, instance: object) -> None:
    r"""Remove the `instance` from this ``Session``.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    This will free all internal references to the instance.  Cascading
    will be applied according to the *expunge* cascade rule.


    """  # noqa: E501

    return self._proxied.expunge(instance)

expunge_all

expunge_all() -> None

Remove all object instances from this Session.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

This is equivalent to calling expunge(obj) on all objects in this Session.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def expunge_all(self) -> None:
    r"""Remove all object instances from this ``Session``.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    This is equivalent to calling ``expunge(obj)`` on all objects in this
    ``Session``.


    """  # noqa: E501

    return self._proxied.expunge_all()

is_modified

is_modified(
    instance: object, include_collections: bool = True
) -> bool

Return True if the given instance has locally modified attributes.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

This method retrieves the history for each instrumented attribute on the instance and performs a comparison of the current value to its previously committed value, if any.

It is in effect a more expensive and accurate version of checking for the given instance in the :attr:.Session.dirty collection; a full test for each attribute's net "dirty" status is performed.

E.g.::

return session.is_modified(someobject)

A few caveats to this method apply:

  • Instances present in the :attr:.Session.dirty collection may report False when tested with this method. This is because the object may have received change events via attribute mutation, thus placing it in :attr:.Session.dirty, but ultimately the state is the same as that loaded from the database, resulting in no net change here.
  • Scalar attributes may not have recorded the previously set value when a new value was applied, if the attribute was not loaded, or was expired, at the time the new value was received - in these cases, the attribute is assumed to have a change, even if there is ultimately no net change against its database value. SQLAlchemy in most cases does not need the "old" value when a set event occurs, so it skips the expense of a SQL call if the old value isn't present, based on the assumption that an UPDATE of the scalar value is usually needed, and in those few cases where it isn't, is less expensive on average than issuing a defensive SELECT.

The "old" value is fetched unconditionally upon set only if the attribute container has the active_history flag set to True. This flag is set typically for primary key attributes and scalar object references that are not a simple many-to-one. To set this flag for any arbitrary mapped column, use the active_history argument with :func:.column_property.

:param instance: mapped instance to be tested for pending changes. :param include_collections: Indicates if multivalued collections should be included in the operation. Setting this to False is a way to detect only local-column based properties (i.e. scalar columns or many-to-one foreign keys) that would result in an UPDATE for this instance upon flush.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def is_modified(
    self, instance: object, include_collections: bool = True
) -> bool:
    r"""Return ``True`` if the given instance has locally
    modified attributes.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    This method retrieves the history for each instrumented
    attribute on the instance and performs a comparison of the current
    value to its previously committed value, if any.

    It is in effect a more expensive and accurate
    version of checking for the given instance in the
    :attr:`.Session.dirty` collection; a full test for
    each attribute's net "dirty" status is performed.

    E.g.::

        return session.is_modified(someobject)

    A few caveats to this method apply:

    * Instances present in the :attr:`.Session.dirty` collection may
      report ``False`` when tested with this method.  This is because
      the object may have received change events via attribute mutation,
      thus placing it in :attr:`.Session.dirty`, but ultimately the state
      is the same as that loaded from the database, resulting in no net
      change here.
    * Scalar attributes may not have recorded the previously set
      value when a new value was applied, if the attribute was not loaded,
      or was expired, at the time the new value was received - in these
      cases, the attribute is assumed to have a change, even if there is
      ultimately no net change against its database value. SQLAlchemy in
      most cases does not need the "old" value when a set event occurs, so
      it skips the expense of a SQL call if the old value isn't present,
      based on the assumption that an UPDATE of the scalar value is
      usually needed, and in those few cases where it isn't, is less
      expensive on average than issuing a defensive SELECT.

      The "old" value is fetched unconditionally upon set only if the
      attribute container has the ``active_history`` flag set to ``True``.
      This flag is set typically for primary key attributes and scalar
      object references that are not a simple many-to-one.  To set this
      flag for any arbitrary mapped column, use the ``active_history``
      argument with :func:`.column_property`.

    :param instance: mapped instance to be tested for pending changes.
    :param include_collections: Indicates if multivalued collections
     should be included in the operation.  Setting this to ``False`` is a
     way to detect only local-column based properties (i.e. scalar columns
     or many-to-one foreign keys) that would result in an UPDATE for this
     instance upon flush.


    """  # noqa: E501

    return self._proxied.is_modified(
        instance, include_collections=include_collections
    )

in_transaction

in_transaction() -> bool

Return True if this :class:_orm.Session has begun a transaction.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

.. versionadded:: 1.4

.. seealso::

:attr:`_orm.Session.is_active`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def in_transaction(self) -> bool:
    r"""Return True if this :class:`_orm.Session` has begun a transaction.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    .. versionadded:: 1.4

    .. seealso::

        :attr:`_orm.Session.is_active`



    """  # noqa: E501

    return self._proxied.in_transaction()

in_nested_transaction

in_nested_transaction() -> bool

Return True if this :class:_orm.Session has begun a nested transaction, e.g. SAVEPOINT.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

.. versionadded:: 1.4

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
def in_nested_transaction(self) -> bool:
    r"""Return True if this :class:`_orm.Session` has begun a nested
    transaction, e.g. SAVEPOINT.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    .. versionadded:: 1.4


    """  # noqa: E501

    return self._proxied.in_nested_transaction()

object_session classmethod

object_session(instance: object) -> Optional[Session]

Return the :class:.Session to which an object belongs.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

This is an alias of :func:.object_session.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
@classmethod
def object_session(cls, instance: object) -> Optional[Session]:
    r"""Return the :class:`.Session` to which an object belongs.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    This is an alias of :func:`.object_session`.


    """  # noqa: E501

    return Session.object_session(instance)

identity_key classmethod

identity_key(
    class_: Optional[Type[Any]] = None,
    ident: Union[Any, Tuple[Any, ...]] = None,
    *,
    instance: Optional[Any] = None,
    row: Optional[Union[Row[Any], RowMapping]] = None,
    identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]

Return an identity key.

.. container:: class_bases

Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.

This is an alias of :func:.util.identity_key.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py
@classmethod
def identity_key(
    cls,
    class_: Optional[Type[Any]] = None,
    ident: Union[Any, Tuple[Any, ...]] = None,
    *,
    instance: Optional[Any] = None,
    row: Optional[Union[Row[Any], RowMapping]] = None,
    identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
    r"""Return an identity key.

    .. container:: class_bases

        Proxied for the :class:`_orm.Session` class on
        behalf of the :class:`_asyncio.AsyncSession` class.

    This is an alias of :func:`.util.identity_key`.


    """  # noqa: E501

    return Session.identity_key(
        class_=class_,
        ident=ident,
        instance=instance,
        row=row,
        identity_token=identity_token,
    )

bulk async

bulk(
    *, proxy_reference: bool = True
) -> AsyncGenerator[AsyncSessionBulk, None]

An async session bulk context manager for AsyncSession objects.

The proxy option indicates that the provided resource references should be encapsulated with a proxy, this is done when validating the resource using the Pydantic core schema. This can be useful to resolve the references that target the same resource into a single instance. Thus, modifying a resolved instance will affect all references that target the same resource.

Parameters:

Name Type Description Default
proxy_reference bool

Whether the registered resource references should be encapsulated with a proxy or not. Defaults to True.

True

Returns:

Type Description
AsyncGenerator[AsyncSessionBulk, None]

An AsyncSessionBulk instance.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
@asynccontextmanager
async def bulk(
    self, *, proxy_reference: bool = True,
) -> AsyncGenerator['AsyncSessionBulk', None]:
    """An async session bulk context manager for `AsyncSession` objects.

    The proxy option indicates that the provided resource references should
    be encapsulated with a proxy, this is done when validating the resource
    using the Pydantic core schema. This can be useful to resolve the
    references that target the same resource into a single instance. Thus,
    modifying a resolved instance will affect all references that target
    the same resource.

    Args:
        proxy_reference: Whether the registered resource references should
            be encapsulated with a proxy or not. Defaults to ``True``.

    Returns:
        An `AsyncSessionBulk` instance.
    """
    bulk = AsyncSessionBulk(self, proxy_reference=proxy_reference)
    token = SESSION_BULK_CONTEXT.set(bulk)
    try:
        yield bulk
    finally:
        SESSION_BULK_CONTEXT.reset(token)

AsyncSessionBulk

AsyncSessionBulk(
    session: _T, *, proxy_reference: bool = True
)

Bases: Bulk[AsyncSession]

An async bulk operation manager for resources.

It is used to register resources for bulk operations and commit or rollback them in a single operation within an async session.

Initialize the session bulk manager.

The proxy option indicates that the provided resource references should be encapsulated with a proxy, this is done when validating the resource using the Pydantic core schema. This can be useful to resolve the references that target the same resource into a single instance. Thus, modifying a resolved instance will affect all references that target the same resource.

Parameters:

Name Type Description Default
session _T

The session to use for the bulk operations.

required
proxy_reference bool

Whether the registered resource references should be encapsulated with a proxy or not. Defaults to True.

True
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def __init__(self, session: _T, *, proxy_reference: bool = True):
    """Initialize the session bulk manager.

    The proxy option indicates that the provided resource references should
    be encapsulated with a proxy, this is done when validating the resource
    using the Pydantic core schema. This can be useful to resolve the
    references that target the same resource into a single instance. Thus,
    modifying a resolved instance will affect all references that target
    the same resource.

    Args:
        session: The session to use for the bulk operations.
        proxy_reference: Whether the registered resource references should
            be encapsulated with a proxy or not. Defaults to ``True``.
    """
    self._lock = Lock()

    self.session = session
    self.proxy_reference = proxy_reference

    self.resolved = {}
    self.unresolved = {}

add

add(
    instance: BaseResource | BaseSpec,
    *,
    is_reference: bool = False,
) -> None

Add a resource instance to the bulk manager.

Parameters:

Name Type Description Default
instance BaseResource | BaseSpec

The resource instance to add to the bulk manager.

required
is_reference bool

Whether the provided resource instance is a reference or not. Defaults to False.

False
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def add(
    self,
    instance: 'BaseResource | BaseSpec',
    *,
    is_reference: bool = False,
) -> None:
    """Add a resource instance to the bulk manager.

    Args:
        instance: The resource instance to add to the bulk manager.
        is_reference: Whether the provided resource instance is a reference
            or not. Defaults to ``False``.
    """
    # Resolve instance entity class
    if is_proxy(instance):
        proxy = instance.__proxy__()
        entity = proxy.__class__
    else:
        proxy = None
        entity = instance.__class__

    # Create instance entry
    entry = BulkEntry(
        instance,  # type: ignore
        is_proxy=proxy is not None,
        is_reference=is_reference,
    )

    # Register instance entry
    with self._lock:
        if entity not in self.resolved:
            self.resolved[entity] = set()
        if entity not in self.unresolved:
            self.unresolved[entity] = set()
        self.unresolved[entity].add(entry)

get

get(
    entity: ResourceType | SpecType,
    *,
    resolved: bool | None = None,
    scope: Literal["all", "references", "values"] = "all",
) -> list[BaseResource]

Get the resource instances registered in the bulk manager.

Parameters:

Name Type Description Default
entity ResourceType | SpecType

The resource type to get the resource instances for.

required
resolved bool | None

Whether to get only the resolved resources or not: - True: Get only the resolved resources. - False: Get only the unresolved resources. - None: Get all the resources. Defaults to None.

None
scope Literal['all', 'references', 'values']

The scope of the resource instances to get: - 'all': Get all the resource instances. - 'references': Get only the resource reference instances. - 'values': Get only the resource value instances. Defaults to 'all'.

'all'

Returns:

Type Description
list[BaseResource]

The list of resource instances registered in the bulk manager for

list[BaseResource]

the specified resource type and options.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def get(
    self,
    entity: 'ResourceType | SpecType',
    *,
    resolved: bool | None = None,
    scope: Literal['all', 'references', 'values'] = 'all',
) -> list['BaseResource']:
    """Get the resource instances registered in the bulk manager.

    Args:
        entity: The resource type to get the resource instances for.
        resolved: Whether to get only the resolved resources or not:
            - ``True``: Get only the resolved resources.
            - ``False``: Get only the unresolved resources.
            - ``None``: Get all the resources.
            Defaults to ``None``.
        scope: The scope of the resource instances to get:
            - ``'all'``: Get all the resource instances.
            - ``'references'``: Get only the resource reference instances.
            - ``'values'``: Get only the resource value instances.
            Defaults to ``'all'``.

    Returns:
        The list of resource instances registered in the bulk manager for
        the specified resource type and options.
    """
    entries: set[BulkEntry] = set()

    # Retrieve entries
    if resolved is None or resolved:
        entries.update(self.resolved.get(entity, set()))  # type: ignore
    if resolved is None or not resolved:
        entries.update(self.unresolved.get(entity, set()))  # type: ignore

    # Retrieve instances by scope
    if scope == 'references':
        return [
            entry.instance for entry in entries
            if entry.is_reference
        ]
    elif scope == 'values':
        return [
            entry.instance for entry in entries
            if not entry.is_reference
        ]

    return [entry.instance for entry in entries]

resolve async

resolve(
    *,
    raise_errors: bool = True,
    scope: Literal["all", "references", "values"] = "all",
    strategy: Literal["bind", "hydrate"] = "bind",
) -> None

Resolve the specified scope of resource entries in the bulk.

For resource references, if the proxy_reference option is enabled, the resolved instances replace the reference proxy targets. Otherwise, the resolved instances are used to update the reference proxy target.

For resource values, the resolved instances are used to update the resource instances with the fetched data, no proxy is used.

Parameters:

Name Type Description Default
raise_errors bool

Whether to raise errors when failing to resolve resource references or values. Defaults to True.

True
scope Literal['all', 'references', 'values']

The scope of the resources to resolve, it can be either: - 'all': Resolve both the resource references and values. - 'references': Resolve only the resource references. - 'values': Resolve only the resource values. Defaults to 'all'.

'all'
strategy Literal['bind', 'hydrate']

The resolution strategy to use, it can be either: - 'bind': Bind the resolved resource references and values to the database session, i.e. only the resolved id and type_ fields are updated and the instances are made transient to detached. - 'hydrate': Hydrate the resolved resource references and values with the data fetched from the database, i.e. the instances are updated with the fetched data. Defaults to 'bind'.

'bind'
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
async def resolve(
    self,
    *,
    raise_errors: bool = True,
    scope: Literal['all', 'references', 'values'] = 'all',
    strategy: Literal['bind', 'hydrate'] = 'bind',
) -> None:
    """Resolve the specified scope of resource entries in the bulk.

    For resource references, if the `proxy_reference` option is enabled,
    the resolved instances replace the reference proxy targets. Otherwise,
    the resolved instances are used to update the reference proxy target.

    For resource values, the resolved instances are used to update the
    resource instances with the fetched data, no proxy is used.

    Args:
        raise_errors: Whether to raise errors when failing to resolve
            resource references or values. Defaults to ``True``.
        scope: The scope of the resources to resolve, it can be either:
            - ``'all'``: Resolve both the resource references and values.
            - ``'references'``: Resolve only the resource references.
            - ``'values'``: Resolve only the resource values.
            Defaults to ``'all'``.
        strategy: The resolution strategy to use, it can be either:
            - ``'bind'``: Bind the resolved resource references and values
                to the database session, i.e. only the resolved `id` and
                `type_` fields are updated and the instances are made
                transient to detached.
            - ``'hydrate'``: Hydrate the resolved resource references and
                values with the data fetched from the database, i.e. the
                instances are updated with the fetched data.
            Defaults to ``'bind'``.
    """
    resolver = self._resolver(
        raise_errors=raise_errors, scope=scope, strategy=strategy
    )

    try:
        context = next(resolver)
        while True:
            fields, statement = context
            result = await self.session.execute(statement)
            context = resolver.send((fields, result))
    except StopIteration:
        pass

async_session_factory

async_session_factory(
    bind: AsyncConnection | AsyncEngine | None = None,
    routing: DatabaseManager | None = None,
    scoped: bool = False,
    scopefunc: Callable[[], Any] = current_task,
    *args: Any,
    **kwargs: Any,
) -> AsyncSessionFactory

Create an async session factory for Session objects.

Parameters:

Name Type Description Default
bind AsyncConnection | AsyncEngine | None

An optional AsyncEngine or AsyncConnection to which this AsyncSession should be bound. When specified, all SQL operations performed by this session will execute via this connectable. Defaults to None.

None
routing DatabaseManager | None

An optional DatabaseManager instance to use for the database routing. Defaults to None.

None
scoped bool

Whether to use a scoped session or not. The scoped session factory ensures that the session is thread-safe. Defaults to False.

False
scopefunc Callable[[], Any]

An optional callable that returns a hashable token that identifies the current scope. Defaults to asyncio.current_task.

current_task

Returns:

Type Description
AsyncSessionFactory

An AsyncSessionFactory instance for creating AsyncSession objects.

All other keyword arguments are passed to the constructor of the

parent SQLAlchemy async_sessionmaker class.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
def async_session_factory(
    bind: AsyncConnection | AsyncEngine | None = None,
    routing: DatabaseManager | None = None,
    scoped: bool = False,
    scopefunc: Callable[[], Any] = asyncio.current_task,
    *args: Any,
    **kwargs: Any,
) -> AsyncSessionFactory:
    """Create an async session factory for `Session` objects.

    Args:
        bind: An optional `AsyncEngine` or `AsyncConnection` to which this
            `AsyncSession` should be bound. When specified, all SQL operations
            performed by this session will execute via this connectable.
            Defaults to ``None``.
        routing: An optional `DatabaseManager` instance to use for the database
            routing. Defaults to ``None``.
        scoped: Whether to use a scoped session or not. The scoped session
            factory ensures that the session is thread-safe.
            Defaults to ``False``.
        scopefunc: An optional callable that returns a hashable token that
            identifies the current scope. Defaults to `asyncio.current_task`.

    Returns:
        An `AsyncSessionFactory` instance for creating `AsyncSession` objects.

    Note: All other keyword arguments are passed to the constructor of the
        parent SQLAlchemy `async_sessionmaker` class.
    """
    # Build session factory
    factory = _async_sessionmaker(
        bind,
        *args,
        class_=AsyncSession,
        routing=routing,
        **kwargs,
    )
    # Wrap the factory in a scoped async session manager if requested. The
    # scoped async session factory ensures that the async session is
    # thread-safe.
    return _async_scoped_session(factory, scopefunc) if scoped else factory

async_session_manager async

async_session_manager(
    *,
    using: AsyncSessionFactory | None = None,
    auto_commit: bool = False,
    new: bool = False,
    on_missing: Literal["create", "raise"] = "create",
) -> AsyncGenerator[AsyncSession, None]

An async session context manager for AsyncSession objects.

Provide a transactional async session context around a series of operations. It manages the async session lifecycle and commits or rollbacks the async session automatically based on the context.

Parameters:

Name Type Description Default
using AsyncSessionFactory | None

An optional AsyncSessionFactory instance to use instead of the application context factory. Defaults to None.

None
auto_commit bool

Whether to automatically commit on success or rollback on failure after the operation completes. Defaults to False.

False
new bool

Whether to create a new session or not. If set to True, a new session is created. If set to False, the current session is used if available, otherwise it follows the on_missing behavior. Defaults to False.

False
on_missing Literal['create', 'raise']

The behavior to follow when no current session is available, either to create a new session or raise an error. Defaults to 'create'.

'create'

Returns:

Type Description
AsyncGenerator[AsyncSession, None]

An AsyncSession instance.

Note

If neither an application nor a factory are provided and no current session is available, the manager will look for a session factory in the current context.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
@asynccontextmanager
async def async_session_manager(
    *,
    using: AsyncSessionFactory | None = None,
    auto_commit: bool = False,
    new: bool = False,
    on_missing: Literal['create', 'raise'] = 'create',
) -> AsyncGenerator[AsyncSession, None]:
    """An async session context manager for `AsyncSession` objects.

    Provide a transactional async session context around a series of
    operations. It manages the async session lifecycle and commits or rollbacks
    the async session automatically based on the context.

    Args:
        using: An optional `AsyncSessionFactory` instance to use instead of
            the application context factory. Defaults to ``None``.
        auto_commit: Whether to automatically commit on success or rollback on
            failure after the operation completes. Defaults to ``False``.
        new: Whether to create a new session or not. If set to ``True``, a new
            session is created. If set to ``False``, the current session is
            used if available, otherwise it follows the `on_missing` behavior.
            Defaults to ``False``.
        on_missing: The behavior to follow when no current session is
            available, either to create a new session or raise an error.
            Defaults to ``'create'``.

    Returns:
        An `AsyncSession` instance.

    Note:
        If neither an application nor a factory are provided and no current
        session is available, the manager will look for a session factory in
        the current context.
    """
    session = SESSION_CONTEXT.get()

    # Retrieve session
    if new is False:
        if session is not None:
            if isinstance(session, AsyncSession):
                yield session
                return
            raise RuntimeError(
                f"Invalid session type found in the current context. Expected "
                f"an `AsyncSession` instance but found {type(session)!r}."
            )
        elif on_missing == 'raise':
            raise RuntimeError(
                "No async session available in the current context where "
                "creating a new session is not allowed `on_missing='raise'`."
            )

    # Retrieve factory
    factory: AsyncSessionFactory | None = None
    if using is not None:
        factory = using
    else:
        app = PLATEFORME_CONTEXT.get()
        if app is not None:
            factory = app.async_session
    # Check factory
    if factory is None:
        raise SessionError(
            "No session factory available in the current context."
        )

    # Retrieve async session
    session = factory()
    token = SESSION_CONTEXT.set(session)

    # Execute operation
    try:
        yield session
    except Exception as error:
        if auto_commit:
            await session.rollback()
        raise DatabaseError(
            "An error occurred while executing a database operation."
        ) from error
    else:
        if auto_commit:
            await session.commit()
    finally:
        SESSION_CONTEXT.reset(token)
        if isinstance(factory, _async_scoped_session):
            await factory.remove()
        else:
            await session.close()

Sync

This module provides utilities for managing database sessions within the Plateforme framework using SQLAlchemy features.

SessionFactory module-attribute

SessionFactory = (
    sessionmaker["Session"] | scoped_session["Session"]
)

A type alias for a sync session factory for sync session objects.

Session

Session(
    bind: Connection | Engine | None = None,
    routing: DatabaseManager | None = None,
    proxy: AsyncSession | None = None,
    *args: Any,
    **kwargs: Any,
)

Bases: Session

Manages persistence operations synchronously for ORM-mapped objects.

Initialize the session.

See also the session_factory function which is used to generate a Session-producing callable with a given set of arguments.

Parameters:

Name Type Description Default
bind Connection | Engine | None

An optional Engine or Connection to which this Session should be bound. When specified, all SQL operations performed by this session will execute via this connectable.

None
routing DatabaseManager | None

An optional DatabaseManager instance to use for the database routing.

None
proxy AsyncSession | None

An optional AsyncSession instance to use as a proxy for the async session. This is used within the get_bind method to determine the correct engine to use for the session.

None
Note

All other keyword arguments are passed to the constructor of the SQLAlchemy parent session class.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
def __init__(
    self,
    bind: Connection | Engine | None = None,
    routing: DatabaseManager | None = None,
    proxy: AsyncSession | None = None,
    *args: Any,
    **kwargs: Any,
):
    """Initialize the session.

    See also the `session_factory` function which is used to generate a
    `Session`-producing callable with a given set of arguments.

    Args:
        bind: An optional `Engine` or `Connection` to which this `Session`
            should be bound. When specified, all SQL operations performed
            by this session will execute via this connectable.
        routing: An optional `DatabaseManager` instance to use for the
            database routing.
        proxy: An optional `AsyncSession` instance to use as a proxy for
            the async session. This is used within the `get_bind` method to
            determine the correct engine to use for the session.

    Note:
        All other keyword arguments are passed to the constructor of the
        SQLAlchemy parent session class.
    """
    super().__init__(bind, *args, **kwargs)
    self.routing = routing
    self.proxy = proxy

identity_map instance-attribute

identity_map: IdentityMap = WeakInstanceDict()

A mapping of object identities to objects themselves.

Iterating through Session.identity_map.values() provides access to the full set of persistent objects (i.e., those that have row identity) currently in the session.

.. seealso::

:func:`.identity_key` - helper function to produce the keys used
in this dictionary.

is_active property

is_active: bool

True if this :class:.Session not in "partial rollback" state.

.. versionchanged:: 1.4 The :class:_orm.Session no longer begins a new transaction immediately, so this attribute will be False when the :class:_orm.Session is first instantiated.

"partial rollback" state typically indicates that the flush process of the :class:_orm.Session has failed, and that the :meth:_orm.Session.rollback method must be emitted in order to fully roll back the transaction.

If this :class:_orm.Session is not in a transaction at all, the :class:_orm.Session will autobegin when it is first used, so in this case :attr:_orm.Session.is_active will return True.

Otherwise, if this :class:_orm.Session is within a transaction, and that transaction has not been rolled back internally, the :attr:_orm.Session.is_active will also return True.

.. seealso::

:ref:`faq_session_rollback`

:meth:`_orm.Session.in_transaction`

dirty property

dirty: IdentitySet

The set of all persistent instances considered dirty.

E.g.::

some_mapped_object in session.dirty

Instances are considered dirty when they were modified but not deleted.

Note that this 'dirty' calculation is 'optimistic'; most attribute-setting or collection modification operations will mark an instance as 'dirty' and place it in this set, even if there is no net change to the attribute's value. At flush time, the value of each attribute is compared to its previously saved value, and if there's no net change, no SQL operation will occur (this is a more expensive operation so it's only done at flush time).

To check if an instance has actionable net changes to its attributes, use the :meth:.Session.is_modified method.

deleted property

deleted: IdentitySet

The set of all instances marked as 'deleted' within this Session

new property

new: IdentitySet

The set of all instances marked as 'new' within this Session.

async_mode property

async_mode: bool

Whether the session is in async mode or not.

close_all classmethod

close_all() -> None

Close all sessions in memory.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
@classmethod
@util.deprecated(
    "1.3",
    "The :meth:`.Session.close_all` method is deprecated and will be "
    "removed in a future release.  Please refer to "
    ":func:`.session.close_all_sessions`.",
)
def close_all(cls) -> None:
    """Close *all* sessions in memory."""

    close_all_sessions()

identity_key classmethod

identity_key(
    class_: Optional[Type[Any]] = None,
    ident: Union[Any, Tuple[Any, ...]] = None,
    *,
    instance: Optional[Any] = None,
    row: Optional[Union[Row[Any], RowMapping]] = None,
    identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]

Return an identity key.

This is an alias of :func:.util.identity_key.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
@classmethod
@util.preload_module("sqlalchemy.orm.util")
def identity_key(
    cls,
    class_: Optional[Type[Any]] = None,
    ident: Union[Any, Tuple[Any, ...]] = None,
    *,
    instance: Optional[Any] = None,
    row: Optional[Union[Row[Any], RowMapping]] = None,
    identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
    """Return an identity key.

    This is an alias of :func:`.util.identity_key`.

    """
    return util.preloaded.orm_util.identity_key(
        class_,
        ident,
        instance=instance,
        row=row,
        identity_token=identity_token,
    )

object_session classmethod

object_session(instance: object) -> Optional[Session]

Return the :class:.Session to which an object belongs.

This is an alias of :func:.object_session.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
@classmethod
def object_session(cls, instance: object) -> Optional[Session]:
    """Return the :class:`.Session` to which an object belongs.

    This is an alias of :func:`.object_session`.

    """

    return object_session(instance)

in_transaction

in_transaction() -> bool

Return True if this :class:_orm.Session has begun a transaction.

.. versionadded:: 1.4

.. seealso::

:attr:`_orm.Session.is_active`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def in_transaction(self) -> bool:
    """Return True if this :class:`_orm.Session` has begun a transaction.

    .. versionadded:: 1.4

    .. seealso::

        :attr:`_orm.Session.is_active`


    """
    return self._transaction is not None

in_nested_transaction

in_nested_transaction() -> bool

Return True if this :class:_orm.Session has begun a nested transaction, e.g. SAVEPOINT.

.. versionadded:: 1.4

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def in_nested_transaction(self) -> bool:
    """Return True if this :class:`_orm.Session` has begun a nested
    transaction, e.g. SAVEPOINT.

    .. versionadded:: 1.4

    """
    return self._nested_transaction is not None

get_transaction

get_transaction() -> Optional[SessionTransaction]

Return the current root transaction in progress, if any.

.. versionadded:: 1.4

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def get_transaction(self) -> Optional[SessionTransaction]:
    """Return the current root transaction in progress, if any.

    .. versionadded:: 1.4

    """
    trans = self._transaction
    while trans is not None and trans._parent is not None:
        trans = trans._parent
    return trans

get_nested_transaction

get_nested_transaction() -> Optional[SessionTransaction]

Return the current nested transaction in progress, if any.

.. versionadded:: 1.4

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def get_nested_transaction(self) -> Optional[SessionTransaction]:
    """Return the current nested transaction in progress, if any.

    .. versionadded:: 1.4

    """

    return self._nested_transaction

info

info() -> _InfoType

A user-modifiable dictionary.

The initial value of this dictionary can be populated using the info argument to the :class:.Session constructor or :class:.sessionmaker constructor or factory methods. The dictionary here is always local to this :class:.Session and can be modified independently of all other :class:.Session objects.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
@util.memoized_property
def info(self) -> _InfoType:
    """A user-modifiable dictionary.

    The initial value of this dictionary can be populated using the
    ``info`` argument to the :class:`.Session` constructor or
    :class:`.sessionmaker` constructor or factory methods.  The dictionary
    here is always local to this :class:`.Session` and can be modified
    independently of all other :class:`.Session` objects.

    """
    return {}

begin

begin(nested: bool = False) -> SessionTransaction

Begin a transaction, or nested transaction, on this :class:.Session, if one is not already begun.

The :class:_orm.Session object features autobegin behavior, so that normally it is not necessary to call the :meth:_orm.Session.begin method explicitly. However, it may be used in order to control the scope of when the transactional state is begun.

When used to begin the outermost transaction, an error is raised if this :class:.Session is already inside of a transaction.

:param nested: if True, begins a SAVEPOINT transaction and is equivalent to calling :meth:~.Session.begin_nested. For documentation on SAVEPOINT transactions, please see :ref:session_begin_nested.

:return: the :class:.SessionTransaction object. Note that :class:.SessionTransaction acts as a Python context manager, allowing :meth:.Session.begin to be used in a "with" block. See :ref:session_explicit_begin for an example.

.. seealso::

:ref:`session_autobegin`

:ref:`unitofwork_transaction`

:meth:`.Session.begin_nested`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def begin(self, nested: bool = False) -> SessionTransaction:
    """Begin a transaction, or nested transaction,
    on this :class:`.Session`, if one is not already begun.

    The :class:`_orm.Session` object features **autobegin** behavior,
    so that normally it is not necessary to call the
    :meth:`_orm.Session.begin`
    method explicitly. However, it may be used in order to control
    the scope of when the transactional state is begun.

    When used to begin the outermost transaction, an error is raised
    if this :class:`.Session` is already inside of a transaction.

    :param nested: if True, begins a SAVEPOINT transaction and is
     equivalent to calling :meth:`~.Session.begin_nested`. For
     documentation on SAVEPOINT transactions, please see
     :ref:`session_begin_nested`.

    :return: the :class:`.SessionTransaction` object.  Note that
     :class:`.SessionTransaction`
     acts as a Python context manager, allowing :meth:`.Session.begin`
     to be used in a "with" block.  See :ref:`session_explicit_begin` for
     an example.

    .. seealso::

        :ref:`session_autobegin`

        :ref:`unitofwork_transaction`

        :meth:`.Session.begin_nested`


    """

    trans = self._transaction
    if trans is None:
        trans = self._autobegin_t(begin=True)

        if not nested:
            return trans

    assert trans is not None

    if nested:
        trans = trans._begin(nested=nested)
        assert self._transaction is trans
        self._nested_transaction = trans
    else:
        raise sa_exc.InvalidRequestError(
            "A transaction is already begun on this Session."
        )

    return trans  # needed for __enter__/__exit__ hook

begin_nested

begin_nested() -> SessionTransaction

Begin a "nested" transaction on this Session, e.g. SAVEPOINT.

The target database(s) and associated drivers must support SQL SAVEPOINT for this method to function correctly.

For documentation on SAVEPOINT transactions, please see :ref:session_begin_nested.

:return: the :class:.SessionTransaction object. Note that :class:.SessionTransaction acts as a context manager, allowing :meth:.Session.begin_nested to be used in a "with" block. See :ref:session_begin_nested for a usage example.

.. seealso::

:ref:`session_begin_nested`

:ref:`pysqlite_serializable` - special workarounds required
with the SQLite driver in order for SAVEPOINT to work
correctly. For asyncio use cases, see the section
:ref:`aiosqlite_serializable`.
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def begin_nested(self) -> SessionTransaction:
    """Begin a "nested" transaction on this Session, e.g. SAVEPOINT.

    The target database(s) and associated drivers must support SQL
    SAVEPOINT for this method to function correctly.

    For documentation on SAVEPOINT
    transactions, please see :ref:`session_begin_nested`.

    :return: the :class:`.SessionTransaction` object.  Note that
     :class:`.SessionTransaction` acts as a context manager, allowing
     :meth:`.Session.begin_nested` to be used in a "with" block.
     See :ref:`session_begin_nested` for a usage example.

    .. seealso::

        :ref:`session_begin_nested`

        :ref:`pysqlite_serializable` - special workarounds required
        with the SQLite driver in order for SAVEPOINT to work
        correctly. For asyncio use cases, see the section
        :ref:`aiosqlite_serializable`.

    """
    return self.begin(nested=True)

rollback

rollback() -> None

Rollback the current transaction in progress.

If no transaction is in progress, this method is a pass-through.

The method always rolls back the topmost database transaction, discarding any nested transactions that may be in progress.

.. seealso::

:ref:`session_rollback`

:ref:`unitofwork_transaction`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def rollback(self) -> None:
    """Rollback the current transaction in progress.

    If no transaction is in progress, this method is a pass-through.

    The method always rolls back
    the topmost database transaction, discarding any nested
    transactions that may be in progress.

    .. seealso::

        :ref:`session_rollback`

        :ref:`unitofwork_transaction`

    """
    if self._transaction is None:
        pass
    else:
        self._transaction.rollback(_to_root=True)

commit

commit() -> None

Flush pending changes and commit the current transaction.

When the COMMIT operation is complete, all objects are fully :term:expired, erasing their internal contents, which will be automatically re-loaded when the objects are next accessed. In the interim, these objects are in an expired state and will not function if they are :term:detached from the :class:.Session. Additionally, this re-load operation is not supported when using asyncio-oriented APIs. The :paramref:.Session.expire_on_commit parameter may be used to disable this behavior.

When there is no transaction in place for the :class:.Session, indicating that no operations were invoked on this :class:.Session since the previous call to :meth:.Session.commit, the method will begin and commit an internal-only "logical" transaction, that does not normally affect the database unless pending flush changes were detected, but will still invoke event handlers and object expiration rules.

The outermost database transaction is committed unconditionally, automatically releasing any SAVEPOINTs in effect.

.. seealso::

:ref:`session_committing`

:ref:`unitofwork_transaction`

:ref:`asyncio_orm_avoid_lazyloads`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def commit(self) -> None:
    """Flush pending changes and commit the current transaction.

    When the COMMIT operation is complete, all objects are fully
    :term:`expired`, erasing their internal contents, which will be
    automatically re-loaded when the objects are next accessed. In the
    interim, these objects are in an expired state and will not function if
    they are :term:`detached` from the :class:`.Session`. Additionally,
    this re-load operation is not supported when using asyncio-oriented
    APIs. The :paramref:`.Session.expire_on_commit` parameter may be used
    to disable this behavior.

    When there is no transaction in place for the :class:`.Session`,
    indicating that no operations were invoked on this :class:`.Session`
    since the previous call to :meth:`.Session.commit`, the method will
    begin and commit an internal-only "logical" transaction, that does not
    normally affect the database unless pending flush changes were
    detected, but will still invoke event handlers and object expiration
    rules.

    The outermost database transaction is committed unconditionally,
    automatically releasing any SAVEPOINTs in effect.

    .. seealso::

        :ref:`session_committing`

        :ref:`unitofwork_transaction`

        :ref:`asyncio_orm_avoid_lazyloads`

    """
    trans = self._transaction
    if trans is None:
        trans = self._autobegin_t()

    trans.commit(_to_root=True)

prepare

prepare() -> None

Prepare the current transaction in progress for two phase commit.

If no transaction is in progress, this method raises an :exc:~sqlalchemy.exc.InvalidRequestError.

Only root transactions of two phase sessions can be prepared. If the current transaction is not such, an :exc:~sqlalchemy.exc.InvalidRequestError is raised.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def prepare(self) -> None:
    """Prepare the current transaction in progress for two phase commit.

    If no transaction is in progress, this method raises an
    :exc:`~sqlalchemy.exc.InvalidRequestError`.

    Only root transactions of two phase sessions can be prepared. If the
    current transaction is not such, an
    :exc:`~sqlalchemy.exc.InvalidRequestError` is raised.

    """
    trans = self._transaction
    if trans is None:
        trans = self._autobegin_t()

    trans.prepare()

connection

connection(
    bind_arguments: Optional[_BindArguments] = None,
    execution_options: Optional[
        CoreExecuteOptionsParameter
    ] = None,
) -> Connection

Return a :class:_engine.Connection object corresponding to this :class:.Session object's transactional state.

Either the :class:_engine.Connection corresponding to the current transaction is returned, or if no transaction is in progress, a new one is begun and the :class:_engine.Connection returned (note that no transactional state is established with the DBAPI until the first SQL statement is emitted).

Ambiguity in multi-bind or unbound :class:.Session objects can be resolved through any of the optional keyword arguments. This ultimately makes usage of the :meth:.get_bind method for resolution.

:param bind_arguments: dictionary of bind arguments. May include "mapper", "bind", "clause", other custom arguments that are passed to :meth:.Session.get_bind.

:param execution_options: a dictionary of execution options that will be passed to :meth:_engine.Connection.execution_options, when the connection is first procured only. If the connection is already present within the :class:.Session, a warning is emitted and the arguments are ignored.

.. seealso::

:ref:`session_transaction_isolation`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def connection(
    self,
    bind_arguments: Optional[_BindArguments] = None,
    execution_options: Optional[CoreExecuteOptionsParameter] = None,
) -> Connection:
    r"""Return a :class:`_engine.Connection` object corresponding to this
    :class:`.Session` object's transactional state.

    Either the :class:`_engine.Connection` corresponding to the current
    transaction is returned, or if no transaction is in progress, a new
    one is begun and the :class:`_engine.Connection`
    returned (note that no
    transactional state is established with the DBAPI until the first
    SQL statement is emitted).

    Ambiguity in multi-bind or unbound :class:`.Session` objects can be
    resolved through any of the optional keyword arguments.   This
    ultimately makes usage of the :meth:`.get_bind` method for resolution.

    :param bind_arguments: dictionary of bind arguments.  May include
     "mapper", "bind", "clause", other custom arguments that are passed
     to :meth:`.Session.get_bind`.

    :param execution_options: a dictionary of execution options that will
     be passed to :meth:`_engine.Connection.execution_options`, **when the
     connection is first procured only**.   If the connection is already
     present within the :class:`.Session`, a warning is emitted and
     the arguments are ignored.

     .. seealso::

        :ref:`session_transaction_isolation`

    """

    if bind_arguments:
        bind = bind_arguments.pop("bind", None)

        if bind is None:
            bind = self.get_bind(**bind_arguments)
    else:
        bind = self.get_bind()

    return self._connection_for_bind(
        bind,
        execution_options=execution_options,
    )

execute

execute(
    statement: TypedReturnsRows[_T],
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    _parent_execute_state: Optional[Any] = None,
    _add_event: Optional[Any] = None,
) -> Result[_T]
execute(
    statement: UpdateBase,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    _parent_execute_state: Optional[Any] = None,
    _add_event: Optional[Any] = None,
) -> CursorResult[Any]
execute(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    _parent_execute_state: Optional[Any] = None,
    _add_event: Optional[Any] = None,
) -> Result[Any]
execute(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    _parent_execute_state: Optional[Any] = None,
    _add_event: Optional[Any] = None,
) -> Result[Any]

Execute a SQL expression construct.

Returns a :class:_engine.Result object representing results of the statement execution.

E.g.::

from sqlalchemy import select
result = session.execute(
    select(User).where(User.id == 5)
)

The API contract of :meth:_orm.Session.execute is similar to that of :meth:_engine.Connection.execute, the :term:2.0 style version of :class:_engine.Connection.

.. versionchanged:: 1.4 the :meth:_orm.Session.execute method is now the primary point of ORM statement execution when using :term:2.0 style ORM usage.

:param statement: An executable statement (i.e. an :class:.Executable expression such as :func:_expression.select).

:param params: Optional dictionary, or list of dictionaries, containing bound parameter values. If a single dictionary, single-row execution occurs; if a list of dictionaries, an "executemany" will be invoked. The keys in each dictionary must correspond to parameter names present in the statement.

:param execution_options: optional dictionary of execution options, which will be associated with the statement execution. This dictionary can provide a subset of the options that are accepted by :meth:_engine.Connection.execution_options, and may also provide additional options understood only in an ORM context.

.. seealso::

:ref:`orm_queryguide_execution_options` - ORM-specific execution
options

:param bind_arguments: dictionary of additional arguments to determine the bind. May include "mapper", "bind", or other custom arguments. Contents of this dictionary are passed to the :meth:.Session.get_bind method.

:return: a :class:_engine.Result object.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def execute(
    self,
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    _parent_execute_state: Optional[Any] = None,
    _add_event: Optional[Any] = None,
) -> Result[Any]:
    r"""Execute a SQL expression construct.

    Returns a :class:`_engine.Result` object representing
    results of the statement execution.

    E.g.::

        from sqlalchemy import select
        result = session.execute(
            select(User).where(User.id == 5)
        )

    The API contract of :meth:`_orm.Session.execute` is similar to that
    of :meth:`_engine.Connection.execute`, the :term:`2.0 style` version
    of :class:`_engine.Connection`.

    .. versionchanged:: 1.4 the :meth:`_orm.Session.execute` method is
       now the primary point of ORM statement execution when using
       :term:`2.0 style` ORM usage.

    :param statement:
        An executable statement (i.e. an :class:`.Executable` expression
        such as :func:`_expression.select`).

    :param params:
        Optional dictionary, or list of dictionaries, containing
        bound parameter values.   If a single dictionary, single-row
        execution occurs; if a list of dictionaries, an
        "executemany" will be invoked.  The keys in each dictionary
        must correspond to parameter names present in the statement.

    :param execution_options: optional dictionary of execution options,
     which will be associated with the statement execution.  This
     dictionary can provide a subset of the options that are accepted
     by :meth:`_engine.Connection.execution_options`, and may also
     provide additional options understood only in an ORM context.

     .. seealso::

        :ref:`orm_queryguide_execution_options` - ORM-specific execution
        options

    :param bind_arguments: dictionary of additional arguments to determine
     the bind.  May include "mapper", "bind", or other custom arguments.
     Contents of this dictionary are passed to the
     :meth:`.Session.get_bind` method.

    :return: a :class:`_engine.Result` object.


    """
    return self._execute_internal(
        statement,
        params,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
        _parent_execute_state=_parent_execute_state,
        _add_event=_add_event,
    )

scalar

scalar(
    statement: TypedReturnsRows[Tuple[_T]],
    params: Optional[_CoreSingleExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Optional[_T]
scalar(
    statement: Executable,
    params: Optional[_CoreSingleExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Any
scalar(
    statement: Executable,
    params: Optional[_CoreSingleExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Any

Execute a statement and return a scalar result.

Usage and parameters are the same as that of :meth:_orm.Session.execute; the return result is a scalar Python value.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def scalar(
    self,
    statement: Executable,
    params: Optional[_CoreSingleExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> Any:
    """Execute a statement and return a scalar result.

    Usage and parameters are the same as that of
    :meth:`_orm.Session.execute`; the return result is a scalar Python
    value.

    """

    return self._execute_internal(
        statement,
        params,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
        _scalar_result=True,
        **kw,
    )

scalars

scalars(
    statement: TypedReturnsRows[Tuple[_T]],
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> ScalarResult[_T]
scalars(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> ScalarResult[Any]
scalars(
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> ScalarResult[Any]

Execute a statement and return the results as scalars.

Usage and parameters are the same as that of :meth:_orm.Session.execute; the return result is a :class:_result.ScalarResult filtering object which will return single elements rather than :class:_row.Row objects.

:return: a :class:_result.ScalarResult object

.. versionadded:: 1.4.24 Added :meth:_orm.Session.scalars

.. versionadded:: 1.4.26 Added :meth:_orm.scoped_session.scalars

.. seealso::

:ref:`orm_queryguide_select_orm_entities` - contrasts the behavior
of :meth:`_orm.Session.execute` to :meth:`_orm.Session.scalars`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def scalars(
    self,
    statement: Executable,
    params: Optional[_CoreAnyExecuteParams] = None,
    *,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
    **kw: Any,
) -> ScalarResult[Any]:
    """Execute a statement and return the results as scalars.

    Usage and parameters are the same as that of
    :meth:`_orm.Session.execute`; the return result is a
    :class:`_result.ScalarResult` filtering object which
    will return single elements rather than :class:`_row.Row` objects.

    :return:  a :class:`_result.ScalarResult` object

    .. versionadded:: 1.4.24 Added :meth:`_orm.Session.scalars`

    .. versionadded:: 1.4.26 Added :meth:`_orm.scoped_session.scalars`

    .. seealso::

        :ref:`orm_queryguide_select_orm_entities` - contrasts the behavior
        of :meth:`_orm.Session.execute` to :meth:`_orm.Session.scalars`

    """

    return self._execute_internal(
        statement,
        params=params,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
        _scalar_result=False,  # mypy appreciates this
        **kw,
    ).scalars()

close

close() -> None

Close out the transactional resources and ORM objects used by this :class:_orm.Session.

This expunges all ORM objects associated with this :class:_orm.Session, ends any transaction in progress and :term:releases any :class:_engine.Connection objects which this :class:_orm.Session itself has checked out from associated :class:_engine.Engine objects. The operation then leaves the :class:_orm.Session in a state which it may be used again.

.. tip::

In the default running mode the :meth:`_orm.Session.close`
method **does not prevent the Session from being used again**.
The :class:`_orm.Session` itself does not actually have a
distinct "closed" state; it merely means
the :class:`_orm.Session` will release all database connections
and ORM objects.

Setting the parameter :paramref:`_orm.Session.close_resets_only`
to ``False`` will instead make the ``close`` final, meaning that
any further action on the session will be forbidden.

.. versionchanged:: 1.4 The :meth:.Session.close method does not immediately create a new :class:.SessionTransaction object; instead, the new :class:.SessionTransaction is created only if the :class:.Session is used again for a database operation.

.. seealso::

:ref:`session_closing` - detail on the semantics of
:meth:`_orm.Session.close` and :meth:`_orm.Session.reset`.

:meth:`_orm.Session.reset` - a similar method that behaves like
``close()`` with  the parameter
:paramref:`_orm.Session.close_resets_only` set to ``True``.
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def close(self) -> None:
    """Close out the transactional resources and ORM objects used by this
    :class:`_orm.Session`.

    This expunges all ORM objects associated with this
    :class:`_orm.Session`, ends any transaction in progress and
    :term:`releases` any :class:`_engine.Connection` objects which this
    :class:`_orm.Session` itself has checked out from associated
    :class:`_engine.Engine` objects. The operation then leaves the
    :class:`_orm.Session` in a state which it may be used again.

    .. tip::

        In the default running mode the :meth:`_orm.Session.close`
        method **does not prevent the Session from being used again**.
        The :class:`_orm.Session` itself does not actually have a
        distinct "closed" state; it merely means
        the :class:`_orm.Session` will release all database connections
        and ORM objects.

        Setting the parameter :paramref:`_orm.Session.close_resets_only`
        to ``False`` will instead make the ``close`` final, meaning that
        any further action on the session will be forbidden.

    .. versionchanged:: 1.4  The :meth:`.Session.close` method does not
       immediately create a new :class:`.SessionTransaction` object;
       instead, the new :class:`.SessionTransaction` is created only if
       the :class:`.Session` is used again for a database operation.

    .. seealso::

        :ref:`session_closing` - detail on the semantics of
        :meth:`_orm.Session.close` and :meth:`_orm.Session.reset`.

        :meth:`_orm.Session.reset` - a similar method that behaves like
        ``close()`` with  the parameter
        :paramref:`_orm.Session.close_resets_only` set to ``True``.

    """
    self._close_impl(invalidate=False)

reset

reset() -> None

Close out the transactional resources and ORM objects used by this :class:_orm.Session, resetting the session to its initial state.

This method provides for same "reset-only" behavior that the :meth:_orm.Session.close method has provided historically, where the state of the :class:_orm.Session is reset as though the object were brand new, and ready to be used again. This method may then be useful for :class:_orm.Session objects which set :paramref:_orm.Session.close_resets_only to False, so that "reset only" behavior is still available.

.. versionadded:: 2.0.22

.. seealso::

:ref:`session_closing` - detail on the semantics of
:meth:`_orm.Session.close` and :meth:`_orm.Session.reset`.

:meth:`_orm.Session.close` - a similar method will additionally
prevent re-use of the Session when the parameter
:paramref:`_orm.Session.close_resets_only` is set to ``False``.
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def reset(self) -> None:
    """Close out the transactional resources and ORM objects used by this
    :class:`_orm.Session`, resetting the session to its initial state.

    This method provides for same "reset-only" behavior that the
    :meth:`_orm.Session.close` method has provided historically, where the
    state of the :class:`_orm.Session` is reset as though the object were
    brand new, and ready to be used again.
    This method may then be useful for :class:`_orm.Session` objects
    which set :paramref:`_orm.Session.close_resets_only` to ``False``,
    so that "reset only" behavior is still available.

    .. versionadded:: 2.0.22

    .. seealso::

        :ref:`session_closing` - detail on the semantics of
        :meth:`_orm.Session.close` and :meth:`_orm.Session.reset`.

        :meth:`_orm.Session.close` - a similar method will additionally
        prevent re-use of the Session when the parameter
        :paramref:`_orm.Session.close_resets_only` is set to ``False``.
    """
    self._close_impl(invalidate=False, is_reset=True)

invalidate

invalidate() -> None

Close this Session, using connection invalidation.

This is a variant of :meth:.Session.close that will additionally ensure that the :meth:_engine.Connection.invalidate method will be called on each :class:_engine.Connection object that is currently in use for a transaction (typically there is only one connection unless the :class:_orm.Session is used with multiple engines).

This can be called when the database is known to be in a state where the connections are no longer safe to be used.

Below illustrates a scenario when using gevent <https://www.gevent.org/>_, which can produce Timeout exceptions that may mean the underlying connection should be discarded::

import gevent

try:
    sess = Session()
    sess.add(User())
    sess.commit()
except gevent.Timeout:
    sess.invalidate()
    raise
except:
    sess.rollback()
    raise

The method additionally does everything that :meth:_orm.Session.close does, including that all ORM objects are expunged.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def invalidate(self) -> None:
    """Close this Session, using connection invalidation.

    This is a variant of :meth:`.Session.close` that will additionally
    ensure that the :meth:`_engine.Connection.invalidate`
    method will be called on each :class:`_engine.Connection` object
    that is currently in use for a transaction (typically there is only
    one connection unless the :class:`_orm.Session` is used with
    multiple engines).

    This can be called when the database is known to be in a state where
    the connections are no longer safe to be used.

    Below illustrates a scenario when using `gevent
    <https://www.gevent.org/>`_, which can produce ``Timeout`` exceptions
    that may mean the underlying connection should be discarded::

        import gevent

        try:
            sess = Session()
            sess.add(User())
            sess.commit()
        except gevent.Timeout:
            sess.invalidate()
            raise
        except:
            sess.rollback()
            raise

    The method additionally does everything that :meth:`_orm.Session.close`
    does, including that all ORM objects are expunged.

    """
    self._close_impl(invalidate=True)

expunge_all

expunge_all() -> None

Remove all object instances from this Session.

This is equivalent to calling expunge(obj) on all objects in this Session.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def expunge_all(self) -> None:
    """Remove all object instances from this ``Session``.

    This is equivalent to calling ``expunge(obj)`` on all objects in this
    ``Session``.

    """

    all_states = self.identity_map.all_states() + list(self._new)
    self.identity_map._kill()
    self.identity_map = identity.WeakInstanceDict()
    self._new = {}
    self._deleted = {}

    statelib.InstanceState._detach_states(all_states, self)

bind_mapper

bind_mapper(
    mapper: _EntityBindKey[_O], bind: _SessionBind
) -> None

Associate a :class:_orm.Mapper or arbitrary Python class with a "bind", e.g. an :class:_engine.Engine or :class:_engine.Connection.

The given entity is added to a lookup used by the :meth:.Session.get_bind method.

:param mapper: a :class:_orm.Mapper object, or an instance of a mapped class, or any Python class that is the base of a set of mapped classes.

:param bind: an :class:_engine.Engine or :class:_engine.Connection object.

.. seealso::

:ref:`session_partitioning`

:paramref:`.Session.binds`

:meth:`.Session.bind_table`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def bind_mapper(
    self, mapper: _EntityBindKey[_O], bind: _SessionBind
) -> None:
    """Associate a :class:`_orm.Mapper` or arbitrary Python class with a
    "bind", e.g. an :class:`_engine.Engine` or
    :class:`_engine.Connection`.

    The given entity is added to a lookup used by the
    :meth:`.Session.get_bind` method.

    :param mapper: a :class:`_orm.Mapper` object,
     or an instance of a mapped
     class, or any Python class that is the base of a set of mapped
     classes.

    :param bind: an :class:`_engine.Engine` or :class:`_engine.Connection`
                object.

    .. seealso::

        :ref:`session_partitioning`

        :paramref:`.Session.binds`

        :meth:`.Session.bind_table`


    """
    self._add_bind(mapper, bind)

bind_table

bind_table(table: TableClause, bind: _SessionBind) -> None

Associate a :class:_schema.Table with a "bind", e.g. an :class:_engine.Engine or :class:_engine.Connection.

The given :class:_schema.Table is added to a lookup used by the :meth:.Session.get_bind method.

:param table: a :class:_schema.Table object, which is typically the target of an ORM mapping, or is present within a selectable that is mapped.

:param bind: an :class:_engine.Engine or :class:_engine.Connection object.

.. seealso::

:ref:`session_partitioning`

:paramref:`.Session.binds`

:meth:`.Session.bind_mapper`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def bind_table(self, table: TableClause, bind: _SessionBind) -> None:
    """Associate a :class:`_schema.Table` with a "bind", e.g. an
    :class:`_engine.Engine`
    or :class:`_engine.Connection`.

    The given :class:`_schema.Table` is added to a lookup used by the
    :meth:`.Session.get_bind` method.

    :param table: a :class:`_schema.Table` object,
     which is typically the target
     of an ORM mapping, or is present within a selectable that is
     mapped.

    :param bind: an :class:`_engine.Engine` or :class:`_engine.Connection`
     object.

    .. seealso::

        :ref:`session_partitioning`

        :paramref:`.Session.binds`

        :meth:`.Session.bind_mapper`


    """
    self._add_bind(table, bind)

query

query(_entity: _EntityType[_O]) -> Query[_O]
query(
    _colexpr: TypedColumnsClauseRole[_T],
) -> RowReturningQuery[Tuple[_T]]
query(
    __ent0: _TypedColumnClauseArgument[_T0],
    __ent1: _TypedColumnClauseArgument[_T1],
) -> RowReturningQuery[Tuple[_T0, _T1]]
query(
    __ent0: _TypedColumnClauseArgument[_T0],
    __ent1: _TypedColumnClauseArgument[_T1],
    __ent2: _TypedColumnClauseArgument[_T2],
) -> RowReturningQuery[Tuple[_T0, _T1, _T2]]
query(
    __ent0: _TypedColumnClauseArgument[_T0],
    __ent1: _TypedColumnClauseArgument[_T1],
    __ent2: _TypedColumnClauseArgument[_T2],
    __ent3: _TypedColumnClauseArgument[_T3],
) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3]]
query(
    __ent0: _TypedColumnClauseArgument[_T0],
    __ent1: _TypedColumnClauseArgument[_T1],
    __ent2: _TypedColumnClauseArgument[_T2],
    __ent3: _TypedColumnClauseArgument[_T3],
    __ent4: _TypedColumnClauseArgument[_T4],
) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4]]
query(
    __ent0: _TypedColumnClauseArgument[_T0],
    __ent1: _TypedColumnClauseArgument[_T1],
    __ent2: _TypedColumnClauseArgument[_T2],
    __ent3: _TypedColumnClauseArgument[_T3],
    __ent4: _TypedColumnClauseArgument[_T4],
    __ent5: _TypedColumnClauseArgument[_T5],
) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]
query(
    __ent0: _TypedColumnClauseArgument[_T0],
    __ent1: _TypedColumnClauseArgument[_T1],
    __ent2: _TypedColumnClauseArgument[_T2],
    __ent3: _TypedColumnClauseArgument[_T3],
    __ent4: _TypedColumnClauseArgument[_T4],
    __ent5: _TypedColumnClauseArgument[_T5],
    __ent6: _TypedColumnClauseArgument[_T6],
) -> RowReturningQuery[
    Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]
]
query(
    __ent0: _TypedColumnClauseArgument[_T0],
    __ent1: _TypedColumnClauseArgument[_T1],
    __ent2: _TypedColumnClauseArgument[_T2],
    __ent3: _TypedColumnClauseArgument[_T3],
    __ent4: _TypedColumnClauseArgument[_T4],
    __ent5: _TypedColumnClauseArgument[_T5],
    __ent6: _TypedColumnClauseArgument[_T6],
    __ent7: _TypedColumnClauseArgument[_T7],
) -> RowReturningQuery[
    Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]
]
query(
    *entities: _ColumnsClauseArgument[Any], **kwargs: Any
) -> Query[Any]
query(
    *entities: _ColumnsClauseArgument[Any], **kwargs: Any
) -> Query[Any]

Return a new :class:_query.Query object corresponding to this :class:_orm.Session.

Note that the :class:_query.Query object is legacy as of SQLAlchemy 2.0; the :func:_sql.select construct is now used to construct ORM queries.

.. seealso::

:ref:`unified_tutorial`

:ref:`queryguide_toplevel`

:ref:`query_api_toplevel` - legacy API doc
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def query(
    self, *entities: _ColumnsClauseArgument[Any], **kwargs: Any
) -> Query[Any]:
    """Return a new :class:`_query.Query` object corresponding to this
    :class:`_orm.Session`.

    Note that the :class:`_query.Query` object is legacy as of
    SQLAlchemy 2.0; the :func:`_sql.select` construct is now used
    to construct ORM queries.

    .. seealso::

        :ref:`unified_tutorial`

        :ref:`queryguide_toplevel`

        :ref:`query_api_toplevel` - legacy API doc

    """

    return self._query_cls(entities, self, **kwargs)

no_autoflush

no_autoflush() -> Iterator[Session]

Return a context manager that disables autoflush.

e.g.::

with session.no_autoflush:

    some_object = SomeClass()
    session.add(some_object)
    # won't autoflush
    some_object.related_thing = session.query(SomeRelated).first()

Operations that proceed within the with: block will not be subject to flushes occurring upon query access. This is useful when initializing a series of objects which involve existing database queries, where the uncompleted object should not yet be flushed.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
@util.non_memoized_property
@contextlib.contextmanager
def no_autoflush(self) -> Iterator[Session]:
    """Return a context manager that disables autoflush.

    e.g.::

        with session.no_autoflush:

            some_object = SomeClass()
            session.add(some_object)
            # won't autoflush
            some_object.related_thing = session.query(SomeRelated).first()

    Operations that proceed within the ``with:`` block
    will not be subject to flushes occurring upon query
    access.  This is useful when initializing a series
    of objects which involve existing database queries,
    where the uncompleted object should not yet be flushed.

    """
    autoflush = self.autoflush
    self.autoflush = False
    try:
        yield self
    finally:
        self.autoflush = autoflush

refresh

refresh(
    instance: object,
    attribute_names: Optional[Iterable[str]] = None,
    with_for_update: ForUpdateParameter = None,
) -> None

Expire and refresh attributes on the given instance.

The selected attributes will first be expired as they would when using :meth:_orm.Session.expire; then a SELECT statement will be issued to the database to refresh column-oriented attributes with the current value available in the current transaction.

:func:_orm.relationship oriented attributes will also be immediately loaded if they were already eagerly loaded on the object, using the same eager loading strategy that they were loaded with originally.

.. versionadded:: 1.4 - the :meth:_orm.Session.refresh method can also refresh eagerly loaded attributes.

:func:_orm.relationship oriented attributes that would normally load using the select (or "lazy") loader strategy will also load if they are named explicitly in the attribute_names collection, emitting a SELECT statement for the attribute using the immediate loader strategy. If lazy-loaded relationships are not named in :paramref:_orm.Session.refresh.attribute_names, then they remain as "lazy loaded" attributes and are not implicitly refreshed.

.. versionchanged:: 2.0.4 The :meth:_orm.Session.refresh method will now refresh lazy-loaded :func:_orm.relationship oriented attributes for those which are named explicitly in the :paramref:_orm.Session.refresh.attribute_names collection.

.. tip::

While the :meth:`_orm.Session.refresh` method is capable of
refreshing both column and relationship oriented attributes, its
primary focus is on refreshing of local column-oriented attributes
on a single instance. For more open ended "refresh" functionality,
including the ability to refresh the attributes on many objects at
once while having explicit control over relationship loader
strategies, use the
:ref:`populate existing <orm_queryguide_populate_existing>` feature
instead.

Note that a highly isolated transaction will return the same values as were previously read in that same transaction, regardless of changes in database state outside of that transaction. Refreshing attributes usually only makes sense at the start of a transaction where database rows have not yet been accessed.

:param attribute_names: optional. An iterable collection of string attribute names indicating a subset of attributes to be refreshed.

:param with_for_update: optional boolean True indicating FOR UPDATE should be used, or may be a dictionary containing flags to indicate a more specific set of FOR UPDATE flags for the SELECT; flags should match the parameters of :meth:_query.Query.with_for_update. Supersedes the :paramref:.Session.refresh.lockmode parameter.

.. seealso::

:ref:`session_expire` - introductory material

:meth:`.Session.expire`

:meth:`.Session.expire_all`

:ref:`orm_queryguide_populate_existing` - allows any ORM query
to refresh objects as they would be loaded normally.
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def refresh(
    self,
    instance: object,
    attribute_names: Optional[Iterable[str]] = None,
    with_for_update: ForUpdateParameter = None,
) -> None:
    """Expire and refresh attributes on the given instance.

    The selected attributes will first be expired as they would when using
    :meth:`_orm.Session.expire`; then a SELECT statement will be issued to
    the database to refresh column-oriented attributes with the current
    value available in the current transaction.

    :func:`_orm.relationship` oriented attributes will also be immediately
    loaded if they were already eagerly loaded on the object, using the
    same eager loading strategy that they were loaded with originally.

    .. versionadded:: 1.4 - the :meth:`_orm.Session.refresh` method
       can also refresh eagerly loaded attributes.

    :func:`_orm.relationship` oriented attributes that would normally
    load using the ``select`` (or "lazy") loader strategy will also
    load **if they are named explicitly in the attribute_names
    collection**, emitting a SELECT statement for the attribute using the
    ``immediate`` loader strategy.  If lazy-loaded relationships are not
    named in :paramref:`_orm.Session.refresh.attribute_names`, then
    they remain as "lazy loaded" attributes and are not implicitly
    refreshed.

    .. versionchanged:: 2.0.4  The :meth:`_orm.Session.refresh` method
       will now refresh lazy-loaded :func:`_orm.relationship` oriented
       attributes for those which are named explicitly in the
       :paramref:`_orm.Session.refresh.attribute_names` collection.

    .. tip::

        While the :meth:`_orm.Session.refresh` method is capable of
        refreshing both column and relationship oriented attributes, its
        primary focus is on refreshing of local column-oriented attributes
        on a single instance. For more open ended "refresh" functionality,
        including the ability to refresh the attributes on many objects at
        once while having explicit control over relationship loader
        strategies, use the
        :ref:`populate existing <orm_queryguide_populate_existing>` feature
        instead.

    Note that a highly isolated transaction will return the same values as
    were previously read in that same transaction, regardless of changes
    in database state outside of that transaction.   Refreshing
    attributes usually only makes sense at the start of a transaction
    where database rows have not yet been accessed.

    :param attribute_names: optional.  An iterable collection of
      string attribute names indicating a subset of attributes to
      be refreshed.

    :param with_for_update: optional boolean ``True`` indicating FOR UPDATE
      should be used, or may be a dictionary containing flags to
      indicate a more specific set of FOR UPDATE flags for the SELECT;
      flags should match the parameters of
      :meth:`_query.Query.with_for_update`.
      Supersedes the :paramref:`.Session.refresh.lockmode` parameter.

    .. seealso::

        :ref:`session_expire` - introductory material

        :meth:`.Session.expire`

        :meth:`.Session.expire_all`

        :ref:`orm_queryguide_populate_existing` - allows any ORM query
        to refresh objects as they would be loaded normally.

    """
    try:
        state = attributes.instance_state(instance)
    except exc.NO_STATE as err:
        raise exc.UnmappedInstanceError(instance) from err

    self._expire_state(state, attribute_names)

    # this autoflush previously used to occur as a secondary effect
    # of the load_on_ident below.   Meaning we'd organize the SELECT
    # based on current DB pks, then flush, then if pks changed in that
    # flush, crash.  this was unticketed but discovered as part of
    # #8703.  So here, autoflush up front, dont autoflush inside
    # load_on_ident.
    self._autoflush()

    if with_for_update == {}:
        raise sa_exc.ArgumentError(
            "with_for_update should be the boolean value "
            "True, or a dictionary with options.  "
            "A blank dictionary is ambiguous."
        )

    with_for_update = ForUpdateArg._from_argument(with_for_update)

    stmt: Select[Any] = sql.select(object_mapper(instance))
    if (
        loading.load_on_ident(
            self,
            stmt,
            state.key,
            refresh_state=state,
            with_for_update=with_for_update,
            only_load_props=attribute_names,
            require_pk_cols=True,
            # technically unnecessary as we just did autoflush
            # above, however removes the additional unnecessary
            # call to _autoflush()
            no_autoflush=True,
            is_user_refresh=True,
        )
        is None
    ):
        raise sa_exc.InvalidRequestError(
            "Could not refresh instance '%s'" % instance_str(instance)
        )

expire_all

expire_all() -> None

Expires all persistent instances within this Session.

When any attributes on a persistent instance is next accessed, a query will be issued using the :class:.Session object's current transactional context in order to load all expired attributes for the given instance. Note that a highly isolated transaction will return the same values as were previously read in that same transaction, regardless of changes in database state outside of that transaction.

To expire individual objects and individual attributes on those objects, use :meth:Session.expire.

The :class:.Session object's default behavior is to expire all state whenever the :meth:Session.rollback or :meth:Session.commit methods are called, so that new state can be loaded for the new transaction. For this reason, calling :meth:Session.expire_all is not usually needed, assuming the transaction is isolated.

.. seealso::

:ref:`session_expire` - introductory material

:meth:`.Session.expire`

:meth:`.Session.refresh`

:meth:`_orm.Query.populate_existing`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def expire_all(self) -> None:
    """Expires all persistent instances within this Session.

    When any attributes on a persistent instance is next accessed,
    a query will be issued using the
    :class:`.Session` object's current transactional context in order to
    load all expired attributes for the given instance.   Note that
    a highly isolated transaction will return the same values as were
    previously read in that same transaction, regardless of changes
    in database state outside of that transaction.

    To expire individual objects and individual attributes
    on those objects, use :meth:`Session.expire`.

    The :class:`.Session` object's default behavior is to
    expire all state whenever the :meth:`Session.rollback`
    or :meth:`Session.commit` methods are called, so that new
    state can be loaded for the new transaction.   For this reason,
    calling :meth:`Session.expire_all` is not usually needed,
    assuming the transaction is isolated.

    .. seealso::

        :ref:`session_expire` - introductory material

        :meth:`.Session.expire`

        :meth:`.Session.refresh`

        :meth:`_orm.Query.populate_existing`

    """
    for state in self.identity_map.all_states():
        state._expire(state.dict, self.identity_map._modified)

expire

expire(
    instance: object,
    attribute_names: Optional[Iterable[str]] = None,
) -> None

Expire the attributes on an instance.

Marks the attributes of an instance as out of date. When an expired attribute is next accessed, a query will be issued to the :class:.Session object's current transactional context in order to load all expired attributes for the given instance. Note that a highly isolated transaction will return the same values as were previously read in that same transaction, regardless of changes in database state outside of that transaction.

To expire all objects in the :class:.Session simultaneously, use :meth:Session.expire_all.

The :class:.Session object's default behavior is to expire all state whenever the :meth:Session.rollback or :meth:Session.commit methods are called, so that new state can be loaded for the new transaction. For this reason, calling :meth:Session.expire only makes sense for the specific case that a non-ORM SQL statement was emitted in the current transaction.

:param instance: The instance to be refreshed. :param attribute_names: optional list of string attribute names indicating a subset of attributes to be expired.

.. seealso::

:ref:`session_expire` - introductory material

:meth:`.Session.expire`

:meth:`.Session.refresh`

:meth:`_orm.Query.populate_existing`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def expire(
    self, instance: object, attribute_names: Optional[Iterable[str]] = None
) -> None:
    """Expire the attributes on an instance.

    Marks the attributes of an instance as out of date. When an expired
    attribute is next accessed, a query will be issued to the
    :class:`.Session` object's current transactional context in order to
    load all expired attributes for the given instance.   Note that
    a highly isolated transaction will return the same values as were
    previously read in that same transaction, regardless of changes
    in database state outside of that transaction.

    To expire all objects in the :class:`.Session` simultaneously,
    use :meth:`Session.expire_all`.

    The :class:`.Session` object's default behavior is to
    expire all state whenever the :meth:`Session.rollback`
    or :meth:`Session.commit` methods are called, so that new
    state can be loaded for the new transaction.   For this reason,
    calling :meth:`Session.expire` only makes sense for the specific
    case that a non-ORM SQL statement was emitted in the current
    transaction.

    :param instance: The instance to be refreshed.
    :param attribute_names: optional list of string attribute names
      indicating a subset of attributes to be expired.

    .. seealso::

        :ref:`session_expire` - introductory material

        :meth:`.Session.expire`

        :meth:`.Session.refresh`

        :meth:`_orm.Query.populate_existing`

    """
    try:
        state = attributes.instance_state(instance)
    except exc.NO_STATE as err:
        raise exc.UnmappedInstanceError(instance) from err
    self._expire_state(state, attribute_names)

expunge

expunge(instance: object) -> None

Remove the instance from this Session.

This will free all internal references to the instance. Cascading will be applied according to the expunge cascade rule.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def expunge(self, instance: object) -> None:
    """Remove the `instance` from this ``Session``.

    This will free all internal references to the instance.  Cascading
    will be applied according to the *expunge* cascade rule.

    """
    try:
        state = attributes.instance_state(instance)
    except exc.NO_STATE as err:
        raise exc.UnmappedInstanceError(instance) from err
    if state.session_id is not self.hash_key:
        raise sa_exc.InvalidRequestError(
            "Instance %s is not present in this Session" % state_str(state)
        )

    cascaded = list(
        state.manager.mapper.cascade_iterator("expunge", state)
    )
    self._expunge_states([state] + [st_ for o, m, st_, dct_ in cascaded])

add

add(instance: object, _warn: bool = True) -> None

Place an object into this :class:_orm.Session.

Objects that are in the :term:transient state when passed to the :meth:_orm.Session.add method will move to the :term:pending state, until the next flush, at which point they will move to the :term:persistent state.

Objects that are in the :term:detached state when passed to the :meth:_orm.Session.add method will move to the :term:persistent state directly.

If the transaction used by the :class:_orm.Session is rolled back, objects which were transient when they were passed to :meth:_orm.Session.add will be moved back to the :term:transient state, and will no longer be present within this :class:_orm.Session.

.. seealso::

:meth:`_orm.Session.add_all`

:ref:`session_adding` - at :ref:`session_basics`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def add(self, instance: object, _warn: bool = True) -> None:
    """Place an object into this :class:`_orm.Session`.

    Objects that are in the :term:`transient` state when passed to the
    :meth:`_orm.Session.add` method will move to the
    :term:`pending` state, until the next flush, at which point they
    will move to the :term:`persistent` state.

    Objects that are in the :term:`detached` state when passed to the
    :meth:`_orm.Session.add` method will move to the :term:`persistent`
    state directly.

    If the transaction used by the :class:`_orm.Session` is rolled back,
    objects which were transient when they were passed to
    :meth:`_orm.Session.add` will be moved back to the
    :term:`transient` state, and will no longer be present within this
    :class:`_orm.Session`.

    .. seealso::

        :meth:`_orm.Session.add_all`

        :ref:`session_adding` - at :ref:`session_basics`

    """
    if _warn and self._warn_on_events:
        self._flush_warning("Session.add()")

    try:
        state = attributes.instance_state(instance)
    except exc.NO_STATE as err:
        raise exc.UnmappedInstanceError(instance) from err

    self._save_or_update_state(state)

add_all

add_all(instances: Iterable[object]) -> None

Add the given collection of instances to this :class:_orm.Session.

See the documentation for :meth:_orm.Session.add for a general behavioral description.

.. seealso::

:meth:`_orm.Session.add`

:ref:`session_adding` - at :ref:`session_basics`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def add_all(self, instances: Iterable[object]) -> None:
    """Add the given collection of instances to this :class:`_orm.Session`.

    See the documentation for :meth:`_orm.Session.add` for a general
    behavioral description.

    .. seealso::

        :meth:`_orm.Session.add`

        :ref:`session_adding` - at :ref:`session_basics`

    """

    if self._warn_on_events:
        self._flush_warning("Session.add_all()")

    for instance in instances:
        self.add(instance, _warn=False)

delete

delete(instance: object) -> None

Mark an instance as deleted.

The object is assumed to be either :term:persistent or :term:detached when passed; after the method is called, the object will remain in the :term:persistent state until the next flush proceeds. During this time, the object will also be a member of the :attr:_orm.Session.deleted collection.

When the next flush proceeds, the object will move to the :term:deleted state, indicating a DELETE statement was emitted for its row within the current transaction. When the transaction is successfully committed, the deleted object is moved to the :term:detached state and is no longer present within this :class:_orm.Session.

.. seealso::

:ref:`session_deleting` - at :ref:`session_basics`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def delete(self, instance: object) -> None:
    """Mark an instance as deleted.

    The object is assumed to be either :term:`persistent` or
    :term:`detached` when passed; after the method is called, the
    object will remain in the :term:`persistent` state until the next
    flush proceeds.  During this time, the object will also be a member
    of the :attr:`_orm.Session.deleted` collection.

    When the next flush proceeds, the object will move to the
    :term:`deleted` state, indicating a ``DELETE`` statement was emitted
    for its row within the current transaction.   When the transaction
    is successfully committed,
    the deleted object is moved to the :term:`detached` state and is
    no longer present within this :class:`_orm.Session`.

    .. seealso::

        :ref:`session_deleting` - at :ref:`session_basics`

    """
    if self._warn_on_events:
        self._flush_warning("Session.delete()")

    try:
        state = attributes.instance_state(instance)
    except exc.NO_STATE as err:
        raise exc.UnmappedInstanceError(instance) from err

    self._delete_impl(state, instance, head=True)

get

get(
    entity: _EntityBindKey[_O],
    ident: _PKIdentityArgument,
    *,
    options: Optional[Sequence[ORMOption]] = None,
    populate_existing: bool = False,
    with_for_update: ForUpdateParameter = None,
    identity_token: Optional[Any] = None,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
) -> Optional[_O]

Return an instance based on the given primary key identifier, or None if not found.

E.g.::

my_user = session.get(User, 5)

some_object = session.get(VersionedFoo, (5, 10))

some_object = session.get(
    VersionedFoo,
    {"id": 5, "version_id": 10}
)

.. versionadded:: 1.4 Added :meth:_orm.Session.get, which is moved from the now legacy :meth:_orm.Query.get method.

:meth:_orm.Session.get is special in that it provides direct access to the identity map of the :class:.Session. If the given primary key identifier is present in the local identity map, the object is returned directly from this collection and no SQL is emitted, unless the object has been marked fully expired. If not present, a SELECT is performed in order to locate the object.

:meth:_orm.Session.get also will perform a check if the object is present in the identity map and marked as expired - a SELECT is emitted to refresh the object as well as to ensure that the row is still present. If not, :class:~sqlalchemy.orm.exc.ObjectDeletedError is raised.

:param entity: a mapped class or :class:.Mapper indicating the type of entity to be loaded.

:param ident: A scalar, tuple, or dictionary representing the primary key. For a composite (e.g. multiple column) primary key, a tuple or dictionary should be passed.

For a single-column primary key, the scalar calling form is typically the most expedient. If the primary key of a row is the value "5", the call looks like::

my_object = session.get(SomeClass, 5)

The tuple form contains primary key values typically in the order in which they correspond to the mapped :class:_schema.Table object's primary key columns, or if the :paramref:_orm.Mapper.primary_key configuration parameter were used, in the order used for that parameter. For example, if the primary key of a row is represented by the integer digits "5, 10" the call would look like::

 my_object = session.get(SomeClass, (5, 10))

The dictionary form should include as keys the mapped attribute names corresponding to each element of the primary key. If the mapped class has the attributes id, version_id as the attributes which store the object's primary key value, the call would look like::

my_object = session.get(SomeClass, {"id": 5, "version_id": 10})

:param options: optional sequence of loader options which will be applied to the query, if one is emitted.

:param populate_existing: causes the method to unconditionally emit a SQL query and refresh the object with the newly loaded data, regardless of whether or not the object is already present.

:param with_for_update: optional boolean True indicating FOR UPDATE should be used, or may be a dictionary containing flags to indicate a more specific set of FOR UPDATE flags for the SELECT; flags should match the parameters of :meth:_query.Query.with_for_update. Supersedes the :paramref:.Session.refresh.lockmode parameter.

:param execution_options: optional dictionary of execution options, which will be associated with the query execution if one is emitted. This dictionary can provide a subset of the options that are accepted by :meth:_engine.Connection.execution_options, and may also provide additional options understood only in an ORM context.

.. versionadded:: 1.4.29

.. seealso::

:ref:`orm_queryguide_execution_options` - ORM-specific execution
options

:param bind_arguments: dictionary of additional arguments to determine the bind. May include "mapper", "bind", or other custom arguments. Contents of this dictionary are passed to the :meth:.Session.get_bind method.

.. versionadded: 2.0.0rc1

:return: The object instance, or None.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def get(
    self,
    entity: _EntityBindKey[_O],
    ident: _PKIdentityArgument,
    *,
    options: Optional[Sequence[ORMOption]] = None,
    populate_existing: bool = False,
    with_for_update: ForUpdateParameter = None,
    identity_token: Optional[Any] = None,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
) -> Optional[_O]:
    """Return an instance based on the given primary key identifier,
    or ``None`` if not found.

    E.g.::

        my_user = session.get(User, 5)

        some_object = session.get(VersionedFoo, (5, 10))

        some_object = session.get(
            VersionedFoo,
            {"id": 5, "version_id": 10}
        )

    .. versionadded:: 1.4 Added :meth:`_orm.Session.get`, which is moved
       from the now legacy :meth:`_orm.Query.get` method.

    :meth:`_orm.Session.get` is special in that it provides direct
    access to the identity map of the :class:`.Session`.
    If the given primary key identifier is present
    in the local identity map, the object is returned
    directly from this collection and no SQL is emitted,
    unless the object has been marked fully expired.
    If not present,
    a SELECT is performed in order to locate the object.

    :meth:`_orm.Session.get` also will perform a check if
    the object is present in the identity map and
    marked as expired - a SELECT
    is emitted to refresh the object as well as to
    ensure that the row is still present.
    If not, :class:`~sqlalchemy.orm.exc.ObjectDeletedError` is raised.

    :param entity: a mapped class or :class:`.Mapper` indicating the
     type of entity to be loaded.

    :param ident: A scalar, tuple, or dictionary representing the
     primary key.  For a composite (e.g. multiple column) primary key,
     a tuple or dictionary should be passed.

     For a single-column primary key, the scalar calling form is typically
     the most expedient.  If the primary key of a row is the value "5",
     the call looks like::

        my_object = session.get(SomeClass, 5)

     The tuple form contains primary key values typically in
     the order in which they correspond to the mapped
     :class:`_schema.Table`
     object's primary key columns, or if the
     :paramref:`_orm.Mapper.primary_key` configuration parameter were
     used, in
     the order used for that parameter. For example, if the primary key
     of a row is represented by the integer
     digits "5, 10" the call would look like::

         my_object = session.get(SomeClass, (5, 10))

     The dictionary form should include as keys the mapped attribute names
     corresponding to each element of the primary key.  If the mapped class
     has the attributes ``id``, ``version_id`` as the attributes which
     store the object's primary key value, the call would look like::

        my_object = session.get(SomeClass, {"id": 5, "version_id": 10})

    :param options: optional sequence of loader options which will be
     applied to the query, if one is emitted.

    :param populate_existing: causes the method to unconditionally emit
     a SQL query and refresh the object with the newly loaded data,
     regardless of whether or not the object is already present.

    :param with_for_update: optional boolean ``True`` indicating FOR UPDATE
      should be used, or may be a dictionary containing flags to
      indicate a more specific set of FOR UPDATE flags for the SELECT;
      flags should match the parameters of
      :meth:`_query.Query.with_for_update`.
      Supersedes the :paramref:`.Session.refresh.lockmode` parameter.

    :param execution_options: optional dictionary of execution options,
     which will be associated with the query execution if one is emitted.
     This dictionary can provide a subset of the options that are
     accepted by :meth:`_engine.Connection.execution_options`, and may
     also provide additional options understood only in an ORM context.

     .. versionadded:: 1.4.29

     .. seealso::

        :ref:`orm_queryguide_execution_options` - ORM-specific execution
        options

    :param bind_arguments: dictionary of additional arguments to determine
     the bind.  May include "mapper", "bind", or other custom arguments.
     Contents of this dictionary are passed to the
     :meth:`.Session.get_bind` method.

     .. versionadded: 2.0.0rc1

    :return: The object instance, or ``None``.

    """
    return self._get_impl(
        entity,
        ident,
        loading.load_on_pk_identity,
        options=options,
        populate_existing=populate_existing,
        with_for_update=with_for_update,
        identity_token=identity_token,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
    )

get_one

get_one(
    entity: _EntityBindKey[_O],
    ident: _PKIdentityArgument,
    *,
    options: Optional[Sequence[ORMOption]] = None,
    populate_existing: bool = False,
    with_for_update: ForUpdateParameter = None,
    identity_token: Optional[Any] = None,
    execution_options: OrmExecuteOptionsParameter = EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
) -> _O

Return exactly one instance based on the given primary key identifier, or raise an exception if not found.

Raises sqlalchemy.orm.exc.NoResultFound if the query selects no rows.

For a detailed documentation of the arguments see the method :meth:.Session.get.

.. versionadded:: 2.0.22

:return: The object instance.

.. seealso::

:meth:`.Session.get` - equivalent method that instead
  returns ``None`` if no row was found with the provided primary
  key
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def get_one(
    self,
    entity: _EntityBindKey[_O],
    ident: _PKIdentityArgument,
    *,
    options: Optional[Sequence[ORMOption]] = None,
    populate_existing: bool = False,
    with_for_update: ForUpdateParameter = None,
    identity_token: Optional[Any] = None,
    execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
    bind_arguments: Optional[_BindArguments] = None,
) -> _O:
    """Return exactly one instance based on the given primary key
    identifier, or raise an exception if not found.

    Raises ``sqlalchemy.orm.exc.NoResultFound`` if the query
    selects no rows.

    For a detailed documentation of the arguments see the
    method :meth:`.Session.get`.

    .. versionadded:: 2.0.22

    :return: The object instance.

    .. seealso::

        :meth:`.Session.get` - equivalent method that instead
          returns ``None`` if no row was found with the provided primary
          key

    """

    instance = self.get(
        entity,
        ident,
        options=options,
        populate_existing=populate_existing,
        with_for_update=with_for_update,
        identity_token=identity_token,
        execution_options=execution_options,
        bind_arguments=bind_arguments,
    )

    if instance is None:
        raise sa_exc.NoResultFound(
            "No row was found when one was required"
        )

    return instance

merge

merge(
    instance: _O,
    *,
    load: bool = True,
    options: Optional[Sequence[ORMOption]] = None,
) -> _O

Copy the state of a given instance into a corresponding instance within this :class:.Session.

:meth:.Session.merge examines the primary key attributes of the source instance, and attempts to reconcile it with an instance of the same primary key in the session. If not found locally, it attempts to load the object from the database based on primary key, and if none can be located, creates a new instance. The state of each attribute on the source instance is then copied to the target instance. The resulting target instance is then returned by the method; the original source instance is left unmodified, and un-associated with the :class:.Session if not already.

This operation cascades to associated instances if the association is mapped with cascade="merge".

See :ref:unitofwork_merging for a detailed discussion of merging.

:param instance: Instance to be merged. :param load: Boolean, when False, :meth:.merge switches into a "high performance" mode which causes it to forego emitting history events as well as all database access. This flag is used for cases such as transferring graphs of objects into a :class:.Session from a second level cache, or to transfer just-loaded objects into the :class:.Session owned by a worker thread or process without re-querying the database.

The load=False use case adds the caveat that the given object has to be in a "clean" state, that is, has no pending changes to be flushed - even if the incoming object is detached from any :class:.Session. This is so that when the merge operation populates local attributes and cascades to related objects and collections, the values can be "stamped" onto the target object as is, without generating any history or attribute events, and without the need to reconcile the incoming data with any existing related objects or collections that might not be loaded. The resulting objects from load=False are always produced as "clean", so it is only appropriate that the given objects should be "clean" as well, else this suggests a mis-use of the method. :param options: optional sequence of loader options which will be applied to the :meth:_orm.Session.get method when the merge operation loads the existing version of the object from the database.

.. versionadded:: 1.4.24

.. seealso::

:func:`.make_transient_to_detached` - provides for an alternative
means of "merging" a single object into the :class:`.Session`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def merge(
    self,
    instance: _O,
    *,
    load: bool = True,
    options: Optional[Sequence[ORMOption]] = None,
) -> _O:
    """Copy the state of a given instance into a corresponding instance
    within this :class:`.Session`.

    :meth:`.Session.merge` examines the primary key attributes of the
    source instance, and attempts to reconcile it with an instance of the
    same primary key in the session.   If not found locally, it attempts
    to load the object from the database based on primary key, and if
    none can be located, creates a new instance.  The state of each
    attribute on the source instance is then copied to the target
    instance.  The resulting target instance is then returned by the
    method; the original source instance is left unmodified, and
    un-associated with the :class:`.Session` if not already.

    This operation cascades to associated instances if the association is
    mapped with ``cascade="merge"``.

    See :ref:`unitofwork_merging` for a detailed discussion of merging.

    :param instance: Instance to be merged.
    :param load: Boolean, when False, :meth:`.merge` switches into
     a "high performance" mode which causes it to forego emitting history
     events as well as all database access.  This flag is used for
     cases such as transferring graphs of objects into a :class:`.Session`
     from a second level cache, or to transfer just-loaded objects
     into the :class:`.Session` owned by a worker thread or process
     without re-querying the database.

     The ``load=False`` use case adds the caveat that the given
     object has to be in a "clean" state, that is, has no pending changes
     to be flushed - even if the incoming object is detached from any
     :class:`.Session`.   This is so that when
     the merge operation populates local attributes and
     cascades to related objects and
     collections, the values can be "stamped" onto the
     target object as is, without generating any history or attribute
     events, and without the need to reconcile the incoming data with
     any existing related objects or collections that might not
     be loaded.  The resulting objects from ``load=False`` are always
     produced as "clean", so it is only appropriate that the given objects
     should be "clean" as well, else this suggests a mis-use of the
     method.
    :param options: optional sequence of loader options which will be
     applied to the :meth:`_orm.Session.get` method when the merge
     operation loads the existing version of the object from the database.

     .. versionadded:: 1.4.24


    .. seealso::

        :func:`.make_transient_to_detached` - provides for an alternative
        means of "merging" a single object into the :class:`.Session`

    """

    if self._warn_on_events:
        self._flush_warning("Session.merge()")

    _recursive: Dict[InstanceState[Any], object] = {}
    _resolve_conflict_map: Dict[_IdentityKeyType[Any], object] = {}

    if load:
        # flush current contents if we expect to load data
        self._autoflush()

    object_mapper(instance)  # verify mapped
    autoflush = self.autoflush
    try:
        self.autoflush = False
        return self._merge(
            attributes.instance_state(instance),
            attributes.instance_dict(instance),
            load=load,
            options=options,
            _recursive=_recursive,
            _resolve_conflict_map=_resolve_conflict_map,
        )
    finally:
        self.autoflush = autoflush

enable_relationship_loading

enable_relationship_loading(obj: object) -> None

Associate an object with this :class:.Session for related object loading.

.. warning::

:meth:`.enable_relationship_loading` exists to serve special
use cases and is not recommended for general use.

Accesses of attributes mapped with :func:_orm.relationship will attempt to load a value from the database using this :class:.Session as the source of connectivity. The values will be loaded based on foreign key and primary key values present on this object - if not present, then those relationships will be unavailable.

The object will be attached to this session, but will not participate in any persistence operations; its state for almost all purposes will remain either "transient" or "detached", except for the case of relationship loading.

Also note that backrefs will often not work as expected. Altering a relationship-bound attribute on the target object may not fire off a backref event, if the effective value is what was already loaded from a foreign-key-holding value.

The :meth:.Session.enable_relationship_loading method is similar to the load_on_pending flag on :func:_orm.relationship. Unlike that flag, :meth:.Session.enable_relationship_loading allows an object to remain transient while still being able to load related items.

To make a transient object associated with a :class:.Session via :meth:.Session.enable_relationship_loading pending, add it to the :class:.Session using :meth:.Session.add normally. If the object instead represents an existing identity in the database, it should be merged using :meth:.Session.merge.

:meth:.Session.enable_relationship_loading does not improve behavior when the ORM is used normally - object references should be constructed at the object level, not at the foreign key level, so that they are present in an ordinary way before flush() proceeds. This method is not intended for general use.

.. seealso::

:paramref:`_orm.relationship.load_on_pending` - this flag
allows per-relationship loading of many-to-ones on items that
are pending.

:func:`.make_transient_to_detached` - allows for an object to
be added to a :class:`.Session` without SQL emitted, which then
will unexpire attributes on access.
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def enable_relationship_loading(self, obj: object) -> None:
    """Associate an object with this :class:`.Session` for related
    object loading.

    .. warning::

        :meth:`.enable_relationship_loading` exists to serve special
        use cases and is not recommended for general use.

    Accesses of attributes mapped with :func:`_orm.relationship`
    will attempt to load a value from the database using this
    :class:`.Session` as the source of connectivity.  The values
    will be loaded based on foreign key and primary key values
    present on this object - if not present, then those relationships
    will be unavailable.

    The object will be attached to this session, but will
    **not** participate in any persistence operations; its state
    for almost all purposes will remain either "transient" or
    "detached", except for the case of relationship loading.

    Also note that backrefs will often not work as expected.
    Altering a relationship-bound attribute on the target object
    may not fire off a backref event, if the effective value
    is what was already loaded from a foreign-key-holding value.

    The :meth:`.Session.enable_relationship_loading` method is
    similar to the ``load_on_pending`` flag on :func:`_orm.relationship`.
    Unlike that flag, :meth:`.Session.enable_relationship_loading` allows
    an object to remain transient while still being able to load
    related items.

    To make a transient object associated with a :class:`.Session`
    via :meth:`.Session.enable_relationship_loading` pending, add
    it to the :class:`.Session` using :meth:`.Session.add` normally.
    If the object instead represents an existing identity in the database,
    it should be merged using :meth:`.Session.merge`.

    :meth:`.Session.enable_relationship_loading` does not improve
    behavior when the ORM is used normally - object references should be
    constructed at the object level, not at the foreign key level, so
    that they are present in an ordinary way before flush()
    proceeds.  This method is not intended for general use.

    .. seealso::

        :paramref:`_orm.relationship.load_on_pending` - this flag
        allows per-relationship loading of many-to-ones on items that
        are pending.

        :func:`.make_transient_to_detached` - allows for an object to
        be added to a :class:`.Session` without SQL emitted, which then
        will unexpire attributes on access.

    """
    try:
        state = attributes.instance_state(obj)
    except exc.NO_STATE as err:
        raise exc.UnmappedInstanceError(obj) from err

    to_attach = self._before_attach(state, obj)
    state._load_pending = True
    if to_attach:
        self._after_attach(state, obj)

flush

flush(objects: Optional[Sequence[Any]] = None) -> None

Flush all the object changes to the database.

Writes out all pending object creations, deletions and modifications to the database as INSERTs, DELETEs, UPDATEs, etc. Operations are automatically ordered by the Session's unit of work dependency solver.

Database operations will be issued in the current transactional context and do not affect the state of the transaction, unless an error occurs, in which case the entire transaction is rolled back. You may flush() as often as you like within a transaction to move changes from Python to the database's transaction buffer.

:param objects: Optional; restricts the flush operation to operate only on elements that are in the given collection.

This feature is for an extremely narrow set of use cases where particular objects may need to be operated upon before the full flush() occurs. It is not intended for general use.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def flush(self, objects: Optional[Sequence[Any]] = None) -> None:
    """Flush all the object changes to the database.

    Writes out all pending object creations, deletions and modifications
    to the database as INSERTs, DELETEs, UPDATEs, etc.  Operations are
    automatically ordered by the Session's unit of work dependency
    solver.

    Database operations will be issued in the current transactional
    context and do not affect the state of the transaction, unless an
    error occurs, in which case the entire transaction is rolled back.
    You may flush() as often as you like within a transaction to move
    changes from Python to the database's transaction buffer.

    :param objects: Optional; restricts the flush operation to operate
      only on elements that are in the given collection.

      This feature is for an extremely narrow set of use cases where
      particular objects may need to be operated upon before the
      full flush() occurs.  It is not intended for general use.

    """

    if self._flushing:
        raise sa_exc.InvalidRequestError("Session is already flushing")

    if self._is_clean():
        return
    try:
        self._flushing = True
        self._flush(objects)
    finally:
        self._flushing = False

bulk_save_objects

bulk_save_objects(
    objects: Iterable[object],
    return_defaults: bool = False,
    update_changed_only: bool = True,
    preserve_order: bool = True,
) -> None

Perform a bulk save of the given list of objects.

.. legacy::

This method is a legacy feature as of the 2.0 series of
SQLAlchemy.   For modern bulk INSERT and UPDATE, see
the sections :ref:`orm_queryguide_bulk_insert` and
:ref:`orm_queryguide_bulk_update`.

For general INSERT and UPDATE of existing ORM mapped objects,
prefer standard :term:`unit of work` data management patterns,
introduced in the :ref:`unified_tutorial` at
:ref:`tutorial_orm_data_manipulation`.  SQLAlchemy 2.0
now uses :ref:`engine_insertmanyvalues` with modern dialects
which solves previous issues of bulk INSERT slowness.

:param objects: a sequence of mapped object instances. The mapped objects are persisted as is, and are not associated with the :class:.Session afterwards.

For each object, whether the object is sent as an INSERT or an UPDATE is dependent on the same rules used by the :class:.Session in traditional operation; if the object has the :attr:.InstanceState.key attribute set, then the object is assumed to be "detached" and will result in an UPDATE. Otherwise, an INSERT is used.

In the case of an UPDATE, statements are grouped based on which attributes have changed, and are thus to be the subject of each SET clause. If update_changed_only is False, then all attributes present within each object are applied to the UPDATE statement, which may help in allowing the statements to be grouped together into a larger executemany(), and will also reduce the overhead of checking history on attributes.

:param return_defaults: when True, rows that are missing values which generate defaults, namely integer primary key defaults and sequences, will be inserted one at a time, so that the primary key value is available. In particular this will allow joined-inheritance and other multi-table mappings to insert correctly without the need to provide primary key values ahead of time; however, :paramref:.Session.bulk_save_objects.return_defaults greatly reduces the performance gains of the method overall. It is strongly advised to please use the standard :meth:_orm.Session.add_all approach.

:param update_changed_only: when True, UPDATE statements are rendered based on those attributes in each state that have logged changes. When False, all attributes present are rendered into the SET clause with the exception of primary key attributes.

:param preserve_order: when True, the order of inserts and updates matches exactly the order in which the objects are given. When False, common types of objects are grouped into inserts and updates, to allow for more batching opportunities.

.. seealso::

:doc:`queryguide/dml`

:meth:`.Session.bulk_insert_mappings`

:meth:`.Session.bulk_update_mappings`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def bulk_save_objects(
    self,
    objects: Iterable[object],
    return_defaults: bool = False,
    update_changed_only: bool = True,
    preserve_order: bool = True,
) -> None:
    """Perform a bulk save of the given list of objects.

    .. legacy::

        This method is a legacy feature as of the 2.0 series of
        SQLAlchemy.   For modern bulk INSERT and UPDATE, see
        the sections :ref:`orm_queryguide_bulk_insert` and
        :ref:`orm_queryguide_bulk_update`.

        For general INSERT and UPDATE of existing ORM mapped objects,
        prefer standard :term:`unit of work` data management patterns,
        introduced in the :ref:`unified_tutorial` at
        :ref:`tutorial_orm_data_manipulation`.  SQLAlchemy 2.0
        now uses :ref:`engine_insertmanyvalues` with modern dialects
        which solves previous issues of bulk INSERT slowness.

    :param objects: a sequence of mapped object instances.  The mapped
     objects are persisted as is, and are **not** associated with the
     :class:`.Session` afterwards.

     For each object, whether the object is sent as an INSERT or an
     UPDATE is dependent on the same rules used by the :class:`.Session`
     in traditional operation; if the object has the
     :attr:`.InstanceState.key`
     attribute set, then the object is assumed to be "detached" and
     will result in an UPDATE.  Otherwise, an INSERT is used.

     In the case of an UPDATE, statements are grouped based on which
     attributes have changed, and are thus to be the subject of each
     SET clause.  If ``update_changed_only`` is False, then all
     attributes present within each object are applied to the UPDATE
     statement, which may help in allowing the statements to be grouped
     together into a larger executemany(), and will also reduce the
     overhead of checking history on attributes.

    :param return_defaults: when True, rows that are missing values which
     generate defaults, namely integer primary key defaults and sequences,
     will be inserted **one at a time**, so that the primary key value
     is available.  In particular this will allow joined-inheritance
     and other multi-table mappings to insert correctly without the need
     to provide primary key values ahead of time; however,
     :paramref:`.Session.bulk_save_objects.return_defaults` **greatly
     reduces the performance gains** of the method overall.  It is strongly
     advised to please use the standard :meth:`_orm.Session.add_all`
     approach.

    :param update_changed_only: when True, UPDATE statements are rendered
     based on those attributes in each state that have logged changes.
     When False, all attributes present are rendered into the SET clause
     with the exception of primary key attributes.

    :param preserve_order: when True, the order of inserts and updates
     matches exactly the order in which the objects are given.   When
     False, common types of objects are grouped into inserts
     and updates, to allow for more batching opportunities.

    .. seealso::

        :doc:`queryguide/dml`

        :meth:`.Session.bulk_insert_mappings`

        :meth:`.Session.bulk_update_mappings`

    """

    obj_states: Iterable[InstanceState[Any]]

    obj_states = (attributes.instance_state(obj) for obj in objects)

    if not preserve_order:
        # the purpose of this sort is just so that common mappers
        # and persistence states are grouped together, so that groupby
        # will return a single group for a particular type of mapper.
        # it's not trying to be deterministic beyond that.
        obj_states = sorted(
            obj_states,
            key=lambda state: (id(state.mapper), state.key is not None),
        )

    def grouping_key(
        state: InstanceState[_O],
    ) -> Tuple[Mapper[_O], bool]:
        return (state.mapper, state.key is not None)

    for (mapper, isupdate), states in itertools.groupby(
        obj_states, grouping_key
    ):
        self._bulk_save_mappings(
            mapper,
            states,
            isupdate,
            True,
            return_defaults,
            update_changed_only,
            False,
        )

bulk_insert_mappings

bulk_insert_mappings(
    mapper: Mapper[Any],
    mappings: Iterable[Dict[str, Any]],
    return_defaults: bool = False,
    render_nulls: bool = False,
) -> None

Perform a bulk insert of the given list of mapping dictionaries.

.. legacy::

This method is a legacy feature as of the 2.0 series of
SQLAlchemy.   For modern bulk INSERT and UPDATE, see
the sections :ref:`orm_queryguide_bulk_insert` and
:ref:`orm_queryguide_bulk_update`.  The 2.0 API shares
implementation details with this method and adds new features
as well.

:param mapper: a mapped class, or the actual :class:_orm.Mapper object, representing the single kind of object represented within the mapping list.

:param mappings: a sequence of dictionaries, each one containing the state of the mapped row to be inserted, in terms of the attribute names on the mapped class. If the mapping refers to multiple tables, such as a joined-inheritance mapping, each dictionary must contain all keys to be populated into all tables.

:param return_defaults: when True, the INSERT process will be altered to ensure that newly generated primary key values will be fetched. The rationale for this parameter is typically to enable :ref:Joined Table Inheritance <joined_inheritance> mappings to be bulk inserted.

.. note:: for backends that don't support RETURNING, the :paramref:_orm.Session.bulk_insert_mappings.return_defaults parameter can significantly decrease performance as INSERT statements can no longer be batched. See :ref:engine_insertmanyvalues for background on which backends are affected.

:param render_nulls: When True, a value of None will result in a NULL value being included in the INSERT statement, rather than the column being omitted from the INSERT. This allows all the rows being INSERTed to have the identical set of columns which allows the full set of rows to be batched to the DBAPI. Normally, each column-set that contains a different combination of NULL values than the previous row must omit a different series of columns from the rendered INSERT statement, which means it must be emitted as a separate statement. By passing this flag, the full set of rows are guaranteed to be batchable into one batch; the cost however is that server-side defaults which are invoked by an omitted column will be skipped, so care must be taken to ensure that these are not necessary.

.. warning::

When this flag is set, **server side default SQL values will
not be invoked** for those columns that are inserted as NULL;
the NULL value will be sent explicitly.   Care must be taken
to ensure that no server-side default functions need to be
invoked for the operation as a whole.

.. seealso::

:doc:`queryguide/dml`

:meth:`.Session.bulk_save_objects`

:meth:`.Session.bulk_update_mappings`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def bulk_insert_mappings(
    self,
    mapper: Mapper[Any],
    mappings: Iterable[Dict[str, Any]],
    return_defaults: bool = False,
    render_nulls: bool = False,
) -> None:
    """Perform a bulk insert of the given list of mapping dictionaries.

    .. legacy::

        This method is a legacy feature as of the 2.0 series of
        SQLAlchemy.   For modern bulk INSERT and UPDATE, see
        the sections :ref:`orm_queryguide_bulk_insert` and
        :ref:`orm_queryguide_bulk_update`.  The 2.0 API shares
        implementation details with this method and adds new features
        as well.

    :param mapper: a mapped class, or the actual :class:`_orm.Mapper`
     object,
     representing the single kind of object represented within the mapping
     list.

    :param mappings: a sequence of dictionaries, each one containing the
     state of the mapped row to be inserted, in terms of the attribute
     names on the mapped class.   If the mapping refers to multiple tables,
     such as a joined-inheritance mapping, each dictionary must contain all
     keys to be populated into all tables.

    :param return_defaults: when True, the INSERT process will be altered
     to ensure that newly generated primary key values will be fetched.
     The rationale for this parameter is typically to enable
     :ref:`Joined Table Inheritance <joined_inheritance>` mappings to
     be bulk inserted.

     .. note:: for backends that don't support RETURNING, the
        :paramref:`_orm.Session.bulk_insert_mappings.return_defaults`
        parameter can significantly decrease performance as INSERT
        statements can no longer be batched.   See
        :ref:`engine_insertmanyvalues`
        for background on which backends are affected.

    :param render_nulls: When True, a value of ``None`` will result
     in a NULL value being included in the INSERT statement, rather
     than the column being omitted from the INSERT.   This allows all
     the rows being INSERTed to have the identical set of columns which
     allows the full set of rows to be batched to the DBAPI.  Normally,
     each column-set that contains a different combination of NULL values
     than the previous row must omit a different series of columns from
     the rendered INSERT statement, which means it must be emitted as a
     separate statement.   By passing this flag, the full set of rows
     are guaranteed to be batchable into one batch; the cost however is
     that server-side defaults which are invoked by an omitted column will
     be skipped, so care must be taken to ensure that these are not
     necessary.

     .. warning::

        When this flag is set, **server side default SQL values will
        not be invoked** for those columns that are inserted as NULL;
        the NULL value will be sent explicitly.   Care must be taken
        to ensure that no server-side default functions need to be
        invoked for the operation as a whole.

    .. seealso::

        :doc:`queryguide/dml`

        :meth:`.Session.bulk_save_objects`

        :meth:`.Session.bulk_update_mappings`

    """
    self._bulk_save_mappings(
        mapper,
        mappings,
        False,
        False,
        return_defaults,
        False,
        render_nulls,
    )

bulk_update_mappings

bulk_update_mappings(
    mapper: Mapper[Any], mappings: Iterable[Dict[str, Any]]
) -> None

Perform a bulk update of the given list of mapping dictionaries.

.. legacy::

This method is a legacy feature as of the 2.0 series of
SQLAlchemy.   For modern bulk INSERT and UPDATE, see
the sections :ref:`orm_queryguide_bulk_insert` and
:ref:`orm_queryguide_bulk_update`.  The 2.0 API shares
implementation details with this method and adds new features
as well.

:param mapper: a mapped class, or the actual :class:_orm.Mapper object, representing the single kind of object represented within the mapping list.

:param mappings: a sequence of dictionaries, each one containing the state of the mapped row to be updated, in terms of the attribute names on the mapped class. If the mapping refers to multiple tables, such as a joined-inheritance mapping, each dictionary may contain keys corresponding to all tables. All those keys which are present and are not part of the primary key are applied to the SET clause of the UPDATE statement; the primary key values, which are required, are applied to the WHERE clause.

.. seealso::

:doc:`queryguide/dml`

:meth:`.Session.bulk_insert_mappings`

:meth:`.Session.bulk_save_objects`
Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def bulk_update_mappings(
    self, mapper: Mapper[Any], mappings: Iterable[Dict[str, Any]]
) -> None:
    """Perform a bulk update of the given list of mapping dictionaries.

    .. legacy::

        This method is a legacy feature as of the 2.0 series of
        SQLAlchemy.   For modern bulk INSERT and UPDATE, see
        the sections :ref:`orm_queryguide_bulk_insert` and
        :ref:`orm_queryguide_bulk_update`.  The 2.0 API shares
        implementation details with this method and adds new features
        as well.

    :param mapper: a mapped class, or the actual :class:`_orm.Mapper`
     object,
     representing the single kind of object represented within the mapping
     list.

    :param mappings: a sequence of dictionaries, each one containing the
     state of the mapped row to be updated, in terms of the attribute names
     on the mapped class.   If the mapping refers to multiple tables, such
     as a joined-inheritance mapping, each dictionary may contain keys
     corresponding to all tables.   All those keys which are present and
     are not part of the primary key are applied to the SET clause of the
     UPDATE statement; the primary key values, which are required, are
     applied to the WHERE clause.


    .. seealso::

        :doc:`queryguide/dml`

        :meth:`.Session.bulk_insert_mappings`

        :meth:`.Session.bulk_save_objects`

    """
    self._bulk_save_mappings(
        mapper, mappings, True, False, False, False, False
    )

is_modified

is_modified(
    instance: object, include_collections: bool = True
) -> bool

Return True if the given instance has locally modified attributes.

This method retrieves the history for each instrumented attribute on the instance and performs a comparison of the current value to its previously committed value, if any.

It is in effect a more expensive and accurate version of checking for the given instance in the :attr:.Session.dirty collection; a full test for each attribute's net "dirty" status is performed.

E.g.::

return session.is_modified(someobject)

A few caveats to this method apply:

  • Instances present in the :attr:.Session.dirty collection may report False when tested with this method. This is because the object may have received change events via attribute mutation, thus placing it in :attr:.Session.dirty, but ultimately the state is the same as that loaded from the database, resulting in no net change here.
  • Scalar attributes may not have recorded the previously set value when a new value was applied, if the attribute was not loaded, or was expired, at the time the new value was received - in these cases, the attribute is assumed to have a change, even if there is ultimately no net change against its database value. SQLAlchemy in most cases does not need the "old" value when a set event occurs, so it skips the expense of a SQL call if the old value isn't present, based on the assumption that an UPDATE of the scalar value is usually needed, and in those few cases where it isn't, is less expensive on average than issuing a defensive SELECT.

The "old" value is fetched unconditionally upon set only if the attribute container has the active_history flag set to True. This flag is set typically for primary key attributes and scalar object references that are not a simple many-to-one. To set this flag for any arbitrary mapped column, use the active_history argument with :func:.column_property.

:param instance: mapped instance to be tested for pending changes. :param include_collections: Indicates if multivalued collections should be included in the operation. Setting this to False is a way to detect only local-column based properties (i.e. scalar columns or many-to-one foreign keys) that would result in an UPDATE for this instance upon flush.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py
def is_modified(
    self, instance: object, include_collections: bool = True
) -> bool:
    r"""Return ``True`` if the given instance has locally
    modified attributes.

    This method retrieves the history for each instrumented
    attribute on the instance and performs a comparison of the current
    value to its previously committed value, if any.

    It is in effect a more expensive and accurate
    version of checking for the given instance in the
    :attr:`.Session.dirty` collection; a full test for
    each attribute's net "dirty" status is performed.

    E.g.::

        return session.is_modified(someobject)

    A few caveats to this method apply:

    * Instances present in the :attr:`.Session.dirty` collection may
      report ``False`` when tested with this method.  This is because
      the object may have received change events via attribute mutation,
      thus placing it in :attr:`.Session.dirty`, but ultimately the state
      is the same as that loaded from the database, resulting in no net
      change here.
    * Scalar attributes may not have recorded the previously set
      value when a new value was applied, if the attribute was not loaded,
      or was expired, at the time the new value was received - in these
      cases, the attribute is assumed to have a change, even if there is
      ultimately no net change against its database value. SQLAlchemy in
      most cases does not need the "old" value when a set event occurs, so
      it skips the expense of a SQL call if the old value isn't present,
      based on the assumption that an UPDATE of the scalar value is
      usually needed, and in those few cases where it isn't, is less
      expensive on average than issuing a defensive SELECT.

      The "old" value is fetched unconditionally upon set only if the
      attribute container has the ``active_history`` flag set to ``True``.
      This flag is set typically for primary key attributes and scalar
      object references that are not a simple many-to-one.  To set this
      flag for any arbitrary mapped column, use the ``active_history``
      argument with :func:`.column_property`.

    :param instance: mapped instance to be tested for pending changes.
    :param include_collections: Indicates if multivalued collections
     should be included in the operation.  Setting this to ``False`` is a
     way to detect only local-column based properties (i.e. scalar columns
     or many-to-one foreign keys) that would result in an UPDATE for this
     instance upon flush.

    """
    state = object_state(instance)

    if not state.modified:
        return False

    dict_ = state.dict

    for attr in state.manager.attributes:
        if (
            not include_collections
            and hasattr(attr.impl, "get_collection")
        ) or not hasattr(attr.impl, "get_history"):
            continue

        (added, unchanged, deleted) = attr.impl.get_history(
            state, dict_, passive=PassiveFlag.NO_CHANGE
        )

        if added or deleted:
            return True
    else:
        return False

get_bind

get_bind(
    mapper: Mapper[_T] | type[_T] | None = None,
    *args: Any,
    **kwargs: Any,
) -> Connection | Engine

Return a bind to which this Session is bound.

Parameters:

Name Type Description Default
mapper Mapper[_T] | type[_T] | None

Optional mapped class or corresponding Mapper instance. The bind can be derived from a Mapper first by consulting the binds map associated with this Session, and secondly by consulting the MetaData associated with the Table to which the Mapper is mapped for a bind.

None

Returns:

Type Description
Connection | Engine

The Engine or Connection to which this Session is bound.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
def get_bind(
    self,
    mapper: Mapper[_T] | type[_T] | None = None,
    *args: Any,
    **kwargs: Any,
) -> Connection | Engine:
    """Return a ``bind`` to which this `Session` is bound.

    Args:
        mapper: Optional mapped class or corresponding `Mapper` instance.
            The bind can be derived from a `Mapper` first by consulting the
            ``binds`` map associated with this `Session`, and secondly by
            consulting the `MetaData` associated with the `Table` to which
            the `Mapper` is mapped for a bind.

    Returns:
        The `Engine` or `Connection` to which this `Session` is bound.
    """
    # Handle custom routing logic based on the router attribute
    if self.bind is not None:
        return self.bind
    if self.routing is not None:
        # Retrieve resource type
        resource = mapper.class_ if isinstance(mapper, Mapper) else mapper
        # Check if the session is flushing (i.e. committing)
        if self._flushing:
            # Use the write engine for flushing operations
            engine = self.routing.get_write_engine(
                resource, async_mode=self.async_mode, **kwargs
            )
        else:
            # Use the read engine for all other operations
            engine = self.routing.get_read_engine(
                resource, async_mode=self.async_mode, **kwargs
            )
        # Try the default engine if no suggestion is made
        if engine is None:
            engine = self.routing.get_engine(
                resource, async_mode=self.async_mode, **kwargs
            )
        # Finally return the engine if a suggestion is made
        if engine is not None:
            if isinstance(engine, AsyncEngine):
                return engine.sync_engine
            return engine
    # Fallback to default behavior
    return super().get_bind(mapper, *args, **kwargs)

bulk

bulk(
    *, proxy_reference: bool = True
) -> Generator[SessionBulk, None, None]

A session bulk context manager for Session objects.

The proxy option indicates that the provided resource references should be encapsulated with a proxy, this is done when validating the resource using the Pydantic core schema. This can be useful to resolve the references that target the same resource into a single instance. Thus, modifying a resolved instance will affect all references that target the same resource.

Parameters:

Name Type Description Default
proxy_reference bool

Whether the registered resource references should be encapsulated with a proxy or not. Defaults to True.

True

Returns:

Type Description
None

A SessionBulk instance.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
@contextmanager
def bulk(
    self, *, proxy_reference: bool = True,
) -> Generator['SessionBulk', None, None]:
    """A session bulk context manager for `Session` objects.

    The proxy option indicates that the provided resource references should
    be encapsulated with a proxy, this is done when validating the resource
    using the Pydantic core schema. This can be useful to resolve the
    references that target the same resource into a single instance. Thus,
    modifying a resolved instance will affect all references that target
    the same resource.

    Args:
        proxy_reference: Whether the registered resource references should
            be encapsulated with a proxy or not. Defaults to ``True``.

    Returns:
        A `SessionBulk` instance.
    """
    bulk = SessionBulk(self, proxy_reference=proxy_reference)
    token = SESSION_BULK_CONTEXT.set(bulk)
    try:
        yield bulk
    finally:
        SESSION_BULK_CONTEXT.reset(token)

SessionBulk

SessionBulk(session: _T, *, proxy_reference: bool = True)

Bases: Bulk[Session]

A bulk operation manager for resources.

It is used to register resources for bulk operations and commit or rollback them in a single operation within a session.

Initialize the session bulk manager.

The proxy option indicates that the provided resource references should be encapsulated with a proxy, this is done when validating the resource using the Pydantic core schema. This can be useful to resolve the references that target the same resource into a single instance. Thus, modifying a resolved instance will affect all references that target the same resource.

Parameters:

Name Type Description Default
session _T

The session to use for the bulk operations.

required
proxy_reference bool

Whether the registered resource references should be encapsulated with a proxy or not. Defaults to True.

True
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def __init__(self, session: _T, *, proxy_reference: bool = True):
    """Initialize the session bulk manager.

    The proxy option indicates that the provided resource references should
    be encapsulated with a proxy, this is done when validating the resource
    using the Pydantic core schema. This can be useful to resolve the
    references that target the same resource into a single instance. Thus,
    modifying a resolved instance will affect all references that target
    the same resource.

    Args:
        session: The session to use for the bulk operations.
        proxy_reference: Whether the registered resource references should
            be encapsulated with a proxy or not. Defaults to ``True``.
    """
    self._lock = Lock()

    self.session = session
    self.proxy_reference = proxy_reference

    self.resolved = {}
    self.unresolved = {}

add

add(
    instance: BaseResource | BaseSpec,
    *,
    is_reference: bool = False,
) -> None

Add a resource instance to the bulk manager.

Parameters:

Name Type Description Default
instance BaseResource | BaseSpec

The resource instance to add to the bulk manager.

required
is_reference bool

Whether the provided resource instance is a reference or not. Defaults to False.

False
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def add(
    self,
    instance: 'BaseResource | BaseSpec',
    *,
    is_reference: bool = False,
) -> None:
    """Add a resource instance to the bulk manager.

    Args:
        instance: The resource instance to add to the bulk manager.
        is_reference: Whether the provided resource instance is a reference
            or not. Defaults to ``False``.
    """
    # Resolve instance entity class
    if is_proxy(instance):
        proxy = instance.__proxy__()
        entity = proxy.__class__
    else:
        proxy = None
        entity = instance.__class__

    # Create instance entry
    entry = BulkEntry(
        instance,  # type: ignore
        is_proxy=proxy is not None,
        is_reference=is_reference,
    )

    # Register instance entry
    with self._lock:
        if entity not in self.resolved:
            self.resolved[entity] = set()
        if entity not in self.unresolved:
            self.unresolved[entity] = set()
        self.unresolved[entity].add(entry)

get

get(
    entity: ResourceType | SpecType,
    *,
    resolved: bool | None = None,
    scope: Literal["all", "references", "values"] = "all",
) -> list[BaseResource]

Get the resource instances registered in the bulk manager.

Parameters:

Name Type Description Default
entity ResourceType | SpecType

The resource type to get the resource instances for.

required
resolved bool | None

Whether to get only the resolved resources or not: - True: Get only the resolved resources. - False: Get only the unresolved resources. - None: Get all the resources. Defaults to None.

None
scope Literal['all', 'references', 'values']

The scope of the resource instances to get: - 'all': Get all the resource instances. - 'references': Get only the resource reference instances. - 'values': Get only the resource value instances. Defaults to 'all'.

'all'

Returns:

Type Description
list[BaseResource]

The list of resource instances registered in the bulk manager for

list[BaseResource]

the specified resource type and options.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def get(
    self,
    entity: 'ResourceType | SpecType',
    *,
    resolved: bool | None = None,
    scope: Literal['all', 'references', 'values'] = 'all',
) -> list['BaseResource']:
    """Get the resource instances registered in the bulk manager.

    Args:
        entity: The resource type to get the resource instances for.
        resolved: Whether to get only the resolved resources or not:
            - ``True``: Get only the resolved resources.
            - ``False``: Get only the unresolved resources.
            - ``None``: Get all the resources.
            Defaults to ``None``.
        scope: The scope of the resource instances to get:
            - ``'all'``: Get all the resource instances.
            - ``'references'``: Get only the resource reference instances.
            - ``'values'``: Get only the resource value instances.
            Defaults to ``'all'``.

    Returns:
        The list of resource instances registered in the bulk manager for
        the specified resource type and options.
    """
    entries: set[BulkEntry] = set()

    # Retrieve entries
    if resolved is None or resolved:
        entries.update(self.resolved.get(entity, set()))  # type: ignore
    if resolved is None or not resolved:
        entries.update(self.unresolved.get(entity, set()))  # type: ignore

    # Retrieve instances by scope
    if scope == 'references':
        return [
            entry.instance for entry in entries
            if entry.is_reference
        ]
    elif scope == 'values':
        return [
            entry.instance for entry in entries
            if not entry.is_reference
        ]

    return [entry.instance for entry in entries]

resolve

resolve(
    *,
    raise_errors: bool = True,
    scope: Literal["all", "references", "values"] = "all",
    strategy: Literal["bind", "hydrate"] = "bind",
) -> None

Resolve the specified scope of resource entries in the bulk.

For resource references, if the proxy_reference option is enabled, the resolved instances replace the reference proxy targets. Otherwise, the resolved instances are used to update the reference proxy target.

For resource values, the resolved instances are used to update the resource instances with the fetched data, no proxy is used.

Parameters:

Name Type Description Default
raise_errors bool

Whether to raise errors when failing to resolve resource references or values. Defaults to True.

True
scope Literal['all', 'references', 'values']

The scope of the resources to resolve, it can be either: - 'all': Resolve both the resource references and values. - 'references': Resolve only the resource references. - 'values': Resolve only the resource values. Defaults to 'all'.

'all'
strategy Literal['bind', 'hydrate']

The resolution strategy to use, it can be either: - 'bind': Bind the resolved resource references and values to the database session, i.e. only the resolved id and type_ fields are updated and the instances are made transient to detached. - 'hydrate': Hydrate the resolved resource references and values with the data fetched from the database, i.e. the instances are updated with the fetched data. Defaults to 'bind'.

'bind'
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
def resolve(
    self,
    *,
    raise_errors: bool = True,
    scope: Literal['all', 'references', 'values'] = 'all',
    strategy: Literal['bind', 'hydrate'] = 'bind',
) -> None:
    """Resolve the specified scope of resource entries in the bulk.

    For resource references, if the `proxy_reference` option is enabled,
    the resolved instances replace the reference proxy targets. Otherwise,
    the resolved instances are used to update the reference proxy target.

    For resource values, the resolved instances are used to update the
    resource instances with the fetched data, no proxy is used.

    Args:
        raise_errors: Whether to raise errors when failing to resolve
            resource references or values. Defaults to ``True``.
        scope: The scope of the resources to resolve, it can be either:
            - ``'all'``: Resolve both the resource references and values.
            - ``'references'``: Resolve only the resource references.
            - ``'values'``: Resolve only the resource values.
            Defaults to ``'all'``.
        strategy: The resolution strategy to use, it can be either:
            - ``'bind'``: Bind the resolved resource references and values
                to the database session, i.e. only the resolved `id` and
                `type_` fields are updated and the instances are made
                transient to detached.
            - ``'hydrate'``: Hydrate the resolved resource references and
                values with the data fetched from the database, i.e. the
                instances are updated with the fetched data.
            Defaults to ``'bind'``.
    """
    resolver = self._resolver(
        raise_errors=raise_errors, scope=scope, strategy=strategy
    )

    try:
        context = next(resolver)
        while True:
            fields, statement = context
            result = self.session.execute(statement)
            context = resolver.send((fields, result))
    except StopIteration:
        pass

session_factory

session_factory(
    bind: Connection | Engine | None = None,
    routing: DatabaseManager | None = None,
    scoped: bool = False,
    *args: Any,
    **kwargs: Any,
) -> SessionFactory

Create a sync session factory for Session objects.

Parameters:

Name Type Description Default
bind Connection | Engine | None

An optional Engine or Connection to which this Session should be bound. When specified, all SQL operations performed by this session will execute via this connectable. Defaults to None.

None
routing DatabaseManager | None

An optional DatabaseManager instance to use for the database routing. Defaults to None.

None
scoped bool

Whether to use a scoped session or not. The scoped session factory ensures that the session is thread-safe. Defaults to False.

False

Returns:

Type Description
SessionFactory

A SessionFactory instance for creating Session objects.

Note

All other keyword arguments are passed to the constructor of the parent SQLAlchemy sessionmaker class.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
def session_factory(
    bind: Connection | Engine | None = None,
    routing: DatabaseManager | None = None,
    scoped: bool = False,
    *args: Any,
    **kwargs: Any,
) -> SessionFactory:
    """Create a sync session factory for `Session` objects.

    Args:
        bind: An optional `Engine` or `Connection` to which this `Session`
            should be bound. When specified, all SQL operations performed by
            this session will execute via this connectable.
            Defaults to ``None``.
        routing: An optional `DatabaseManager` instance to use for the database
            routing. Defaults to ``None``.
        scoped: Whether to use a scoped session or not. The scoped session
            factory ensures that the session is thread-safe.
            Defaults to ``False``.

    Returns:
        A `SessionFactory` instance for creating `Session` objects.

    Note:
        All other keyword arguments are passed to the constructor of the parent
        SQLAlchemy `sessionmaker` class.
    """
    # Build session factory
    factory = _sessionmaker(
        bind,
        *args,
        class_=Session,
        routing=routing,
        **kwargs,
    )
    # Wrap the factory in a scoped session manager if requested. The scoped
    # session factory ensures that the session is thread-safe.
    return _scoped_session(factory) if scoped else factory

session_manager

session_manager(
    *,
    using: SessionFactory | None = None,
    auto_commit: bool = False,
    new: bool = False,
    on_missing: Literal["create", "raise"] = "create",
) -> Generator[Session, None, None]

A sync session context manager for Session objects.

Provide a transactional session context around a series of operations. It manages the session lifecycle and commits or rollbacks the session automatically based on the context.

Parameters:

Name Type Description Default
using SessionFactory | None

An optional SessionFactory instance to use instead of the application context factory. Defaults to None.

None
auto_commit bool

Whether to automatically commit on success or rollback on failure after the operation completes. Defaults to False.

False
new bool

Whether to create a new session or not. If set to True, a new session is created. If set to False, the current session is used if available, otherwise it follows the on_missing behavior. Defaults to False.

False
on_missing Literal['create', 'raise']

The behavior to follow when no current session is available, either to create a new session or raise an error. Defaults to 'create'.

'create'

Returns:

Type Description
None

A Session instance.

Note

If neither an application nor a factory are provided and no current session is available, the manager will look for a session factory in the current context.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/sessions.py
@contextmanager
def session_manager(
    *,
    using: SessionFactory | None = None,
    auto_commit: bool = False,
    new: bool = False,
    on_missing: Literal['create', 'raise'] = 'create',
) -> Generator[Session, None, None]:
    """A sync session context manager for `Session` objects.

    Provide a transactional session context around a series of operations. It
    manages the session lifecycle and commits or rollbacks the session
    automatically based on the context.

    Args:
        using: An optional `SessionFactory` instance to use instead of the
            application context factory. Defaults to ``None``.
        auto_commit: Whether to automatically commit on success or rollback on
            failure after the operation completes. Defaults to ``False``.
        new: Whether to create a new session or not. If set to ``True``, a new
            session is created. If set to ``False``, the current session is
            used if available, otherwise it follows the `on_missing` behavior.
            Defaults to ``False``.
        on_missing: The behavior to follow when no current session is
            available, either to create a new session or raise an error.
            Defaults to ``'create'``.

    Returns:
        A `Session` instance.

    Note:
        If neither an application nor a factory are provided and no current
        session is available, the manager will look for a session factory in
        the current context.
    """
    session = SESSION_CONTEXT.get()

    # Retrieve session
    if new is False:
        if session is not None:
            if isinstance(session, Session):
                yield session
                return
            raise RuntimeError(
                f"Invalid session type found in the current context. Expected "
                f"a `Session` instance but found {type(session)!r}."
            )
        elif on_missing == 'raise':
            raise RuntimeError(
                f"No session available in the current context where creating "
                f"a new session is not allowed `on_missing='raise'`."
            )

    # Retrieve factory
    factory: SessionFactory | None
    if using is not None:
        factory = using
    else:
        app = PLATEFORME_CONTEXT.get()
        if app is not None:
            factory = app.session
    # Check factory
    if factory is None:
        raise SessionError(
            "No session factory available in the current context."
        )

    # Retrieve sync session
    session = factory()
    token = SESSION_CONTEXT.set(session)

    # Execute operation
    try:
        yield session
    except Exception as error:
        if auto_commit:
            session.rollback()
        raise DatabaseError(
            "An error occurred while executing a database operation."
        ) from error
    else:
        if auto_commit:
            session.commit()
    finally:
        SESSION_CONTEXT.reset(token)
        if isinstance(factory, _scoped_session):
            factory.remove()
        else:
            session.close()