Skip to content

Workers

Workers are the main building block for asynchronous programming with aio-fluid. They are responsible for running tasks and managing their lifecycle. All workers implemented derive from the base abstract class Worker where the main method to implement is the Worker.run method.

Worker Lifecycle

The lifecycle of a worker is managed by the WorkerState class which provides a set of states that a worker can be in. The worker starts in an inital state and than it can be started and stopped.

Startup

To start a worker one uses the async method Worker.startup which create the task running the worker. The task will transition the worker from WorkerState.INIT to the WorkerState.RUNNING state. The worker will then run the Worker.on_startup coroutine method (which by default is a no-op) follow by the main worker coroutine method Worker.run method until it is stopped.

This is a very simple example of a worker that prints a message every second until it is stopped:

import asyncio

from fluid.utils.worker import Worker


class SimpleWorker(Worker):
    async def run(self):
        while self.is_running():
            self.print_message()
            await asyncio.sleep(1)

    def print_message(self):
        print(f"Hello from {self.worker_name} in state {self.worker_state}")


async def main():
    worker = SimpleWorker()
    worker.print_message()
    await worker.startup()
    asyncio.get_event_loop().call_later(5, worker.gracefully_stop)
    await worker.wait_for_shutdown()
    worker.print_message()


if __name__ == "__main__":
    asyncio.run(main())

Shutdown

To shut down a worker there are few possibilities.

Async Context Manager

Worker implements the async context manager protocol. Entering the context calls Worker.startup and exiting it calls Worker.shutdown, so the async with pattern is the most concise way to manage the full lifecycle:

async with MyWorker() as worker:
    # worker is running here
    ...
# worker is fully shut down here

Resources that the worker needs for its entire lifetime can be opened and closed inside Worker.run using normal async with statements — no subclassing of lifecycle hooks is required. The example below subclasses QueueConsumer to build a worker that accepts text items via send, opens an AsyncAnthropic client for the duration of run, and streams a one-sentence summary from Claude for each item:

import asyncio

import anthropic

from fluid.utils.worker import QueueConsumer

ARTICLES = [
    (
        "Async programming in Python allows multiple tasks to run concurrently "
        "within a single thread by yielding control at await points, which is "
        "particularly efficient for I/O-bound workloads such as HTTP requests "
        "and database queries."
    ),
    (
        "Large language models are trained on vast corpora of text using "
        "self-supervised objectives, enabling them to learn grammar, facts, and "
        "reasoning patterns that can be adapted to a wide range of downstream "
        "tasks through prompting or fine-tuning."
    ),
    (
        "The observer pattern decouples event producers from consumers by "
        "introducing an intermediary that maintains a list of subscribers and "
        "notifies them when state changes, making it straightforward to add new "
        "listeners without modifying the source."
    ),
]


class AnthropicWorker(QueueConsumer[str]):

    def __init__(self):
        super().__init__()
        self.results = []

    async def run(self) -> None:
        async with anthropic.AsyncAnthropic() as client:
            while not self.is_stopping():
                item = await self.get_message()
                if item is None:
                    continue
                if item == "":
                    break
                async with client.messages.stream(
                    model="claude-opus-4-7",
                    max_tokens=128,
                    messages=[
                        {
                            "role": "user",
                            "content": f"Summarise in one sentence: {item}",
                        }
                    ],
                ) as stream:
                    self.results.append(await stream.get_final_text())


async def main() -> None:
    async with AnthropicWorker() as worker:
        for article in ARTICLES:
            worker.send(article)
        worker.send("")  # signal end of input
    print(worker.results)


if __name__ == "__main__":
    asyncio.run(main())