Skip to content

Task Queue App

The fluid.scheduler module is a simple yet powerful distributed task producer (TaskScheduler) and consumer (TaskConsumer) system for executing tasks. The middleware for distributing tasks can be configured via the TaskBroker interface.

A redis task broker is provided for convenience.

Tasks Consumer

Create a task consumer, register tasks from modules, and run the consumer.

import asyncio
from typing import Any
from fluid.scheduler import TaskConsumer
import task_module_a, task_module_b

def task_consumer(**kwargs: Any) -> TaskConsumer:
    consumer = TaskConsumer(**kwargs)
    return consumer

if __name__ == "__main__":
    consumer = task_consumer()

FastAPI Integration

The TaskConsumer can be integrated with FastAPI so that tasks can be queued via HTTP requests.

import uvicorn
from fluid.scheduler.endpoints import setup_fastapi

if __name__ == "__main__":
    consumer = task_consumer()
    app = setup_fastapi(consumer)

You can test via the example provided

$ python -m examples.simple_fastapi

and check the openapi UI at

Task App Command Line

The TaskConsumer or TaskScheduler can be run with the command line tool to allow for an even richer API.

from fluid.scheduler.cli import TaskManagerCLI

if __name__ == "__main__":
    consumer = task_consumer()

This features requires to install the package with the cli extra.

$ pip install aio-fluid[cli]
$ python -m examples.simple_cli
Usage: python -m examples.simple_cli [OPTIONS] COMMAND [ARGS]...

  --help  Show this message and exit.

  enable  Enable or disable a task
  exec    Execute a registered task
  ls      List all tasks with their schedules
  serve   Start app server.

The command line tool provides a powerful interface to execute tasks, parameters are passed as optional arguments using the standard click interface.