Skip to content

Task

A Task defines the implementation of a given operation, the inputs required and the scheduling metadata. Usually, a Task is not created directly, but rather through the use of the @task decorator.

Example

A task function is decorated vya the @task decorator and must accept the TaskRun object as its first and only argument.

from fluid.scheduler import task, TaskRun

@task
def hello(ctx: TaskRun) -> None:
    print("Hello, world!")

fluid.scheduler.task

task(executor: TaskExecutor) -> Task
task(
    *,
    name: str | None = None,
    schedule: Scheduler | None = None,
    short_description: str | None = None,
    description: str | None = None,
    randomize: RandomizeType | None = None,
    max_concurrency: int = 0,
    priority: TaskPriority = medium,
    cpu_bound: bool = False,
    k8s_config: K8sConfig | None = None,
    timeout_seconds: int = 60
) -> TaskConstructor
task(executor=None, **kwargs)

Decorator to create a Task

This decorator can be used in two ways:

  • As a simple decorator of the executor function
  • As a function with keyword arguments
Source code in fluid/scheduler/models.py
def task(executor: TaskExecutor | None = None, **kwargs: Any) -> Task | TaskConstructor:
    """Decorator to create a Task

    This decorator can be used in two ways:

    - As a simple decorator of the executor function
    - As a function with keyword arguments
    """
    if kwargs and executor:
        raise TaskDecoratorError("cannot use positional parameters")
    elif kwargs:
        return TaskConstructor(**kwargs)
    elif not executor:
        raise TaskDecoratorError("this is a decorator cannot be invoked in this way")
    else:
        return TaskConstructor()(executor)

fluid.scheduler.Task

Bases: NamedTuple, Generic[TP]

A Task executes any time it is invoked

name instance-attribute

name

Task name - unique identifier

executor instance-attribute

executor

Task executor function

params_model instance-attribute

params_model

Pydantic model for task parameters

logger instance-attribute

logger

Task logger

module class-attribute instance-attribute

module = ''

Task python module

short_description class-attribute instance-attribute

short_description = ''

Short task description - one line

description class-attribute instance-attribute

description = ''

Task description - obtained from the executor docstring if not provided

schedule class-attribute instance-attribute

schedule = None

Task schedule - None means the task is not scheduled

randomize class-attribute instance-attribute

randomize = None

Randomize function for task schedule

max_concurrency class-attribute instance-attribute

max_concurrency = 0

how many tasks can run in each consumer concurrently - 0 means no limit

timeout_seconds class-attribute instance-attribute

timeout_seconds = 60

Task timeout in seconds - how long the task can run before being aborted

priority class-attribute instance-attribute

priority = medium

Task priority - high, medium, low

k8s_config class-attribute instance-attribute

k8s_config = from_env()

cpu_bound property

cpu_bound

True if the task is CPU bound

info

info(**params)
Source code in fluid/scheduler/models.py
def info(self, **params: Any) -> TaskInfo:
    params.update(
        name=self.name,
        description=self.description,
        module=self.module,
        priority=self.priority,
        schedule=str(self.schedule) if self.schedule else None,
    )
    return TaskInfo(**compact_dict(params))

fluid.scheduler.TaskPriority

Bases: StrEnum

high class-attribute instance-attribute

high = auto()

medium class-attribute instance-attribute

medium = auto()

low class-attribute instance-attribute

low = auto()

fluid.scheduler.TaskState

Bases: StrEnum

init class-attribute instance-attribute

init = auto()

queued class-attribute instance-attribute

queued = auto()

running class-attribute instance-attribute

running = auto()

success class-attribute instance-attribute

success = auto()

failure class-attribute instance-attribute

failure = auto()

aborted class-attribute instance-attribute

aborted = auto()

rate_limited class-attribute instance-attribute

rate_limited = auto()

is_failure property

is_failure

is_done property

is_done