Task Manager Plugins¶
Plugins extend the TaskManager with additional behaviour by hooking into task lifecycle events.
A plugin implements the TaskManagerPlugin interface and is registered via TaskManager.with_plugin.
from fluid.scheduler import TaskScheduler, task_manager_fastapi
from fluid.scheduler.db import TaskDbPlugin, with_task_history_router
task_manager = TaskScheduler(...)
task_manager.with_plugin(TaskDbPlugin(db))
app = task_manager_fastapi(task_manager)
with_task_history_router(app)
fluid.scheduler.TaskManagerPlugin
¶
fluid.scheduler.db.TaskDbPlugin
¶
Bases: TaskManagerPlugin
A plugin to store task runs in a database.
This plugin listens to task state changes and updates the database accordingly. It requires a CrudDB instance to perform database operations and allows customization of the table name and event tags.
You can use the skip_db tag to prevent database operations for specific tasks.
It can be used if the db extra is installed, and requires a compatible
database backend supported by CrudDB.
| PARAMETER | DESCRIPTION |
|---|---|
table_name
|
The name of the table to store task runs
TYPE:
|
tag
|
The tag for the plugin event registration
TYPE:
|
skip_db_tag
|
The tag to skip database operations
TYPE:
|
Source code in fluid/scheduler/db.py
register
¶
Source code in fluid/scheduler/db.py
get_history
async
¶
Get task run history based on the provided query parameters.
| PARAMETER | DESCRIPTION |
|---|---|
q
|
Query parameters for fetching task run history
TYPE:
|
Source code in fluid/scheduler/db.py
fluid.scheduler.db.with_task_history_router
¶
Add task history endpoints to a FastAPI app.
| PARAMETER | DESCRIPTION |
|---|---|
app
|
FastAPI app instance.
TYPE:
|
Source code in fluid/scheduler/db.py
Accessing the plugin from a task¶
get_db_plugin retrieves the registered
TaskDbPlugin from the task manager state.
It is designed as a FastAPI dependency for route handlers, but can also be
called directly from within a task by passing context.task_manager:
from fluid.scheduler import TaskRun, task
from fluid.scheduler.db import get_db_plugin, HistoryQuery
@task()
async def report(context: TaskRun) -> None:
db_plugin = get_db_plugin(context.task_manager)
page = await db_plugin.get_history(HistoryQuery(task="my-task", limit=10))
for run in page.data:
print(run.id, run.state)
fluid.scheduler.db.get_db_plugin
¶
Retrieve the registered TaskDbPlugin.
Can be used as a FastAPI dependency in route handlers, or called directly
from within a task by passing context.task_manager.
Source code in fluid/scheduler/db.py
History Models¶
The following models are used when querying task run history via TaskDbPlugin.get_history or the HTTP endpoints added by with_task_history_router.
They can be imported from fluid.scheduler.db:
fluid.scheduler.db.HistoryQuery
pydantic-model
¶
fluid.scheduler.db.TaskRunHistory
pydantic-model
¶
fluid.scheduler.db.TaskRunHistoryPage
pydantic-model
¶
Bases: BaseModel
A paginated response containing a list of task run history records.
Returned by TaskDbPlugin.get_history
and the GET /task-history endpoint.
Fields:
-
data(list[TaskRunHistory]) -
cursor(str)