Skip to content

Task Run

It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskRun

fluid.scheduler.TaskRun pydantic-model

Bases: BaseModel, Generic[TP]

A TaskRun contains all the data generated by a Task run

This model is never initialized directly, it is created by the TaskManager

Fields:

id pydantic-field

id

Unique task run id

task pydantic-field

task

Task to be executed

priority pydantic-field

priority

Task priority

params pydantic-field

params

Task parameters

state pydantic-field

state = init

Task state

task_manager pydantic-field

task_manager

queued pydantic-field

queued = None

start pydantic-field

start = None

end pydantic-field

end = None

logger property pydantic-field

logger

in_queue property pydantic-field

in_queue

duration property pydantic-field

duration

duration_ms property pydantic-field

duration_ms

total property pydantic-field

total

name property pydantic-field

name

name_id property pydantic-field

name_id

is_done property pydantic-field

is_done

is_failure property pydantic-field

is_failure

deps property pydantic-field

deps

execute async

execute()
Source code in fluid/scheduler/models.py
async def execute(self) -> None:
    try:
        self.set_state(TaskState.running)
        await self.task.executor(self)  # type: ignore [arg-type]
    except Exception:
        self.set_state(TaskState.failure)
        raise
    else:
        self.set_state(TaskState.success)

serialize_task

serialize_task(task, _info)
Source code in fluid/scheduler/models.py
@field_serializer("task")
def serialize_task(self, task: Task, _info: Any) -> str:
    return task.name

set_state

set_state(state, state_time=None)
Source code in fluid/scheduler/models.py
def set_state(
    self,
    state: TaskState,
    state_time: datetime | None = None,
) -> None:
    if self.state == state:
        return
    state_time = as_utc(state_time)
    match (self.state, state):
        case (TaskState.init, TaskState.queued):
            self.queued = state_time
            self.state = state
            self._dispatch()
        case (TaskState.init, _):
            self.set_state(TaskState.queued, state_time)
            self.set_state(state, state_time)
        case (TaskState.queued, TaskState.running):
            self.start = state_time
            self.state = state
            self._dispatch()
        case (
            TaskState.queued,
            TaskState.success
            | TaskState.aborted
            | TaskState.rate_limited
            | TaskState.failure,
        ):
            self.set_state(TaskState.running, state_time)
            self.set_state(state, state_time)
        case (
            TaskState.running,
            TaskState.success
            | TaskState.aborted
            | TaskState.rate_limited
            | TaskState.failure,
        ):
            self.end = state_time
            self.state = state
            self._dispatch()
        case _:
            raise TaskRunError(f"invalid state transition {self.state} -> {state}")

lock

lock(timeout)
Source code in fluid/scheduler/models.py
def lock(self, timeout: float | None) -> Lock:
    return self.task_manager.broker.lock(self.name, timeout=timeout)