Skip to content

Managers

plateforme.core.database.base

This module defines the DatabaseManager class for controlling database operations. It provides methods for managing database engine connections and routing operations using DatabaseRouter interfaces for multi-database environment scenario within the Plateforme framework.

DatabaseManager

DatabaseManager(
    urls: EngineMap,
    routers: list[DatabaseRouter] | None = None,
)

Bases: Iterable[str]

A utility class for managing database engine connections.

Initialize the engine manager.

Parameters:

Name Type Description Default
urls EngineMap

A dictionary of database engine URLs to be managed.

required
routers list[DatabaseRouter] | None

A list of database routers to be used for routing operations. Defaults to an empty list.

None
Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/base.py
def __init__(self,
    urls: EngineMap,
    routers: list[DatabaseRouter] | None = None,
) -> None:
    """Initialize the engine manager.

    Args:
        urls: A dictionary of database engine URLs to be managed.
        routers: A list of database routers to be used for routing
            operations. Defaults to an empty list.
    """
    # Initialize urls
    self.urls = urls
    # Initialize engines
    self.engines = {}
    self.async_engines = {}
    for alias, url in urls.items():
        # Unpack URL
        conn = str(url)
        scheme, address = conn.split('://')
        match = re.match(RegexPattern.ENGINE_SCHEME, scheme)
        assert match is not None
        _, dialect, driver = match.groups()
        assert isinstance(dialect, str) and isinstance(driver, str)
        # Create engines
        self.async_engines[alias] = create_async_engine(conn)
        self.engines[alias] = create_engine(f'{dialect}://{address}')

    # Initialize routers
    self.routers = routers or []

get_engine

get_engine(
    resource: type[Any] | None = None,
    async_mode: bool = False,
    operation_mode: Literal["read", "write"] | None = None,
    **kwargs: Any,
) -> AsyncEngine | Engine

Give a suggestion for the standard engine to use.

Suggest the standard engine that should be used for read and write operations on a given resource clase.

Parameters:

Name Type Description Default
resource type[Any] | None

The resource class. Defaults to None.

None
async_mode bool

Whether to return an async engine or a sync engine. Defaults to False.

False
operation_mode Literal['read', 'write'] | None

The mode of operation either read, or write. If None is provided, the standard engine will be suggested. Defaults to None.

None
**kwargs Any

Additional information to assist in selecting an engine.

{}

Returns:

Type Description
AsyncEngine | Engine

Either an async engine or a sync engine depending on the chosen

AsyncEngine | Engine

async mode. Defaults to sync engine.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/base.py
def get_engine(
    self,
    resource: type[Any] | None = None,
    async_mode: bool = False,
    operation_mode: Literal['read', 'write'] | None = None,
    **kwargs: Any
) -> AsyncEngine | Engine:
    """Give a suggestion for the standard engine to use.

    Suggest the standard engine that should be used for read and write
    operations on a given resource clase.

    Args:
        resource: The resource class. Defaults to ``None``.
        async_mode: Whether to return an async engine or a sync engine.
            Defaults to ``False``.
        operation_mode: The mode of operation either ``read``, or
            ``write``. If ``None`` is provided, the standard engine will be
            suggested. Defaults to ``None``.
        **kwargs: Additional information to assist in selecting an engine.

    Returns:
        Either an async engine or a sync engine depending on the chosen
        async mode. Defaults to sync engine.
    """
    # Retrieve the engines
    engines: dict[str, Engine | AsyncEngine] = \
        self.async_engines if async_mode else self.engines  # type: ignore
    # Check if any of the routers have a suggestion
    def suggest(
        mode: Literal['read', 'write'] | None,
    ) -> AsyncEngine | Engine | None:
        for router in self.routers:
            # Retrieve the router method
            method = getattr(
                router,
                f'get_{mode}_engine' if mode else 'get_engine',
                lambda *args, **kwargs: None
            )
            if not callable(method):
                raise PlateformeError(
                    f"Invalid router method {method!r}. Router methods "
                    f"must be callable.",
                    code='plateforme-invalid-engine',
                )
            # Retrieve the router suggestion
            if name := method(resource, **kwargs):
                if not isinstance(name, str) and name not in engines:
                    raise PlateformeError(
                        f"Invalid engine alias {name!r}. The "
                        f"{'async' if async_mode else 'sync'} engine "
                        f"alias is not defined.",
                        code='plateforme-invalid-engine',
                    )
                return engines[name]
        return None

    # Try to return the suggested engine
    if engine := suggest(operation_mode):
        return engine
    elif operation_mode and (engine := suggest(None)):
        return engine
    elif 'default' in engines:
        return engines['default']

    # Raise an exception if no suggestion was made
    raise PlateformeError(
        "No standard engine suggestion was made by any of the routers and "
        "no `default` engine was specified.",
        code='plateforme-invalid-engine',
    )

get_read_engine

get_read_engine(
    resource: type[Any] | None = None,
    async_mode: bool = False,
    **kwargs: Any,
) -> AsyncEngine | Engine

Give a suggestion for the engine to use for read operations.

Suggest the engine that should be used for read operations on a given resource clase.

Parameters:

Name Type Description Default
resource type[Any] | None

The resource class.

None
async_mode bool

Whether to return an async engine or a sync engine. Defaults to False.

False
**kwargs Any

Additional information to assist in selecting an engine.

{}

Returns:

Type Description
AsyncEngine | Engine

An engine alias if a suggestion is made, else None.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/base.py
def get_read_engine(
    self,
    resource: type[Any] | None = None,
    async_mode: bool = False,
    **kwargs: Any
) -> AsyncEngine | Engine:
    """Give a suggestion for the engine to use for read operations.

    Suggest the engine that should be used for read operations on a
    given resource clase.

    Args:
        resource: The resource class.
        async_mode: Whether to return an async engine or a sync engine.
            Defaults to ``False``.
        **kwargs: Additional information to assist in selecting an engine.

    Returns:
        An engine alias if a suggestion is made, else ``None``.
    """
    return self.get_engine(
        resource, async_mode, operation_mode='read', **kwargs
    )

get_write_engine

get_write_engine(
    resource: type[Any] | None = None,
    async_mode: bool = False,
    **kwargs: Any,
) -> AsyncEngine | Engine

Give a suggestion for the engine to use for write operations.

Suggest the engine that should be used for write operations on a given resource clase.

Parameters:

Name Type Description Default
resource type[Any] | None

The resource class.

None
async_mode bool

Whether to return an async engine or a sync engine. Defaults to False.

False
**kwargs Any

Additional information to assist in selecting an engine.

{}

Returns:

Type Description
AsyncEngine | Engine

An engine alias if a suggestion is made, else None.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/base.py
def get_write_engine(
    self,
    resource: type[Any] | None = None,
    async_mode: bool = False,
    **kwargs: Any,
) -> AsyncEngine | Engine:
    """Give a suggestion for the engine to use for write operations.

    Suggest the engine that should be used for write operations on a
    given resource clase.

    Args:
        resource: The resource class.
        async_mode: Whether to return an async engine or a sync engine.
            Defaults to ``False``.
        **kwargs: Additional information to assist in selecting an engine.

    Returns:
        An engine alias if a suggestion is made, else ``None``.
    """
    return self.get_engine(
        resource, async_mode, operation_mode='write', **kwargs
    )

is_migration_allowed

is_migration_allowed(
    engine: str, package: str, **kwargs: Any
) -> bool

Flag whether a migration operation is allowed to run.

Determine if the migration operation is allowed to run on the specified database engine alias.

Parameters:

Name Type Description Default
engine str

The database engine alias.

required
package str

The name of the package being migrated.

required
**kwargs Any

Additional information to assist in making a decision.

{}

Returns:

Type Description
bool

True if operation should run, False if not. Defaults to

bool

True.

Source code in .venv/lib/python3.12/site-packages/plateforme/core/database/base.py
def is_migration_allowed(
    self,
    engine: str,
    package: str,
    **kwargs: Any,
) -> bool:
    """Flag whether a migration operation is allowed to run.

    Determine if the migration operation is allowed to run on the specified
    database engine alias.

    Args:
        engine: The database engine alias.
        package: The name of the package being migrated.
        **kwargs: Additional information to assist in making a decision.

    Returns:
        ``True`` if operation should run, ``False`` if not. Defaults to
        ``True``.
    """
    # Check if any of the routers have a suggestion
    for router in self.routers:
        is_allowed = router.is_migration_allowed(engine, package, **kwargs)
        if is_allowed is not None:
            return is_allowed
    # Return default
    return True

This module defines the DatabaseManager class for controlling database operations. It provides methods for managing database engine connections and routing operations using DatabaseRouter interfaces for multi-database environment scenario within the Plateforme framework.

MissingGreenlet

MissingGreenlet(*arg: Any, **kw: Any)

Bases: InvalidRequestError

Error raised by the async greenlet await_ if called while not inside the greenlet spawn context.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/exc.py
def __init__(self, *arg: Any, **kw: Any):
    code = kw.pop("code", None)
    if code is not None:
        self.code = code
    super().__init__(*arg, **kw)

inspect

inspect(
    subject: Type[_InspectableTypeProtocol[_IN]],
    raiseerr: bool = True,
) -> _IN
inspect(
    subject: _InspectableProtocol[_IN],
    raiseerr: bool = True,
) -> _IN
inspect(
    subject: Inspectable[_IN], raiseerr: bool = True
) -> _IN
inspect(
    subject: Any, raiseerr: Literal[False] = ...
) -> Optional[Any]
inspect(subject: Any, raiseerr: bool = True) -> Any
inspect(subject: Any, raiseerr: bool = True) -> Any

Produce an inspection object for the given target.

The returned value in some cases may be the same object as the one given, such as if a :class:_orm.Mapper object is passed. In other cases, it will be an instance of the registered inspection type for the given object, such as if an :class:_engine.Engine is passed, an :class:_reflection.Inspector object is returned.

:param subject: the subject to be inspected. :param raiseerr: When True, if the given subject does not correspond to a known SQLAlchemy inspected type, :class:sqlalchemy.exc.NoInspectionAvailable is raised. If False, None is returned.

Source code in .venv/lib/python3.12/site-packages/sqlalchemy/inspection.py
def inspect(subject: Any, raiseerr: bool = True) -> Any:
    """Produce an inspection object for the given target.

    The returned value in some cases may be the
    same object as the one given, such as if a
    :class:`_orm.Mapper` object is passed.   In other
    cases, it will be an instance of the registered
    inspection type for the given object, such as
    if an :class:`_engine.Engine` is passed, an
    :class:`_reflection.Inspector` object is returned.

    :param subject: the subject to be inspected.
    :param raiseerr: When ``True``, if the given subject
     does not
     correspond to a known SQLAlchemy inspected type,
     :class:`sqlalchemy.exc.NoInspectionAvailable`
     is raised.  If ``False``, ``None`` is returned.

    """
    type_ = type(subject)
    for cls in type_.__mro__:
        if cls in _registrars:
            reg = _registrars.get(cls, None)
            if reg is None:
                continue
            elif reg is True:
                return subject
            ret = reg(subject)
            if ret is not None:
                return ret
    else:
        reg = ret = None

    if raiseerr and (reg is None or ret is None):
        raise exc.NoInspectionAvailable(
            "No inspection system is "
            "available for object of type %s" % type_
        )
    return ret