Skip to content

Task Scheduling

Scheduling functions for tasks. They can be imported from fluid.scheduler:

from fastapi.scheduler import every, crontab

fluid.scheduler.Scheduler

Bases: ABC

Base class for all schedulers.

info abstractmethod

info()

Return a string representation of the schedule.

Source code in fluid/scheduler/scheduler_crontab.py
@abstractmethod
def info(self) -> str:
    """Return a string representation of the schedule."""

fluid.scheduler.every

every(delta, delay=timedelta(), jitter=timedelta())

Bases: Scheduler

Source code in fluid/scheduler/scheduler_every.py
def __init__(
    self,
    delta: timedelta,
    delay: timedelta = timedelta(),
    jitter: timedelta = timedelta(),
) -> None:
    self.delta: timedelta = delta
    self.delay: timedelta = delay
    self.jitter: timedelta = jitter
    self._delta: timedelta = self.next_delta()
    self._started: datetime | None = None

delta instance-attribute

delta = delta

delay instance-attribute

delay = delay

jitter instance-attribute

jitter = jitter

info

info()
Source code in fluid/scheduler/scheduler_every.py
def info(self) -> str:
    return str(self.delta)

next_delta

next_delta()
Source code in fluid/scheduler/scheduler_every.py
def next_delta(self) -> timedelta:
    return self.delta + random.uniform(0, 1) * self.jitter

fluid.scheduler.crontab

crontab(
    minute="*",
    hour="*",
    day="*",
    month="*",
    day_of_week="*",
    tz=UTC,
)

Bases: Scheduler

Convert a "crontab"-style set of parameters into a test function that will return True when the given datetime matches the parameters set forth in the crontab. For day-of-week, 0=Sunday and 6=Saturday. Acceptable inputs: * = every distinct value /n = run every "n" times, i.e. hours='/4' == 0, 4, 8, 12, 16, 20 m-n = run every time m..n m,n = run on m and n

Source code in fluid/scheduler/scheduler_crontab.py
def __init__(
    self,
    minute: CI = "*",
    hour: CI = "*",
    day: CI = "*",
    month: CI = "*",
    day_of_week: CI = "*",
    tz: ZoneInfo = UTC,
) -> None:
    self.tz: ZoneInfo = tz
    self._info = (
        f"minute {minute}; hour {hour}; day {day}; month {month}; "
        f"day_of_week {day_of_week}"
    )
    validation = (
        ("m", month, range(1, 13)),
        ("d", day, range(1, 32)),
        ("w", day_of_week, range(8)),  # 0-6, but also 7 for Sunday.
        ("H", hour, range(24)),
        ("M", minute, range(60)),
    )
    cron_settings = []

    for date_str, value, acceptable in validation:
        settings: Set[int] = set()

        if isinstance(value, int):
            value = str(value)

        for piece in value.split(","):
            if piece == "*":
                settings.update(acceptable)
                continue

            if piece.isdigit():
                digit = int(piece)
                if digit not in acceptable:
                    raise ValueError("%d is not a valid input" % digit)
                elif date_str == "w":
                    digit %= 7
                settings.add(digit)

            else:
                dash_match = dash_re.match(piece)
                if dash_match:
                    lhs, rhs = map(int, dash_match.groups())
                    if lhs not in acceptable or rhs not in acceptable:
                        raise ValueError("%s is not a valid input" % piece)
                    elif date_str == "w":
                        lhs %= 7
                        rhs %= 7
                    settings.update(range(lhs, rhs + 1))
                    continue

                # Handle stuff like */3, */6.
                every_match = every_re.match(piece)
                if every_match:
                    if date_str == "w":
                        raise ValueError(
                            "Cannot perform this kind of matching"
                            " on day-of-week."
                        )
                    interval = int(every_match.groups()[0])
                    settings.update(acceptable[::interval])

        cron_settings.append(sorted(list(settings)))
    self.cron_settings = tuple(cron_settings)

tz instance-attribute

tz = tz

cron_settings instance-attribute

cron_settings = tuple(cron_settings)

info

info()
Source code in fluid/scheduler/scheduler_crontab.py
def info(self) -> str:
    return self._info