Skip to content

Task Run

It can be imported from fluid.scheduler:

from fluid.scheduler import TaskRun

fluid.scheduler.TaskRun pydantic-model

Bases: BaseModel, Generic[TP]

A TaskRun contains all the data generated by a Task run

This model is never initialized directly, it is created by the TaskManager

Fields:

id pydantic-field

id

Unique task run id

task pydantic-field

task

Task to be executed

priority pydantic-field

priority

Task priority

params pydantic-field

params

Task parameters

state pydantic-field

state = init

Task state

task_manager pydantic-field

task_manager

queued pydantic-field

queued = None

start pydantic-field

start = None

end pydantic-field

end = None

execute_after pydantic-field

execute_after = None

Do not execute before this UTC timestamp. Set by retry logic.

rate_limit_attempt pydantic-field

rate_limit_attempt = 0

Number of rate-limit retries already consumed.

retry_attempt pydantic-field

retry_attempt = 0

Number of failure retries already consumed.

logger property

logger

in_queue property

in_queue

duration property

duration

duration_ms property

duration_ms

total property

total

name property

name

name_id property

name_id

is_done property

is_done

is_failure property

is_failure

deps property

deps

abort

abort(reason='')

Abort the task run by raising TaskAbortedError.

Source code in fluid/scheduler/models.py
def abort(self, reason: str = "") -> None:
    """Abort the task run by raising
    [TaskAbortedError][fluid.scheduler.errors.TaskAbortedError].
    """
    raise TaskAbortedError(reason) from None

set_state

set_state(state, state_time=None)

Set the state of the task run, with proper handling of timestamps and state transitions.

This method is called by the task consumer and should not be called directly by the task executor.

Source code in fluid/scheduler/models.py
def set_state(
    self,
    state: TaskState,
    state_time: datetime | None = None,
) -> None:
    """Set the state of the task run, with proper handling of timestamps
    and state transitions.

    This method is called by the task consumer and should not be called directly
    by the task executor.
    """
    if self.state == state:
        return
    state_time = as_utc(state_time)
    match (self.state, state):
        case (TaskState.init, TaskState.queued):
            self.queued = state_time
            self.state = state
            self._dispatch()
        case (TaskState.init, _):
            self.set_state(TaskState.queued, state_time)
            self.set_state(state, state_time)
        case (TaskState.queued, TaskState.running):
            self.start = state_time
            self.state = state
            self._dispatch()
        case (
            TaskState.queued,
            TaskState.success
            | TaskState.aborted
            | TaskState.rate_limited
            | TaskState.failure,
        ):
            self.set_state(TaskState.running, state_time)
            self.set_state(state, state_time)
        case (
            TaskState.running,
            TaskState.success
            | TaskState.aborted
            | TaskState.rate_limited
            | TaskState.failure
            | TaskState.interrupted,
        ):
            self.end = state_time
            self.state = state
            self._dispatch()
        case _:
            raise TaskRunError(f"invalid state transition {self.state} -> {state}")

lock

lock(timeout)

Get a lock for this task run

Source code in fluid/scheduler/models.py
def lock(self, timeout: float | None) -> Lock:
    """Get a lock for this task run"""
    return self.task_manager.broker.lock(f"tasks:{self.name}", timeout=timeout)