Skip to content

Task Manager

It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskManager

fluid.scheduler.TaskManager

TaskManager(*, deps=None, config=None, **kwargs)

The task manager is the main class for managing tasks

Source code in fluid/scheduler/consumer.py
def __init__(
    self,
    *,
    deps: Any = None,
    config: TaskManagerConfig | None = None,
    **kwargs: Any,
) -> None:
    self.deps: Annotated[
        Any,
        Doc(
            """
            Dependencies for the task manager.

            Production applications requires global dependencies to be
            available to all tasks. This can be achieved by setting
            the `deps` attribute of the task manager to an object
            with the required dependencies.

            Each task can cast the dependencies to the required type.
            """
        ),
    ] = (
        deps if deps is not None else State()
    )
    self.config: Annotated[
        TaskManagerConfig, Doc("""Task manager configuration""")
    ] = config or TaskManagerConfig(**kwargs)
    self.dispatcher: Annotated[
        TaskDispatcher,
        Doc(
            """
            A dispatcher of [TaskRun][fluid.scheduler.TaskRun] events.

            Application can register handlers to listen for events
            happening during the lifecycle of a task run.
            """
        ),
    ] = TaskDispatcher()
    self.broker = TaskBroker.from_url(self.config.broker_url)
    self._stack = AsyncExitStack()

deps instance-attribute

deps = deps if deps is not None else State()

Dependencies for the task manager.

Production applications requires global dependencies to be available to all tasks. This can be achieved by setting the deps attribute of the task manager to an object with the required dependencies.

Each task can cast the dependencies to the required type.

config instance-attribute

config = config or TaskManagerConfig(**kwargs)

Task manager configuration

dispatcher instance-attribute

dispatcher = TaskDispatcher()

A dispatcher of TaskRun events.

Application can register handlers to listen for events happening during the lifecycle of a task run.

broker instance-attribute

broker = from_url(broker_url)

registry property

registry

The task registry

type property

type

enter_async_context async

enter_async_context(cm)
Source code in fluid/scheduler/consumer.py
async def enter_async_context(self, cm: Any) -> Any:
    return await self._stack.enter_async_context(cm)

execute async

execute(task, **params)

Execute a task and wait for it to finish

Source code in fluid/scheduler/consumer.py
async def execute(self, task: Task | str, **params: Any) -> TaskRun:
    """Execute a task and wait for it to finish"""
    task_run = self.create_task_run(task, **params)
    await task_run.execute()
    return task_run

on_shutdown async

on_shutdown()
Source code in fluid/scheduler/consumer.py
async def on_shutdown(self) -> None:
    await self.broker.close()

execute_sync

execute_sync(task, **params)
Source code in fluid/scheduler/consumer.py
def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:
    return asyncio.run(self._execute_and_exit(task, **params))

register_task

register_task(task)

Register a task with the task manager

Only tasks registered can be executed by a task manager

Source code in fluid/scheduler/consumer.py
def register_task(self, task: Task) -> None:
    """Register a task with the task manager

    Only tasks registered can be executed by a task manager
    """
    self.broker.register_task(task)

queue async

queue(task, priority=None, **params)

Queue a task for execution

This methods fires two events:

  • init: when the task run is created
  • queued: after the task is queued
Source code in fluid/scheduler/consumer.py
async def queue(
    self,
    task: str | Task,
    priority: TaskPriority | None = None,
    **params: Any,
) -> TaskRun:
    """Queue a task for execution

    This methods fires two events:

    - init: when the task run is created
    - queued: after the task is queued
    """
    task_run = self.create_task_run(task, priority=priority, **params)
    self.dispatcher.dispatch(task_run)
    task_run.set_state(TaskState.queued)
    await self.broker.queue_task(task_run)
    return task_run

create_task_run

create_task_run(task, run_id='', priority=None, **params)

Create a TaskRun in init state

Source code in fluid/scheduler/consumer.py
def create_task_run(
    self,
    task: str | Task,
    run_id: str = "",
    priority: TaskPriority | None = None,
    **params: Any,
) -> TaskRun:
    """Create a TaskRun in `init` state"""
    task = self.broker.task_from_registry(task)
    run_id = run_id or self.broker.new_uuid()
    return TaskRun(
        id=run_id,
        task=task,
        priority=priority or task.priority,
        params=task.params_model(**params),
        task_manager=self,
    )

register_from_module

register_from_module(module)
Source code in fluid/scheduler/consumer.py
def register_from_module(self, module: Any) -> None:
    for name in dir(module):
        if name.startswith("_"):
            continue
        if isinstance(obj := getattr(module, name), Task):
            self.register_task(obj)

register_from_dict

register_from_dict(data)
Source code in fluid/scheduler/consumer.py
def register_from_dict(self, data: dict) -> None:
    for name, obj in data.items():
        if name.startswith("_"):
            continue
        if isinstance(obj, Task):
            self.register_task(obj)

register_async_handler

register_async_handler(event, handler)

Register an async handler for a given event

This method is a no op for a TaskManager that is not a worker

Source code in fluid/scheduler/consumer.py
def register_async_handler(self, event: str, handler: AsyncHandler) -> None:
    """Register an async handler for a given event

    This method is a no op for a TaskManager that is not a worker
    """

unregister_async_handler

unregister_async_handler(event)

Unregister an async handler for a given event

This method is a no op for a TaskManager that is not a worker

Source code in fluid/scheduler/consumer.py
def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None:
    """Unregister an async handler for a given event

    This method is a no op for a TaskManager that is not a worker
    """
    return None

fluid.scheduler.TaskManagerConfig pydantic-model

Bases: BaseModel

Task manager configuration

Fields:

schedule_tasks pydantic-field

schedule_tasks = True

consume_tasks pydantic-field

consume_tasks = True

Consume tasks or sleep

max_concurrent_tasks pydantic-field

max_concurrent_tasks = MAX_CONCURRENT_TASKS

The number of coroutine workers consuming tasks. Each worker consumes one task at a time, therefore, this number is the maximum number of tasks that can run concurrently.It can be configured via the FLUID_MAX_CONCURRENT_TASKS environment variable, and by default is set to 5.

sleep pydantic-field

sleep = 0.1

amount to async sleep after completion of a task

broker_url pydantic-field

broker_url = ''

fluid.scheduler.consumer.TaskDispatcher

TaskDispatcher()

Bases: Dispatcher[TaskRun]

The task dispatcher is responsible for dispatching task run messages

Source code in fluid/utils/dispatcher.py
def __init__(self) -> None:
    self._msg_handlers: defaultdict[str, dict[str, MessageHandlerType]] = (
        defaultdict(
            dict,
        )
    )

register_handler

register_handler(event, handler)
Source code in fluid/utils/dispatcher.py
def register_handler(
    self,
    event: Event | str,
    handler: MessageHandlerType,
) -> MessageHandlerType | None:
    event = Event.from_string_or_event(event)
    previous = self._msg_handlers[event.type].get(event.tag)
    self._msg_handlers[event.type][event.tag] = handler
    return previous

unregister_handler

unregister_handler(event)
Source code in fluid/utils/dispatcher.py
def unregister_handler(self, event: Event | str) -> MessageHandlerType | None:
    event = Event.from_string_or_event(event)
    return self._msg_handlers[event.type].pop(event.tag, None)

get_handlers

get_handlers(message)
Source code in fluid/utils/dispatcher.py
def get_handlers(
    self,
    message: MessageType,
) -> dict[str, MessageHandlerType] | None:
    message_type = str(self.message_type(message))
    return self._msg_handlers.get(message_type)

dispatch

dispatch(message)

dispatch the message

Source code in fluid/utils/dispatcher.py
def dispatch(self, message: MessageType) -> int:
    """dispatch the message"""
    handlers = self.get_handlers(message)
    if handlers:
        for handler in handlers.values():
            handler(message)
    return len(handlers or ())

message_type

message_type(message)
Source code in fluid/scheduler/consumer.py
def message_type(self, message: TaskRun) -> str:
    return message.state