Skip to content

DB Migration

The migration object is accessed via Database.migration or CrudDB and is used to create and manage database migrations.

fluid.db.Migration dataclass

Migration(db)

A wrapper around Alembic commands to perform database migrations

db instance-attribute

db

cfg class-attribute instance-attribute

cfg = field(init=False, repr=False)

metadata property

metadata

sync_engine property

sync_engine

init

init()
Source code in fluid/db/migration.py
def init(self) -> str:
    dirname = self.cfg.get_main_option("script_location") or ""
    alembic_cmd.init(self.cfg, dirname)
    return self.message()

show

show(revision)
Source code in fluid/db/migration.py
def show(self, revision: str) -> str:
    alembic_cmd.show(self.cfg, revision)
    return self.message()

history

history()
Source code in fluid/db/migration.py
def history(self) -> str:
    alembic_cmd.history(self.cfg)
    return self.message()

revision

revision(message, autogenerate=False, branch_label=None)
Source code in fluid/db/migration.py
def revision(
    self,
    message: str,
    autogenerate: bool = False,
    branch_label: str | None = None,
) -> str:
    alembic_cmd.revision(
        self.cfg,
        autogenerate=autogenerate,
        message=message,
        branch_label=branch_label,
    )
    return self.message()

upgrade

upgrade(revision)
Source code in fluid/db/migration.py
def upgrade(self, revision: str) -> str:
    alembic_cmd.upgrade(self.cfg, revision)
    return self.message()

downgrade

downgrade(revision)
Source code in fluid/db/migration.py
def downgrade(self, revision: str) -> str:
    alembic_cmd.downgrade(self.cfg, revision)
    return self.message()

current

current(verbose=False)
Source code in fluid/db/migration.py
def current(self, verbose: bool = False) -> str:
    alembic_cmd.current(self.cfg, verbose=verbose)
    return self.message()

message

message()
Source code in fluid/db/migration.py
def message(self) -> str:
    msg = cast(StringIO, self.cfg.stdout).getvalue()
    self.cfg.stdout.seek(0)
    self.cfg.stdout.truncate()
    return msg

db_exists

db_exists(dbname='')
Source code in fluid/db/migration.py
def db_exists(self, dbname: str = "") -> bool:
    url = self.sync_engine.url
    if dbname:
        url = url.set(database=dbname)
    return database_exists(url)

db_create

db_create(dbname='')

Creates a new database if it does not exist

Source code in fluid/db/migration.py
def db_create(self, dbname: str = "") -> bool:
    """Creates a new database if it does not exist"""
    url = self.sync_engine.url
    if dbname:
        url = url.set(database=dbname)
    if database_exists(url):
        return False
    create_database(url)
    return True

db_drop

db_drop(dbname='')
Source code in fluid/db/migration.py
def db_drop(self, dbname: str = "") -> bool:
    url = self.sync_engine.url
    if dbname:
        url = url.set(database=dbname)
    if database_exists(url):
        drop_database(url)
        return True
    return False

truncate_all

truncate_all()

Drop all tables from :attr:metadata in database

Source code in fluid/db/migration.py
def truncate_all(self) -> None:
    """Drop all tables from :attr:`metadata` in database"""
    with self.sync_engine.begin() as conn:
        conn.execute(sa.text(f'truncate {", ".join(self.metadata.tables)}'))

drop_all_schemas

drop_all_schemas()

Drop all schema in database

Source code in fluid/db/migration.py
def drop_all_schemas(self) -> None:
    """Drop all schema in database"""
    with self.sync_engine.begin() as conn:
        conn.execute(sa.text("DROP SCHEMA IF EXISTS public CASCADE"))
        conn.execute(sa.text("CREATE SCHEMA IF NOT EXISTS public"))

create_ro_user

create_ro_user(
    username, password, role="", schema="public"
)

Creates a read-only user

Source code in fluid/db/migration.py
def create_ro_user(
    self,
    username: str,
    password: str,
    role: str = "",
    schema: str = "public",
) -> bool:
    """Creates a read-only user"""
    engine = self.sync_engine
    role = role or f"{engine.url.username}_ro"
    database = engine.url.database
    created = True
    with engine.begin() as conn:
        try:
            conn.execute(sa.text(f"CREATE ROLE {role};"))
        except sa.exc.ProgrammingError:
            created = False
    with engine.begin() as conn:
        conn.execute(
            sa.text(
                f"GRANT CONNECT ON DATABASE {database} TO {role};"
                f"GRANT USAGE ON SCHEMA {schema} TO {role};"
                f"GRANT SELECT ON ALL TABLES IN SCHEMA {schema} TO {role};"
                f"GRANT SELECT ON ALL SEQUENCES IN SCHEMA {schema} TO {role};",
            ),
        )
        conn.execute(
            sa.text(
                f"ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} "
                f"GRANT SELECT ON TABLES TO {role};",
            ),
        )
    with engine.begin() as conn:
        try:
            conn.execute(
                sa.text(
                    f"CREATE USER {username} WITH PASSWORD '{password}';"
                    f"GRANT {role} TO {username};",
                ),
            )
        except sa.exc.ProgrammingError:
            created = False
    return created

drop_role

drop_role(role)

Drop a role

Source code in fluid/db/migration.py
def drop_role(
    self,
    role: str,
) -> bool:
    """Drop a role"""
    try:
        with self.sync_engine.begin() as conn:
            conn.execute(sa.text(f"DROP OWNED BY {role};"))
        with self.sync_engine.begin() as conn:
            conn.execute(sa.text(f"DROP ROLE IF EXISTS {role};"))
    except sa.exc.ProgrammingError as exc:
        if f'role "{role}" does not exist' not in str(exc):
            raise
        return False
    return True