Coverage for src/couchers/models/background_jobs.py: 100%

28 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 00:20 +0000

1import enum 

2 

3from sqlalchemy import BigInteger, Column, DateTime, Enum, Index, Integer, String, func, text 

4from sqlalchemy import LargeBinary as Binary 

5from sqlalchemy.ext.hybrid import hybrid_property 

6 

7from couchers.models.base import Base 

8 

9 

10class BackgroundJobState(enum.Enum): 

11 # job is fresh, waiting to be picked off the queue 

12 pending = enum.auto() 

13 # job complete 

14 completed = enum.auto() 

15 # error occurred, will be retried 

16 error = enum.auto() 

17 # failed too many times, not retrying anymore 

18 failed = enum.auto() 

19 

20 

21class BackgroundJob(Base): 

22 """ 

23 This table implements a queue of background jobs. 

24 """ 

25 

26 __tablename__ = "background_jobs" 

27 

28 id = Column(BigInteger, primary_key=True) 

29 

30 # used to discern which function should be triggered to service it 

31 job_type = Column(String, nullable=False) 

32 state = Column(Enum(BackgroundJobState), nullable=False, default=BackgroundJobState.pending) 

33 

34 # time queued 

35 queued = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) 

36 

37 # time at which we may next attempt it, for implementing exponential backoff 

38 next_attempt_after = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) 

39 

40 # used to count number of retries for failed jobs 

41 try_count = Column(Integer, nullable=False, default=0) 

42 

43 max_tries = Column(Integer, nullable=False, default=5) 

44 

45 # higher is more important 

46 priority = Column(Integer, nullable=False, server_default=text("10")) 

47 

48 # protobuf encoded job payload 

49 payload = Column(Binary, nullable=False) 

50 

51 # if the job failed, we write that info here 

52 failure_info = Column(String, nullable=True) 

53 

54 __table_args__ = ( 

55 # used in looking up background jobs to attempt 

56 # create index on background_jobs(priority desc, next_attempt_after, (max_tries - try_count)) where state = 'pending' OR state = 'error'; 

57 Index( 

58 "ix_background_jobs_lookup", 

59 priority.desc(), 

60 next_attempt_after, 

61 (max_tries - try_count), 

62 postgresql_where=((state == BackgroundJobState.pending) | (state == BackgroundJobState.error)), 

63 ), 

64 ) 

65 

66 @hybrid_property 

67 def ready_for_retry(self): 

68 return ( 

69 (self.next_attempt_after <= func.now()) 

70 & (self.try_count < self.max_tries) 

71 & ((self.state == BackgroundJobState.pending) | (self.state == BackgroundJobState.error)) 

72 ) 

73 

74 def __repr__(self): 

75 return f"BackgroundJob(id={self.id}, job_type={self.job_type}, state={self.state}, next_attempt_after={self.next_attempt_after}, try_count={self.try_count}, failure_info={self.failure_info})"