Task Manager¶
The Task Manager is a component that manages the execution of tasks. It is the simplest way to run tasks and it is the base class for the TaskConsumer and theTaskScheduler.
It can be imported from fluid.scheduler
:
The Task Manager is useful if you want to execute tasks in a synchronous or asynchronous way.
fluid.scheduler.TaskManager
¶
The task manager is the main class for managing tasks
Source code in fluid/scheduler/consumer.py
deps
instance-attribute
¶
Dependencies for the task manager.
Production applications requires global dependencies to be
available to all tasks. This can be achieved by setting
the deps
attribute of the task manager to an object
with the required dependencies.
Each task can cast the dependencies to the required type.
config
instance-attribute
¶
config = config or TaskManagerConfig(**kwargs)
Task manager configuration
dispatcher
instance-attribute
¶
dispatcher = TaskDispatcher()
A dispatcher of TaskRun events.
Application can register handlers to listen for events happening during the lifecycle of a task run.
on_startup
async
¶
on_shutdown
async
¶
add_async_context_manager
¶
Add an async context manager to the task manager
These context managers are entered when the task manager starts
register_task
¶
Register a task with the task manager
PARAMETER | DESCRIPTION |
---|---|
task
|
Task to register
TYPE:
|
execute
async
¶
Execute a task and wait for it to finish
PARAMETER | DESCRIPTION |
---|---|
task
|
The task or task name, if a task name it must be registered with the task manager.
TYPE:
|
run_id
|
Unique ID for the task run. If not provided a new UUID is generated.
TYPE:
|
priority
|
Override the default task priority if provided
TYPE:
|
**params
|
The optional parameters for the task run. They must match the task params model
TYPE:
|
Source code in fluid/scheduler/consumer.py
execute_sync
¶
Execute a task synchronously
This method is a blocking method that should be used in a synchronous context.
PARAMETER | DESCRIPTION |
---|---|
task
|
The task or task name, if a task name it must be registered with the task manager.
TYPE:
|
run_id
|
Unique ID for the task run. If not provided a new UUID is generated.
TYPE:
|
priority
|
Override the default task priority if provided
TYPE:
|
**params
|
The optional parameters for the task run. They must match the task params model
TYPE:
|
Source code in fluid/scheduler/consumer.py
queue
async
¶
Queue a task for execution
This methods fires two events:
init
: when the task run is createdqueued
: after the task is queued
It returns the TaskRun object
PARAMETER | DESCRIPTION |
---|---|
task
|
The task or task name, if a task name it must be registered with the task manager.
TYPE:
|
run_id
|
Unique ID for the task run. If not provided a new UUID is generated.
TYPE:
|
priority
|
Override the default task priority if provided
TYPE:
|
**params
|
The optional parameters for the task run. They must match the task params model
TYPE:
|
Source code in fluid/scheduler/consumer.py
create_task_run
¶
Create a TaskRun in init
state
PARAMETER | DESCRIPTION |
---|---|
task
|
The task or task name, if a task name it must be registered with the task manager.
TYPE:
|
run_id
|
Unique ID for the task run. If not provided a new UUID is generated.
TYPE:
|
priority
|
Override the default task priority if provided
TYPE:
|
**params
|
The optional parameters for the task run. They must match the task params model
TYPE:
|
Source code in fluid/scheduler/consumer.py
register_from_module
¶
PARAMETER | DESCRIPTION |
---|---|
module
|
Python module with tasks implementations - can contain any object, only instances of Task are registered
TYPE:
|
Source code in fluid/scheduler/consumer.py
register_from_dict
¶
PARAMETER | DESCRIPTION |
---|---|
data
|
Python dictionary with tasks implementations - can contain any object, only instances of Task are registered
TYPE:
|
Source code in fluid/scheduler/consumer.py
register_async_handler
¶
Register an async handler for a given event
This method is a no op for a TaskManager that is not a worker
unregister_async_handler
¶
Unregister an async handler for a given event
This method is a no op for a TaskManager that is not a worker
fluid.scheduler.TaskManagerConfig
pydantic-model
¶
Bases: BaseModel
Task manager configuration
Fields:
-
schedule_tasks
(bool
) -
consume_tasks
(bool
) -
max_concurrent_tasks
(int
) -
sleep
(float
) -
broker_url
(str
)
max_concurrent_tasks
pydantic-field
¶
The number of coroutine workers consuming tasks. Each worker consumes one task at a time, therefore, this number is the maximum number of tasks that can run concurrently.It can be configured via the FLUID_MAX_CONCURRENT_TASKS
environment variable, and by default is set to 5.
fluid.scheduler.consumer.TaskDispatcher
¶
Bases: Dispatcher[TaskRun]
The task dispatcher is responsible for dispatching task run messages
Source code in fluid/utils/dispatcher.py
register_handler
¶
Register a handler for the given event
It is possible to register multiple handlers for the same event type by providing a different tag for each handler.
For example, to register two handlers for the event type foo
:
dispatcher.register_handler("foo.first", handler1)
dispatcher.register_handler("foo.second", handler2)
PARAMETER | DESCRIPTION |
---|---|
event
|
The event to register the handler for
TYPE:
|
handler
|
The handler to register
TYPE:
|
Source code in fluid/utils/dispatcher.py
unregister_handler
¶
Unregister a handler for the given event
It returns the handler that was unregistered or None
if no handler was
registered for the given event.
PARAMETER | DESCRIPTION |
---|---|
event
|
The event to unregister the handler
TYPE:
|
Source code in fluid/utils/dispatcher.py
get_handlers
¶
Get all event handlers for the given message
This method returns a dictionary of all handlers registered for the given
message type. If no handlers are registered for the message type, it returns
None
.
PARAMETER | DESCRIPTION |
---|---|
message
|
The message to get the handlers for
TYPE:
|
Source code in fluid/utils/dispatcher.py
dispatch
¶
dispatch the message to all handlers
It returns the number of handlers that were called