Skip to content

Bulk

plateforme.core.database.bulk

This module provides utilities for managing database sessions bulk within the Plateforme framework using SQLAlchemy features.

BulkData module-attribute

BulkData = dict['ResourceType', set['BulkEntry']]

A type alias for bulk data used in the bulk manager.

BulkHash module-attribute

BulkHash = int

A type alias for a bulk hash used in query conditions.

BulkSignature module-attribute

BulkSignature = frozenset[str]

A type alias for bulk fields signature used in query conditions.

BulkValue module-attribute

BulkValue = dict[str, Any]

A type alias for a bulk value used in query conditions.

BulkConditions module-attribute

BulkConditions = dict[BulkSignature, list[BulkValue]]

A type alias that stores bulk conditions for the resolution.

BulkMap module-attribute

BulkMap = dict[BulkHash, list['BulkEntry']]

A type alias that maps bulk entries to their hash values.

BulkQuery module-attribute

BulkQuery = tuple[BulkSignature, Select[Any]]

A type alias for a bulk query used for the resolution.

BulkProcess module-attribute

BulkProcess = list[BulkQuery]

A type alias for a bulk process used for the resolution.

BulkResult module-attribute

BulkResult = tuple[BulkSignature, Result[Any]]

A type alias for a bulk result used for the resolution.

BulkEntry dataclass

BulkEntry(
    *,
    instance: BaseResource,
    is_proxy: bool = False,
    is_reference: bool = False,
)

A metadata class for bulk resource entries.

instance class-attribute instance-attribute

instance: BaseResource = field(kw_only=False)

The bulk entry resource instance.

is_proxy class-attribute instance-attribute

is_proxy: bool = field(default=False)

Whether the bulk entry resource instance is a proxy or not.

is_reference class-attribute instance-attribute

is_reference: bool = field(default=False)

Whether the bulk entry resource instance is a reference or not.

Bulk

Bulk(session: _T, *, proxy_reference: bool = True)

Bases: ABC, Generic[_T]

A bulk registry and operation manager for resources.

It is used to register resources for bulk operations and commit or rollback them in a streamlined manner. The bulk manager can be used to resolve resource references and update the resource instances with the resolved identifiers.

Attributes:

Name Type Description
session _T

The async or sync session used for the bulk operations.

proxy_reference bool

Whether resource references should be encapsulated with a proxy or not.

resolved BulkData

The resolved resource entries in the bulk.

unresolved BulkData

The unresolved resource entries in the bulk.

Initialize the session bulk manager.

The proxy option indicates that the provided resource references should be encapsulated with a proxy, this is done when validating the resource using the Pydantic core schema. This can be useful to resolve the references that target the same resource into a single instance. Thus, modifying a resolved instance will affect all references that target the same resource.

Parameters:

Name Type Description Default
session _T

The session to use for the bulk operations.

required
proxy_reference bool

Whether the registered resource references should be encapsulated with a proxy or not. Defaults to True.

True
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def __init__(self, session: _T, *, proxy_reference: bool = True):
    """Initialize the session bulk manager.

    The proxy option indicates that the provided resource references should
    be encapsulated with a proxy, this is done when validating the resource
    using the Pydantic core schema. This can be useful to resolve the
    references that target the same resource into a single instance. Thus,
    modifying a resolved instance will affect all references that target
    the same resource.

    Args:
        session: The session to use for the bulk operations.
        proxy_reference: Whether the registered resource references should
            be encapsulated with a proxy or not. Defaults to ``True``.
    """
    self._lock = Lock()

    self.session = session
    self.proxy_reference = proxy_reference

    self.resolved = {}
    self.unresolved = {}

add

add(
    instance: BaseResource | BaseSpec,
    *,
    is_reference: bool = False,
) -> None

Add a resource instance to the bulk manager.

Parameters:

Name Type Description Default
instance BaseResource | BaseSpec

The resource instance to add to the bulk manager.

required
is_reference bool

Whether the provided resource instance is a reference or not. Defaults to False.

False
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def add(
    self,
    instance: 'BaseResource | BaseSpec',
    *,
    is_reference: bool = False,
) -> None:
    """Add a resource instance to the bulk manager.

    Args:
        instance: The resource instance to add to the bulk manager.
        is_reference: Whether the provided resource instance is a reference
            or not. Defaults to ``False``.
    """
    # Resolve instance entity class
    if is_proxy(instance):
        proxy = instance.__proxy__()
        entity = proxy.__class__
    else:
        proxy = None
        entity = instance.__class__

    # Create instance entry
    entry = BulkEntry(
        instance,  # type: ignore
        is_proxy=proxy is not None,
        is_reference=is_reference,
    )

    # Register instance entry
    with self._lock:
        if entity not in self.resolved:
            self.resolved[entity] = set()
        if entity not in self.unresolved:
            self.unresolved[entity] = set()
        self.unresolved[entity].add(entry)

get

get(
    entity: ResourceType | SpecType,
    *,
    resolved: bool | None = None,
    scope: Literal["all", "references", "values"] = "all",
) -> list[BaseResource]

Get the resource instances registered in the bulk manager.

Parameters:

Name Type Description Default
entity ResourceType | SpecType

The resource type to get the resource instances for.

required
resolved bool | None

Whether to get only the resolved resources or not: - True: Get only the resolved resources. - False: Get only the unresolved resources. - None: Get all the resources. Defaults to None.

None
scope Literal['all', 'references', 'values']

The scope of the resource instances to get: - 'all': Get all the resource instances. - 'references': Get only the resource reference instances. - 'values': Get only the resource value instances. Defaults to 'all'.

'all'

Returns:

Type Description
list[BaseResource]

The list of resource instances registered in the bulk manager for

list[BaseResource]

the specified resource type and options.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def get(
    self,
    entity: 'ResourceType | SpecType',
    *,
    resolved: bool | None = None,
    scope: Literal['all', 'references', 'values'] = 'all',
) -> list['BaseResource']:
    """Get the resource instances registered in the bulk manager.

    Args:
        entity: The resource type to get the resource instances for.
        resolved: Whether to get only the resolved resources or not:
            - ``True``: Get only the resolved resources.
            - ``False``: Get only the unresolved resources.
            - ``None``: Get all the resources.
            Defaults to ``None``.
        scope: The scope of the resource instances to get:
            - ``'all'``: Get all the resource instances.
            - ``'references'``: Get only the resource reference instances.
            - ``'values'``: Get only the resource value instances.
            Defaults to ``'all'``.

    Returns:
        The list of resource instances registered in the bulk manager for
        the specified resource type and options.
    """
    entries: set[BulkEntry] = set()

    # Retrieve entries
    if resolved is None or resolved:
        entries.update(self.resolved.get(entity, set()))  # type: ignore
    if resolved is None or not resolved:
        entries.update(self.unresolved.get(entity, set()))  # type: ignore

    # Retrieve instances by scope
    if scope == 'references':
        return [
            entry.instance for entry in entries
            if entry.is_reference
        ]
    elif scope == 'values':
        return [
            entry.instance for entry in entries
            if not entry.is_reference
        ]

    return [entry.instance for entry in entries]

resolve abstractmethod

resolve(
    *,
    raise_errors: bool = True,
    scope: Literal["all", "references", "values"] = "all",
    strategy: Literal["bind", "hydrate"] = "bind",
) -> None | Awaitable[None]

Resolve the specified scope of resource entries in the bulk.

It must be implemented by the subclass either as a synchronous or asynchronous method to resolve the registered resource entries in the bulk.

For resource references, if the proxy_reference option is enabled, the resolved instances replace the reference proxy targets. Otherwise, the resolved instances are used to update the reference proxy target.

For resource values, the resolved instances are used to update the resource instances with the fetched data, no proxy is used.

Parameters:

Name Type Description Default
raise_errors bool

Whether to raise errors when failing to resolve resource references or values. Defaults to True.

True
scope Literal['all', 'references', 'values']

The scope of the resources to resolve, it can be either: - 'all': Resolve both the resource references and values. - 'references': Resolve only the resource references. - 'values': Resolve only the resource values. Defaults to 'all'.

'all'
strategy Literal['bind', 'hydrate']

The resolution strategy to use, it can be either: - 'bind': Bind the resolved resource references and values to the database session, i.e. only the resolved id and type_ fields are updated and the instances are made transient to detached. - 'hydrate': Hydrate the resolved resource references and values with the data fetched from the database, i.e. the instances are updated with the fetched data. Defaults to 'bind'.

'bind'
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
@abstractmethod
def resolve(
    self,
    *,
    raise_errors: bool = True,
    scope: Literal['all', 'references', 'values'] = 'all',
    strategy: Literal['bind', 'hydrate'] = 'bind',
) -> None | Awaitable[None]:
    """Resolve the specified scope of resource entries in the bulk.

    It must be implemented by the subclass either as a synchronous or
    asynchronous method to resolve the registered resource entries in the
    bulk.

    For resource references, if the `proxy_reference` option is enabled,
    the resolved instances replace the reference proxy targets. Otherwise,
    the resolved instances are used to update the reference proxy target.

    For resource values, the resolved instances are used to update the
    resource instances with the fetched data, no proxy is used.

    Args:
        raise_errors: Whether to raise errors when failing to resolve
            resource references or values. Defaults to ``True``.
        scope: The scope of the resources to resolve, it can be either:
            - ``'all'``: Resolve both the resource references and values.
            - ``'references'``: Resolve only the resource references.
            - ``'values'``: Resolve only the resource values.
            Defaults to ``'all'``.
        strategy: The resolution strategy to use, it can be either:
            - ``'bind'``: Bind the resolved resource references and values
                to the database session, i.e. only the resolved `id` and
                `type_` fields are updated and the instances are made
                transient to detached.
            - ``'hydrate'``: Hydrate the resolved resource references and
                values with the data fetched from the database, i.e. the
                instances are updated with the fetched data.
            Defaults to ``'bind'``.
    """
    ...

generate_hash

generate_hash(value: BulkValue) -> BulkHash

Generate a hash for the provided value.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/bulk.py
def generate_hash(value: BulkValue) -> BulkHash:
    """Generate a hash for the provided value."""
    hash_dict = {**value}

    for hash_key, hash_value in hash_dict.items():
        if not hasattr(hash_value, 'id'):
            continue
        hash_dict[hash_key] = getattr(hash_value, 'id')

    return hash(frozenset(hash_dict.items()))