Workers¶
Workers are the main building block for asynchronous programming with aio-fluid. They are responsible for running asynchronous tasks and managing their lifecycle.
There are several worker classes which can be imported from fluid.utils.worker, and they aall derive from the abstract fluid.utils.worker.Worker class.
fluid.utils.worker.WorkerState
¶
Bases: StrEnum
The lifecycle state of a Worker.
INIT
class-attribute
instance-attribute
¶
Worker has been created but not yet started.
STOPPING
class-attribute
instance-attribute
¶
Graceful stop requested; run should exit at its next safe point.
STOPPED
class-attribute
instance-attribute
¶
Worker exited cleanly after a graceful stop.
FORCE_STOPPED
class-attribute
instance-attribute
¶
Worker was cancelled because it did not exit within the grace period.
fluid.utils.worker.Worker
¶
Bases: ABC
Abstract base class for all workers.
A worker encapsulates a long-running async task with a managed lifecycle.
Subclasses implement run, which is called
once the worker is started and should loop until
is_running returns False.
Use startup to start the worker as an asyncio task, and shutdown (or gracefully_stop + wait_for_shutdown) to stop it.
Override on_startup and on_shutdown to initialise and clean up async resources that the worker owns.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Worker's name, if not provided it is evaluated from the class name
TYPE:
|
stopping_grace_period
|
Grace period in seconds to wait for workers to stop running when this worker is shutdown. It defaults to the
TYPE:
|
Source code in fluid/utils/worker.py
has_started
¶
is_running
¶
is_stopping
¶
is_stopped
¶
gracefully_stop
¶
after_shutdown
¶
Called after shutdown of worker
By default it does nothing, but can be overriden to do something such as exit the process.
status
async
¶
on_startup
async
¶
Called when the worker starts running
Use this function to initialize other async resources connected with the worker
on_shutdown
async
¶
called after the worker stopped running
Use this function to cleanup resources connected with the worker
startup
async
¶
start the worker
This method creates a task to run the worker.
Source code in fluid/utils/worker.py
shutdown
async
¶
Shutdown a running worker and wait for it to stop
This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.
Source code in fluid/utils/worker.py
wait_for_shutdown
async
¶
Wait for the worker to stop
This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.
Source code in fluid/utils/worker.py
workers
¶
run
abstractmethod
async
¶
run the worker
This is the only abstract method and that needs implementing. It is the coroutine that mantains the worker running.
fluid.utils.worker.WorkerFunction
¶
WorkerFunction(
run_function,
*,
heartbeat=0,
name="",
stopping_grace_period=STOPPING_GRACE_PERIOD
)
Bases: Worker
A Worker that calls a coroutine function in a loop.
On each iteration the supplied run_function is awaited, then the worker
sleeps for heartbeat seconds before repeating. The loop exits when
is_running returns False.
| PARAMETER | DESCRIPTION |
|---|---|
run_function
|
The coroutine function tuo run and await at each iteration of the worker loop
TYPE:
|
heartbeat
|
The time to wait between each coroutine function run
TYPE:
|
name
|
Worker's name, if not provided it is evaluated from the class name
TYPE:
|
stopping_grace_period
|
Grace period in seconds before force-cancelling this worker
TYPE:
|
Source code in fluid/utils/worker.py
run
async
¶
has_started
¶
is_running
¶
is_stopping
¶
is_stopped
¶
gracefully_stop
¶
after_shutdown
¶
Called after shutdown of worker
By default it does nothing, but can be overriden to do something such as exit the process.
status
async
¶
on_startup
async
¶
Called when the worker starts running
Use this function to initialize other async resources connected with the worker
on_shutdown
async
¶
called after the worker stopped running
Use this function to cleanup resources connected with the worker
startup
async
¶
start the worker
This method creates a task to run the worker.
Source code in fluid/utils/worker.py
shutdown
async
¶
Shutdown a running worker and wait for it to stop
This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.
Source code in fluid/utils/worker.py
wait_for_shutdown
async
¶
Wait for the worker to stop
This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.
Source code in fluid/utils/worker.py
fluid.utils.worker.QueueConsumer
¶
Bases: Worker, MessageProducer[MessageType]
Abstract Worker backed by an asyncio queue.
Provides send for thread-safe message delivery and get_message for retrieving the next message with a timeout. Subclasses implement run to consume messages from the queue.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Worker's name, if not provided it is evaluated from the class name
TYPE:
|
stopping_grace_period
|
Grace period in seconds to wait for workers to stop running when this worker is shutdown. It defaults to the
TYPE:
|
Source code in fluid/utils/worker.py
get_message
async
¶
Get the next message from the queue
Source code in fluid/utils/worker.py
queue_size
¶
status
async
¶
send
¶
has_started
¶
is_running
¶
is_stopping
¶
is_stopped
¶
gracefully_stop
¶
after_shutdown
¶
Called after shutdown of worker
By default it does nothing, but can be overriden to do something such as exit the process.
on_startup
async
¶
Called when the worker starts running
Use this function to initialize other async resources connected with the worker
on_shutdown
async
¶
called after the worker stopped running
Use this function to cleanup resources connected with the worker
startup
async
¶
start the worker
This method creates a task to run the worker.
Source code in fluid/utils/worker.py
shutdown
async
¶
Shutdown a running worker and wait for it to stop
This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.
Source code in fluid/utils/worker.py
wait_for_shutdown
async
¶
Wait for the worker to stop
This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.
Source code in fluid/utils/worker.py
workers
¶
run
abstractmethod
async
¶
run the worker
This is the only abstract method and that needs implementing. It is the coroutine that mantains the worker running.
fluid.utils.worker.QueueConsumerWorker
¶
Bases: QueueConsumer[MessageType]
A QueueConsumer that dispatches each message to a single async callback.
| PARAMETER | DESCRIPTION |
|---|---|
on_message
|
The async callback to call when a message is received
TYPE:
|
name
|
Worker's name, if not provided it is evaluated from the class name
TYPE:
|
stopping_grace_period
|
Grace period in seconds to wait for workers to stop running when this worker is shutdown. It defaults to the
TYPE:
|
Source code in fluid/utils/worker.py
run
async
¶
send
¶
has_started
¶
is_running
¶
is_stopping
¶
is_stopped
¶
gracefully_stop
¶
after_shutdown
¶
Called after shutdown of worker
By default it does nothing, but can be overriden to do something such as exit the process.
status
async
¶
on_startup
async
¶
Called when the worker starts running
Use this function to initialize other async resources connected with the worker
on_shutdown
async
¶
called after the worker stopped running
Use this function to cleanup resources connected with the worker
startup
async
¶
start the worker
This method creates a task to run the worker.
Source code in fluid/utils/worker.py
shutdown
async
¶
Shutdown a running worker and wait for it to stop
This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.
Source code in fluid/utils/worker.py
wait_for_shutdown
async
¶
Wait for the worker to stop
This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.
Source code in fluid/utils/worker.py
workers
¶
get_message
async
¶
Get the next message from the queue
Source code in fluid/utils/worker.py
fluid.utils.worker.AsyncConsumer
¶
Bases: QueueConsumer[MessageType]
A QueueConsumer that fans out each message to all registered async handlers via an AsyncDispatcher.
The run loop processes messages until
is_stopping returns True.
Any messages remaining in the queue when the worker stops are discarded;
callers that need guaranteed delivery should drain the queue before
requesting a stop.
| PARAMETER | DESCRIPTION |
|---|---|
dispatcher
|
Async message dispatcher to dispatch messages
TYPE:
|
name
|
Worker's name, if not provided it is evaluated from the class name
TYPE:
|
stopping_grace_period
|
Grace period in seconds to wait for workers to stop running when this worker is shutdown. It defaults to the
TYPE:
|
Source code in fluid/utils/worker.py
run
async
¶
send
¶
has_started
¶
is_running
¶
is_stopping
¶
is_stopped
¶
gracefully_stop
¶
after_shutdown
¶
Called after shutdown of worker
By default it does nothing, but can be overriden to do something such as exit the process.
status
async
¶
on_startup
async
¶
Called when the worker starts running
Use this function to initialize other async resources connected with the worker
on_shutdown
async
¶
called after the worker stopped running
Use this function to cleanup resources connected with the worker
startup
async
¶
start the worker
This method creates a task to run the worker.
Source code in fluid/utils/worker.py
shutdown
async
¶
Shutdown a running worker and wait for it to stop
This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.
Source code in fluid/utils/worker.py
wait_for_shutdown
async
¶
Wait for the worker to stop
This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.
Source code in fluid/utils/worker.py
workers
¶
get_message
async
¶
Get the next message from the queue
Source code in fluid/utils/worker.py
fluid.utils.worker.Workers
¶
Bases: Worker
A Worker that owns and manages a collection of child workers.
Child workers are registered with
add_workers.
When the Workers instance starts, its run loop
starts each child worker and monitors their health — if any child stops
unexpectedly the whole group is gracefully stopped.
On shutdown all child workers are stopped concurrently via
[_wait_for_workers][fluid.utils.worker.Workers._wait_for_workers].
Workers that do not exit within stopping_grace_period seconds are
force-cancelled.
!!! note "Shutdown ordering" By default all child workers are shut down concurrently. If a subclass needs a specific shutdown order (e.g. an event dispatcher that must outlive the workers that produce events), override [_wait_for_workers][fluid.utils.worker.Workers._wait_for_workers].
| PARAMETER | DESCRIPTION |
|---|---|
*workers
|
Workers to manage, they can also be added later via
TYPE:
|
name
|
Worker's name, if not provided it is evaluated from the class name
TYPE:
|
heartbeat
|
The time to wait between each workers status check
TYPE:
|
stopping_grace_period
|
Grace period in seconds to wait for workers to stop running when this worker is shutdown. It defaults to the
TYPE:
|
Source code in fluid/utils/worker.py
add_workers
¶
add workers to the workers
They can be added while the worker is running.
workers
¶
gracefully_stop
¶
status
async
¶
run
async
¶
Source code in fluid/utils/worker.py
has_started
¶
is_running
¶
is_stopping
¶
is_stopped
¶
after_shutdown
¶
Called after shutdown of worker
By default it does nothing, but can be overriden to do something such as exit the process.
on_startup
async
¶
Called when the worker starts running
Use this function to initialize other async resources connected with the worker
on_shutdown
async
¶
called after the worker stopped running
Use this function to cleanup resources connected with the worker
startup
async
¶
start the worker
This method creates a task to run the worker.
Source code in fluid/utils/worker.py
shutdown
async
¶
Shutdown a running worker and wait for it to stop
This method will try to gracefully stop the worker and wait for it to stop. If the worker does not stop in the grace period, it will force shutdown by cancelling the task.
Source code in fluid/utils/worker.py
wait_for_shutdown
async
¶
Wait for the worker to stop
This method will wait for the worker to stop running, but doesn't try to gracefully stop it nor force shutdown.