Coverage for src/couchers/jobs/worker.py: 82%
105 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1"""
2Background job workers
3"""
5import logging
6import traceback
7from datetime import timedelta
8from inspect import getmembers, isfunction
9from multiprocessing import Process
10from sched import scheduler
11from time import monotonic, perf_counter_ns, sleep
13import sentry_sdk
14import sqlalchemy.exc
15from google.protobuf import empty_pb2
16from opentelemetry import trace
18from couchers.config import config
19from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope
20from couchers.jobs import handlers
21from couchers.jobs.enqueue import queue_job
22from couchers.metrics import (
23 background_jobs_got_job_counter,
24 background_jobs_no_jobs_counter,
25 background_jobs_serialization_errors_counter,
26 jobs_queued_histogram,
27 observe_in_jobs_duration_histogram,
28)
29from couchers.models import BackgroundJob, BackgroundJobState
30from couchers.sql import couchers_select as select
31from couchers.tracing import setup_tracing
32from couchers.utils import now
34logger = logging.getLogger(__name__)
35trace = trace.get_tracer(__name__)
37JOBS = {}
38SCHEDULE = []
40for name, func in getmembers(handlers, isfunction):
41 if hasattr(func, "PAYLOAD"):
42 JOBS[name] = (func.PAYLOAD, func)
43 if hasattr(func, "SCHEDULE"):
44 SCHEDULE.append((name, func.SCHEDULE))
47def process_job():
48 """
49 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed,
50 regardless of failure/success.
51 """
52 logger.debug("Looking for a job")
54 with worker_repeatable_read_session_scope() as session:
55 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction
56 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's
57 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside
58 # psycopg2 that's quite annoying to catch and deal with
59 try:
60 job = (
61 session.execute(
62 select(BackgroundJob)
63 .where(BackgroundJob.ready_for_retry)
64 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc())
65 .limit(1)
66 .with_for_update(skip_locked=True)
67 )
68 .scalars()
69 .one_or_none()
70 )
71 except sqlalchemy.exc.OperationalError:
72 background_jobs_serialization_errors_counter.inc()
73 logger.debug("Serialization error")
74 return False
76 if not job:
77 background_jobs_no_jobs_counter.inc()
78 logger.debug("No pending jobs")
79 return False
81 background_jobs_got_job_counter.inc()
83 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone
84 logger.info(f"Job #{job.id} of type {job.job_type} grabbed")
85 job.try_count += 1
87 message_type, func = JOBS[job.job_type]
89 jobs_queued_histogram.observe((now() - job.queued).total_seconds())
90 try:
91 with trace.start_as_current_span(job.job_type) as rollspan:
92 start = perf_counter_ns()
93 ret = func(message_type.FromString(job.payload))
94 finished = perf_counter_ns()
95 job.state = BackgroundJobState.completed
96 observe_in_jobs_duration_histogram(
97 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9
98 )
99 logger.info(f"Job #{job.id} complete on try number {job.try_count}")
100 except Exception as e:
101 finished = perf_counter_ns()
102 logger.exception(e)
103 sentry_sdk.set_tag("context", "job")
104 sentry_sdk.set_tag("job", job.job_type)
105 sentry_sdk.capture_exception(e)
107 if job.try_count >= job.max_tries:
108 # if we already tried max_tries times, it's permanently failed
109 job.state = BackgroundJobState.failed
110 logger.info(f"Job #{job.id} failed on try number {job.try_count}")
111 else:
112 job.state = BackgroundJobState.error
113 # exponential backoff
114 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count))
115 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}")
116 observe_in_jobs_duration_histogram(
117 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9
118 )
119 # add some info for debugging
120 job.failure_info = traceback.format_exc()
122 if config["IN_TEST"]:
123 raise e
125 # exiting ctx manager commits and releases the row lock
126 return True
129def service_jobs():
130 """
131 Service jobs in an infinite loop
132 """
133 while True:
134 # if no job was found, sleep for a second, otherwise query for another job straight away
135 if not process_job():
136 sleep(1)
139def _run_job_and_schedule(sched, schedule_id):
140 job_type, frequency = SCHEDULE[schedule_id]
141 logger.info(f"Processing job of type {job_type}")
143 # wake ourselves up after frequency
144 sched.enter(
145 frequency.total_seconds(),
146 1,
147 _run_job_and_schedule,
148 argument=(
149 sched,
150 schedule_id,
151 ),
152 )
154 # queue the job
155 with session_scope() as session:
156 queue_job(session, job_type, empty_pb2.Empty())
159def run_scheduler():
160 """
161 Schedules jobs according to schedule in .definitions
162 """
163 sched = scheduler(monotonic, sleep)
165 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE):
166 sched.enter(
167 0,
168 1,
169 _run_job_and_schedule,
170 argument=(
171 sched,
172 schedule_id,
173 ),
174 )
176 sched.run()
179def _run_forever(func):
180 db_post_fork()
181 setup_tracing()
183 while True:
184 try:
185 logger.critical("Background worker starting")
186 func()
187 except Exception as e:
188 logger.critical("Unhandled exception in background worker", exc_info=e)
189 # cool off in case we have some programming error to not hammer the database
190 sleep(60)
193def start_jobs_scheduler():
194 scheduler = Process(
195 target=_run_forever,
196 args=(run_scheduler,),
197 )
198 scheduler.start()
199 return scheduler
202def start_jobs_worker():
203 worker = Process(target=_run_forever, args=(service_jobs,))
204 worker.start()
205 return worker