Coverage for src/couchers/jobs/worker.py: 85%
99 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1"""
2Background job workers
3"""
5import logging
6import traceback
7from datetime import timedelta
8from inspect import getmembers, isfunction
9from multiprocessing import Process
10from sched import scheduler
11from time import monotonic, perf_counter_ns, sleep
13import sentry_sdk
14from google.protobuf import empty_pb2
15from opentelemetry import trace
17from couchers.config import config
18from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope
19from couchers.jobs import handlers
20from couchers.jobs.enqueue import queue_job
21from couchers.metrics import create_prometheus_server, job_process_registry, observe_in_jobs_duration_histogram
22from couchers.models import BackgroundJob, BackgroundJobState
23from couchers.sql import couchers_select as select
24from couchers.tracing import setup_tracing
26logger = logging.getLogger(__name__)
27trace = trace.get_tracer(__name__)
29JOBS = {}
30SCHEDULE = []
32for name, func in getmembers(handlers, isfunction):
33 if hasattr(func, "PAYLOAD"):
34 JOBS[name] = (func.PAYLOAD, func)
35 if hasattr(func, "SCHEDULE"):
36 SCHEDULE.append((name, func.SCHEDULE))
39def process_job():
40 """
41 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed,
42 regardless of failure/success.
43 """
44 logger.debug("Looking for a job")
46 with worker_repeatable_read_session_scope() as session:
47 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction
48 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's
49 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside
50 # psycopg2 that's quite annoying to catch and deal with
51 job = (
52 session.execute(
53 select(BackgroundJob).where(BackgroundJob.ready_for_retry).with_for_update(skip_locked=True)
54 )
55 .scalars()
56 .first()
57 )
59 if not job:
60 logger.debug("No pending jobs")
61 return False
63 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone
64 logger.info(f"Job #{job.id} of type {job.job_type} grabbed")
65 job.try_count += 1
67 message_type, func = JOBS[job.job_type]
69 try:
70 with trace.start_as_current_span(job.job_type) as rollspan:
71 start = perf_counter_ns()
72 ret = func(message_type.FromString(job.payload))
73 finished = perf_counter_ns()
74 job.state = BackgroundJobState.completed
75 observe_in_jobs_duration_histogram(
76 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9
77 )
78 logger.info(f"Job #{job.id} complete on try number {job.try_count}")
79 except Exception as e:
80 finished = perf_counter_ns()
81 logger.exception(e)
82 sentry_sdk.set_tag("context", "job")
83 sentry_sdk.set_tag("job", job.job_type)
84 sentry_sdk.capture_exception(e)
86 if job.try_count >= job.max_tries:
87 # if we already tried max_tries times, it's permanently failed
88 job.state = BackgroundJobState.failed
89 logger.info(f"Job #{job.id} failed on try number {job.try_count}")
90 else:
91 job.state = BackgroundJobState.error
92 # exponential backoff
93 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count))
94 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}")
95 observe_in_jobs_duration_histogram(
96 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9
97 )
98 # add some info for debugging
99 job.failure_info = traceback.format_exc()
101 if config["IN_TEST"]:
102 raise e
104 # exiting ctx manager commits and releases the row lock
105 return True
108def service_jobs():
109 """
110 Service jobs in an infinite loop
111 """
112 t = create_prometheus_server(job_process_registry, 8001)
113 try:
114 while True:
115 # if no job was found, sleep for a second, otherwise query for another job straight away
116 if not process_job():
117 sleep(1)
118 finally:
119 logger.info("Closing prometheus server")
120 t.server_close()
123def _run_job_and_schedule(sched, schedule_id):
124 job_type, frequency = SCHEDULE[schedule_id]
125 logger.info(f"Processing job of type {job_type}")
127 # wake ourselves up after frequency
128 sched.enter(
129 frequency.total_seconds(),
130 1,
131 _run_job_and_schedule,
132 argument=(
133 sched,
134 schedule_id,
135 ),
136 )
138 # queue the job
139 with session_scope() as session:
140 queue_job(session, job_type, empty_pb2.Empty())
143def run_scheduler():
144 """
145 Schedules jobs according to schedule in .definitions
146 """
147 sched = scheduler(monotonic, sleep)
149 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE):
150 sched.enter(
151 0,
152 1,
153 _run_job_and_schedule,
154 argument=(
155 sched,
156 schedule_id,
157 ),
158 )
160 sched.run()
163def _run_forever(func):
164 db_post_fork()
165 setup_tracing()
167 while True:
168 try:
169 logger.critical("Background worker starting")
170 func()
171 except Exception as e:
172 logger.critical("Unhandled exception in background worker", exc_info=e)
173 # cool off in case we have some programming error to not hammer the database
174 sleep(60)
177def start_jobs_scheduler():
178 scheduler = Process(
179 target=_run_forever,
180 args=(run_scheduler,),
181 )
182 scheduler.start()
183 return scheduler
186def start_jobs_worker():
187 worker = Process(target=_run_forever, args=(service_jobs,))
188 worker.start()
189 return worker