Coverage for app/backend/src/couchers/jobs/worker.py: 79%
104 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1"""
2Background job workers
3"""
5import logging
6import traceback
7from collections.abc import Callable
8from datetime import timedelta
9from multiprocessing import Process
10from sched import scheduler
11from time import monotonic, perf_counter_ns, sleep
12from typing import Any
14import sentry_sdk
15import sqlalchemy.exc
16from google.protobuf import empty_pb2
17from opentelemetry import trace
18from sqlalchemy import select
20from couchers.config import config
21from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope
22from couchers.experimentation import setup_experimentation
23from couchers.jobs.definitions import JOBS, Job
24from couchers.jobs.enqueue import queue_job
25from couchers.metrics import (
26 background_jobs_got_job_counter,
27 background_jobs_no_jobs_counter,
28 background_jobs_serialization_errors_counter,
29 jobs_queued_histogram,
30 observe_in_jobs_duration_histogram,
31)
32from couchers.models import BackgroundJob, BackgroundJobState
33from couchers.profiling import setup_profiling
34from couchers.tracing import setup_tracing
35from couchers.utils import now
37logger = logging.getLogger(__name__)
38tracer = trace.get_tracer(__name__)
41def process_job() -> bool:
42 """
43 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed,
44 regardless of failure/success.
45 """
46 logger.debug("Looking for a job")
48 with worker_repeatable_read_session_scope() as session:
49 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction
50 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's
51 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside
52 # psycopg2 that's quite annoying to catch and deal with
53 try:
54 job = (
55 session.execute(
56 select(BackgroundJob)
57 .where(BackgroundJob.ready_for_retry)
58 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc())
59 .limit(1)
60 .with_for_update(skip_locked=True)
61 )
62 .scalars()
63 .one_or_none()
64 )
65 except sqlalchemy.exc.OperationalError:
66 background_jobs_serialization_errors_counter.inc()
67 logger.debug("Serialization error")
68 return False
70 if not job:
71 background_jobs_no_jobs_counter.inc()
72 logger.debug("No pending jobs")
73 return False
75 background_jobs_got_job_counter.inc()
77 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone
78 logger.info(f"Job #{job.id} of type {job.job_type} grabbed")
79 job.try_count += 1
81 job_def = JOBS[job.job_type]
83 jobs_queued_histogram.labels(str(job.priority)).observe((now() - job.queued).total_seconds())
84 try:
85 with tracer.start_as_current_span(job.job_type) as rollspan:
86 start = perf_counter_ns()
87 job_def.handler(job_def.payload_type.FromString(job.payload))
88 finished = perf_counter_ns()
89 job.state = BackgroundJobState.completed
90 observe_in_jobs_duration_histogram(
91 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9
92 )
93 logger.info(f"Job #{job.id} complete on try number {job.try_count}")
94 except Exception as e:
95 finished = perf_counter_ns()
96 logger.exception(e)
97 sentry_sdk.set_tag("context", "job")
98 sentry_sdk.set_tag("job", job.job_type)
99 sentry_sdk.capture_exception(e)
101 if job.try_count >= job.max_tries:
102 # if we already tried max_tries times, it's permanently failed
103 job.state = BackgroundJobState.failed
104 logger.info(f"Job #{job.id} failed on try number {job.try_count}")
105 else:
106 job.state = BackgroundJobState.error
107 # exponential backoff
108 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count))
109 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}")
110 observe_in_jobs_duration_histogram(
111 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9
112 )
113 # add some info for debugging
114 job.failure_info = traceback.format_exc()
116 if config.IN_TEST:
117 raise e
119 # exiting ctx manager commits and releases the row lock
120 return True
123def service_jobs() -> None:
124 """
125 Service jobs in an infinite loop
126 """
127 while True:
128 # if no job was found, sleep for a second, otherwise query for another job straight away
129 if not process_job():
130 sleep(1)
133def _run_job_and_schedule(sched: scheduler, job_def: Job[Any], frequency: timedelta) -> None:
134 logger.info(f"Processing job of type {job_def.name}")
136 # wake ourselves up after frequency
137 sched.enter(
138 delay=frequency.total_seconds(),
139 priority=1,
140 action=_run_job_and_schedule,
141 argument=(
142 sched,
143 job_def,
144 frequency,
145 ),
146 )
148 # queue the job
149 with session_scope() as session:
150 queue_job(session, job=job_def.handler, payload=empty_pb2.Empty())
153def run_scheduler() -> None:
154 """
155 Schedules jobs according to schedule in JOBS
156 """
157 sched = scheduler(monotonic, sleep)
159 for job_type, job_def in JOBS.items():
160 if job_def.schedule is not None: 160 ↛ 159line 160 didn't jump to line 159 because the condition on line 160 was always true
161 sched.enter(
162 delay=0,
163 priority=1,
164 action=_run_job_and_schedule,
165 argument=(
166 sched,
167 job_def,
168 job_def.schedule,
169 ),
170 )
172 sched.run()
175def _run_forever(func: Callable[[], None], profile_instance: str | None = None) -> None:
176 # Post-fork initialization: these services use threading/async internals that
177 # don't survive fork() and must be initialized fresh in each child process
178 db_post_fork()
179 setup_tracing()
180 setup_experimentation()
181 if profile_instance is not None:
182 setup_profiling(role="worker", instance=profile_instance)
184 while True:
185 try:
186 logger.info("Background worker starting")
187 func()
188 except Exception as e:
189 logger.critical("Unhandled exception in background worker", exc_info=e)
190 # cool off in case we have some programming error to not hammer the database
191 sleep(60)
194def start_jobs_scheduler() -> Process:
195 scheduler = Process(
196 target=_run_forever,
197 args=(run_scheduler,),
198 )
199 scheduler.start()
200 return scheduler
203def start_jobs_worker(index: int) -> Process:
204 worker = Process(target=_run_forever, args=(service_jobs,), kwargs={"profile_instance": f"worker-{index}"})
205 worker.start()
206 return worker