Bases: BaseModel
, Generic[TP]
A TaskRun contains all the data generated by a Task run
This model is never initialized directly, it is created by the TaskManager
Fields:
task_manager
pydantic-field
logger
property
pydantic-field
in_queue
property
pydantic-field
duration
property
pydantic-field
duration_ms
property
pydantic-field
total
property
pydantic-field
name
property
pydantic-field
name_id
property
pydantic-field
is_done
property
pydantic-field
is_failure
property
pydantic-field
deps
property
pydantic-field
execute
async
Source code in fluid/scheduler/models.py
| async def execute(self) -> None:
try:
self.set_state(TaskState.running)
await self.task.executor(self) # type: ignore [arg-type]
except Exception:
self.set_state(TaskState.failure)
raise
else:
self.set_state(TaskState.success)
|
serialize_task
serialize_task(task, _info)
Source code in fluid/scheduler/models.py
| @field_serializer("task")
def serialize_task(self, task: Task, _info: Any) -> str:
return task.name
|
set_state
set_state(state, state_time=None)
Source code in fluid/scheduler/models.py
| def set_state(
self,
state: TaskState,
state_time: datetime | None = None,
) -> None:
if self.state == state:
return
state_time = as_utc(state_time)
match (self.state, state):
case (TaskState.init, TaskState.queued):
self.queued = state_time
self.state = state
self._dispatch()
case (TaskState.init, _):
self.set_state(TaskState.queued, state_time)
self.set_state(state, state_time)
case (TaskState.queued, TaskState.running):
self.start = state_time
self.state = state
self._dispatch()
case (
TaskState.queued,
TaskState.success
| TaskState.aborted
| TaskState.rate_limited
| TaskState.failure,
):
self.set_state(TaskState.running, state_time)
self.set_state(state, state_time)
case (
TaskState.running,
TaskState.success
| TaskState.aborted
| TaskState.rate_limited
| TaskState.failure,
):
self.end = state_time
self.state = state
self._dispatch()
case _:
raise TaskRunError(f"invalid state transition {self.state} -> {state}")
|
lock
Source code in fluid/scheduler/models.py
| def lock(self, timeout: float | None) -> Lock:
return self.task_manager.broker.lock(self.name, timeout=timeout)
|