Coverage for app / backend / src / couchers / jobs / enqueue.py: 88%
13 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1"""
2Background jobs
3"""
5import logging
6from collections.abc import Callable
8import google.protobuf.message
9from sqlalchemy.orm import Session
11from couchers.models import BackgroundJob
13logger = logging.getLogger(__name__)
16def queue_job[T: google.protobuf.message.Message](
17 session: Session,
18 job: Callable[[T], None],
19 payload: T,
20 max_tries: int | None = None,
21 priority: int | None = None,
22) -> None:
23 job_model = BackgroundJob(job_type=job.__name__, payload=payload.SerializeToString())
25 if max_tries is not None: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true
26 job_model.max_tries = max_tries
28 if priority is not None:
29 job_model.priority = priority
31 session.add(job_model)