Coverage for app / backend / src / couchers / jobs / worker.py: 81%
101 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1"""
2Background job workers
3"""
5import logging
6import traceback
7from collections.abc import Callable
8from datetime import timedelta
9from multiprocessing import Process
10from sched import scheduler
11from time import monotonic, perf_counter_ns, sleep
12from typing import Any
14import sentry_sdk
15import sqlalchemy.exc
16from google.protobuf import empty_pb2
17from opentelemetry import trace
18from sqlalchemy import select
20from couchers.config import config
21from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope
22from couchers.experimentation import setup_experimentation
23from couchers.jobs.definitions import JOBS, Job
24from couchers.jobs.enqueue import queue_job
25from couchers.metrics import (
26 background_jobs_got_job_counter,
27 background_jobs_no_jobs_counter,
28 background_jobs_serialization_errors_counter,
29 jobs_queued_histogram,
30 observe_in_jobs_duration_histogram,
31)
32from couchers.models import BackgroundJob, BackgroundJobState
33from couchers.tracing import setup_tracing
34from couchers.utils import now
36logger = logging.getLogger(__name__)
37tracer = trace.get_tracer(__name__)
40def process_job() -> bool:
41 """
42 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed,
43 regardless of failure/success.
44 """
45 logger.debug("Looking for a job")
47 with worker_repeatable_read_session_scope() as session:
48 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction
49 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's
50 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside
51 # psycopg2 that's quite annoying to catch and deal with
52 try:
53 job = (
54 session.execute(
55 select(BackgroundJob)
56 .where(BackgroundJob.ready_for_retry)
57 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc())
58 .limit(1)
59 .with_for_update(skip_locked=True)
60 )
61 .scalars()
62 .one_or_none()
63 )
64 except sqlalchemy.exc.OperationalError:
65 background_jobs_serialization_errors_counter.inc()
66 logger.debug("Serialization error")
67 return False
69 if not job:
70 background_jobs_no_jobs_counter.inc()
71 logger.debug("No pending jobs")
72 return False
74 background_jobs_got_job_counter.inc()
76 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone
77 logger.info(f"Job #{job.id} of type {job.job_type} grabbed")
78 job.try_count += 1
80 job_def = JOBS[job.job_type]
82 jobs_queued_histogram.observe((now() - job.queued).total_seconds())
83 try:
84 with tracer.start_as_current_span(job.job_type) as rollspan:
85 start = perf_counter_ns()
86 job_def.handler(job_def.payload_type.FromString(job.payload))
87 finished = perf_counter_ns()
88 job.state = BackgroundJobState.completed
89 observe_in_jobs_duration_histogram(
90 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9
91 )
92 logger.info(f"Job #{job.id} complete on try number {job.try_count}")
93 except Exception as e:
94 finished = perf_counter_ns()
95 logger.exception(e)
96 sentry_sdk.set_tag("context", "job")
97 sentry_sdk.set_tag("job", job.job_type)
98 sentry_sdk.capture_exception(e)
100 if job.try_count >= job.max_tries:
101 # if we already tried max_tries times, it's permanently failed
102 job.state = BackgroundJobState.failed
103 logger.info(f"Job #{job.id} failed on try number {job.try_count}")
104 else:
105 job.state = BackgroundJobState.error
106 # exponential backoff
107 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count))
108 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}")
109 observe_in_jobs_duration_histogram(
110 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9
111 )
112 # add some info for debugging
113 job.failure_info = traceback.format_exc()
115 if config["IN_TEST"]:
116 raise e
118 # exiting ctx manager commits and releases the row lock
119 return True
122def service_jobs() -> None:
123 """
124 Service jobs in an infinite loop
125 """
126 while True:
127 # if no job was found, sleep for a second, otherwise query for another job straight away
128 if not process_job():
129 sleep(1)
132def _run_job_and_schedule(sched: scheduler, job_def: Job[Any], frequency: timedelta) -> None:
133 logger.info(f"Processing job of type {job_def.name}")
135 # wake ourselves up after frequency
136 sched.enter(
137 delay=frequency.total_seconds(),
138 priority=1,
139 action=_run_job_and_schedule,
140 argument=(
141 sched,
142 job_def,
143 frequency,
144 ),
145 )
147 # queue the job
148 with session_scope() as session:
149 queue_job(session, job=job_def.handler, payload=empty_pb2.Empty())
152def run_scheduler() -> None:
153 """
154 Schedules jobs according to schedule in JOBS
155 """
156 sched = scheduler(monotonic, sleep)
158 for job_type, job_def in JOBS.items():
159 if job_def.schedule is not None: 159 ↛ 158line 159 didn't jump to line 158 because the condition on line 159 was always true
160 sched.enter(
161 delay=0,
162 priority=1,
163 action=_run_job_and_schedule,
164 argument=(
165 sched,
166 job_def,
167 job_def.schedule,
168 ),
169 )
171 sched.run()
174def _run_forever(func: Callable[[], None]) -> None:
175 # Post-fork initialization: these services use threading/async internals that
176 # don't survive fork() and must be initialized fresh in each child process
177 db_post_fork()
178 setup_tracing()
179 setup_experimentation() # Must be initialized after fork - see couchers/experimentation.py
181 while True:
182 try:
183 logger.critical("Background worker starting")
184 func()
185 except Exception as e:
186 logger.critical("Unhandled exception in background worker", exc_info=e)
187 # cool off in case we have some programming error to not hammer the database
188 sleep(60)
191def start_jobs_scheduler() -> Process:
192 scheduler = Process(
193 target=_run_forever,
194 args=(run_scheduler,),
195 )
196 scheduler.start()
197 return scheduler
200def start_jobs_worker() -> Process:
201 worker = Process(target=_run_forever, args=(service_jobs,))
202 worker.start()
203 return worker