Coverage for src/couchers/jobs/enqueue.py: 100%

7 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1""" 

2Background jobs 

3""" 

4 

5import logging 

6 

7from google.protobuf.message import Message 

8from sqlalchemy.orm import Session 

9 

10from couchers.models import BackgroundJob 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15def queue_job( 

16 session: Session, 

17 job_type: str, 

18 payload: Message, 

19 max_tries: int | None = None, 

20 priority: int | None = None, 

21) -> None: 

22 session.add( 

23 BackgroundJob( 

24 job_type=job_type, 

25 payload=payload.SerializeToString(), 

26 max_tries=max_tries, 

27 priority=priority, 

28 ) 

29 )