Coverage for src/couchers/models/background_jobs.py: 100%

31 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-25 10:58 +0000

1import enum 

2from datetime import datetime 

3from typing import Any 

4 

5from sqlalchemy import BigInteger, DateTime, Enum, Index, Integer, String, func, text 

6from sqlalchemy import LargeBinary as Binary 

7from sqlalchemy.ext.hybrid import hybrid_property 

8from sqlalchemy.orm import Mapped, mapped_column 

9 

10from couchers.models.base import Base 

11 

12 

13class BackgroundJobState(enum.Enum): 

14 # job is fresh, waiting to be picked off the queue 

15 pending = enum.auto() 

16 # job complete 

17 completed = enum.auto() 

18 # error occurred, will be retried 

19 error = enum.auto() 

20 # failed too many times, not retrying anymore 

21 failed = enum.auto() 

22 

23 

24class BackgroundJob(Base): 

25 """ 

26 This table implements a queue of background jobs. 

27 """ 

28 

29 __tablename__ = "background_jobs" 

30 

31 id: Mapped[int] = mapped_column(BigInteger, primary_key=True) 

32 

33 # used to discern which function should be triggered to service it 

34 job_type: Mapped[str] = mapped_column(String) 

35 state: Mapped[BackgroundJobState] = mapped_column(Enum(BackgroundJobState), default=BackgroundJobState.pending) 

36 

37 # time queued 

38 queued: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) 

39 

40 # time at which we may next attempt it, for implementing exponential backoff 

41 next_attempt_after: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) 

42 

43 # used to count number of retries for failed jobs 

44 try_count: Mapped[int] = mapped_column(Integer, default=0) 

45 

46 max_tries: Mapped[int] = mapped_column(Integer, default=5) 

47 

48 # higher is more important 

49 priority: Mapped[int] = mapped_column(Integer, server_default=text("10")) 

50 

51 # protobuf encoded job payload 

52 payload: Mapped[bytes] = mapped_column(Binary) 

53 

54 # if the job failed, we write that info here 

55 failure_info: Mapped[str | None] = mapped_column(String, nullable=True) 

56 

57 __table_args__ = ( 

58 # used in looking up background jobs to attempt 

59 # create index on background_jobs(priority desc, next_attempt_after, (max_tries - try_count)) where state = 'pending' OR state = 'error'; 

60 Index( 

61 "ix_background_jobs_lookup", 

62 priority.desc(), 

63 next_attempt_after, 

64 (max_tries - try_count), 

65 postgresql_where=((state == BackgroundJobState.pending) | (state == BackgroundJobState.error)), 

66 ), 

67 ) 

68 

69 @hybrid_property 

70 def ready_for_retry(self) -> Any: 

71 return ( 

72 (self.next_attempt_after <= func.now()) 

73 & (self.try_count < self.max_tries) 

74 & ((self.state == BackgroundJobState.pending) | (self.state == BackgroundJobState.error)) 

75 ) 

76 

77 def __repr__(self) -> str: 

78 return f"BackgroundJob(id={self.id}, job_type={self.job_type}, state={self.state}, next_attempt_after={self.next_attempt_after}, try_count={self.try_count}, failure_info={self.failure_info})"