Coverage for src/couchers/jobs/ 85%
99 statements
« prev ^ index » next v7.6.10, created at 2025-03-11 15:27 +0000
« prev ^ index » next v7.6.10, created at 2025-03-11 15:27 +0000
2Background job workers
5import logging
6import traceback
7from datetime import timedelta
8from inspect import getmembers, isfunction
9from multiprocessing import Process
10from sched import scheduler
11from time import monotonic, perf_counter_ns, sleep
13import sentry_sdk
14from google.protobuf import empty_pb2
15from opentelemetry import trace
17from couchers.config import config
18from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope
19from import handlers
20from import queue_job
21from couchers.metrics import create_prometheus_server, job_process_registry, observe_in_jobs_duration_histogram
22from couchers.models import BackgroundJob, BackgroundJobState
23from couchers.sql import couchers_select as select
24from couchers.tracing import setup_tracing
26logger = logging.getLogger(__name__)
27trace = trace.get_tracer(__name__)
29JOBS = {}
32for name, func in getmembers(handlers, isfunction):
33 if hasattr(func, "PAYLOAD"):
34 JOBS[name] = (func.PAYLOAD, func)
35 if hasattr(func, "SCHEDULE"):
36 SCHEDULE.append((name, func.SCHEDULE))
39def process_job():
40 """
41 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed,
42 regardless of failure/success.
43 """
44 logger.debug("Looking for a job")
46 with worker_repeatable_read_session_scope() as session:
47 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction
48 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's
49 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside
50 # psycopg2 that's quite annoying to catch and deal with
51 job = (
52 session.execute(
53 select(BackgroundJob).where(BackgroundJob.ready_for_retry).with_for_update(skip_locked=True)
54 )
55 .scalars()
56 .first()
57 )
59 if not job:
60 logger.debug("No pending jobs")
61 return False
63 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone
64"Job #{} of type {job.job_type} grabbed")
65 job.try_count += 1
67 message_type, func = JOBS[job.job_type]
69 try:
70 with trace.start_as_current_span(job.job_type) as rollspan:
71 start = perf_counter_ns()
72 ret = func(message_type.FromString(job.payload))
73 finished = perf_counter_ns()
74 job.state = BackgroundJobState.completed
75 observe_in_jobs_duration_histogram(
76 job.job_type,, job.try_count, "", (finished - start) / 1e9
77 )
78"Job #{} complete on try number {job.try_count}")
79 except Exception as e:
80 finished = perf_counter_ns()
81 logger.exception(e)
82 sentry_sdk.set_tag("context", "job")
83 sentry_sdk.set_tag("job", job.job_type)
84 sentry_sdk.capture_exception(e)
86 if job.try_count >= job.max_tries:
87 # if we already tried max_tries times, it's permanently failed
88 job.state = BackgroundJobState.failed
89"Job #{} failed on try number {job.try_count}")
90 else:
91 job.state = BackgroundJobState.error
92 # exponential backoff
93 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count))
94"Job #{} error on try number {job.try_count}, next try at {job.next_attempt_after}")
95 observe_in_jobs_duration_histogram(
96 job.job_type,, job.try_count, type(e).__name__, (finished - start) / 1e9
97 )
98 # add some info for debugging
99 job.failure_info = traceback.format_exc()
101 if config["IN_TEST"]:
102 raise e
104 # exiting ctx manager commits and releases the row lock
105 return True
108def service_jobs():
109 """
110 Service jobs in an infinite loop
111 """
112 t = create_prometheus_server(job_process_registry, 8001)
113 try:
114 while True:
115 # if no job was found, sleep for a second, otherwise query for another job straight away
116 if not process_job():
117 sleep(1)
118 finally:
119"Closing prometheus server")
120 t.server_close()
123def _run_job_and_schedule(sched, schedule_id):
124 job_type, frequency = SCHEDULE[schedule_id]
125"Processing job of type {job_type}")
127 # wake ourselves up after frequency
128 sched.enter(
129 frequency.total_seconds(),
130 1,
131 _run_job_and_schedule,
132 argument=(
133 sched,
134 schedule_id,
135 ),
136 )
138 # queue the job
139 with session_scope() as session:
140 queue_job(session, job_type, empty_pb2.Empty())
143def run_scheduler():
144 """
145 Schedules jobs according to schedule in .definitions
146 """
147 sched = scheduler(monotonic, sleep)
149 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE):
150 sched.enter(
151 0,
152 1,
153 _run_job_and_schedule,
154 argument=(
155 sched,
156 schedule_id,
157 ),
158 )
163def _run_forever(func):
164 db_post_fork()
165 setup_tracing()
167 while True:
168 try:
169 logger.critical("Background worker starting")
170 func()
171 except Exception as e:
172 logger.critical("Unhandled exception in background worker", exc_info=e)
173 # cool off in case we have some programming error to not hammer the database
174 sleep(60)
177def start_jobs_scheduler():
178 scheduler = Process(
179 target=_run_forever,
180 args=(run_scheduler,),
181 )
182 scheduler.start()
183 return scheduler
186def start_jobs_worker():
187 worker = Process(target=_run_forever, args=(service_jobs,))
188 worker.start()
189 return worker