Coverage for src/couchers/models/background_jobs.py: 100%
28 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 00:20 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 00:20 +0000
1import enum
3from sqlalchemy import BigInteger, Column, DateTime, Enum, Index, Integer, String, func, text
4from sqlalchemy import LargeBinary as Binary
5from sqlalchemy.ext.hybrid import hybrid_property
7from couchers.models.base import Base
10class BackgroundJobState(enum.Enum):
11 # job is fresh, waiting to be picked off the queue
12 pending = enum.auto()
13 # job complete
14 completed = enum.auto()
15 # error occurred, will be retried
16 error = enum.auto()
17 # failed too many times, not retrying anymore
18 failed = enum.auto()
21class BackgroundJob(Base):
22 """
23 This table implements a queue of background jobs.
24 """
26 __tablename__ = "background_jobs"
28 id = Column(BigInteger, primary_key=True)
30 # used to discern which function should be triggered to service it
31 job_type = Column(String, nullable=False)
32 state = Column(Enum(BackgroundJobState), nullable=False, default=BackgroundJobState.pending)
34 # time queued
35 queued = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
37 # time at which we may next attempt it, for implementing exponential backoff
38 next_attempt_after = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
40 # used to count number of retries for failed jobs
41 try_count = Column(Integer, nullable=False, default=0)
43 max_tries = Column(Integer, nullable=False, default=5)
45 # higher is more important
46 priority = Column(Integer, nullable=False, server_default=text("10"))
48 # protobuf encoded job payload
49 payload = Column(Binary, nullable=False)
51 # if the job failed, we write that info here
52 failure_info = Column(String, nullable=True)
54 __table_args__ = (
55 # used in looking up background jobs to attempt
56 # create index on background_jobs(priority desc, next_attempt_after, (max_tries - try_count)) where state = 'pending' OR state = 'error';
57 Index(
58 "ix_background_jobs_lookup",
59 priority.desc(),
60 next_attempt_after,
61 (max_tries - try_count),
62 postgresql_where=((state == BackgroundJobState.pending) | (state == BackgroundJobState.error)),
63 ),
64 )
66 @hybrid_property
67 def ready_for_retry(self):
68 return (
69 (self.next_attempt_after <= func.now())
70 & (self.try_count < self.max_tries)
71 & ((self.state == BackgroundJobState.pending) | (self.state == BackgroundJobState.error))
72 )
74 def __repr__(self):
75 return f"BackgroundJob(id={self.id}, job_type={self.job_type}, state={self.state}, next_attempt_after={self.next_attempt_after}, try_count={self.try_count}, failure_info={self.failure_info})"