Task¶
A Task defines the implementation of a given operation, the inputs required and the scheduling metadata. Usually, a Task is not created directly, but rather through the use of the @task decorator.
Example¶
A task function is decorated vya the @task decorator and must accept the TaskRun object as its first and only argument.
from fluid.scheduler import task, TaskRun
@task
def hello(ctx: TaskRun) -> None:
print("Hello, world!")
fluid.scheduler.task
¶
task(executor: TaskExecutor) -> Task
task(
*,
name: str | None = None,
schedule: Scheduler | None = None,
short_description: str | None = None,
description: str | None = None,
randomize: RandomizeType | None = None,
max_concurrency: int | None = None,
priority: TaskPriority | None = None,
cpu_bound: bool | None = None,
k8s_config: K8sConfig | None = None,
timeout_seconds: int | None = None,
tags: Sequence[str] | None = None
) -> TaskConstructor
task(
executor=None,
*,
name=None,
schedule=None,
short_description=None,
description=None,
randomize=None,
max_concurrency=None,
priority=None,
cpu_bound=None,
k8s_config=None,
timeout_seconds=None,
tags=None
)
Decorator to create a Task from a function and optional parameters.
This decorator can be used in two ways:
- As a simple decorator of the executor function
- As a function with keyword arguments for greater control over the task configuration
| PARAMETER | DESCRIPTION |
|---|---|
executor
|
The executor function for the task
TYPE:
|
name
|
The name of the task. If None, the name will be derived from the executor function
TYPE:
|
schedule
|
The schedule for the tas. If None, the task will not be scheduled
TYPE:
|
short_description
|
A short description of the task. If not provided it will be extracted from the task function docstring first line
TYPE:
|
description
|
A detailed description of the task. If not provided it will be extracted from the task function docstring
TYPE:
|
randomize
|
Randomization settings for the task
TYPE:
|
max_concurrency
|
The maximum number of concurrent executions of the task
TYPE:
|
priority
|
The priority of the task such as high, medium, low
TYPE:
|
cpu_bound
|
Whether the task is CPU bound
TYPE:
|
k8s_config
|
Kubernetes configuration - None means use the default configuration
TYPE:
|
timeout_seconds
|
Task timeout in seconds - how long the task can run before being aborted
TYPE:
|
tags
|
Task tags - used for categorization and filtering of tasks
TYPE:
|
Source code in fluid/scheduler/models.py
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 | |
fluid.scheduler.Task
¶
Bases: NamedTuple, Generic[TP]
A Task configuration.
This is not created directly, but rather through the use of the @task decorator.
Executes any time it is invoked
short_description
class-attribute
instance-attribute
¶
Short task description - one line
description
class-attribute
instance-attribute
¶
Task description - obtained from the executor docstring if not provided
schedule
class-attribute
instance-attribute
¶
Task schedule - None means the task is not scheduled
randomize
class-attribute
instance-attribute
¶
Randomize function for task schedule
max_concurrency
class-attribute
instance-attribute
¶
how many tasks can be run concurrently - 0 means no limit
timeout_seconds
class-attribute
instance-attribute
¶
Task timeout in seconds - how long the task can run before being aborted
k8s_config
class-attribute
instance-attribute
¶
Kubernetes configuration for tasks run on Kubernetes cluster.
tags
class-attribute
instance-attribute
¶
Task tags - used for categorization and filtering of tasks
get_k8s_config
¶
info
¶
Return task info object
Source code in fluid/scheduler/models.py
fluid.scheduler.TaskPriority
¶
fluid.scheduler.TaskState
¶
Bases: StrEnum
fluid.scheduler.K8sConfig
pydantic-model
¶
Bases: BaseModel
Kubernetes configuration for tasks run on Kubernetes cluster. This configuration is used by the task consumer to run tasks on Kubernetes Jobs.
This is used when the task consumer runs inside a Kubernetes cluster and the task is marked as CPU bound.
Fields:
-
namespace(str) -
deployment(str) -
container(str) -
job_ttl(int) -
sleep(float)
fluid.scheduler.is_in_cpu_process
¶
Check if the current process is a CPU process.
A CPU process is a process that is spawned by the task manager to run a cpu-bound task.
It is identified by the environment variable TASK_MANAGER_SPAWN
being set to "true".