Coverage for app / backend / src / couchers / jobs / worker.py: 81%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1""" 

2Background job workers 

3""" 

4 

5import logging 

6import traceback 

7from collections.abc import Callable 

8from datetime import timedelta 

9from multiprocessing import Process 

10from sched import scheduler 

11from time import monotonic, perf_counter_ns, sleep 

12from typing import Any 

13 

14import sentry_sdk 

15import sqlalchemy.exc 

16from google.protobuf import empty_pb2 

17from opentelemetry import trace 

18from sqlalchemy import select 

19 

20from couchers.config import config 

21from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope 

22from couchers.experimentation import setup_experimentation 

23from couchers.jobs.definitions import JOBS, Job 

24from couchers.jobs.enqueue import queue_job 

25from couchers.metrics import ( 

26 background_jobs_got_job_counter, 

27 background_jobs_no_jobs_counter, 

28 background_jobs_serialization_errors_counter, 

29 jobs_queued_histogram, 

30 observe_in_jobs_duration_histogram, 

31) 

32from couchers.models import BackgroundJob, BackgroundJobState 

33from couchers.tracing import setup_tracing 

34from couchers.utils import now 

35 

36logger = logging.getLogger(__name__) 

37tracer = trace.get_tracer(__name__) 

38 

39 

40def process_job() -> bool: 

41 """ 

42 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed, 

43 regardless of failure/success. 

44 """ 

45 logger.debug("Looking for a job") 

46 

47 with worker_repeatable_read_session_scope() as session: 

48 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction 

49 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's 

50 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside 

51 # psycopg2 that's quite annoying to catch and deal with 

52 try: 

53 job = ( 

54 session.execute( 

55 select(BackgroundJob) 

56 .where(BackgroundJob.ready_for_retry) 

57 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc()) 

58 .limit(1) 

59 .with_for_update(skip_locked=True) 

60 ) 

61 .scalars() 

62 .one_or_none() 

63 ) 

64 except sqlalchemy.exc.OperationalError: 

65 background_jobs_serialization_errors_counter.inc() 

66 logger.debug("Serialization error") 

67 return False 

68 

69 if not job: 

70 background_jobs_no_jobs_counter.inc() 

71 logger.debug("No pending jobs") 

72 return False 

73 

74 background_jobs_got_job_counter.inc() 

75 

76 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone 

77 logger.info(f"Job #{job.id} of type {job.job_type} grabbed") 

78 job.try_count += 1 

79 

80 job_def = JOBS[job.job_type] 

81 

82 jobs_queued_histogram.observe((now() - job.queued).total_seconds()) 

83 try: 

84 with tracer.start_as_current_span(job.job_type) as rollspan: 

85 start = perf_counter_ns() 

86 job_def.handler(job_def.payload_type.FromString(job.payload)) 

87 finished = perf_counter_ns() 

88 job.state = BackgroundJobState.completed 

89 observe_in_jobs_duration_histogram( 

90 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9 

91 ) 

92 logger.info(f"Job #{job.id} complete on try number {job.try_count}") 

93 except Exception as e: 

94 finished = perf_counter_ns() 

95 logger.exception(e) 

96 sentry_sdk.set_tag("context", "job") 

97 sentry_sdk.set_tag("job", job.job_type) 

98 sentry_sdk.capture_exception(e) 

99 

100 if job.try_count >= job.max_tries: 

101 # if we already tried max_tries times, it's permanently failed 

102 job.state = BackgroundJobState.failed 

103 logger.info(f"Job #{job.id} failed on try number {job.try_count}") 

104 else: 

105 job.state = BackgroundJobState.error 

106 # exponential backoff 

107 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count)) 

108 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}") 

109 observe_in_jobs_duration_histogram( 

110 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9 

111 ) 

112 # add some info for debugging 

113 job.failure_info = traceback.format_exc() 

114 

115 if config["IN_TEST"]: 

116 raise e 

117 

118 # exiting ctx manager commits and releases the row lock 

119 return True 

120 

121 

122def service_jobs() -> None: 

123 """ 

124 Service jobs in an infinite loop 

125 """ 

126 while True: 

127 # if no job was found, sleep for a second, otherwise query for another job straight away 

128 if not process_job(): 

129 sleep(1) 

130 

131 

132def _run_job_and_schedule(sched: scheduler, job_def: Job[Any], frequency: timedelta) -> None: 

133 logger.info(f"Processing job of type {job_def.name}") 

134 

135 # wake ourselves up after frequency 

136 sched.enter( 

137 delay=frequency.total_seconds(), 

138 priority=1, 

139 action=_run_job_and_schedule, 

140 argument=( 

141 sched, 

142 job_def, 

143 frequency, 

144 ), 

145 ) 

146 

147 # queue the job 

148 with session_scope() as session: 

149 queue_job(session, job=job_def.handler, payload=empty_pb2.Empty()) 

150 

151 

152def run_scheduler() -> None: 

153 """ 

154 Schedules jobs according to schedule in JOBS 

155 """ 

156 sched = scheduler(monotonic, sleep) 

157 

158 for job_type, job_def in JOBS.items(): 

159 if job_def.schedule is not None: 159 ↛ 158line 159 didn't jump to line 158 because the condition on line 159 was always true

160 sched.enter( 

161 delay=0, 

162 priority=1, 

163 action=_run_job_and_schedule, 

164 argument=( 

165 sched, 

166 job_def, 

167 job_def.schedule, 

168 ), 

169 ) 

170 

171 sched.run() 

172 

173 

174def _run_forever(func: Callable[[], None]) -> None: 

175 # Post-fork initialization: these services use threading/async internals that 

176 # don't survive fork() and must be initialized fresh in each child process 

177 db_post_fork() 

178 setup_tracing() 

179 setup_experimentation() # Must be initialized after fork - see couchers/experimentation.py 

180 

181 while True: 

182 try: 

183 logger.critical("Background worker starting") 

184 func() 

185 except Exception as e: 

186 logger.critical("Unhandled exception in background worker", exc_info=e) 

187 # cool off in case we have some programming error to not hammer the database 

188 sleep(60) 

189 

190 

191def start_jobs_scheduler() -> Process: 

192 scheduler = Process( 

193 target=_run_forever, 

194 args=(run_scheduler,), 

195 ) 

196 scheduler.start() 

197 return scheduler 

198 

199 

200def start_jobs_worker() -> Process: 

201 worker = Process(target=_run_forever, args=(service_jobs,)) 

202 worker.start() 

203 return worker