Coverage for src/couchers/jobs/worker.py: 85%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Background job workers
3"""
4import logging
5import traceback
6from datetime import timedelta
7from multiprocessing import Process
8from sched import scheduler
9from time import monotonic, sleep
11import sentry_sdk
12from google.protobuf import empty_pb2
14from couchers.db import get_engine, session_scope
15from couchers.jobs.definitions import JOBS, SCHEDULE
16from couchers.jobs.enqueue import queue_job
17from couchers.metrics import create_prometheus_server, job_process_registry, jobs_counter
18from couchers.models import BackgroundJob, BackgroundJobState
19from couchers.sql import couchers_select as select
21logger = logging.getLogger(__name__)
24def process_job():
25 """
26 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed,
27 regardless of failure/success.
28 """
29 logger.debug(f"Looking for a job")
31 with session_scope(isolation_level="REPEATABLE READ") as session:
32 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction
33 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's
34 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside
35 # psycopg2 that's quite annoying to catch and deal with
36 job = (
37 session.execute(
38 select(BackgroundJob).where(BackgroundJob.ready_for_retry).with_for_update(skip_locked=True)
39 )
40 .scalars()
41 .first()
42 )
44 if not job:
45 logger.debug(f"No pending jobs")
46 return False
48 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone
49 logger.info(f"Job #{job.id} grabbed")
50 job.try_count += 1
52 message_type, func = JOBS[job.job_type]
54 try:
55 ret = func(message_type.FromString(job.payload))
56 job.state = BackgroundJobState.completed
57 jobs_counter.labels(job.job_type.name, job.state.name, str(job.try_count), "").inc()
58 logger.info(f"Job #{job.id} complete on try number {job.try_count}")
59 except Exception as e:
60 logger.exception(e)
61 sentry_sdk.set_tag("context", "job")
62 sentry_sdk.set_tag("job", job.job_type.name)
63 sentry_sdk.capture_exception(e)
65 if job.try_count >= job.max_tries:
66 # if we already tried max_tries times, it's permanently failed
67 job.state = BackgroundJobState.failed
68 logger.info(f"Job #{job.id} failed on try number {job.try_count}")
69 else:
70 job.state = BackgroundJobState.error
71 # exponential backoff
72 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count))
73 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}")
74 # add some info for debugging
75 jobs_counter.labels(job.job_type.name, job.state.name, str(job.try_count), type(e).__name__).inc()
76 job.failure_info = traceback.format_exc()
78 # exiting ctx manager commits and releases the row lock
79 return True
82def service_jobs():
83 """
84 Service jobs in an infinite loop
85 """
86 get_engine().dispose()
87 t = create_prometheus_server(job_process_registry, 8001)
88 try:
89 while True:
90 # if no job was found, sleep for a second, otherwise query for another job straight away
91 if not process_job():
92 sleep(1)
93 finally:
94 logger.info(f"Closing prometheus server")
95 t.server_close()
98def _run_job_and_schedule(sched, schedule_id):
99 job_type, frequency = SCHEDULE[schedule_id]
100 logger.info(f"Processing job of type {job_type}")
102 # wake ourselves up after frequency
103 sched.enter(
104 frequency.total_seconds(),
105 1,
106 _run_job_and_schedule,
107 argument=(
108 sched,
109 schedule_id,
110 ),
111 )
113 # queue the job
114 queue_job(job_type, empty_pb2.Empty())
117def run_scheduler():
118 """
119 Schedules jobs according to schedule in .definitions
120 """
121 # multiprocessing uses fork() which in turn copies file descriptors, so the engine may have connections in its pool
122 # that we don't want to reuse. This is the SQLALchemy-recommended way of clearing the connection pool in this thread
123 get_engine().dispose()
125 sched = scheduler(monotonic, sleep)
127 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE):
128 sched.enter(
129 0,
130 1,
131 _run_job_and_schedule,
132 argument=(
133 sched,
134 schedule_id,
135 ),
136 )
138 sched.run()
141def _run_forever(func):
142 while True:
143 try:
144 logger.critical("Background worker starting")
145 func()
146 except Exception as e:
147 logger.critical("Unhandled exception in background worker", exc_info=e)
148 # cool off in case we have some programming error to not hammer the database
149 sleep(60)
152def start_jobs_scheduler():
153 scheduler = Process(
154 target=_run_forever,
155 args=(run_scheduler,),
156 )
157 scheduler.start()
158 return scheduler
161def start_jobs_worker():
162 worker = Process(target=_run_forever, args=(service_jobs,))
163 worker.start()
164 return worker