Coverage for src/couchers/jobs/worker.py: 82%
109 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1"""
2Background job workers
3"""
5import logging
6import traceback
7from collections.abc import Callable
8from datetime import timedelta
9from inspect import getmembers, isfunction
10from multiprocessing import Process
11from sched import scheduler
12from time import monotonic, perf_counter_ns, sleep
13from typing import Any
15import sentry_sdk
16import sqlalchemy.exc
17from google.protobuf import empty_pb2
18from opentelemetry import trace
20from couchers.config import config
21from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope
22from couchers.experimentation import setup_experimentation
23from couchers.jobs import handlers
24from couchers.jobs.enqueue import queue_job
25from couchers.metrics import (
26 background_jobs_got_job_counter,
27 background_jobs_no_jobs_counter,
28 background_jobs_serialization_errors_counter,
29 jobs_queued_histogram,
30 observe_in_jobs_duration_histogram,
31)
32from couchers.models import BackgroundJob, BackgroundJobState
33from couchers.sql import couchers_select as select
34from couchers.tracing import setup_tracing
35from couchers.utils import now
37logger = logging.getLogger(__name__)
38tracer = trace.get_tracer(__name__)
40JOBS: dict[str, tuple[Any, Callable[[Any], Any]]] = {}
41SCHEDULE: list[tuple[str, timedelta]] = []
43for name, func in getmembers(handlers, isfunction):
44 if hasattr(func, "PAYLOAD"):
45 JOBS[name] = (func.PAYLOAD, func)
46 if hasattr(func, "SCHEDULE"):
47 SCHEDULE.append((name, func.SCHEDULE))
50def process_job() -> bool:
51 """
52 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed,
53 regardless of failure/success.
54 """
55 logger.debug("Looking for a job")
57 with worker_repeatable_read_session_scope() as session:
58 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction
59 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's
60 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside
61 # psycopg2 that's quite annoying to catch and deal with
62 try:
63 job = (
64 session.execute(
65 select(BackgroundJob)
66 .where(BackgroundJob.ready_for_retry)
67 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc())
68 .limit(1)
69 .with_for_update(skip_locked=True)
70 )
71 .scalars()
72 .one_or_none()
73 )
74 except sqlalchemy.exc.OperationalError:
75 background_jobs_serialization_errors_counter.inc()
76 logger.debug("Serialization error")
77 return False
79 if not job:
80 background_jobs_no_jobs_counter.inc()
81 logger.debug("No pending jobs")
82 return False
84 background_jobs_got_job_counter.inc()
86 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone
87 logger.info(f"Job #{job.id} of type {job.job_type} grabbed")
88 job.try_count += 1
90 message_type, func = JOBS[job.job_type]
92 jobs_queued_histogram.observe((now() - job.queued).total_seconds())
93 try:
94 with tracer.start_as_current_span(job.job_type) as rollspan:
95 start = perf_counter_ns()
96 ret = func(message_type.FromString(job.payload))
97 finished = perf_counter_ns()
98 job.state = BackgroundJobState.completed
99 observe_in_jobs_duration_histogram(
100 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9
101 )
102 logger.info(f"Job #{job.id} complete on try number {job.try_count}")
103 except Exception as e:
104 finished = perf_counter_ns()
105 logger.exception(e)
106 sentry_sdk.set_tag("context", "job")
107 sentry_sdk.set_tag("job", job.job_type)
108 sentry_sdk.capture_exception(e)
110 if job.try_count >= job.max_tries:
111 # if we already tried max_tries times, it's permanently failed
112 job.state = BackgroundJobState.failed
113 logger.info(f"Job #{job.id} failed on try number {job.try_count}")
114 else:
115 job.state = BackgroundJobState.error
116 # exponential backoff
117 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count))
118 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}")
119 observe_in_jobs_duration_histogram(
120 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9
121 )
122 # add some info for debugging
123 job.failure_info = traceback.format_exc()
125 if config["IN_TEST"]:
126 raise e
128 # exiting ctx manager commits and releases the row lock
129 return True
132def service_jobs() -> None:
133 """
134 Service jobs in an infinite loop
135 """
136 while True:
137 # if no job was found, sleep for a second, otherwise query for another job straight away
138 if not process_job():
139 sleep(1)
142def _run_job_and_schedule(sched: scheduler, schedule_id: int) -> None:
143 job_type, frequency = SCHEDULE[schedule_id]
144 logger.info(f"Processing job of type {job_type}")
146 # wake ourselves up after frequency
147 sched.enter(
148 frequency.total_seconds(),
149 1,
150 _run_job_and_schedule,
151 argument=(
152 sched,
153 schedule_id,
154 ),
155 )
157 # queue the job
158 with session_scope() as session:
159 queue_job(session, job_type, empty_pb2.Empty())
162def run_scheduler() -> None:
163 """
164 Schedules jobs according to schedule in .definitions
165 """
166 sched = scheduler(monotonic, sleep)
168 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE):
169 sched.enter(
170 0,
171 1,
172 _run_job_and_schedule,
173 argument=(
174 sched,
175 schedule_id,
176 ),
177 )
179 sched.run()
182def _run_forever(func: Callable[[], None]) -> None:
183 # Post-fork initialization: these services use threading/async internals that
184 # don't survive fork() and must be initialized fresh in each child process
185 db_post_fork()
186 setup_tracing()
187 setup_experimentation() # Must be initialized after fork - see couchers/experimentation.py
189 while True:
190 try:
191 logger.critical("Background worker starting")
192 func()
193 except Exception as e:
194 logger.critical("Unhandled exception in background worker", exc_info=e)
195 # cool off in case we have some programming error to not hammer the database
196 sleep(60)
199def start_jobs_scheduler() -> Process:
200 scheduler = Process(
201 target=_run_forever,
202 args=(run_scheduler,),
203 )
204 scheduler.start()
205 return scheduler
208def start_jobs_worker() -> Process:
209 worker = Process(target=_run_forever, args=(service_jobs,))
210 worker.start()
211 return worker