Skip to content

Task Manager

The Task Manager is a component that manages the execution of tasks. It is the simplest way to run tasks and it is the base class for the TaskConsumer and theTaskScheduler.

It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskManager

The Task Manager is useful if you want to execute tasks in a synchronous or asynchronous way.

fluid.scheduler.TaskManager

TaskManager(*, deps=None, config=None, **kwargs)

The task manager is the main class for managing tasks

Source code in fluid/scheduler/consumer.py
def __init__(
    self,
    *,
    deps: Any = None,
    config: TaskManagerConfig | None = None,
    **kwargs: Any,
) -> None:
    self.deps: Annotated[
        Any,
        Doc(
            """
            Dependencies for the task manager.

            Production applications requires global dependencies to be
            available to all tasks. This can be achieved by setting
            the `deps` attribute of the task manager to an object
            with the required dependencies.

            Each task can cast the dependencies to the required type.
            """
        ),
    ] = (
        deps if deps is not None else State()
    )
    self.config: Annotated[
        TaskManagerConfig, Doc("""Task manager configuration""")
    ] = config or TaskManagerConfig(**kwargs)
    self.dispatcher: Annotated[
        TaskDispatcher,
        Doc(
            """
            A dispatcher of [TaskRun][fluid.scheduler.TaskRun] events.

            Application can register handlers to listen for events
            happening during the lifecycle of a task run.
            """
        ),
    ] = TaskDispatcher()
    self.broker = TaskBroker.from_url(self.config.broker_url)
    self._async_contexts: list[Any] = []
    self._stack = AsyncExitStack()

deps instance-attribute

deps = deps if deps is not None else State()

Dependencies for the task manager.

Production applications requires global dependencies to be available to all tasks. This can be achieved by setting the deps attribute of the task manager to an object with the required dependencies.

Each task can cast the dependencies to the required type.

config instance-attribute

config = config or TaskManagerConfig(**kwargs)

Task manager configuration

dispatcher instance-attribute

dispatcher = TaskDispatcher()

A dispatcher of TaskRun events.

Application can register handlers to listen for events happening during the lifecycle of a task run.

broker instance-attribute

broker = from_url(broker_url)

registry property

registry

The task registry

type property

type

The type of the task manager

on_startup async

on_startup()
Source code in fluid/scheduler/consumer.py
async def on_startup(self) -> None:
    await self.__aenter__()

on_shutdown async

on_shutdown()
Source code in fluid/scheduler/consumer.py
async def on_shutdown(self) -> None:
    await self.__aexit__(None, None, None)

add_async_context_manager

add_async_context_manager(cm)

Add an async context manager to the task manager

These context managers are entered when the task manager starts

Source code in fluid/scheduler/consumer.py
def add_async_context_manager(self, cm: Any) -> None:
    """Add an async context manager to the task manager

    These context managers are entered when the task manager starts
    """
    self._async_contexts.append(cm)

register_task

register_task(task)

Register a task with the task manager

PARAMETER DESCRIPTION
task

Task to register

TYPE: Task

Source code in fluid/scheduler/consumer.py
def register_task(self, task: Annotated[Task, Doc("Task to register")]) -> None:
    """Register a task with the task manager"""
    self.broker.register_task(task)

execute async

execute(task, *, run_id='', priority=None, **params)

Execute a task and wait for it to finish

PARAMETER DESCRIPTION
task

The task or task name, if a task name it must be registered with the task manager.

TYPE: str | Task

run_id

Unique ID for the task run. If not provided a new UUID is generated.

TYPE: str DEFAULT: ''

priority

Override the default task priority if provided

TYPE: TaskPriority | None DEFAULT: None

**params

The optional parameters for the task run. They must match the task params model

TYPE: Any DEFAULT: {}

Source code in fluid/scheduler/consumer.py
async def execute(
    self,
    task: Annotated[
        str | Task,
        Doc(
            "The task or task name,"
            " if a task name it must be registered with the task manager."
        ),
    ],
    *,
    run_id: Annotated[
        str,
        Doc("Unique ID for the task run. If not provided a new UUID is generated."),
    ] = "",
    priority: Annotated[
        TaskPriority | None, Doc("Override the default task priority if provided")
    ] = None,
    **params: Annotated[
        Any,
        Doc(
            "The optional parameters for the task run. "
            "They must match the task params model"
        ),
    ],
) -> TaskRun:
    """Execute a task and wait for it to finish"""
    task_run = self.create_task_run(
        task,
        run_id=run_id,
        priority=priority,
        **params,
    )
    await task_run.execute()
    return task_run

execute_sync

execute_sync(task, *, run_id='', priority=None, **params)

Execute a task synchronously

This method is a blocking method that should be used in a synchronous context.

PARAMETER DESCRIPTION
task

The task or task name, if a task name it must be registered with the task manager.

TYPE: str | Task

run_id

Unique ID for the task run. If not provided a new UUID is generated.

TYPE: str DEFAULT: ''

priority

Override the default task priority if provided

TYPE: TaskPriority | None DEFAULT: None

**params

The optional parameters for the task run. They must match the task params model

TYPE: Any DEFAULT: {}

Source code in fluid/scheduler/consumer.py
def execute_sync(
    self,
    task: Annotated[
        str | Task,
        Doc(
            "The task or task name,"
            " if a task name it must be registered with the task manager."
        ),
    ],
    *,
    run_id: Annotated[
        str,
        Doc("Unique ID for the task run. If not provided a new UUID is generated."),
    ] = "",
    priority: Annotated[
        TaskPriority | None, Doc("Override the default task priority if provided")
    ] = None,
    **params: Annotated[
        Any,
        Doc(
            "The optional parameters for the task run. "
            "They must match the task params model"
        ),
    ],
) -> TaskRun:
    """Execute a task synchronously

    This method is a blocking method that should be used in a synchronous
    context.
    """
    return asyncio.run(
        self._execute_and_exit(
            task,
            run_id=run_id,
            priority=priority,
            **params,
        )
    )

queue async

queue(task, *, run_id='', priority=None, **params)

Queue a task for execution

This methods fires two events:

  • init: when the task run is created
  • queued: after the task is queued

It returns the TaskRun object

PARAMETER DESCRIPTION
task

The task or task name, if a task name it must be registered with the task manager.

TYPE: str | Task

run_id

Unique ID for the task run. If not provided a new UUID is generated.

TYPE: str DEFAULT: ''

priority

Override the default task priority if provided

TYPE: TaskPriority | None DEFAULT: None

**params

The optional parameters for the task run. They must match the task params model

TYPE: Any DEFAULT: {}

Source code in fluid/scheduler/consumer.py
async def queue(
    self,
    task: Annotated[
        str | Task,
        Doc(
            "The task or task name,"
            " if a task name it must be registered with the task manager."
        ),
    ],
    *,
    run_id: Annotated[
        str,
        Doc("Unique ID for the task run. If not provided a new UUID is generated."),
    ] = "",
    priority: Annotated[
        TaskPriority | None, Doc("Override the default task priority if provided")
    ] = None,
    **params: Annotated[
        Any,
        Doc(
            "The optional parameters for the task run. "
            "They must match the task params model"
        ),
    ],
) -> TaskRun:
    """Queue a task for execution

    This methods fires two events:

    - `init`: when the task run is created
    - `queued`: after the task is queued

    It returns the [TaskRun][fluid.scheduler.TaskRun] object
    """
    task_run = self.create_task_run(
        task,
        run_id=run_id,
        priority=priority,
        **params,
    )
    self.dispatcher.dispatch(task_run)
    task_run.set_state(TaskState.queued)
    await self.broker.queue_task(task_run)
    return task_run

create_task_run

create_task_run(
    task, *, run_id="", priority=None, **params
)

Create a TaskRun in init state

PARAMETER DESCRIPTION
task

The task or task name, if a task name it must be registered with the task manager.

TYPE: str | Task

run_id

Unique ID for the task run. If not provided a new UUID is generated.

TYPE: str DEFAULT: ''

priority

Override the default task priority if provided

TYPE: TaskPriority | None DEFAULT: None

**params

The optional parameters for the task run. They must match the task params model

TYPE: Any DEFAULT: {}

Source code in fluid/scheduler/consumer.py
def create_task_run(
    self,
    task: Annotated[
        str | Task,
        Doc(
            "The task or task name,"
            " if a task name it must be registered with the task manager."
        ),
    ],
    *,
    run_id: Annotated[
        str,
        Doc("Unique ID for the task run. If not provided a new UUID is generated."),
    ] = "",
    priority: Annotated[
        TaskPriority | None, Doc("Override the default task priority if provided")
    ] = None,
    **params: Annotated[
        Any,
        Doc(
            "The optional parameters for the task run. "
            "They must match the task params model"
        ),
    ],
) -> TaskRun:
    """Create a [TaskRun][fluid.scheduler.TaskRun] in `init` state"""
    task = self.broker.task_from_registry(task)
    run_id = run_id or self.broker.new_uuid()
    return TaskRun(
        id=run_id,
        task=task,
        priority=priority or task.priority,
        params=task.params_model(**params),
        task_manager=self,
    )

register_from_module

register_from_module(module)
PARAMETER DESCRIPTION
module

Python module with tasks implementations - can contain any object, only instances of Task are registered

TYPE: ModuleType

Source code in fluid/scheduler/consumer.py
def register_from_module(
    self,
    module: Annotated[
        ModuleType,
        Doc(
            "Python module with tasks implementations "
            "- can contain any object, only instances of Task are registered"
        ),
    ],
) -> None:
    for name in dir(module):
        if name.startswith("_"):
            continue
        if isinstance(obj := getattr(module, name), Task):
            self.register_task(obj)

register_from_dict

register_from_dict(data)
PARAMETER DESCRIPTION
data

Python dictionary with tasks implementations - can contain any object, only instances of Task are registered

TYPE: dict[str, Any]

Source code in fluid/scheduler/consumer.py
def register_from_dict(
    self,
    data: Annotated[
        dict[str, Any],
        Doc(
            "Python dictionary with tasks implementations "
            "- can contain any object, only instances of Task are registered"
        ),
    ],
) -> None:
    for name, obj in data.items():
        if name.startswith("_"):
            continue
        if isinstance(obj, Task):
            self.register_task(obj)

register_async_handler

register_async_handler(event, handler)

Register an async handler for a given event

This method is a no op for a TaskManager that is not a worker

Source code in fluid/scheduler/consumer.py
def register_async_handler(self, event: str, handler: AsyncHandler) -> None:
    """Register an async handler for a given event

    This method is a no op for a TaskManager that is not a worker
    """

unregister_async_handler

unregister_async_handler(event)

Unregister an async handler for a given event

This method is a no op for a TaskManager that is not a worker

Source code in fluid/scheduler/consumer.py
def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None:
    """Unregister an async handler for a given event

    This method is a no op for a TaskManager that is not a worker
    """

fluid.scheduler.TaskManagerConfig pydantic-model

Bases: BaseModel

Task manager configuration

Fields:

schedule_tasks pydantic-field

schedule_tasks = True

consume_tasks pydantic-field

consume_tasks = True

Consume tasks or sleep

max_concurrent_tasks pydantic-field

max_concurrent_tasks = MAX_CONCURRENT_TASKS

The number of coroutine workers consuming tasks. Each worker consumes one task at a time, therefore, this number is the maximum number of tasks that can run concurrently.It can be configured via the FLUID_MAX_CONCURRENT_TASKS environment variable, and by default is set to 5.

sleep pydantic-field

sleep = 0.1

amount to async sleep after completion of a task

broker_url pydantic-field

broker_url = ''

fluid.scheduler.consumer.TaskDispatcher

TaskDispatcher()

Bases: Dispatcher[TaskRun]

The task dispatcher is responsible for dispatching task run messages

Source code in fluid/utils/dispatcher.py
def __init__(self) -> None:
    self._msg_handlers: defaultdict[str, dict[str, MessageHandlerType]] = (
        defaultdict(
            dict,
        )
    )

register_handler

register_handler(event, handler)

Register a handler for the given event

It is possible to register multiple handlers for the same event type by providing a different tag for each handler.

For example, to register two handlers for the event type foo:

dispatcher.register_handler("foo.first", handler1)
dispatcher.register_handler("foo.second", handler2)
PARAMETER DESCRIPTION
event

The event to register the handler for

TYPE: Event | str

handler

The handler to register

TYPE: MessageHandlerType

Source code in fluid/utils/dispatcher.py
def register_handler(
    self,
    event: Annotated[Event | str, Doc("The event to register the handler for")],
    handler: Annotated[MessageHandlerType, Doc("The handler to register")],
) -> MessageHandlerType | None:
    """Register a handler for the given event

    It is possible to register multiple handlers for the same event type by
    providing a different tag for each handler.

    For example, to register two handlers for the event type `foo`:

    ```python
    dispatcher.register_handler("foo.first", handler1)
    dispatcher.register_handler("foo.second", handler2)
    ```
    """
    event = Event.from_string_or_event(event)
    previous = self._msg_handlers[event.type].get(event.tag)
    self._msg_handlers[event.type][event.tag] = handler
    return previous

unregister_handler

unregister_handler(event)

Unregister a handler for the given event

It returns the handler that was unregistered or None if no handler was registered for the given event.

PARAMETER DESCRIPTION
event

The event to unregister the handler

TYPE: Event | str

Source code in fluid/utils/dispatcher.py
def unregister_handler(
    self, event: Annotated[Event | str, Doc("The event to unregister the handler")]
) -> MessageHandlerType | None:
    """Unregister a handler for the given event

    It returns the handler that was unregistered or `None` if no handler was
    registered for the given event.
    """
    event = Event.from_string_or_event(event)
    return self._msg_handlers[event.type].pop(event.tag, None)

get_handlers

get_handlers(message)

Get all event handlers for the given message

This method returns a dictionary of all handlers registered for the given message type. If no handlers are registered for the message type, it returns None.

PARAMETER DESCRIPTION
message

The message to get the handlers for

TYPE: MessageType

Source code in fluid/utils/dispatcher.py
def get_handlers(
    self,
    message: Annotated[MessageType, Doc("The message to get the handlers for")],
) -> dict[str, MessageHandlerType] | None:
    """Get all event handlers for the given message

    This method returns a dictionary of all handlers registered for the given
    message type. If no handlers are registered for the message type, it returns
    `None`.
    """
    event_type = self.event_type(message)
    return self._msg_handlers.get(event_type)

dispatch

dispatch(message)

dispatch the message to all handlers

It returns the number of handlers that were called

Source code in fluid/utils/dispatcher.py
def dispatch(self, message: MessageType) -> int:
    """dispatch the message to all handlers

    It returns the number of handlers that were called
    """
    handlers = self.get_handlers(message)
    if handlers:
        for handler in handlers.values():
            handler(message)
    return len(handlers or ())

event_type

event_type(message)
Source code in fluid/scheduler/consumer.py
def event_type(self, message: TaskRun) -> str:
    return message.state