Skip to content

CrudDB

The CrudDB class inherits from Database to provide standard CRUD operations for a database table.

It can be imported from fluid.db:

from fluid.db import CrudDB

fluid.db.CrudDB dataclass

CrudDB(
    dsn,
    echo=DBECHO,
    pool_size=DBPOOL_MAX_SIZE,
    max_overflow=DBPOOL_MAX_OVERFLOW,
    metadata=MetaData(),
    migration_path="",
    app_name=APP_NAME,
    _engine=None,
)

Bases: Database

A :class:.Database with additional methods for CRUD operations

dsn instance-attribute

dsn

data source name, aka connection string

Example: "postgresql+asyncpg://user:password@localhost/dbname"

echo class-attribute instance-attribute

echo = DBECHO

pool_size class-attribute instance-attribute

pool_size = DBPOOL_MAX_SIZE

max_overflow class-attribute instance-attribute

max_overflow = DBPOOL_MAX_OVERFLOW

metadata class-attribute instance-attribute

metadata = field(default_factory=MetaData)

migration_path class-attribute instance-attribute

migration_path = ''

app_name class-attribute instance-attribute

app_name = APP_NAME

tables property

tables

A dictionary of tables in the database

engine property

engine

The :class:sqlalchemy.ext.asyncio.AsyncEngine creating connection and transactions

sync_engine property

sync_engine

The :class:sqlalchemy.engine.Engine for synchrouns operations

from_env classmethod

from_env(*, dsn=DATABASE, schema=DATABASE_SCHEMA, **kwargs)

Create a new database container from environment variables as defaults

Source code in fluid/db/container.py
@classmethod
def from_env(
    cls,
    *,
    dsn: str = settings.DATABASE,
    schema: str | None = settings.DATABASE_SCHEMA,
    **kwargs: Any,
) -> Self:
    """Create a new database container from environment variables as defaults"""
    return cls(dsn=dsn, metadata=sa.MetaData(schema=schema), **kwargs)

cli

cli(**kwargs)

Create a new click group for database commands

Source code in fluid/db/container.py
def cli(self, **kwargs: Any) -> DbGroup:
    """Create a new click group for database commands"""
    return DbGroup(self, **kwargs)

connection async

connection()

Context manager for obtaining an asynchronous connection

Source code in fluid/db/container.py
@asynccontextmanager
async def connection(self) -> AsyncIterator[AsyncConnection]:
    """Context manager for obtaining an asynchronous connection"""
    async with self.engine.connect() as conn:
        yield conn

ensure_connection async

ensure_connection(conn=None)

Context manager for obtaining an asynchronous connection

Source code in fluid/db/container.py
@asynccontextmanager
async def ensure_connection(
    self,
    conn: AsyncConnection | None = None,
) -> AsyncIterator[AsyncConnection]:
    """Context manager for obtaining an asynchronous connection"""
    if conn:
        yield conn
    else:
        async with self.engine.connect() as conn:
            yield conn

transaction async

transaction()

Context manager for initializing an asynchronous database transaction

Source code in fluid/db/container.py
@asynccontextmanager
async def transaction(self) -> AsyncIterator[AsyncConnection]:
    """Context manager for initializing an asynchronous database transaction"""
    async with self.engine.begin() as conn:
        yield conn

ensure_transaction async

ensure_transaction(conn=None)

Context manager for ensuring we a connection has initialized a database transaction

Source code in fluid/db/container.py
@asynccontextmanager
async def ensure_transaction(
    self,
    conn: AsyncConnection | None = None,
) -> AsyncIterator[AsyncConnection]:
    """Context manager for ensuring we a connection has initialized
    a database transaction"""
    if conn:
        if not conn.in_transaction():
            async with conn.begin():
                yield conn
        else:
            yield conn
    else:
        async with self.transaction() as conn:
            yield conn

close async

close()

Close the asynchronous db engine if opened

Source code in fluid/db/container.py
async def close(self) -> None:
    """Close the asynchronous db engine if opened"""
    if self._engine is not None:
        engine, self._engine = self._engine, None
        await engine.dispose()

ping async

ping()

Ping the database

Source code in fluid/db/container.py
async def ping(self) -> str:
    """Ping the database"""
    # TODO: we need a custom ping query
    async with self.connection() as conn:
        await conn.execute(sa.text("SELECT 1"))
    return "ok"

migration

migration()

Create a new migration manager for this database

Source code in fluid/db/container.py
def migration(self) -> Migration:
    """Create a new migration manager for this database"""
    return Migration(self)

db_select async

db_select(table, filters, *, order_by=None, conn=None)

Select rows from a given table :param table: sqlalchemy Table :param filters: key-value pairs for filtering rows :param conn: optional db connection :param consumer: optional consumer (see :meth:.get_query)

Source code in fluid/db/crud.py
async def db_select(
    self,
    table: FromClause,
    filters: dict,
    *,
    order_by: tuple[str, ...] | None = None,
    conn: AsyncConnection | None = None,
) -> CursorResult:
    """Select rows from a given table
    :param table: sqlalchemy Table
    :param filters: key-value pairs for filtering rows
    :param conn: optional db connection
    :param consumer: optional consumer (see :meth:`.get_query`)
    """
    sql_query = self.get_query(table, Select(table), params=filters)
    if order_by:
        sql_query = self.order_by_query(table, cast(Select, sql_query), order_by)
    async with self.ensure_transaction(conn) as conn:
        return await conn.execute(sql_query)

db_insert async

db_insert(table, data, *, conn=None)

Perform an insert into a table :param table: sqlalchemy Table :param data: key-value pairs for columns values :param conn: optional db connection

Source code in fluid/db/crud.py
async def db_insert(
    self,
    table: Table,
    data: list[dict] | dict,
    *,
    conn: AsyncConnection | None = None,
) -> CursorResult:
    """Perform an insert into a table
    :param table: sqlalchemy Table
    :param data: key-value pairs for columns values
    :param conn: optional db connection
    """
    async with self.ensure_transaction(conn) as conn:
        sql_query = self.insert_query(table, data)
        return await conn.execute(sql_query)

db_update async

db_update(table, filters, data, *, conn=None)

Perform an update of rows

:param table: sqlalchemy Table :param filters: key-value pairs for filtering rows to update :param data: key-value pairs for updating columns values of selected rows :param conn: optional db connection :param consumer: optional consumer (see :meth:.get_query)

Source code in fluid/db/crud.py
async def db_update(
    self,
    table: Table,
    filters: dict,
    data: dict,
    *,
    conn: AsyncConnection | None = None,
) -> CursorResult:
    """Perform an update of rows

    :param table: sqlalchemy Table
    :param filters: key-value pairs for filtering rows to update
    :param data: key-value pairs for updating columns values of selected rows
    :param conn: optional db connection
    :param consumer: optional consumer (see :meth:`.get_query`)
    """
    update = (
        cast(
            Update,
            self.get_query(table, table.update(), params=filters),
        )
        .values(**data)
        .returning(*table.columns)
    )
    async with self.ensure_transaction(conn) as conn:
        return await conn.execute(update)

db_upsert async

db_upsert(table, filters, data=None, *, conn=None)

Perform an upsert for a single record

:param table: sqlalchemy Table :param filters: key-value pairs for filtering rows to update :param data: key-value pairs for updating columns values of selected rows :param conn: optional db connection :param consumer: optional consumer (see :meth:.get_query)

Source code in fluid/db/crud.py
async def db_upsert(
    self,
    table: Table,
    filters: dict,
    data: dict | None = None,
    *,
    conn: AsyncConnection | None = None,
) -> Row:
    """Perform an upsert for a single record

    :param table: sqlalchemy Table
    :param filters: key-value pairs for filtering rows to update
    :param data: key-value pairs for updating columns values of selected rows
    :param conn: optional db connection
    :param consumer: optional consumer (see :meth:`.get_query`)
    """
    if data:
        result = await self.db_update(table, filters, data, conn=conn)
    else:
        result = await self.db_select(table, filters, conn=conn)
    record = result.one_or_none()
    if record is None:
        insert_data = data.copy() if data else {}
        insert_data.update(filters)
        result = await self.db_insert(table, insert_data, conn=conn)
        record = result.one()
    return record

db_delete async

db_delete(table, filters, *, conn=None)

Delete rows from a given table :param table: sqlalchemy Table :param filters: key-value pairs for filtering rows :param conn: optional db connection :param consumer: optional consumer (see :meth:.get_query)

Source code in fluid/db/crud.py
async def db_delete(
    self,
    table: Table,
    filters: dict,
    *,
    conn: AsyncConnection | None = None,
) -> CursorResult:
    """Delete rows from a given table
    :param table: sqlalchemy Table
    :param filters: key-value pairs for filtering rows
    :param conn: optional db connection
    :param consumer: optional consumer (see :meth:`.get_query`)
    """
    sql_query = self.get_query(
        table,
        table.delete().returning(*table.columns),
        params=filters,
    )
    async with self.ensure_transaction(conn) as conn:
        return await conn.execute(sql_query)

db_count async

db_count(table, filters, *, conn=None)

Count rows in a table :param table: sqlalchemy Table :param filters: key-value pairs for filtering rows :param conn: optional db connection :param consumer: optional consumer (see :meth:.get_query)

Source code in fluid/db/crud.py
async def db_count(
    self,
    table: FromClause,
    filters: dict,
    *,
    conn: AsyncConnection | None = None,
) -> int:
    """Count rows in a table
    :param table: sqlalchemy Table
    :param filters: key-value pairs for filtering rows
    :param conn: optional db connection
    :param consumer: optional consumer (see :meth:`.get_query`)
    """
    count_query = self.db_count_query(
        cast(
            Select,
            self.get_query(
                table,
                table.select(),
                params=filters,
            ),
        ),
    )
    async with self.ensure_connection(conn) as conn:
        result: CursorResult = await conn.execute(count_query)
        return cast(int, result.scalar())

insert_query

insert_query(table, records)
Source code in fluid/db/crud.py
def insert_query(self, table: Table, records: list[dict] | dict) -> Insert:
    if isinstance(records, dict):
        records = [records]
    else:
        cols: Set[str] = set()
        for record in records:
            cols.update(record)
        new_records = []
        for record in records:
            if len(record) < len(cols):
                record = record.copy()
                missing = cols.difference(record)
                for col in missing:
                    record[col] = None
            new_records.append(record)
        records = new_records
    return insert(table).values(records).returning(*table.columns)

get_query

get_query(table, sql_query, *, params=None)

Build an SqlAlchemy query :param table: sqlalchemy Table :param sql_query: sqlalchemy query type :param params: key-value pairs for the query :param consumer: optional consumer for manipulating parameters

Source code in fluid/db/crud.py
def get_query(
    self,
    table: FromClause,
    sql_query: QueryType,
    *,
    params: dict | None = None,
) -> QueryType:
    """Build an SqlAlchemy query
    :param table: sqlalchemy Table
    :param sql_query: sqlalchemy query type
    :param params: key-value pairs for the query
    :param consumer: optional consumer for manipulating parameters
    """
    filters: list = []
    columns = table.c
    params = params or {}

    for key, value in params.items():
        bits = key.split(":")
        field = bits[0]
        op = bits[1] if len(bits) == 2 else "eq"
        field = getattr(columns, field)
        result = self.default_filter_column(field, op, value)
        if result is not None:
            if not isinstance(result, (list, tuple)):
                result = (result,)
            filters.extend(result)
    if filters:
        whereclause = and_(*filters) if len(filters) > 1 else filters[0]
        sql_query = cast(Select, sql_query).where(whereclause)
    return sql_query

db_count_query

db_count_query(sql_query)
Source code in fluid/db/crud.py
def db_count_query(self, sql_query: Select) -> Select:
    return select(func.count()).select_from(sql_query.alias("inner"))

order_by_query

order_by_query(table, sql_query, order_by)

Apply ordering to a sql_query

Source code in fluid/db/crud.py
def order_by_query(
    self,
    table: FromClause,
    sql_query: Select,
    order_by: tuple[str, ...],
) -> Select:
    """Apply ordering to a sql_query"""
    return sql_query.order_by(*self.order_by_columns(table, order_by))

order_by_columns

order_by_columns(table, order_by)

Apply ordering to a sql_query

Source code in fluid/db/crud.py
def order_by_columns(
    self,
    table: FromClause,
    order_by: tuple[str, ...],
) -> list[Column]:
    """Apply ordering to a sql_query"""
    columns = []
    for name in order_by:
        if name.startswith("-"):
            order_by_column = getattr(table.c, name[1:], None)
            if order_by_column is not None:
                columns.append(order_by_column.desc())
        else:
            order_by_column = getattr(table.c, name, None)
            if order_by_column is not None:
                columns.append(order_by_column)
    return columns

search_query

search_query(table, sql_query, search_fields, search)

Apply search to a sql_query

Source code in fluid/db/crud.py
def search_query(
    self,
    table: FromClause,
    sql_query: Select,
    search_fields: tuple[str, ...],
    search: str,
) -> Select:
    """Apply search to a sql_query"""
    if search and search_fields:
        columns = [getattr(table.c, col) for col in search_fields]
        return sql_query.where(or_(*(col.ilike(f"%{search}%") for col in columns)))
    return sql_query

default_filter_column

default_filter_column(column, op, value)

Applies a filter on a field. Notes on 'ne' op: Example data: [None, 'john', 'roger'] ne:john would return only roger (i.e. nulls excluded) ne: would return john and roger Notes on 'search' op: For some reason, SQLAlchemy uses to_tsquery rather than plainto_tsquery for the match operator to_tsquery uses operators (&, |, ! etc.) while plainto_tsquery tokenises the input string and uses AND between tokens, hence plainto_tsquery is what we want here For other database back ends, the behaviour of the match operator is completely different - see: http://docs.sqlalchemy.org/en/rel_1_0/core/sqlelement.html :param field: field name :param op: 'eq', 'ne', 'gt', 'lt', 'ge', 'le' or 'search' :param value: comparison value, string or list/tuple :return:

Source code in fluid/db/crud.py
def default_filter_column(self, column: Column, op: str, value: Any) -> Any:
    """
    Applies a filter on a field.
    Notes on 'ne' op:
    Example data: [None, 'john', 'roger']
    ne:john would return only roger (i.e. nulls excluded)
    ne:     would return john and roger
    Notes on  'search' op:
    For some reason, SQLAlchemy uses to_tsquery rather than
    plainto_tsquery for the match operator
    to_tsquery uses operators (&, |, ! etc.) while
    plainto_tsquery tokenises the input string and uses AND between
    tokens, hence plainto_tsquery is what we want here
    For other database back ends, the behaviour of the match
    operator is completely different - see:
    http://docs.sqlalchemy.org/en/rel_1_0/core/sqlelement.html
    :param field: field name
    :param op: 'eq', 'ne', 'gt', 'lt', 'ge', 'le' or 'search'
    :param value: comparison value, string or list/tuple
    :return:
    """
    if multiple := isinstance(value, (list, tuple)):
        value = tuple(column_value_to_python(column, v) for v in value)
    else:
        value = column_value_to_python(column, value)

    if multiple and op in ("eq", "ne"):
        if op == "eq":
            return column.in_(value)
        elif op == "ne":
            return ~column.in_(value)
    else:
        if multiple:
            assert len(value) > 0
            value = value[0]

        if op == "eq":
            return column == value
        elif op == "ne":
            return column != value
        elif op == "gt":
            return column > value
        elif op == "ge":
            return column >= value
        elif op == "lt":
            return column < value
        elif op == "le":
            return column <= value