Coverage for src/couchers/jobs/worker.py: 82%
105 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-24 14:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-24 14:08 +0000
1"""
2Background job workers
3"""
5import logging
6import traceback
7from datetime import timedelta
8from inspect import getmembers, isfunction
9from multiprocessing import Process
10from sched import scheduler
11from time import monotonic, perf_counter_ns, sleep
13import sentry_sdk
14import sqlalchemy.exc
15from google.protobuf import empty_pb2
16from opentelemetry import trace
18from couchers.config import config
19from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope
20from couchers.jobs import handlers
21from couchers.jobs.enqueue import queue_job
22from couchers.metrics import (
23 background_jobs_got_job_counter,
24 background_jobs_no_jobs_counter,
25 background_jobs_serialization_errors_counter,
26 jobs_queued_histogram,
27 observe_in_jobs_duration_histogram,
28)
29from couchers.models import BackgroundJob, BackgroundJobState
30from couchers.sql import couchers_select as select
31from couchers.tracing import setup_tracing
32from couchers.utils import now
34logger = logging.getLogger(__name__)
35trace = trace.get_tracer(__name__)
37JOBS = {}
38SCHEDULE = []
40for name, func in getmembers(handlers, isfunction):
41 if hasattr(func, "PAYLOAD"):
42 JOBS[name] = (func.PAYLOAD, func)
43 if hasattr(func, "SCHEDULE"):
44 SCHEDULE.append((name, func.SCHEDULE))
47def process_job():
48 """
49 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed,
50 regardless of failure/success.
51 """
52 logger.debug("Looking for a job")
54 with worker_repeatable_read_session_scope() as session:
55 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction
56 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's
57 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside
58 # psycopg2 that's quite annoying to catch and deal with
59 try:
60 job = (
61 session.execute(
62 select(BackgroundJob)
63 .where(BackgroundJob.ready_for_retry)
64 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc())
65 .with_for_update(skip_locked=True)
66 )
67 .scalars()
68 .first()
69 )
70 except sqlalchemy.exc.OperationalError:
71 background_jobs_serialization_errors_counter.inc()
72 logger.debug("Serialization error")
73 return False
75 if not job:
76 background_jobs_no_jobs_counter.inc()
77 logger.debug("No pending jobs")
78 return False
80 background_jobs_got_job_counter.inc()
82 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone
83 logger.info(f"Job #{job.id} of type {job.job_type} grabbed")
84 job.try_count += 1
86 message_type, func = JOBS[job.job_type]
88 jobs_queued_histogram.observe((now() - job.queued).total_seconds())
89 try:
90 with trace.start_as_current_span(job.job_type) as rollspan:
91 start = perf_counter_ns()
92 ret = func(message_type.FromString(job.payload))
93 finished = perf_counter_ns()
94 job.state = BackgroundJobState.completed
95 observe_in_jobs_duration_histogram(
96 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9
97 )
98 logger.info(f"Job #{job.id} complete on try number {job.try_count}")
99 except Exception as e:
100 finished = perf_counter_ns()
101 logger.exception(e)
102 sentry_sdk.set_tag("context", "job")
103 sentry_sdk.set_tag("job", job.job_type)
104 sentry_sdk.capture_exception(e)
106 if job.try_count >= job.max_tries:
107 # if we already tried max_tries times, it's permanently failed
108 job.state = BackgroundJobState.failed
109 logger.info(f"Job #{job.id} failed on try number {job.try_count}")
110 else:
111 job.state = BackgroundJobState.error
112 # exponential backoff
113 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count))
114 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}")
115 observe_in_jobs_duration_histogram(
116 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9
117 )
118 # add some info for debugging
119 job.failure_info = traceback.format_exc()
121 if config["IN_TEST"]:
122 raise e
124 # exiting ctx manager commits and releases the row lock
125 return True
128def service_jobs():
129 """
130 Service jobs in an infinite loop
131 """
132 while True:
133 # if no job was found, sleep for a second, otherwise query for another job straight away
134 if not process_job():
135 sleep(1)
138def _run_job_and_schedule(sched, schedule_id):
139 job_type, frequency = SCHEDULE[schedule_id]
140 logger.info(f"Processing job of type {job_type}")
142 # wake ourselves up after frequency
143 sched.enter(
144 frequency.total_seconds(),
145 1,
146 _run_job_and_schedule,
147 argument=(
148 sched,
149 schedule_id,
150 ),
151 )
153 # queue the job
154 with session_scope() as session:
155 queue_job(session, job_type, empty_pb2.Empty())
158def run_scheduler():
159 """
160 Schedules jobs according to schedule in .definitions
161 """
162 sched = scheduler(monotonic, sleep)
164 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE):
165 sched.enter(
166 0,
167 1,
168 _run_job_and_schedule,
169 argument=(
170 sched,
171 schedule_id,
172 ),
173 )
175 sched.run()
178def _run_forever(func):
179 db_post_fork()
180 setup_tracing()
182 while True:
183 try:
184 logger.critical("Background worker starting")
185 func()
186 except Exception as e:
187 logger.critical("Unhandled exception in background worker", exc_info=e)
188 # cool off in case we have some programming error to not hammer the database
189 sleep(60)
192def start_jobs_scheduler():
193 scheduler = Process(
194 target=_run_forever,
195 args=(run_scheduler,),
196 )
197 scheduler.start()
198 return scheduler
201def start_jobs_worker():
202 worker = Process(target=_run_forever, args=(service_jobs,))
203 worker.start()
204 return worker