Task¶
A Task defines the implementation of a given operation, the inputs required and the scheduling metadata. Usually, a Task is not created directly, but rather through the use of the @task decorator.
Example¶
A task function is decorated via the @task decorator and must accept the TaskRun object as its first and only argument.
from fluid.scheduler import task, TaskRun
@task
async def hello(ctx: TaskRun) -> None:
print("Hello, world!")
For retry configuration (retry, rate_limit_retry) see Task Retry.
fluid.scheduler.task
¶
task(executor: TaskExecutor) -> Task
task(
*,
name: str | None = None,
schedule: Scheduler | None = None,
short_description: str | None = None,
description: str | None = None,
randomize: RandomizeType | None = None,
max_concurrency: int | None = None,
priority: TaskPriority | None = None,
cpu_bound: bool | None = None,
k8s_config: K8sConfig | None = None,
timeout_seconds: int | None = None,
tags: Sequence[str] | None = None,
retry: RetryPolicy | None = None,
rate_limit_retry: RetryPolicy | None = None,
env: dict[str, str] | None = None
) -> TaskConstructor
task(
executor=None,
*,
name=None,
schedule=None,
short_description=None,
description=None,
randomize=None,
max_concurrency=None,
priority=None,
cpu_bound=None,
k8s_config=None,
timeout_seconds=None,
tags=None,
retry=None,
rate_limit_retry=None,
env=None
)
Decorator to create a Task from a function and optional parameters.
This decorator can be used in two ways:
- As a simple decorator of the executor function
- As a function with keyword arguments for greater control over the task configuration
| PARAMETER | DESCRIPTION |
|---|---|
executor
|
The executor function for the task
TYPE:
|
name
|
The name of the task. If None, the name will be derived from the executor function
TYPE:
|
schedule
|
The schedule for the tas. If None, the task will not be scheduled
TYPE:
|
short_description
|
A short description of the task. If not provided it will be extracted from the task function docstring first line
TYPE:
|
description
|
A detailed description of the task. If not provided it will be extracted from the task function docstring
TYPE:
|
randomize
|
Randomization settings for the task
TYPE:
|
max_concurrency
|
The maximum number of concurrent executions of the task
TYPE:
|
priority
|
The priority of the task such as high, medium, low
TYPE:
|
cpu_bound
|
Whether the task is CPU bound
TYPE:
|
k8s_config
|
Kubernetes configuration - None means use the default configuration
TYPE:
|
timeout_seconds
|
Task timeout in seconds - how long the task can run before being aborted
TYPE:
|
tags
|
Task tags - used for categorization and filtering of tasks
TYPE:
|
retry
|
Retry policy for execution failures
TYPE:
|
rate_limit_retry
|
Retry policy when the task is rate limited by max_concurrency
TYPE:
|
env
|
Extra environment variables injected into the subprocess or k8s job
TYPE:
|
Source code in fluid/scheduler/models.py
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 | |
fluid.scheduler.Task
¶
Bases: NamedTuple, Generic[TP]
A Task configuration.
This is not created directly, but rather through the use of the @task decorator.
Executes any time it is invoked
short_description
class-attribute
instance-attribute
¶
Short task description - one line
description
class-attribute
instance-attribute
¶
Task description - obtained from the executor docstring if not provided
schedule
class-attribute
instance-attribute
¶
Task schedule - None means the task is not scheduled
randomize
class-attribute
instance-attribute
¶
Randomize function for task schedule
max_concurrency
class-attribute
instance-attribute
¶
how many tasks can be run concurrently - 0 means no limit
timeout_seconds
class-attribute
instance-attribute
¶
Task timeout in seconds - how long the task can run before being aborted
k8s_config
class-attribute
instance-attribute
¶
Kubernetes configuration for tasks run on Kubernetes cluster.
tags
class-attribute
instance-attribute
¶
Task tags - used for categorization and filtering of tasks
retry
class-attribute
instance-attribute
¶
Retry policy for general execution failures.
rate_limit_retry
class-attribute
instance-attribute
¶
Retry policy when the executor raises RateLimitError.
env
class-attribute
instance-attribute
¶
Extra environment variables injected into the subprocess or k8s job.
get_k8s_config
¶
info
¶
Return task info object
Source code in fluid/scheduler/models.py
fluid.scheduler.TaskPriority
¶
Bases: StrEnum
Priority level for task execution ordering.
fluid.scheduler.TaskState
¶
Bases: StrEnum
Lifecycle state of a task run.
queued
class-attribute
instance-attribute
¶
Task is waiting in the queue to be picked up by a worker.
failure
class-attribute
instance-attribute
¶
Task raised an exception during execution.
rate_limited
class-attribute
instance-attribute
¶
Task execution was deferred due to rate limiting.
interrupted
class-attribute
instance-attribute
¶
Task was interrupted by a worker shutdown before it could complete.
fluid.scheduler.K8sConfig
pydantic-model
¶
Bases: BaseModel
Kubernetes configuration for tasks run on Kubernetes cluster. This configuration is used by the task consumer to run tasks on Kubernetes Jobs.
This is used when the task consumer runs inside a Kubernetes cluster and the task is marked as CPU bound.
Fields:
-
namespace(str) -
deployment(str) -
container(str) -
resources(K8sResourceRequirements | None) -
job_ttl(int) -
sleep(float)
resources
pydantic-field
¶
Kubernetes resource limits and requests for the container
fluid.scheduler.K8sResourceRequirements
¶
fluid.scheduler.is_in_cpu_process
¶
Check if the current process is a CPU process.
A CPU process is a process that is spawned by the task manager to run a cpu-bound task.
It is identified by the environment variable TASK_MANAGER_SPAWN
being set to "true".