Coverage for src/couchers/models/background_jobs.py: 100%
31 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-25 10:58 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-25 10:58 +0000
1import enum
2from datetime import datetime
3from typing import Any
5from sqlalchemy import BigInteger, DateTime, Enum, Index, Integer, String, func, text
6from sqlalchemy import LargeBinary as Binary
7from sqlalchemy.ext.hybrid import hybrid_property
8from sqlalchemy.orm import Mapped, mapped_column
10from couchers.models.base import Base
13class BackgroundJobState(enum.Enum):
14 # job is fresh, waiting to be picked off the queue
15 pending = enum.auto()
16 # job complete
17 completed = enum.auto()
18 # error occurred, will be retried
19 error = enum.auto()
20 # failed too many times, not retrying anymore
21 failed = enum.auto()
24class BackgroundJob(Base):
25 """
26 This table implements a queue of background jobs.
27 """
29 __tablename__ = "background_jobs"
31 id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
33 # used to discern which function should be triggered to service it
34 job_type: Mapped[str] = mapped_column(String)
35 state: Mapped[BackgroundJobState] = mapped_column(Enum(BackgroundJobState), default=BackgroundJobState.pending)
37 # time queued
38 queued: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
40 # time at which we may next attempt it, for implementing exponential backoff
41 next_attempt_after: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
43 # used to count number of retries for failed jobs
44 try_count: Mapped[int] = mapped_column(Integer, default=0)
46 max_tries: Mapped[int] = mapped_column(Integer, default=5)
48 # higher is more important
49 priority: Mapped[int] = mapped_column(Integer, server_default=text("10"))
51 # protobuf encoded job payload
52 payload: Mapped[bytes] = mapped_column(Binary)
54 # if the job failed, we write that info here
55 failure_info: Mapped[str | None] = mapped_column(String, nullable=True)
57 __table_args__ = (
58 # used in looking up background jobs to attempt
59 # create index on background_jobs(priority desc, next_attempt_after, (max_tries - try_count)) where state = 'pending' OR state = 'error';
60 Index(
61 "ix_background_jobs_lookup",
62 priority.desc(),
63 next_attempt_after,
64 (max_tries - try_count),
65 postgresql_where=((state == BackgroundJobState.pending) | (state == BackgroundJobState.error)),
66 ),
67 )
69 @hybrid_property
70 def ready_for_retry(self) -> Any:
71 return (
72 (self.next_attempt_after <= func.now())
73 & (self.try_count < self.max_tries)
74 & ((self.state == BackgroundJobState.pending) | (self.state == BackgroundJobState.error))
75 )
77 def __repr__(self) -> str:
78 return f"BackgroundJob(id={self.id}, job_type={self.job_type}, state={self.state}, next_attempt_after={self.next_attempt_after}, try_count={self.try_count}, failure_info={self.failure_info})"