Task Retries¶
Tasks can be configured to retry automatically when they fail or when they cannot run due to concurrency limits. Both behaviours are controlled by a RetryPolicy attached to the task via the @task decorator.
Retrying on failure¶
Use the retry parameter to re-queue a task after an execution error.
from fluid.scheduler import RetryPolicy, task, TaskRun
@task(retry=RetryPolicy(max_attempts=3, wait=2.0, backoff=2.0))
async def fetch_data(ctx: TaskRun) -> None:
"""Fetch data from an external API."""
response = await ctx.deps.http_client.get("https://api.example.com/data")
ctx.logger.info("fetched %d bytes", len(response))
If fetch_data raises, the TaskConsumer re-queues it with a delay and moves on.
With backoff=2.0 the wait times grow exponentially: 2s → 4s → 8s.
After 3 failed retries the TaskRun ends in the failure state.
Limiting which exceptions trigger a retry¶
By default all exceptions trigger a retry. Pass exceptions to be more selective:
@task(retry=RetryPolicy(max_attempts=5, wait=1.0, exceptions=(IOError, TimeoutError)))
async def fetch_data(ctx: TaskRun) -> None:
...
ValueError or other programming errors will not be retried and the task fails immediately.
Retrying when rate limited¶
When max_concurrency is set, a task that cannot start because the limit is already reached ends in the rate_limited state by default.
Set rate_limit_retry to re-queue it instead:
@task(
max_concurrency=1,
rate_limit_retry=RetryPolicy(max_attempts=10, wait=5.0),
)
async def exclusive_job(ctx: TaskRun) -> None:
"""Only one instance of this task can run at a time."""
...
The task will be re-queued up to 10 times, waiting 5 seconds between each attempt. If the slot is still occupied after all attempts, the run ends in rate_limited.
Combining both policies¶
A task can have both policies simultaneously:
@task(
max_concurrency=2,
retry=RetryPolicy(max_attempts=3, wait=2.0, backoff=2.0, max_wait=30.0),
rate_limit_retry=RetryPolicy(max_attempts=5, wait=10.0),
)
async def resilient_task(ctx: TaskRun) -> None:
...
How retries work under the hood¶
Both retry paths use the same mechanism — no workers are blocked waiting:
- The TaskConsumer detects the failure or concurrency limit.
- It creates a fresh copy of the TaskRun with
execute_afterset tonow + delay. - The copy is pushed back onto the Redis queue via the TaskBroker and the worker moves on immediately.
- When a worker dequeues the copy and
execute_afteris still in the future, it schedules re-queuing viacall_laterand returns — still without blocking. - Once the delay has elapsed the task enters the queue normally and is executed.