Async Database¶
The fluid.db module provides a simple asynchronous interface to interact with postgres databases. It is built on top of the sqlalchemy and asyncpg libraries.
Installation¶
To use the database module, you need to install the db extra, and optionally the cli extra if you want to use the command line interface for managing database migrations:
Database¶
There are two database classes:
- Database — provides connection management, transactions, and migrations.
- CrudDB — extends
Databasewith CRUD helpers for common query patterns.
Most applications should use CrudDB directly:
from fluid.db import CrudDB
db = CrudDB("postgresql+asyncpg://postgres:postgres@localhost:5432/mydb")
Note the use of the postgresql+asyncpg driver in the connection string. This is required for the async engine.
You can also load the DSN from an environment variable (defaults to DATABASE):
Register a Table¶
Register tables against the database's metadata so that migrations and CRUD helpers can discover them:
import sqlalchemy as sa
articles = sa.Table(
"articles",
db.metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("title", sa.String(200), nullable=False),
sa.Column("author", sa.String(100)),
sa.Column("score", sa.Integer),
sa.Column("published_at", sa.DateTime(timezone=True)),
)
Migrations¶
The Migration object wraps Alembic and is obtained from the database:
Create the database¶
Apply migrations¶
For quick setup in tests or development, you can create all tables directly from metadata without Alembic revision files:
Generate a new revision¶
# auto-generate a revision by diffing metadata against the current schema
mig.revision("add score column", autogenerate=True)
Connections and Transactions¶
Use connection() when you only need to read data:
Use transaction() when you need to write data — it commits on exit and rolls back on exception:
The ensure_connection and ensure_transaction variants are useful in functions that may receive an existing connection from the caller, avoiding nested connections:
async def insert_article(data: dict, conn=None):
async with db.ensure_transaction(conn) as conn:
await conn.execute(articles.insert().values(**data))
CRUD Operations¶
CrudDB provides async helpers that cover the most common patterns.
Insert¶
# single row
row = (await db.db_insert(articles, {"title": "Hello", "score": 10})).one()
# multiple rows — missing columns are filled with None automatically
rows = (await db.db_insert(articles, [
{"title": "Hello", "score": 10},
{"title": "World"},
])).fetchall()
All insert operations return the inserted rows via RETURNING *.
Select¶
Pass order_by to sort results. Prefix a field name with - for descending order:
Filter operators¶
Filters use a "field:op" key syntax. The default operator is eq:
| Key | Meaning |
|---|---|
"score" or "score:eq" |
score = value |
"score:ne" |
score != value |
"score:gt" |
score > value |
"score:ge" |
score >= value |
"score:lt" |
score < value |
"score:le" |
score <= value |
Pass a list as the value to use IN / NOT IN:
# WHERE score IN (5, 10, 15)
rows = (await db.db_select(articles, {"score": [5, 10, 15]})).fetchall()
Update¶
Returns all updated rows via RETURNING *.
Upsert¶
db_upsert updates a single record if it exists, or inserts it if it does not:
# update score if the row exists, otherwise insert it
row = await db.db_upsert(
articles,
{"title": "Hello"}, # lookup key
{"score": 42}, # data to set
)
Delete¶
Returns all deleted rows via RETURNING *.
Count¶
Pagination¶
Pagination implements cursor-based pagination on top of CrudDB.
It fetches one extra row beyond the requested limit to determine whether a next page exists, then encodes a cursor that the client returns with the next request.
from fluid.db import Pagination
# first page
rows, cursor = await Pagination.create(
"published_at",
limit=20,
filters={"author": "alice"},
desc=True,
).execute(db, articles)
# next page — filters and limit are embedded in the cursor
if cursor:
rows, cursor = await Pagination.create(
"published_at",
cursor=cursor,
desc=True,
).execute(db, articles)
When cursor is provided, the filters and limit arguments are ignored — they are decoded from the cursor itself, ensuring consistent pages even if the caller changes them between requests.
Full-text search¶
Combine pagination with a full-text search across multiple columns: