Skip to content

Task

A Task defines the implementation of a given operation, the inputs required and the scheduling metadata. Usually, a Task is not created directly, but rather through the use of the @task decorator.

Example

A task function is decorated vya the @task decorator and must accept the TaskRun object as its first and only argument.

from fluid.scheduler import task, TaskRun

@task
def hello(ctx: TaskRun) -> None:
    print("Hello, world!")

fluid.scheduler.task

task(executor: TaskExecutor) -> Task
task(
    *,
    name: str | None = None,
    schedule: Scheduler | None = None,
    short_description: str | None = None,
    description: str | None = None,
    randomize: RandomizeType | None = None,
    max_concurrency: int | None = None,
    priority: TaskPriority | None = None,
    cpu_bound: bool | None = None,
    k8s_config: K8sConfig | None = None,
    timeout_seconds: int | None = None,
    tags: Sequence[str] | None = None
) -> TaskConstructor
task(
    executor=None,
    *,
    name=None,
    schedule=None,
    short_description=None,
    description=None,
    randomize=None,
    max_concurrency=None,
    priority=None,
    cpu_bound=None,
    k8s_config=None,
    timeout_seconds=None,
    tags=None
)

Decorator to create a Task from a function and optional parameters.

This decorator can be used in two ways:

  • As a simple decorator of the executor function
  • As a function with keyword arguments for greater control over the task configuration
PARAMETER DESCRIPTION
executor

The executor function for the task

TYPE: TaskExecutor | None DEFAULT: None

name

The name of the task. If None, the name will be derived from the executor function

TYPE: str | None DEFAULT: None

schedule

The schedule for the tas. If None, the task will not be scheduled

TYPE: Scheduler | None DEFAULT: None

short_description

A short description of the task. If not provided it will be extracted from the task function docstring first line

TYPE: str | None DEFAULT: None

description

A detailed description of the task. If not provided it will be extracted from the task function docstring

TYPE: str | None DEFAULT: None

randomize

Randomization settings for the task

TYPE: RandomizeType | None DEFAULT: None

max_concurrency

The maximum number of concurrent executions of the task

TYPE: int | None DEFAULT: None

priority

The priority of the task such as high, medium, low

TYPE: TaskPriority | None DEFAULT: None

cpu_bound

Whether the task is CPU bound

TYPE: bool | None DEFAULT: None

k8s_config

Kubernetes configuration - None means use the default configuration

TYPE: K8sConfig | None DEFAULT: None

timeout_seconds

Task timeout in seconds - how long the task can run before being aborted

TYPE: int | None DEFAULT: None

tags

Task tags - used for categorization and filtering of tasks

TYPE: Sequence[str] | None DEFAULT: None

Source code in fluid/scheduler/models.py
def task(
    executor: Annotated[
        TaskExecutor | None,
        Doc("The executor function for the task"),
    ] = None,
    *,
    name: Annotated[
        str | None,
        Doc(
            (
                "The name of the task. If None, the name will be derived "
                "from the executor function"
            )
        ),
    ] = None,
    schedule: Annotated[
        Scheduler | None,
        Doc("The schedule for the tas. If None, the task will not be scheduled"),
    ] = None,
    short_description: Annotated[
        str | None,
        Doc(
            (
                "A short description of the task. "
                "If not provided it will be extracted from the task function docstring "
                "first line"
            )
        ),
    ] = None,
    description: Annotated[
        str | None,
        Doc(
            (
                "A detailed description of the task. "
                "If not provided it will be extracted from the task function docstring"
            )
        ),
    ] = None,
    randomize: Annotated[
        RandomizeType | None,
        Doc("Randomization settings for the task"),
    ] = None,
    max_concurrency: Annotated[
        int | None,
        Doc(("The maximum number of concurrent executions of the task")),
    ] = None,
    priority: Annotated[
        TaskPriority | None,
        Doc("The priority of the task such as high, medium, low"),
    ] = None,
    cpu_bound: Annotated[
        bool | None,
        Doc("Whether the task is CPU bound"),
    ] = None,
    k8s_config: Annotated[
        K8sConfig | None,
        Doc("Kubernetes configuration - None means use the default configuration"),
    ] = None,
    timeout_seconds: Annotated[
        int | None,
        Doc("Task timeout in seconds - how long the task can run before being aborted"),
    ] = None,
    tags: Annotated[
        Sequence[str] | None,
        Doc("Task tags - used for categorization and filtering of tasks"),
    ] = None,
) -> Task | TaskConstructor:
    """Decorator to create a [Task][fluid.scheduler.Task] from a function
    and optional parameters.

    This decorator can be used in two ways:

    - As a simple decorator of the executor function
    - As a function with keyword arguments for greater control
        over the task configuration
    """
    kwargs = compact_dict(
        name=name,
        schedule=schedule,
        short_description=short_description,
        description=description,
        randomize=randomize,
        max_concurrency=max_concurrency,
        priority=priority,
        cpu_bound=cpu_bound,
        k8s_config=k8s_config,
        timeout_seconds=timeout_seconds,
        tags=frozenset(tags) if tags is not None else None,
    )
    if kwargs and executor:
        raise TaskDecoratorError("cannot use positional parameters")
    elif kwargs:
        return TaskConstructor(**kwargs)
    elif not executor:
        raise TaskDecoratorError("this is a decorator cannot be invoked in this way")
    else:
        return TaskConstructor()(executor)

fluid.scheduler.Task

Bases: NamedTuple, Generic[TP]

A Task configuration.

This is not created directly, but rather through the use of the @task decorator.

Executes any time it is invoked

name instance-attribute

name

Task name - unique identifier

executor instance-attribute

executor

Task executor function

params_model instance-attribute

params_model

Pydantic model for task parameters

logger instance-attribute

logger

Task logger

module class-attribute instance-attribute

module = ''

Task python module

short_description class-attribute instance-attribute

short_description = ''

Short task description - one line

description class-attribute instance-attribute

description = ''

Task description - obtained from the executor docstring if not provided

schedule class-attribute instance-attribute

schedule = None

Task schedule - None means the task is not scheduled

randomize class-attribute instance-attribute

randomize = None

Randomize function for task schedule

max_concurrency class-attribute instance-attribute

max_concurrency = 0

how many tasks can be run concurrently - 0 means no limit

timeout_seconds class-attribute instance-attribute

timeout_seconds = 60

Task timeout in seconds - how long the task can run before being aborted

priority class-attribute instance-attribute

priority = medium

Task priority - high, medium, low

k8s_config class-attribute instance-attribute

k8s_config = None

Kubernetes configuration for tasks run on Kubernetes cluster.

tags class-attribute instance-attribute

tags = frozenset()

Task tags - used for categorization and filtering of tasks

cpu_bound property

cpu_bound

True if the task is CPU bound

get_k8s_config

get_k8s_config()

Get Kubernetes configuration for this task

Source code in fluid/scheduler/models.py
def get_k8s_config(self) -> K8sConfig:
    """Get Kubernetes configuration for this task"""
    return self.k8s_config or K8sConfig()

info

info(**params)

Return task info object

Source code in fluid/scheduler/models.py
def info(self, **params: Any) -> TaskInfo:
    """Return task info object"""
    params.update(
        name=self.name,
        description=self.description,
        module=self.module,
        priority=self.priority,
        schedule=str(self.schedule) if self.schedule else None,
    )
    return TaskInfo(**compact_dict(params))

fluid.scheduler.TaskPriority

Bases: StrEnum

high class-attribute instance-attribute

high = auto()

medium class-attribute instance-attribute

medium = auto()

low class-attribute instance-attribute

low = auto()

fluid.scheduler.TaskState

Bases: StrEnum

init class-attribute instance-attribute

init = auto()

queued class-attribute instance-attribute

queued = auto()

running class-attribute instance-attribute

running = auto()

success class-attribute instance-attribute

success = auto()

failure class-attribute instance-attribute

failure = auto()

aborted class-attribute instance-attribute

aborted = auto()

rate_limited class-attribute instance-attribute

rate_limited = auto()

is_failure property

is_failure

is_done property

is_done

fluid.scheduler.K8sConfig pydantic-model

Bases: BaseModel

Kubernetes configuration for tasks run on Kubernetes cluster. This configuration is used by the task consumer to run tasks on Kubernetes Jobs.

from fluid.scheduler import K8sConfig

This is used when the task consumer runs inside a Kubernetes cluster and the task is marked as CPU bound.

Fields:

namespace pydantic-field

namespace

Kubernetes namespace where the task consumer deployment run

deployment pydantic-field

deployment

Kubernetes deployment of the task consumer

container pydantic-field

container

Kubernetes container

job_ttl pydantic-field

job_ttl

Time to live for k8s Job after completion

sleep pydantic-field

sleep

Amount to async sleep while waiting for completion of k8s Job

fluid.scheduler.is_in_cpu_process

is_in_cpu_process()

Check if the current process is a CPU process.

A CPU process is a process that is spawned by the task manager to run a cpu-bound task.

It is identified by the environment variable TASK_MANAGER_SPAWN being set to "true".

Source code in fluid/scheduler/common.py
def is_in_cpu_process() -> bool:
    """Check if the current process is a CPU process.

    A CPU process is a process that is spawned by the task manager to run
    a cpu-bound task.

    It is identified by the environment variable `TASK_MANAGER_SPAWN`
    being set to "true".
    """
    return os.getenv("TASK_MANAGER_SPAWN") == "true"