Coverage for src/couchers/jobs/enqueue.py: 100%
7 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1"""
2Background jobs
3"""
5import logging
7from google.protobuf.message import Message
8from sqlalchemy.orm import Session
10from couchers.models import BackgroundJob
12logger = logging.getLogger(__name__)
15def queue_job(
16 session: Session,
17 job_type: str,
18 payload: Message,
19 max_tries: int | None = None,
20 priority: int | None = None,
21) -> None:
22 session.add(
23 BackgroundJob(
24 job_type=job_type,
25 payload=payload.SerializeToString(),
26 max_tries=max_tries,
27 priority=priority,
28 )
29 )