Skip to content

Task Consumer

The task consumer is a TaskManager which is also a Workers that consumes tasks from the task queue and executes them. It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskConsumer

fluid.scheduler.TaskConsumer

TaskConsumer(**config)

Bases: TaskManager, Workers

The Task Consumer is a Task Manager responsible for consuming tasks from a task queue

Source code in fluid/scheduler/consumer.py
def __init__(self, **config: Any) -> None:
    super().__init__(**config)
    Workers.__init__(self)
    self._async_dispatcher_worker = AsyncConsumer(AsyncTaskDispatcher())
    self._concurrent_tasks: dict[str, dict[str, TaskRun]] = defaultdict(dict)
    self._task_to_queue: deque[str | Task] = deque()
    self._queue_tasks_worker = WorkerFunction(
        self._queue_task, name="queue-task-worker"
    )
    self.add_workers(self._queue_tasks_worker)
    self.add_workers(self._async_dispatcher_worker)
    for i in range(self.config.max_concurrent_tasks):
        worker_name = f"task-worker-{i+1}"
        self.add_workers(
            WorkerFunction(
                partial(self._consume_tasks, worker_name), name=worker_name
            )
        )

worker_state property

worker_state

The running state of the worker

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

deps instance-attribute

deps = deps if deps is not None else State()

Dependencies for the task manager.

Production applications requires global dependencies to be available to all tasks. This can be achieved by setting the deps attribute of the task manager to an object with the required dependencies.

Each task can cast the dependencies to the required type.

config instance-attribute

config = config or TaskManagerConfig(**kwargs)

Task manager configuration

dispatcher instance-attribute

dispatcher = TaskDispatcher()

A dispatcher of TaskRun events.

Application can register handlers to listen for events happening during the lifecycle of a task run.

broker instance-attribute

broker = from_url(broker_url)

registry property

registry

The task registry

type property

type

The type of the task manager

num_concurrent_tasks property

num_concurrent_tasks

The number of concurrent_tasks running in the consumer

has_started

has_started()
Source code in fluid/utils/worker.py
def has_started(self) -> bool:
    return self._worker_state != WorkerState.INIT

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._worker_state == WorkerState.RUNNING

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._worker_state == WorkerState.STOPPING

is_stopped

is_stopped()
Source code in fluid/utils/worker.py
def is_stopped(self) -> bool:
    return self._worker_state in (WorkerState.STOPPED, WorkerState.FORCE_STOPPED)

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    super().gracefully_stop()
    for worker in self._workers:
        worker.gracefully_stop()

after_shutdown

after_shutdown(reason, code)

Called after shutdown of worker

By default it does nothing, but can be overriden to do something such as exit the process.

Source code in fluid/utils/worker.py
def after_shutdown(self, reason: str, code: int) -> None:  # noqa: B027
    """Called after shutdown of worker

    By default it does nothing, but can be overriden to do something such as
    exit the process.
    """

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    status_workers = await asyncio.gather(
        *[worker.status() for worker in self._workers],
    )
    return {
        worker.worker_name: status
        for worker, status in zip(self._workers, status_workers)
    }

on_startup async

on_startup()
Source code in fluid/scheduler/consumer.py
async def on_startup(self) -> None:
    await self.__aenter__()

on_shutdown async

on_shutdown()
Source code in fluid/scheduler/consumer.py
async def on_shutdown(self) -> None:
    await self.__aexit__(None, None, None)

startup async

startup()

start the worker

This method creates a task to run the worker.

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the worker

    This method creates a task to run the worker.
    """
    if self.has_started():
        raise WorkerStartError(
            "worker %s already started: %s", self.worker_name, self._worker_state
        )
    else:
        self._worker_task_runner = await WorkerTaskRunner.start(self)

shutdown async

shutdown()

Shutdown a running worker and wait for it to stop

This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """Shutdown a running worker and wait for it to stop

    This method will try to gracefully stop the worker and wait for it to stop.
    If the worker does not stop in the grace period, it will force shutdown
    by cancelling the task.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.shutdown()

wait_for_shutdown async

wait_for_shutdown()

Wait for the worker to stop

This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.

Source code in fluid/utils/worker.py
async def wait_for_shutdown(self) -> None:
    """Wait for the worker to stop

    This method will wait for the worker to stop running, but doesn't
    try to gracefully stop it nor force shutdown.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.wait_for_shutdown()

workers

workers()
Source code in fluid/utils/worker.py
def workers(self) -> Iterator[Worker]:
    return iter(self._workers)

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    while self.is_running():
        for worker in self._workers:
            if not worker.has_started():
                await worker.startup()
            if not worker.is_running():
                self.gracefully_stop()
                break
        await asyncio.sleep(self._heartbeat)
    await self._wait_for_workers()

add_workers

add_workers(*workers)

add workers to the workers

They can be added while the worker is running.

Source code in fluid/utils/worker.py
def add_workers(self, *workers: Worker) -> None:
    """add workers to the workers

    They can be added while the worker is running.
    """
    for worker in workers:
        if worker not in self._workers:
            self._workers.append(worker)

add_async_context_manager

add_async_context_manager(cm)

Add an async context manager to the task manager

These context managers are entered when the task manager starts

Source code in fluid/scheduler/consumer.py
def add_async_context_manager(self, cm: Any) -> None:
    """Add an async context manager to the task manager

    These context managers are entered when the task manager starts
    """
    self._async_contexts.append(cm)

register_task

register_task(task)

Register a task with the task manager

PARAMETER DESCRIPTION
task

Task to register

TYPE: Task

Source code in fluid/scheduler/consumer.py
def register_task(self, task: Annotated[Task, Doc("Task to register")]) -> None:
    """Register a task with the task manager"""
    self.broker.register_task(task)

execute async

execute(task, *, run_id='', priority=None, **params)

Execute a task and wait for it to finish

PARAMETER DESCRIPTION
task

The task or task name, if a task name it must be registered with the task manager.

TYPE: str | Task

run_id

Unique ID for the task run. If not provided a new UUID is generated.

TYPE: str DEFAULT: ''

priority

Override the default task priority if provided

TYPE: TaskPriority | None DEFAULT: None

**params

The optional parameters for the task run. They must match the task params model

TYPE: Any DEFAULT: {}

Source code in fluid/scheduler/consumer.py
async def execute(
    self,
    task: Annotated[
        str | Task,
        Doc(
            "The task or task name,"
            " if a task name it must be registered with the task manager."
        ),
    ],
    *,
    run_id: Annotated[
        str,
        Doc("Unique ID for the task run. If not provided a new UUID is generated."),
    ] = "",
    priority: Annotated[
        TaskPriority | None, Doc("Override the default task priority if provided")
    ] = None,
    **params: Annotated[
        Any,
        Doc(
            "The optional parameters for the task run. "
            "They must match the task params model"
        ),
    ],
) -> TaskRun:
    """Execute a task and wait for it to finish"""
    task_run = self.create_task_run(
        task,
        run_id=run_id,
        priority=priority,
        **params,
    )
    await task_run.execute()
    return task_run

execute_sync

execute_sync(task, *, run_id='', priority=None, **params)

Execute a task synchronously

This method is a blocking method that should be used in a synchronous context.

PARAMETER DESCRIPTION
task

The task or task name, if a task name it must be registered with the task manager.

TYPE: str | Task

run_id

Unique ID for the task run. If not provided a new UUID is generated.

TYPE: str DEFAULT: ''

priority

Override the default task priority if provided

TYPE: TaskPriority | None DEFAULT: None

**params

The optional parameters for the task run. They must match the task params model

TYPE: Any DEFAULT: {}

Source code in fluid/scheduler/consumer.py
def execute_sync(
    self,
    task: Annotated[
        str | Task,
        Doc(
            "The task or task name,"
            " if a task name it must be registered with the task manager."
        ),
    ],
    *,
    run_id: Annotated[
        str,
        Doc("Unique ID for the task run. If not provided a new UUID is generated."),
    ] = "",
    priority: Annotated[
        TaskPriority | None, Doc("Override the default task priority if provided")
    ] = None,
    **params: Annotated[
        Any,
        Doc(
            "The optional parameters for the task run. "
            "They must match the task params model"
        ),
    ],
) -> TaskRun:
    """Execute a task synchronously

    This method is a blocking method that should be used in a synchronous
    context.
    """
    return asyncio.run(
        self._execute_and_exit(
            task,
            run_id=run_id,
            priority=priority,
            **params,
        )
    )

queue async

queue(task, *, run_id='', priority=None, **params)

Queue a task for execution

This methods fires two events:

  • init: when the task run is created
  • queued: after the task is queued

It returns the TaskRun object

PARAMETER DESCRIPTION
task

The task or task name, if a task name it must be registered with the task manager.

TYPE: str | Task

run_id

Unique ID for the task run. If not provided a new UUID is generated.

TYPE: str DEFAULT: ''

priority

Override the default task priority if provided

TYPE: TaskPriority | None DEFAULT: None

**params

The optional parameters for the task run. They must match the task params model

TYPE: Any DEFAULT: {}

Source code in fluid/scheduler/consumer.py
async def queue(
    self,
    task: Annotated[
        str | Task,
        Doc(
            "The task or task name,"
            " if a task name it must be registered with the task manager."
        ),
    ],
    *,
    run_id: Annotated[
        str,
        Doc("Unique ID for the task run. If not provided a new UUID is generated."),
    ] = "",
    priority: Annotated[
        TaskPriority | None, Doc("Override the default task priority if provided")
    ] = None,
    **params: Annotated[
        Any,
        Doc(
            "The optional parameters for the task run. "
            "They must match the task params model"
        ),
    ],
) -> TaskRun:
    """Queue a task for execution

    This methods fires two events:

    - `init`: when the task run is created
    - `queued`: after the task is queued

    It returns the [TaskRun][fluid.scheduler.TaskRun] object
    """
    task_run = self.create_task_run(
        task,
        run_id=run_id,
        priority=priority,
        **params,
    )
    self.dispatcher.dispatch(task_run)
    task_run.set_state(TaskState.queued)
    await self.broker.queue_task(task_run)
    return task_run

create_task_run

create_task_run(
    task, *, run_id="", priority=None, **params
)

Create a TaskRun in init state

PARAMETER DESCRIPTION
task

The task or task name, if a task name it must be registered with the task manager.

TYPE: str | Task

run_id

Unique ID for the task run. If not provided a new UUID is generated.

TYPE: str DEFAULT: ''

priority

Override the default task priority if provided

TYPE: TaskPriority | None DEFAULT: None

**params

The optional parameters for the task run. They must match the task params model

TYPE: Any DEFAULT: {}

Source code in fluid/scheduler/consumer.py
def create_task_run(
    self,
    task: Annotated[
        str | Task,
        Doc(
            "The task or task name,"
            " if a task name it must be registered with the task manager."
        ),
    ],
    *,
    run_id: Annotated[
        str,
        Doc("Unique ID for the task run. If not provided a new UUID is generated."),
    ] = "",
    priority: Annotated[
        TaskPriority | None, Doc("Override the default task priority if provided")
    ] = None,
    **params: Annotated[
        Any,
        Doc(
            "The optional parameters for the task run. "
            "They must match the task params model"
        ),
    ],
) -> TaskRun:
    """Create a [TaskRun][fluid.scheduler.TaskRun] in `init` state"""
    task = self.broker.task_from_registry(task)
    run_id = run_id or self.broker.new_uuid()
    return TaskRun(
        id=run_id,
        task=task,
        priority=priority or task.priority,
        params=task.params_model(**params),
        task_manager=self,
    )

register_from_module

register_from_module(module)
PARAMETER DESCRIPTION
module

Python module with tasks implementations - can contain any object, only instances of Task are registered

TYPE: ModuleType

Source code in fluid/scheduler/consumer.py
def register_from_module(
    self,
    module: Annotated[
        ModuleType,
        Doc(
            "Python module with tasks implementations "
            "- can contain any object, only instances of Task are registered"
        ),
    ],
) -> None:
    for name in dir(module):
        if name.startswith("_"):
            continue
        if isinstance(obj := getattr(module, name), Task):
            self.register_task(obj)

register_from_dict

register_from_dict(data)
PARAMETER DESCRIPTION
data

Python dictionary with tasks implementations - can contain any object, only instances of Task are registered

TYPE: dict[str, Any]

Source code in fluid/scheduler/consumer.py
def register_from_dict(
    self,
    data: Annotated[
        dict[str, Any],
        Doc(
            "Python dictionary with tasks implementations "
            "- can contain any object, only instances of Task are registered"
        ),
    ],
) -> None:
    for name, obj in data.items():
        if name.startswith("_"):
            continue
        if isinstance(obj, Task):
            self.register_task(obj)

sync_queue

sync_queue(task)

Queue a task synchronously

Source code in fluid/scheduler/consumer.py
def sync_queue(self, task: str | Task) -> None:
    """Queue a task synchronously"""
    self._task_to_queue.appendleft(task)

num_concurrent_tasks_for

num_concurrent_tasks_for(task_name)

The number of concurrent tasks for a given task_name

Source code in fluid/scheduler/consumer.py
def num_concurrent_tasks_for(self, task_name: str) -> int:
    """The number of concurrent tasks for a given task_name"""
    return len(self._concurrent_tasks[task_name])

queue_and_wait async

queue_and_wait(task, *, timeout=None, **params)

Queue a task and wait for it to finish

Source code in fluid/scheduler/consumer.py
async def queue_and_wait(
    self, task: str | Task, *, timeout: int | None = None, **params: Any
) -> TaskRun:
    """Queue a task and wait for it to finish"""
    with TaskRunWaiter(self) as waiter:
        task_run = await self.queue(task, **params)
        return await waiter.wait(task_run, timeout=timeout)

register_async_handler

register_async_handler(event, handler)
Source code in fluid/scheduler/consumer.py
def register_async_handler(self, event: Event | str, handler: AsyncHandler) -> None:
    event = Event.from_string_or_event(event)
    self.dispatcher.register_handler(
        f"{event.type}.async_dispatch",
        self._async_dispatcher_worker.send,
    )
    self._async_dispatcher_worker.dispatcher.register_handler(event, handler)

unregister_async_handler

unregister_async_handler(event)
Source code in fluid/scheduler/consumer.py
def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None:
    return self._async_dispatcher_worker.dispatcher.unregister_handler(event)