Skip to content

Task Manager Plugins

Plugins extend the TaskManager with additional behaviour by hooking into task lifecycle events.

A plugin implements the TaskManagerPlugin interface and is registered via TaskManager.with_plugin.

from fluid.scheduler import TaskScheduler, task_manager_fastapi
from fluid.scheduler.db import TaskDbPlugin, with_task_history_router

task_manager = TaskScheduler(...)
task_manager.with_plugin(TaskDbPlugin(db))
app = task_manager_fastapi(task_manager)
with_task_history_router(app)

fluid.scheduler.TaskManagerPlugin

Bases: ABC

Plugin for a task Manager

register abstractmethod

register(task_manager)

Register the plugin with the task manager

Source code in fluid/scheduler/plugin.py
@abc.abstractmethod
def register(self, task_manager: TaskManager) -> None:
    """Register the plugin with the task manager"""

fluid.scheduler.db.TaskDbPlugin

TaskDbPlugin(
    db,
    *,
    table_name="fluid_tasks",
    tag="db",
    skip_db_tag="skip_db"
)

Bases: TaskManagerPlugin

A plugin to store task runs in a database.

This plugin listens to task state changes and updates the database accordingly. It requires a CrudDB instance to perform database operations and allows customization of the table name and event tags.

You can use the skip_db tag to prevent database operations for specific tasks.

It can be used if the db extra is installed, and requires a compatible database backend supported by CrudDB.

PARAMETER DESCRIPTION
table_name

The name of the table to store task runs

TYPE: str DEFAULT: 'fluid_tasks'

tag

The tag for the plugin event registration

TYPE: str DEFAULT: 'db'

skip_db_tag

The tag to skip database operations

TYPE: str DEFAULT: 'skip_db'

Source code in fluid/scheduler/db.py
def __init__(
    self,
    db: CrudDB,
    *,
    table_name: Annotated[
        str,
        Doc("The name of the table to store task runs"),
    ] = "fluid_tasks",
    tag: Annotated[
        str,
        Doc("The tag for the plugin event registration"),
    ] = "db",
    skip_db_tag: Annotated[
        str,
        Doc("The tag to skip database operations"),
    ] = "skip_db",
) -> None:
    if table_name not in db.tables:
        task_meta(db.metadata, table_name=table_name)
    self.table_name = table_name
    self.db = db
    self.tag = tag
    self.skip_db_tag = skip_db_tag

table_name instance-attribute

table_name = table_name

db instance-attribute

db = db

tag instance-attribute

tag = tag

skip_db_tag instance-attribute

skip_db_tag = skip_db_tag

register

register(task_manager)
Source code in fluid/scheduler/db.py
def register(self, task_manager: TaskManager) -> None:
    if is_in_cpu_process():
        return

    task_manager.state.task_db_plugin = self
    task_manager.register_async_handler(
        Event(TaskState.queued, self.tag),
        self._handle_update,
    )
    task_manager.register_async_handler(
        Event(TaskState.success, self.tag),
        self._handle_update,
    )
    task_manager.register_async_handler(
        Event(TaskState.failure, self.tag),
        self._handle_update,
    )
    task_manager.register_async_handler(
        Event(TaskState.aborted, self.tag),
        self._handle_update,
    )
    task_manager.register_async_handler(
        Event(TaskState.rate_limited, self.tag),
        self._handle_update,
    )

get_history async

get_history(q)

Get task run history based on the provided query parameters.

PARAMETER DESCRIPTION
q

Query parameters for fetching task run history

TYPE: HistoryQuery

Source code in fluid/scheduler/db.py
async def get_history(
    self,
    q: Annotated[
        HistoryQuery, Doc("Query parameters for fetching task run history")
    ],
) -> TaskRunHistoryPage:
    """Get task run history based on the provided query parameters."""
    table = self.db.tables[self.table_name]
    pagination = Pagination.create(
        "queued",
        filters=q.filters(),
        limit=q.limit,
        cursor=q.cursor,
        desc=True,
    )
    rows, cursor = await pagination.execute(self.db, table)
    return TaskRunHistoryPage(
        data=[_row_to_task_run(row) for row in rows],
        cursor=cursor,
    )

fluid.scheduler.db.with_task_history_router

with_task_history_router(app, prefix='/task-history')

Add task history endpoints to a FastAPI app.

PARAMETER DESCRIPTION
app

FastAPI app instance.

TYPE: FastAPI

Source code in fluid/scheduler/db.py
def with_task_history_router(
    app: Annotated[
        FastAPI,
        Doc("FastAPI app instance."),
    ],
    prefix: str = "/task-history",
) -> FastAPI:
    """Add task history endpoints to a FastAPI app."""
    app.include_router(router, prefix=prefix)
    return app

Accessing the plugin from a task

get_db_plugin retrieves the registered TaskDbPlugin from the task manager state. It is designed as a FastAPI dependency for route handlers, but can also be called directly from within a task by passing context.task_manager:

from fluid.scheduler import TaskRun, task
from fluid.scheduler.db import get_db_plugin, HistoryQuery


@task()
async def report(context: TaskRun) -> None:
    db_plugin = get_db_plugin(context.task_manager)
    page = await db_plugin.get_history(HistoryQuery(task="my-task", limit=10))
    for run in page.data:
        print(run.id, run.state)

fluid.scheduler.db.get_db_plugin

get_db_plugin(task_manager)

Retrieve the registered TaskDbPlugin.

Can be used as a FastAPI dependency in route handlers, or called directly from within a task by passing context.task_manager.

Source code in fluid/scheduler/db.py
def get_db_plugin(task_manager: TaskManagerDep) -> TaskDbPlugin:
    """Retrieve the registered [TaskDbPlugin][fluid.scheduler.db.TaskDbPlugin].

    Can be used as a FastAPI dependency in route handlers, or called directly
    from within a task by passing `context.task_manager`.
    """
    return task_manager.state.task_db_plugin

History Models

The following models are used when querying task run history via TaskDbPlugin.get_history or the HTTP endpoints added by with_task_history_router.

They can be imported from fluid.scheduler.db:

from fluid.scheduler.db import HistoryQuery, TaskRunHistory, TaskRunHistoryPage

fluid.scheduler.db.HistoryQuery pydantic-model

Bases: BaseModel

Query parameters for fetching task run history.

Fields:

task pydantic-field

task = None

start pydantic-field

start = None

end pydantic-field

end = None

state pydantic-field

state = None

limit pydantic-field

limit = None

cursor pydantic-field

cursor = ''

filters

filters()
Source code in fluid/scheduler/db.py
def filters(self) -> dict:
    return {
        self._filter_map.get(k, k): v
        for k, v in self.model_dump(
            exclude_none=True, exclude={"limit", "cursor"}
        ).items()
    }

fluid.scheduler.db.TaskRunHistory pydantic-model

Bases: BaseModel

A model representing the history of a task run, including its parameters and timing information.

Fields:

id pydantic-field

id

The unique ID of the task run

task pydantic-field

task

The name of the task

priority pydantic-field

priority

The priority of the task

state pydantic-field

state

The state of the task

queued pydantic-field

queued

The time the task was queued

start pydantic-field

start = None

The start time of the task

end pydantic-field

end = None

The end time of the task

params pydantic-field

params

The parameters of the task run

fluid.scheduler.db.TaskRunHistoryPage pydantic-model

Bases: BaseModel

A paginated response containing a list of task run history records.

Returned by TaskDbPlugin.get_history and the GET /task-history endpoint.

Fields:

data pydantic-field

data

The task run history records

cursor pydantic-field

cursor

Pagination cursor to fetch the next page