Skip to content

Workers

Workers are the main building block for asynchronous programming with aio-fluid. They are responsible for running asynchronous tasks and managing their lifecycle. There are several worker classes which can be imported from fluid.utils.worker, and they aall derive from the abstract fluid.utils.worker.Worker class.

from fluid.utils.worker import Worker

fluid.utils.worker.WorkerState

Bases: StrEnum

The state of a worker

INIT class-attribute instance-attribute

INIT = auto()

RUNNING class-attribute instance-attribute

RUNNING = auto()

STOPPING class-attribute instance-attribute

STOPPING = auto()

STOPPED class-attribute instance-attribute

STOPPED = auto()

FORCE_STOPPED class-attribute instance-attribute

FORCE_STOPPED = auto()

fluid.utils.worker.Worker

Worker(
    name="", stopping_grace_period=STOPPING_GRACE_PERIOD
)

Bases: ABC

An Abstract Worker that can be started and stopped

All other workers derive from this class.

PARAMETER DESCRIPTION
name

Worker's name, if not provided it is evaluated from the class name

TYPE: str DEFAULT: ''

stopping_grace_period

Grace period in seconds to wait for workers to stop running when this worker is shutdown. It defaults to the FLUID_STOPPING_GRACE_PERIOD environment variable or 10 seconds.

TYPE: int DEFAULT: STOPPING_GRACE_PERIOD

Source code in fluid/utils/worker.py
def __init__(
    self,
    name: Annotated[
        str,
        Doc("Worker's name, if not provided it is evaluated from the class name"),
    ] = "",
    stopping_grace_period: Annotated[
        int,
        Doc(
            "Grace period in seconds to wait for workers to stop running "
            "when this worker is shutdown. "
            "It defaults to the `FLUID_STOPPING_GRACE_PERIOD` "
            "environment variable or 10 seconds."
        ),
    ] = settings.STOPPING_GRACE_PERIOD,
) -> None:
    self._worker_name: str = name or underscore(type(self).__name__)
    self._worker_state: WorkerState = WorkerState.INIT
    self._stopping_grace_period = stopping_grace_period
    self._worker_task_runner: WorkerTaskRunner | None = None

worker_state property

worker_state

The running state of the worker

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

has_started

has_started()
Source code in fluid/utils/worker.py
def has_started(self) -> bool:
    return self._worker_state != WorkerState.INIT

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._worker_state == WorkerState.RUNNING

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._worker_state == WorkerState.STOPPING

is_stopped

is_stopped()
Source code in fluid/utils/worker.py
def is_stopped(self) -> bool:
    return self._worker_state in (WorkerState.STOPPED, WorkerState.FORCE_STOPPED)

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    if self.is_running():
        self._worker_state = WorkerState.STOPPING

after_shutdown

after_shutdown(reason, code)

Called after shutdown of worker

By default it does nothing, but can be overriden to do something such as exit the process.

Source code in fluid/utils/worker.py
def after_shutdown(self, reason: str, code: int) -> None:  # noqa: B027
    """Called after shutdown of worker

    By default it does nothing, but can be overriden to do something such as
    exit the process.
    """

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    return {"stopping": self.is_stopping(), "running": self.is_running()}

on_startup async

on_startup()

Called when the worker starts running

Use this function to initialize other async resources connected with the worker

Source code in fluid/utils/worker.py
async def on_startup(self) -> None:  # noqa: B027
    """Called when the worker starts running

    Use this function to initialize other async resources connected with the worker
    """

on_shutdown async

on_shutdown()

called after the worker stopped running

Use this function to cleanup resources connected with the worker

Source code in fluid/utils/worker.py
async def on_shutdown(self) -> None:  # noqa: B027
    """called after the worker stopped running

    Use this function to cleanup resources connected with the worker
    """

startup async

startup()

start the worker

This method creates a task to run the worker.

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the worker

    This method creates a task to run the worker.
    """
    if self.has_started():
        raise WorkerStartError(
            "worker %s already started: %s", self.worker_name, self._worker_state
        )
    else:
        self._worker_task_runner = await WorkerTaskRunner.start(self)

shutdown async

shutdown()

Shutdown a running worker and wait for it to stop

This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """Shutdown a running worker and wait for it to stop

    This method will try to gracefully stop the worker and wait for it to stop.
    If the worker does not stop in the grace period, it will force shutdown
    by cancelling the task.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.shutdown()

wait_for_shutdown async

wait_for_shutdown()

Wait for the worker to stop

This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.

Source code in fluid/utils/worker.py
async def wait_for_shutdown(self) -> None:
    """Wait for the worker to stop

    This method will wait for the worker to stop running, but doesn't
    try to gracefully stop it nor force shutdown.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.wait_for_shutdown()

workers

workers()

An iterator of workers in this worker

Source code in fluid/utils/worker.py
def workers(self) -> Iterator[Worker]:
    """An iterator of workers in this worker"""
    yield self

run abstractmethod async

run()

run the worker

This is the only abstract method and that needs implementing. It is the coroutine that mantains the worker running.

Source code in fluid/utils/worker.py
@abstractmethod
async def run(self) -> None:
    """run the worker

    This is the only abstract method and that needs implementing.
    It is the coroutine that mantains the worker running.
    """

fluid.utils.worker.WorkerFunction

WorkerFunction(run_function, heartbeat=0, name='')

Bases: Worker

A Worker that runs and wait a coroutine function in a loop

PARAMETER DESCRIPTION
run_function

The coroutine function tuo run and await at each iteration of the worker loop

TYPE: Callable[[], Awaitable[None]]

heartbeat

The time to wait between each coroutine function run

TYPE: float | int DEFAULT: 0

name

Worker's name, if not provided it is evaluated from the class name

TYPE: str DEFAULT: ''

Source code in fluid/utils/worker.py
def __init__(
    self,
    run_function: Annotated[
        Callable[[], Awaitable[None]],
        Doc(
            "The coroutine function tuo run and await at each iteration "
            "of the worker loop"
        ),
    ],
    heartbeat: Annotated[
        float | int, Doc("The time to wait between each coroutine function run")
    ] = 0,
    name: Annotated[
        str,
        Doc("Worker's name, if not provided it is evaluated from the class name"),
    ] = "",
) -> None:
    super().__init__(name=name)
    self._run_function = run_function
    self._heartbeat = heartbeat

worker_state property

worker_state

The running state of the worker

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

has_started

has_started()
Source code in fluid/utils/worker.py
def has_started(self) -> bool:
    return self._worker_state != WorkerState.INIT

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._worker_state == WorkerState.RUNNING

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._worker_state == WorkerState.STOPPING

is_stopped

is_stopped()
Source code in fluid/utils/worker.py
def is_stopped(self) -> bool:
    return self._worker_state in (WorkerState.STOPPED, WorkerState.FORCE_STOPPED)

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    if self.is_running():
        self._worker_state = WorkerState.STOPPING

after_shutdown

after_shutdown(reason, code)

Called after shutdown of worker

By default it does nothing, but can be overriden to do something such as exit the process.

Source code in fluid/utils/worker.py
def after_shutdown(self, reason: str, code: int) -> None:  # noqa: B027
    """Called after shutdown of worker

    By default it does nothing, but can be overriden to do something such as
    exit the process.
    """

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    return {"stopping": self.is_stopping(), "running": self.is_running()}

on_startup async

on_startup()

Called when the worker starts running

Use this function to initialize other async resources connected with the worker

Source code in fluid/utils/worker.py
async def on_startup(self) -> None:  # noqa: B027
    """Called when the worker starts running

    Use this function to initialize other async resources connected with the worker
    """

on_shutdown async

on_shutdown()

called after the worker stopped running

Use this function to cleanup resources connected with the worker

Source code in fluid/utils/worker.py
async def on_shutdown(self) -> None:  # noqa: B027
    """called after the worker stopped running

    Use this function to cleanup resources connected with the worker
    """

startup async

startup()

start the worker

This method creates a task to run the worker.

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the worker

    This method creates a task to run the worker.
    """
    if self.has_started():
        raise WorkerStartError(
            "worker %s already started: %s", self.worker_name, self._worker_state
        )
    else:
        self._worker_task_runner = await WorkerTaskRunner.start(self)

shutdown async

shutdown()

Shutdown a running worker and wait for it to stop

This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """Shutdown a running worker and wait for it to stop

    This method will try to gracefully stop the worker and wait for it to stop.
    If the worker does not stop in the grace period, it will force shutdown
    by cancelling the task.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.shutdown()

wait_for_shutdown async

wait_for_shutdown()

Wait for the worker to stop

This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.

Source code in fluid/utils/worker.py
async def wait_for_shutdown(self) -> None:
    """Wait for the worker to stop

    This method will wait for the worker to stop running, but doesn't
    try to gracefully stop it nor force shutdown.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.wait_for_shutdown()

workers

workers()

An iterator of workers in this worker

Source code in fluid/utils/worker.py
def workers(self) -> Iterator[Worker]:
    """An iterator of workers in this worker"""
    yield self

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    while self.is_running():
        await self._run_function()
        await asyncio.sleep(self._heartbeat)

fluid.utils.worker.QueueConsumer

QueueConsumer(name='')

Bases: Worker, MessageProducer[MessageType]

An Abstract Worker that can receive messages

This worker can receive messages but not consume them.

PARAMETER DESCRIPTION
name

Worker's name, if not provided it is evaluated from the class name

TYPE: str DEFAULT: ''

Source code in fluid/utils/worker.py
def __init__(
    self,
    name: Annotated[
        str,
        Doc("Worker's name, if not provided it is evaluated from the class name"),
    ] = "",
) -> None:
    super().__init__(name=name)
    self._queue: asyncio.Queue[MessageType | None] = asyncio.Queue()

worker_state property

worker_state

The running state of the worker

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

has_started

has_started()
Source code in fluid/utils/worker.py
def has_started(self) -> bool:
    return self._worker_state != WorkerState.INIT

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._worker_state == WorkerState.RUNNING

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._worker_state == WorkerState.STOPPING

is_stopped

is_stopped()
Source code in fluid/utils/worker.py
def is_stopped(self) -> bool:
    return self._worker_state in (WorkerState.STOPPED, WorkerState.FORCE_STOPPED)

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    if self.is_running():
        self._worker_state = WorkerState.STOPPING

after_shutdown

after_shutdown(reason, code)

Called after shutdown of worker

By default it does nothing, but can be overriden to do something such as exit the process.

Source code in fluid/utils/worker.py
def after_shutdown(self, reason: str, code: int) -> None:  # noqa: B027
    """Called after shutdown of worker

    By default it does nothing, but can be overriden to do something such as
    exit the process.
    """

on_startup async

on_startup()

Called when the worker starts running

Use this function to initialize other async resources connected with the worker

Source code in fluid/utils/worker.py
async def on_startup(self) -> None:  # noqa: B027
    """Called when the worker starts running

    Use this function to initialize other async resources connected with the worker
    """

on_shutdown async

on_shutdown()

called after the worker stopped running

Use this function to cleanup resources connected with the worker

Source code in fluid/utils/worker.py
async def on_shutdown(self) -> None:  # noqa: B027
    """called after the worker stopped running

    Use this function to cleanup resources connected with the worker
    """

startup async

startup()

start the worker

This method creates a task to run the worker.

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the worker

    This method creates a task to run the worker.
    """
    if self.has_started():
        raise WorkerStartError(
            "worker %s already started: %s", self.worker_name, self._worker_state
        )
    else:
        self._worker_task_runner = await WorkerTaskRunner.start(self)

shutdown async

shutdown()

Shutdown a running worker and wait for it to stop

This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """Shutdown a running worker and wait for it to stop

    This method will try to gracefully stop the worker and wait for it to stop.
    If the worker does not stop in the grace period, it will force shutdown
    by cancelling the task.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.shutdown()

wait_for_shutdown async

wait_for_shutdown()

Wait for the worker to stop

This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.

Source code in fluid/utils/worker.py
async def wait_for_shutdown(self) -> None:
    """Wait for the worker to stop

    This method will wait for the worker to stop running, but doesn't
    try to gracefully stop it nor force shutdown.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.wait_for_shutdown()

workers

workers()

An iterator of workers in this worker

Source code in fluid/utils/worker.py
def workers(self) -> Iterator[Worker]:
    """An iterator of workers in this worker"""
    yield self

run abstractmethod async

run()

run the worker

This is the only abstract method and that needs implementing. It is the coroutine that mantains the worker running.

Source code in fluid/utils/worker.py
@abstractmethod
async def run(self) -> None:
    """run the worker

    This is the only abstract method and that needs implementing.
    It is the coroutine that mantains the worker running.
    """

get_message async

get_message(timeout=0.5)

Get the next message from the queue

Source code in fluid/utils/worker.py
async def get_message(self, timeout: float = 0.5) -> MessageType | None:
    """Get the next message from the queue"""
    try:
        async with asyncio.timeout(timeout):
            return await self._queue.get()
    except asyncio.TimeoutError:
        return None
    except (asyncio.CancelledError, RuntimeError):
        if not self.is_stopping():
            raise
    return None

queue_size

queue_size()

Get the size of the queue

Source code in fluid/utils/worker.py
def queue_size(self) -> int:
    """Get the size of the queue"""
    return self._queue.qsize()

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    status = await super().status()
    status.update(queue_size=self.queue_size())
    return status

send

send(message)

Send a message into the worker

Source code in fluid/utils/worker.py
def send(self, message: MessageType | None) -> None:
    """Send a message into the worker"""
    self._queue.put_nowait(message)

fluid.utils.worker.QueueConsumerWorker

QueueConsumerWorker(on_message, name='')

Bases: QueueConsumer[MessageType]

A Worker that can receive and consume messages

PARAMETER DESCRIPTION
on_message

The async callback to call when a message is received

TYPE: Callable[[MessageType], Awaitable[None]]

name

Worker's name, if not provided it is evaluated from the class name

TYPE: str DEFAULT: ''

Source code in fluid/utils/worker.py
def __init__(
    self,
    on_message: Annotated[
        Callable[[MessageType], Awaitable[None]],
        Doc("The async callback to call when a message is received"),
    ],
    name: Annotated[
        str,
        Doc("Worker's name, if not provided it is evaluated from the class name"),
    ] = "",
) -> None:
    super().__init__(name=name)
    self.on_message = on_message

worker_state property

worker_state

The running state of the worker

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

on_message instance-attribute

on_message = on_message

send

send(message)

Send a message into the worker

Source code in fluid/utils/worker.py
def send(self, message: MessageType | None) -> None:
    """Send a message into the worker"""
    self._queue.put_nowait(message)

has_started

has_started()
Source code in fluid/utils/worker.py
def has_started(self) -> bool:
    return self._worker_state != WorkerState.INIT

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._worker_state == WorkerState.RUNNING

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._worker_state == WorkerState.STOPPING

is_stopped

is_stopped()
Source code in fluid/utils/worker.py
def is_stopped(self) -> bool:
    return self._worker_state in (WorkerState.STOPPED, WorkerState.FORCE_STOPPED)

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    if self.is_running():
        self._worker_state = WorkerState.STOPPING

after_shutdown

after_shutdown(reason, code)

Called after shutdown of worker

By default it does nothing, but can be overriden to do something such as exit the process.

Source code in fluid/utils/worker.py
def after_shutdown(self, reason: str, code: int) -> None:  # noqa: B027
    """Called after shutdown of worker

    By default it does nothing, but can be overriden to do something such as
    exit the process.
    """

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    status = await super().status()
    status.update(queue_size=self.queue_size())
    return status

on_startup async

on_startup()

Called when the worker starts running

Use this function to initialize other async resources connected with the worker

Source code in fluid/utils/worker.py
async def on_startup(self) -> None:  # noqa: B027
    """Called when the worker starts running

    Use this function to initialize other async resources connected with the worker
    """

on_shutdown async

on_shutdown()

called after the worker stopped running

Use this function to cleanup resources connected with the worker

Source code in fluid/utils/worker.py
async def on_shutdown(self) -> None:  # noqa: B027
    """called after the worker stopped running

    Use this function to cleanup resources connected with the worker
    """

startup async

startup()

start the worker

This method creates a task to run the worker.

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the worker

    This method creates a task to run the worker.
    """
    if self.has_started():
        raise WorkerStartError(
            "worker %s already started: %s", self.worker_name, self._worker_state
        )
    else:
        self._worker_task_runner = await WorkerTaskRunner.start(self)

shutdown async

shutdown()

Shutdown a running worker and wait for it to stop

This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """Shutdown a running worker and wait for it to stop

    This method will try to gracefully stop the worker and wait for it to stop.
    If the worker does not stop in the grace period, it will force shutdown
    by cancelling the task.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.shutdown()

wait_for_shutdown async

wait_for_shutdown()

Wait for the worker to stop

This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.

Source code in fluid/utils/worker.py
async def wait_for_shutdown(self) -> None:
    """Wait for the worker to stop

    This method will wait for the worker to stop running, but doesn't
    try to gracefully stop it nor force shutdown.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.wait_for_shutdown()

workers

workers()

An iterator of workers in this worker

Source code in fluid/utils/worker.py
def workers(self) -> Iterator[Worker]:
    """An iterator of workers in this worker"""
    yield self

get_message async

get_message(timeout=0.5)

Get the next message from the queue

Source code in fluid/utils/worker.py
async def get_message(self, timeout: float = 0.5) -> MessageType | None:
    """Get the next message from the queue"""
    try:
        async with asyncio.timeout(timeout):
            return await self._queue.get()
    except asyncio.TimeoutError:
        return None
    except (asyncio.CancelledError, RuntimeError):
        if not self.is_stopping():
            raise
    return None

queue_size

queue_size()

Get the size of the queue

Source code in fluid/utils/worker.py
def queue_size(self) -> int:
    """Get the size of the queue"""
    return self._queue.qsize()

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    while not self.is_stopping():
        message = await self.get_message()
        if message is not None:
            await self.on_message(message)

fluid.utils.worker.AsyncConsumer

AsyncConsumer(dispatcher, name='')

Bases: QueueConsumer[MessageType]

A Worker that can dispatch to async callbacks

PARAMETER DESCRIPTION
dispatcher

Async message dispatcher to dispatch messages

TYPE: AsyncDispatcher[MessageType]

name

Worker's name, if not provided it is evaluated from the class name

TYPE: str DEFAULT: ''

Source code in fluid/utils/worker.py
def __init__(
    self,
    dispatcher: Annotated[
        AsyncDispatcher[MessageType],
        Doc("Async message dispatcher to dispatch messages"),
    ],
    name: Annotated[
        str,
        Doc("Worker's name, if not provided it is evaluated from the class name"),
    ] = "",
) -> None:
    super().__init__(name)
    self.dispatcher: AsyncDispatcher[MessageType] = dispatcher

worker_state property

worker_state

The running state of the worker

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

dispatcher instance-attribute

dispatcher = dispatcher

send

send(message)

Send a message into the worker

Source code in fluid/utils/worker.py
def send(self, message: MessageType | None) -> None:
    """Send a message into the worker"""
    self._queue.put_nowait(message)

has_started

has_started()
Source code in fluid/utils/worker.py
def has_started(self) -> bool:
    return self._worker_state != WorkerState.INIT

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._worker_state == WorkerState.RUNNING

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._worker_state == WorkerState.STOPPING

is_stopped

is_stopped()
Source code in fluid/utils/worker.py
def is_stopped(self) -> bool:
    return self._worker_state in (WorkerState.STOPPED, WorkerState.FORCE_STOPPED)

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    if self.is_running():
        self._worker_state = WorkerState.STOPPING

after_shutdown

after_shutdown(reason, code)

Called after shutdown of worker

By default it does nothing, but can be overriden to do something such as exit the process.

Source code in fluid/utils/worker.py
def after_shutdown(self, reason: str, code: int) -> None:  # noqa: B027
    """Called after shutdown of worker

    By default it does nothing, but can be overriden to do something such as
    exit the process.
    """

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    status = await super().status()
    status.update(queue_size=self.queue_size())
    return status

on_startup async

on_startup()

Called when the worker starts running

Use this function to initialize other async resources connected with the worker

Source code in fluid/utils/worker.py
async def on_startup(self) -> None:  # noqa: B027
    """Called when the worker starts running

    Use this function to initialize other async resources connected with the worker
    """

on_shutdown async

on_shutdown()

called after the worker stopped running

Use this function to cleanup resources connected with the worker

Source code in fluid/utils/worker.py
async def on_shutdown(self) -> None:  # noqa: B027
    """called after the worker stopped running

    Use this function to cleanup resources connected with the worker
    """

startup async

startup()

start the worker

This method creates a task to run the worker.

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the worker

    This method creates a task to run the worker.
    """
    if self.has_started():
        raise WorkerStartError(
            "worker %s already started: %s", self.worker_name, self._worker_state
        )
    else:
        self._worker_task_runner = await WorkerTaskRunner.start(self)

shutdown async

shutdown()

Shutdown a running worker and wait for it to stop

This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """Shutdown a running worker and wait for it to stop

    This method will try to gracefully stop the worker and wait for it to stop.
    If the worker does not stop in the grace period, it will force shutdown
    by cancelling the task.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.shutdown()

wait_for_shutdown async

wait_for_shutdown()

Wait for the worker to stop

This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.

Source code in fluid/utils/worker.py
async def wait_for_shutdown(self) -> None:
    """Wait for the worker to stop

    This method will wait for the worker to stop running, but doesn't
    try to gracefully stop it nor force shutdown.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.wait_for_shutdown()

workers

workers()

An iterator of workers in this worker

Source code in fluid/utils/worker.py
def workers(self) -> Iterator[Worker]:
    """An iterator of workers in this worker"""
    yield self

get_message async

get_message(timeout=0.5)

Get the next message from the queue

Source code in fluid/utils/worker.py
async def get_message(self, timeout: float = 0.5) -> MessageType | None:
    """Get the next message from the queue"""
    try:
        async with asyncio.timeout(timeout):
            return await self._queue.get()
    except asyncio.TimeoutError:
        return None
    except (asyncio.CancelledError, RuntimeError):
        if not self.is_stopping():
            raise
    return None

queue_size

queue_size()

Get the size of the queue

Source code in fluid/utils/worker.py
def queue_size(self) -> int:
    """Get the size of the queue"""
    return self._queue.qsize()

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    while not self.is_stopping():
        message = await self.get_message()
        if message is not None:
            await self.dispatcher.dispatch(message)

fluid.utils.worker.Workers

Workers(
    *workers,
    name="",
    heartbeat=0.1,
    stopping_grace_period=STOPPING_GRACE_PERIOD
)

Bases: Worker

An worker managing several workers

PARAMETER DESCRIPTION
*workers

Workers to manage, they can also be added later via add_workers method

TYPE: Worker DEFAULT: ()

name

Worker's name, if not provided it is evaluated from the class name

TYPE: str DEFAULT: ''

heartbeat

The time to wait between each workers status check

TYPE: float | int DEFAULT: 0.1

stopping_grace_period

Grace period in seconds to wait for workers to stop running when this worker is shutdown. It defaults to the FLUID_STOPPING_GRACE_PERIOD environment variable or 10 seconds.

TYPE: int DEFAULT: STOPPING_GRACE_PERIOD

Source code in fluid/utils/worker.py
def __init__(
    self,
    *workers: Annotated[
        Worker,
        Doc(
            "Workers to manage, they can also be added later "
            "via `add_workers` method"
        ),
    ],
    name: Annotated[
        str,
        Doc("Worker's name, if not provided it is evaluated from the class name"),
    ] = "",
    heartbeat: Annotated[
        float | int,
        Doc("The time to wait between each workers status check"),
    ] = 0.1,
    stopping_grace_period: Annotated[
        int,
        Doc(
            "Grace period in seconds to wait for workers to stop running "
            "when this worker is shutdown. "
            "It defaults to the `FLUID_STOPPING_GRACE_PERIOD` "
            "environment variable or 10 seconds."
        ),
    ] = settings.STOPPING_GRACE_PERIOD,
) -> None:
    super().__init__(name, stopping_grace_period=stopping_grace_period)
    self._heartbeat = heartbeat
    self._workers: list[Worker] = []
    self.add_workers(*workers)

worker_state property

worker_state

The running state of the worker

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

has_started

has_started()
Source code in fluid/utils/worker.py
def has_started(self) -> bool:
    return self._worker_state != WorkerState.INIT

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._worker_state == WorkerState.RUNNING

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._worker_state == WorkerState.STOPPING

is_stopped

is_stopped()
Source code in fluid/utils/worker.py
def is_stopped(self) -> bool:
    return self._worker_state in (WorkerState.STOPPED, WorkerState.FORCE_STOPPED)

after_shutdown

after_shutdown(reason, code)

Called after shutdown of worker

By default it does nothing, but can be overriden to do something such as exit the process.

Source code in fluid/utils/worker.py
def after_shutdown(self, reason: str, code: int) -> None:  # noqa: B027
    """Called after shutdown of worker

    By default it does nothing, but can be overriden to do something such as
    exit the process.
    """

on_startup async

on_startup()

Called when the worker starts running

Use this function to initialize other async resources connected with the worker

Source code in fluid/utils/worker.py
async def on_startup(self) -> None:  # noqa: B027
    """Called when the worker starts running

    Use this function to initialize other async resources connected with the worker
    """

on_shutdown async

on_shutdown()

called after the worker stopped running

Use this function to cleanup resources connected with the worker

Source code in fluid/utils/worker.py
async def on_shutdown(self) -> None:  # noqa: B027
    """called after the worker stopped running

    Use this function to cleanup resources connected with the worker
    """

startup async

startup()

start the worker

This method creates a task to run the worker.

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the worker

    This method creates a task to run the worker.
    """
    if self.has_started():
        raise WorkerStartError(
            "worker %s already started: %s", self.worker_name, self._worker_state
        )
    else:
        self._worker_task_runner = await WorkerTaskRunner.start(self)

shutdown async

shutdown()

Shutdown a running worker and wait for it to stop

This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """Shutdown a running worker and wait for it to stop

    This method will try to gracefully stop the worker and wait for it to stop.
    If the worker does not stop in the grace period, it will force shutdown
    by cancelling the task.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.shutdown()

wait_for_shutdown async

wait_for_shutdown()

Wait for the worker to stop

This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.

Source code in fluid/utils/worker.py
async def wait_for_shutdown(self) -> None:
    """Wait for the worker to stop

    This method will wait for the worker to stop running, but doesn't
    try to gracefully stop it nor force shutdown.
    """
    if self._worker_task_runner is not None:
        await self._worker_task_runner.wait_for_shutdown()

add_workers

add_workers(*workers)

add workers to the workers

They can be added while the worker is running.

Source code in fluid/utils/worker.py
def add_workers(self, *workers: Worker) -> None:
    """add workers to the workers

    They can be added while the worker is running.
    """
    for worker in workers:
        if worker not in self._workers:
            self._workers.append(worker)

workers

workers()
Source code in fluid/utils/worker.py
def workers(self) -> Iterator[Worker]:
    return iter(self._workers)

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    super().gracefully_stop()
    for worker in self._workers:
        worker.gracefully_stop()

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    status_workers = await asyncio.gather(
        *[worker.status() for worker in self._workers],
    )
    return {
        worker.worker_name: status
        for worker, status in zip(self._workers, status_workers)
    }

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    while self.is_running():
        for worker in self._workers:
            if not worker.has_started():
                await worker.startup()
            if not worker.is_running():
                self.gracefully_stop()
                break
        await asyncio.sleep(self._heartbeat)
    await self._wait_for_workers()