Task Queue App¶
The fluid.scheduler module is a simple yet powerful distributed task producer (TaskScheduler) and consumer (TaskConsumer) system for executing tasks.
The middleware for distributing tasks can be configured via the TaskBroker interface.
A redis task broker is provided for convenience.
Tasks Consumer¶
Create a task consumer, register tasks from modules, and run the consumer.
import asyncio
from typing import Any
from fluid.scheduler import TaskConsumer
import task_module_a, task_module_b
def task_consumer(**kwargs: Any) -> TaskConsumer:
consumer = TaskConsumer(**kwargs)
consumer.register_from_module(task_module_a)
consumer.register_from_module(task_module_b)
return consumer
if __name__ == "__main__":
consumer = task_consumer()
asyncio.run(consumer.run())
FastAPI Integration¶
A TaskManager can be integrated with FastAPI so that tasks can be queued via HTTP requests.
To setup the FastAPI app, use the task_manager_fastapi function:
import uvicorn
from fluid.scheduler import task_manager_fastapi
if __name__ == "__main__":
consumer = task_consumer()
app = task_manager_fastapi(consumer)
uvicorn.run(app)
You can test via the example provided
and check the openapi UI at http://127.0.0.1:8000/docs.
Task App Command Line¶
The TaskConsumer or TaskScheduler can be run with the command line tool to allow for an even richer API.
from fluid.scheduler.cli import TaskManagerCLI
from fluid.scheduler import task_manager_fastapi
if __name__ == "__main__":
consumer = task_consumer()
TaskManagerCLI(task_manager_fastapi(consumer))()
This features requires to install the package with the cli extra.
$ python -m examples.simple_cli
Usage: python -m examples.simple_cli [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
enable Enable or disable a task
exec Execute a registered task
ls List all tasks with their schedules
serve Start app server.
The command line tool provides a powerful interface to execute tasks, parameters are passed as optional arguments using the standard click interface.
Plugins¶
Plugins extend the task manager with additional behaviour by hooking into task lifecycle events. A plugin implements the TaskManagerPlugin interface and is registered via TaskManager.with_plugin.
Database Plugin¶
The TaskDbPlugin stores every task run in a database table so you can query task history, audit outcomes, and build dashboards on top of the data.
It requires a CrudDB instance and the db extra:
Register the plugin when building your task manager:
from fluid.scheduler import TaskScheduler, task_manager_fastapi
from fluid.scheduler.db import TaskDbPlugin, with_task_history_router
from fluid.db import CrudDB
db = CrudDB.from_env()
task_manager = TaskScheduler(...)
task_manager.with_plugin(TaskDbPlugin(db))
app = task_manager_fastapi(task_manager)
with_task_history_router(app)
The plugin creates a fluid_tasks table (configurable via table_name) and
persists a row for each task run as it moves through its lifecycle states.
Tasks tagged with skip_db are excluded from persistence.
with_task_history_router mounts a /history router on the app with two endpoints:
| Method | Path | Description |
|---|---|---|
GET |
/history |
List task run history with optional filters |
GET |
/history/{run_id} |
Fetch a single task run by ID |
The list endpoint accepts the following query parameters:
| Parameter | Type | Description |
|---|---|---|
name |
string |
Filter by task name |
state |
TaskState |
Filter by task state (e.g. success, failure) |
start |
datetime |
Only runs queued at or after this time |
end |
datetime |
Only runs queued at or before this time |
Example requests:
# All history, most recent first
GET /history
# Only successful runs of the "add" task
GET /history?name=add&state=success
# Runs queued in a specific time window
GET /history?start=2024-01-01T00:00:00Z&end=2024-01-02T00:00:00Z
# Fetch a specific run by ID
GET /history/abc123
Custom Plugins¶
To create your own plugin, subclass TaskManagerPlugin
and implement the register method. Use
TaskManager.register_async_handler
to subscribe to task lifecycle events:
from fluid.scheduler import TaskManagerPlugin, TaskManager, TaskState
from fluid.utils.dispatcher import Event
class MyPlugin(TaskManagerPlugin):
def register(self, task_manager: TaskManager) -> None:
task_manager.register_async_handler(
Event(TaskState.success, "my_plugin"),
self._on_success,
)
async def _on_success(self, task_run) -> None:
print(f"Task {task_run.name} succeeded")