Skip to content

Database

It can be imported from fluid.db:

from fluid.db import Database

fluid.db.Database dataclass

Database(
    dsn,
    echo=DBECHO,
    pool_size=DBPOOL_MAX_SIZE,
    max_overflow=DBPOOL_MAX_OVERFLOW,
    metadata=MetaData(),
    migration_path="",
    app_name=APP_NAME,
    _engine=None,
)

A container for tables in a database and a manager of asynchronous connections to a postgresql database

dsn instance-attribute

dsn

data source name, aka connection string

Example: "postgresql+asyncpg://user:password@localhost/dbname"

echo class-attribute instance-attribute

echo = DBECHO

pool_size class-attribute instance-attribute

pool_size = DBPOOL_MAX_SIZE

max_overflow class-attribute instance-attribute

max_overflow = DBPOOL_MAX_OVERFLOW

metadata class-attribute instance-attribute

metadata = field(default_factory=MetaData)

migration_path class-attribute instance-attribute

migration_path = ''

app_name class-attribute instance-attribute

app_name = APP_NAME

tables property

tables

A dictionary of tables in the database

engine property

engine

The :class:sqlalchemy.ext.asyncio.AsyncEngine creating connection and transactions

sync_engine property

sync_engine

The :class:sqlalchemy.engine.Engine for synchrouns operations

from_env classmethod

from_env(*, dsn=DATABASE, schema=DATABASE_SCHEMA, **kwargs)

Create a new database container from environment variables as defaults

Source code in fluid/db/container.py
@classmethod
def from_env(
    cls,
    *,
    dsn: str = settings.DATABASE,
    schema: str | None = settings.DATABASE_SCHEMA,
    **kwargs: Any,
) -> Self:
    """Create a new database container from environment variables as defaults"""
    return cls(dsn=dsn, metadata=sa.MetaData(schema=schema), **kwargs)

cli

cli(**kwargs)

Create a new click group for database commands

Source code in fluid/db/container.py
def cli(self, **kwargs: Any) -> DbGroup:
    """Create a new click group for database commands"""
    return DbGroup(self, **kwargs)

connection async

connection()

Context manager for obtaining an asynchronous connection

Source code in fluid/db/container.py
@asynccontextmanager
async def connection(self) -> AsyncIterator[AsyncConnection]:
    """Context manager for obtaining an asynchronous connection"""
    async with self.engine.connect() as conn:
        yield conn

ensure_connection async

ensure_connection(conn=None)

Context manager for obtaining an asynchronous connection

Source code in fluid/db/container.py
@asynccontextmanager
async def ensure_connection(
    self,
    conn: AsyncConnection | None = None,
) -> AsyncIterator[AsyncConnection]:
    """Context manager for obtaining an asynchronous connection"""
    if conn:
        yield conn
    else:
        async with self.engine.connect() as conn:
            yield conn

transaction async

transaction()

Context manager for initializing an asynchronous database transaction

Source code in fluid/db/container.py
@asynccontextmanager
async def transaction(self) -> AsyncIterator[AsyncConnection]:
    """Context manager for initializing an asynchronous database transaction"""
    async with self.engine.begin() as conn:
        yield conn

ensure_transaction async

ensure_transaction(conn=None)

Context manager for ensuring we a connection has initialized a database transaction

Source code in fluid/db/container.py
@asynccontextmanager
async def ensure_transaction(
    self,
    conn: AsyncConnection | None = None,
) -> AsyncIterator[AsyncConnection]:
    """Context manager for ensuring we a connection has initialized
    a database transaction"""
    if conn:
        if not conn.in_transaction():
            async with conn.begin():
                yield conn
        else:
            yield conn
    else:
        async with self.transaction() as conn:
            yield conn

close async

close()

Close the asynchronous db engine if opened

Source code in fluid/db/container.py
async def close(self) -> None:
    """Close the asynchronous db engine if opened"""
    if self._engine is not None:
        engine, self._engine = self._engine, None
        await engine.dispose()

ping async

ping()

Ping the database

Source code in fluid/db/container.py
async def ping(self) -> str:
    """Ping the database"""
    # TODO: we need a custom ping query
    async with self.connection() as conn:
        await conn.execute(sa.text("SELECT 1"))
    return "ok"

migration

migration()

Create a new migration manager for this database

Source code in fluid/db/container.py
def migration(self) -> Migration:
    """Create a new migration manager for this database"""
    return Migration(self)