Coverage for app / backend / src / couchers / jobs / enqueue.py: 88%

13 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1""" 

2Background jobs 

3""" 

4 

5import logging 

6from collections.abc import Callable 

7 

8import google.protobuf.message 

9from sqlalchemy.orm import Session 

10 

11from couchers.models import BackgroundJob 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16def queue_job[T: google.protobuf.message.Message]( 

17 session: Session, 

18 job: Callable[[T], None], 

19 payload: T, 

20 max_tries: int | None = None, 

21 priority: int | None = None, 

22) -> None: 

23 job_model = BackgroundJob(job_type=job.__name__, payload=payload.SerializeToString()) 

24 

25 if max_tries is not None: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true

26 job_model.max_tries = max_tries 

27 

28 if priority is not None: 

29 job_model.priority = priority 

30 

31 session.add(job_model)