Skip to content

Task Manager Plugins

Plugins extend the TaskManager with additional behaviour by hooking into task lifecycle events.

A plugin implements the TaskManagerPlugin interface and is registered via TaskManager.with_plugin.

from fluid.scheduler import TaskScheduler
from fluid.scheduler.db import TaskDbPlugin

task_manager = TaskScheduler(...)
task_manager.with_plugin(TaskDbPlugin(db))

fluid.scheduler.TaskManagerPlugin

Bases: ABC

Plugin for a task Manager

register abstractmethod

register(task_manager)

Register the plugin with the task manager

Source code in fluid/scheduler/plugin.py
@abc.abstractmethod
def register(self, task_manager: TaskManager) -> None:
    """Register the plugin with the task manager"""

fluid.scheduler.db.TaskDbPlugin

TaskDbPlugin(
    db,
    *,
    table_name="fluid_tasks",
    tag="db",
    skip_db_tag="skip_db"
)

Bases: TaskManagerPlugin

A plugin to store task runs in a database.

This plugin listens to task state changes and updates the database accordingly. It requires a CrudDB instance to perform database operations and allows customization of the table name and event tags.

You can use the skip_db tag to prevent database operations for specific tasks.

It can be used if the db extra is installed, and requires a compatible database backend supported by CrudDB.

PARAMETER DESCRIPTION
table_name

The name of the table to store task runs

TYPE: str DEFAULT: 'fluid_tasks'

tag

The tag for the plugin event registration

TYPE: str DEFAULT: 'db'

skip_db_tag

The tag to skip database operations

TYPE: str DEFAULT: 'skip_db'

Source code in fluid/scheduler/db.py
def __init__(
    self,
    db: CrudDB,
    *,
    table_name: Annotated[
        str,
        Doc("The name of the table to store task runs"),
    ] = "fluid_tasks",
    tag: Annotated[
        str,
        Doc("The tag for the plugin event registration"),
    ] = "db",
    skip_db_tag: Annotated[
        str,
        Doc("The tag to skip database operations"),
    ] = "skip_db",
) -> None:
    if table_name not in db.tables:
        task_meta(db.metadata, table_name=table_name)
    self.table_name = table_name
    self.db = db
    self.tag = tag
    self.skip_db_tag = skip_db_tag

table_name instance-attribute

table_name = table_name

db instance-attribute

db = db

tag instance-attribute

tag = tag

skip_db_tag instance-attribute

skip_db_tag = skip_db_tag

register

register(task_manager)
Source code in fluid/scheduler/db.py
def register(self, task_manager: TaskManager) -> None:
    if is_in_cpu_process():
        return
    task_manager.register_async_handler(
        Event(TaskState.queued, self.tag),
        self._handle_queued,
    )
    task_manager.register_async_handler(
        Event(TaskState.running, self.tag),
        self._handle_update,
    )
    task_manager.register_async_handler(
        Event(TaskState.success, self.tag),
        self._handle_update,
    )
    task_manager.register_async_handler(
        Event(TaskState.failure, self.tag),
        self._handle_update,
    )
    task_manager.register_async_handler(
        Event(TaskState.aborted, self.tag),
        self._handle_update,
    )
    task_manager.register_async_handler(
        Event(TaskState.rate_limited, self.tag),
        self._handle_update,
    )