Skip to content

Task Scheduler

The task scheduler TaskScheduler inherits from the TaskConsumer to add scheduling of periodic tasks.

It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskScheduler

fluid.scheduler.TaskScheduler

TaskScheduler(**kwargs)

Bases: TaskConsumer

A task manager for scheduling tasks

Source code in fluid/scheduler/scheduler.py
def __init__(self, **kwargs: Any) -> None:
    super().__init__(**kwargs)
    self.add_workers(ScheduleTasks(self))

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

running property

running

deps instance-attribute

deps = deps if deps is not None else State()

Dependencies for the task manager.

Production applications requires global dependencies to be available to all tasks. This can be achieved by setting the deps attribute of the task manager to an object with the required dependencies.

Each task can cast the dependencies to the required type.

config instance-attribute

config = config or TaskManagerConfig(**kwargs)

Task manager configuration

dispatcher instance-attribute

dispatcher = TaskDispatcher()

A dispatcher of TaskRun events.

Application can register handlers to listen for events happening during the lifecycle of a task run.

broker instance-attribute

broker = from_url(broker_url)

registry property

registry

The task registry

type property

type

num_concurrent_tasks property

num_concurrent_tasks

The number of concurrent_tasks running in the consumer

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    return await self._workers.status()

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._workers.gracefully_stop()

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._workers.is_stopping()

run async

run()

run the workers

Source code in fluid/utils/worker.py
async def run(self) -> None:
    """run the workers"""
    with self.start_running():
        async with self.safe_run():
            workers, _ = self._workers.workers_tasks()
            self._workers.workers = tuple(workers)
            self._workers.tasks = tuple(
                self.create_task(worker) for worker in workers
            )
            await asyncio.gather(*self._workers.tasks)
        await self.shutdown()

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

add_workers

add_workers(*workers)

add workers to the workers

Source code in fluid/utils/worker.py
def add_workers(self, *workers: Worker) -> None:
    """add workers to the workers"""
    workers_, _ = self._workers.workers_tasks()
    for worker in workers:
        if worker not in workers_:
            workers_.append(worker)

wait_for_exit async

wait_for_exit()
Source code in fluid/utils/worker.py
async def wait_for_exit(self) -> None:
    if self._workers_task is not None:
        await self._workers_task

create_task

create_task(worker)
Source code in fluid/utils/worker.py
def create_task(self, worker: Worker) -> asyncio.Task:
    return asyncio.create_task(
        self._run_worker(worker), name=f"{self.worker_name}-{worker.worker_name}"
    )

on_shutdown async

on_shutdown()
Source code in fluid/scheduler/consumer.py
async def on_shutdown(self) -> None:
    await self.broker.close()

shutdown async

shutdown()

shutdown the workers

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """shutdown the workers"""
    if self._has_shutdown:
        return
    self._has_shutdown = True
    logger.warning(
        "gracefully stopping %d workers: %s",
        self.num_workers,
        ", ".join(w.worker_name for w in self._workers.workers),
    )
    self.gracefully_stop()
    try:
        async with asyncio.timeout(self._stopping_grace_period):
            await self.wait_for_exit()
        await self.on_shutdown()
        return
    except asyncio.TimeoutError:
        logger.warning(
            "could not stop workers %s gracefully after %s"
            " seconds - force shutdown",
            ", ".join(
                task.get_name() for task in self._workers.tasks if not task.done()
            ),
            self._stopping_grace_period,
        )
    except asyncio.CancelledError:
        pass
    self._force_shutdown = True
    self._workers.cancel()
    try:
        await self.wait_for_exit()
    except asyncio.CancelledError:
        pass
    await self.on_shutdown()

bail_out

bail_out(reason, code=1)
Source code in fluid/utils/worker.py
def bail_out(self, reason: str, code: int = 1) -> None:
    self.gracefully_stop()

safe_run async

safe_run()

Context manager to run a worker safely

Source code in fluid/utils/worker.py
@asynccontextmanager
async def safe_run(self) -> AsyncGenerator:
    """Context manager to run a worker safely"""
    try:
        yield
    except asyncio.CancelledError:
        if self._force_shutdown:
            # we are shutting down, this is expected
            pass
        raise
    except Exception as e:
        reason = f"unhandled exception while running workers: {e}"
        logger.exception(reason)
        asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)
    else:
        # worker finished without error
        # make sure we are shutting down
        asyncio.get_event_loop().call_soon(self.bail_out, "worker exit", 1)

remove_workers

remove_workers(*workers)

remove workers from the workers

Source code in fluid/utils/worker.py
def remove_workers(self, *workers: Worker) -> None:
    "remove workers from the workers"
    workers_, _ = self._workers.workers_tasks()
    for worker in workers:
        try:
            workers_.remove(worker)
        except ValueError:
            pass

startup async

startup()

start the workers

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the workers"""
    if self._workers_task is None:
        self._workers_task = asyncio.create_task(self.run(), name=self.worker_name)
        for args in self._delayed_callbacks:
            self._delayed_callback(*args)
        self._delayed_callbacks = []

register_callback

register_callback(
    callback, seconds, jitter=0.0, periodic=False
)

Register a callback

The callback can be periodic or not.

Source code in fluid/utils/worker.py
def register_callback(
    self,
    callback: Callable[[], None],
    seconds: float,
    jitter: float = 0.0,
    periodic: bool | float = False,
) -> None:
    """Register a callback

    The callback can be periodic or not.
    """
    if periodic is True:
        periodic_float = seconds
    elif periodic is False:
        periodic_float = 0.0
    else:
        periodic_float = periodic
    if not self.running:
        self._delayed_callbacks.append((callback, seconds, jitter, periodic_float))
    else:
        self._delayed_callback(callback, seconds, jitter, periodic_float)

enter_async_context async

enter_async_context(cm)
Source code in fluid/scheduler/consumer.py
async def enter_async_context(self, cm: Any) -> Any:
    return await self._stack.enter_async_context(cm)

execute async

execute(task, **params)

Execute a task and wait for it to finish

Source code in fluid/scheduler/consumer.py
async def execute(self, task: Task | str, **params: Any) -> TaskRun:
    """Execute a task and wait for it to finish"""
    task_run = self.create_task_run(task, **params)
    await task_run.execute()
    return task_run

execute_sync

execute_sync(task, **params)
Source code in fluid/scheduler/consumer.py
def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:
    return asyncio.run(self._execute_and_exit(task, **params))

register_task

register_task(task)

Register a task with the task manager

Only tasks registered can be executed by a task manager

Source code in fluid/scheduler/consumer.py
def register_task(self, task: Task) -> None:
    """Register a task with the task manager

    Only tasks registered can be executed by a task manager
    """
    self.broker.register_task(task)

queue async

queue(task, priority=None, **params)

Queue a task for execution

This methods fires two events:

  • init: when the task run is created
  • queued: after the task is queued
Source code in fluid/scheduler/consumer.py
async def queue(
    self,
    task: str | Task,
    priority: TaskPriority | None = None,
    **params: Any,
) -> TaskRun:
    """Queue a task for execution

    This methods fires two events:

    - init: when the task run is created
    - queued: after the task is queued
    """
    task_run = self.create_task_run(task, priority=priority, **params)
    self.dispatcher.dispatch(task_run)
    task_run.set_state(TaskState.queued)
    await self.broker.queue_task(task_run)
    return task_run

create_task_run

create_task_run(task, run_id='', priority=None, **params)

Create a TaskRun in init state

Source code in fluid/scheduler/consumer.py
def create_task_run(
    self,
    task: str | Task,
    run_id: str = "",
    priority: TaskPriority | None = None,
    **params: Any,
) -> TaskRun:
    """Create a TaskRun in `init` state"""
    task = self.broker.task_from_registry(task)
    run_id = run_id or self.broker.new_uuid()
    return TaskRun(
        id=run_id,
        task=task,
        priority=priority or task.priority,
        params=task.params_model(**params),
        task_manager=self,
    )

register_from_module

register_from_module(module)
Source code in fluid/scheduler/consumer.py
def register_from_module(self, module: Any) -> None:
    for name in dir(module):
        if name.startswith("_"):
            continue
        if isinstance(obj := getattr(module, name), Task):
            self.register_task(obj)

register_from_dict

register_from_dict(data)
Source code in fluid/scheduler/consumer.py
def register_from_dict(self, data: dict) -> None:
    for name, obj in data.items():
        if name.startswith("_"):
            continue
        if isinstance(obj, Task):
            self.register_task(obj)

register_async_handler

register_async_handler(event, handler)
Source code in fluid/scheduler/consumer.py
def register_async_handler(self, event: Event | str, handler: AsyncHandler) -> None:
    event = Event.from_string_or_event(event)
    self.dispatcher.register_handler(
        f"{event.type}.async_dispatch",
        self._async_dispatcher_worker.send,
    )
    self._async_dispatcher_worker.dispatcher.register_handler(event, handler)

unregister_async_handler

unregister_async_handler(event)
Source code in fluid/scheduler/consumer.py
def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None:
    return self._async_dispatcher_worker.dispatcher.unregister_handler(event)

sync_queue

sync_queue(task)
Source code in fluid/scheduler/consumer.py
def sync_queue(self, task: str | Task) -> None:
    self._task_to_queue.appendleft(task)

sync_priority_queue

sync_priority_queue(task)
Source code in fluid/scheduler/consumer.py
def sync_priority_queue(self, task: str | Task) -> None:
    self._priority_task_run_queue.appendleft(self.create_task_run(task))

num_concurrent_tasks_for

num_concurrent_tasks_for(task_name)

The number of concurrent tasks for a given task_name

Source code in fluid/scheduler/consumer.py
def num_concurrent_tasks_for(self, task_name: str) -> int:
    """The number of concurrent tasks for a given task_name"""
    return len(self._concurrent_tasks[task_name])

queue_and_wait async

queue_and_wait(task, *, timeout=2, **params)

Queue a task and wait for it to finish

Source code in fluid/scheduler/consumer.py
async def queue_and_wait(
    self, task: str | Task, *, timeout: int = 2, **params: Any
) -> TaskRun:
    """Queue a task and wait for it to finish"""
    with TaskRunWaiter(self) as waiter:
        task_run = await self.queue(task, **params)
        return await waiter.wait(task_run, timeout=timeout)