Skip to content

Task Queue App

The fluid.scheduler module is a simple yet powerful distributed task producer (TaskScheduler) and consumer (TaskConsumer) system for executing tasks. The middleware for distributing tasks can be configured via the TaskBroker interface.

A redis task broker is provided for convenience.

Tasks Consumer

Create a task consumer, register tasks from modules, and run the consumer.

import asyncio
from typing import Any
from fluid.scheduler import TaskConsumer
import task_module_a, task_module_b


def task_consumer(**kwargs: Any) -> TaskConsumer:
    consumer = TaskConsumer(**kwargs)
    consumer.register_from_module(task_module_a)
    consumer.register_from_module(task_module_b)
    return consumer


if __name__ == "__main__":
    consumer = task_consumer()
    asyncio.run(consumer.run())

FastAPI Integration

A TaskManager can be integrated with FastAPI so that tasks can be queued via HTTP requests.

To setup the FastAPI app, use the task_manager_fastapi function:

import uvicorn
from fluid.scheduler import task_manager_fastapi

if __name__ == "__main__":
    consumer = task_consumer()
    app = task_manager_fastapi(consumer)
    uvicorn.run(app)

You can test via the example provided

$ python -m examples.simple_fastapi

and check the openapi UI at http://127.0.0.1:8000/docs.

Task App Command Line

The TaskConsumer or TaskScheduler can be run with the command line tool to allow for an even richer API.

from fluid.scheduler.cli import TaskManagerCLI
from fluid.scheduler import task_manager_fastapi

if __name__ == "__main__":
    consumer = task_consumer()
    TaskManagerCLI(task_manager_fastapi(consumer))()

This features requires to install the package with the cli extra.

$ pip install aio-fluid[cli]
$ python -m examples.simple_cli
Usage: python -m examples.simple_cli [OPTIONS] COMMAND [ARGS]...

Options:
  --help  Show this message and exit.

Commands:
  enable  Enable or disable a task
  exec    Execute a registered task
  ls      List all tasks with their schedules
  serve   Start app server.

The command line tool provides a powerful interface to execute tasks, parameters are passed as optional arguments using the standard click interface.

Plugins

Plugins extend the task manager with additional behaviour by hooking into task lifecycle events. A plugin implements the TaskManagerPlugin interface and is registered via TaskManager.with_plugin.

Database Plugin

The TaskDbPlugin stores every task run in a database table so you can query task history, audit outcomes, and build dashboards on top of the data.

It requires a CrudDB instance and the db extra:

pip install aio-fluid[db]

Register the plugin when building your task manager:

from fluid.scheduler import TaskScheduler, task_manager_fastapi
from fluid.scheduler.db import TaskDbPlugin, with_task_history_router
from fluid.db import CrudDB

db = CrudDB.from_env()
task_manager = TaskScheduler(...)
task_manager.with_plugin(TaskDbPlugin(db))
app = task_manager_fastapi(task_manager)
with_task_history_router(app)

The plugin creates a fluid_tasks table (configurable via table_name) and persists a row for each task run as it moves through its lifecycle states. Tasks tagged with skip_db are excluded from persistence.

with_task_history_router mounts a /history router on the app with two endpoints:

Method Path Description
GET /history List task run history with optional filters
GET /history/{run_id} Fetch a single task run by ID

The list endpoint accepts the following query parameters:

Parameter Type Description
name string Filter by task name
state TaskState Filter by task state (e.g. success, failure)
start datetime Only runs queued at or after this time
end datetime Only runs queued at or before this time

Example requests:

# All history, most recent first
GET /history

# Only successful runs of the "add" task
GET /history?name=add&state=success

# Runs queued in a specific time window
GET /history?start=2024-01-01T00:00:00Z&end=2024-01-02T00:00:00Z

# Fetch a specific run by ID
GET /history/abc123

Custom Plugins

To create your own plugin, subclass TaskManagerPlugin and implement the register method. Use TaskManager.register_async_handler to subscribe to task lifecycle events:

from fluid.scheduler import TaskManagerPlugin, TaskManager, TaskState
from fluid.utils.dispatcher import Event


class MyPlugin(TaskManagerPlugin):
    def register(self, task_manager: TaskManager) -> None:
        task_manager.register_async_handler(
            Event(TaskState.success, "my_plugin"),
            self._on_success,
        )

    async def _on_success(self, task_run) -> None:
        print(f"Task {task_run.name} succeeded")