Coverage for src/couchers/jobs/worker.py: 82%

109 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1""" 

2Background job workers 

3""" 

4 

5import logging 

6import traceback 

7from collections.abc import Callable 

8from datetime import timedelta 

9from inspect import getmembers, isfunction 

10from multiprocessing import Process 

11from sched import scheduler 

12from time import monotonic, perf_counter_ns, sleep 

13from typing import Any 

14 

15import sentry_sdk 

16import sqlalchemy.exc 

17from google.protobuf import empty_pb2 

18from opentelemetry import trace 

19 

20from couchers.config import config 

21from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope 

22from couchers.experimentation import setup_experimentation 

23from couchers.jobs import handlers 

24from couchers.jobs.enqueue import queue_job 

25from couchers.metrics import ( 

26 background_jobs_got_job_counter, 

27 background_jobs_no_jobs_counter, 

28 background_jobs_serialization_errors_counter, 

29 jobs_queued_histogram, 

30 observe_in_jobs_duration_histogram, 

31) 

32from couchers.models import BackgroundJob, BackgroundJobState 

33from couchers.sql import couchers_select as select 

34from couchers.tracing import setup_tracing 

35from couchers.utils import now 

36 

37logger = logging.getLogger(__name__) 

38tracer = trace.get_tracer(__name__) 

39 

40JOBS: dict[str, tuple[Any, Callable[[Any], Any]]] = {} 

41SCHEDULE: list[tuple[str, timedelta]] = [] 

42 

43for name, func in getmembers(handlers, isfunction): 

44 if hasattr(func, "PAYLOAD"): 

45 JOBS[name] = (func.PAYLOAD, func) 

46 if hasattr(func, "SCHEDULE"): 

47 SCHEDULE.append((name, func.SCHEDULE)) 

48 

49 

50def process_job() -> bool: 

51 """ 

52 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed, 

53 regardless of failure/success. 

54 """ 

55 logger.debug("Looking for a job") 

56 

57 with worker_repeatable_read_session_scope() as session: 

58 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction 

59 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's 

60 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside 

61 # psycopg2 that's quite annoying to catch and deal with 

62 try: 

63 job = ( 

64 session.execute( 

65 select(BackgroundJob) 

66 .where(BackgroundJob.ready_for_retry) 

67 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc()) 

68 .limit(1) 

69 .with_for_update(skip_locked=True) 

70 ) 

71 .scalars() 

72 .one_or_none() 

73 ) 

74 except sqlalchemy.exc.OperationalError: 

75 background_jobs_serialization_errors_counter.inc() 

76 logger.debug("Serialization error") 

77 return False 

78 

79 if not job: 

80 background_jobs_no_jobs_counter.inc() 

81 logger.debug("No pending jobs") 

82 return False 

83 

84 background_jobs_got_job_counter.inc() 

85 

86 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone 

87 logger.info(f"Job #{job.id} of type {job.job_type} grabbed") 

88 job.try_count += 1 

89 

90 message_type, func = JOBS[job.job_type] 

91 

92 jobs_queued_histogram.observe((now() - job.queued).total_seconds()) 

93 try: 

94 with tracer.start_as_current_span(job.job_type) as rollspan: 

95 start = perf_counter_ns() 

96 ret = func(message_type.FromString(job.payload)) 

97 finished = perf_counter_ns() 

98 job.state = BackgroundJobState.completed 

99 observe_in_jobs_duration_histogram( 

100 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9 

101 ) 

102 logger.info(f"Job #{job.id} complete on try number {job.try_count}") 

103 except Exception as e: 

104 finished = perf_counter_ns() 

105 logger.exception(e) 

106 sentry_sdk.set_tag("context", "job") 

107 sentry_sdk.set_tag("job", job.job_type) 

108 sentry_sdk.capture_exception(e) 

109 

110 if job.try_count >= job.max_tries: 

111 # if we already tried max_tries times, it's permanently failed 

112 job.state = BackgroundJobState.failed 

113 logger.info(f"Job #{job.id} failed on try number {job.try_count}") 

114 else: 

115 job.state = BackgroundJobState.error 

116 # exponential backoff 

117 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count)) 

118 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}") 

119 observe_in_jobs_duration_histogram( 

120 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9 

121 ) 

122 # add some info for debugging 

123 job.failure_info = traceback.format_exc() 

124 

125 if config["IN_TEST"]: 

126 raise e 

127 

128 # exiting ctx manager commits and releases the row lock 

129 return True 

130 

131 

132def service_jobs() -> None: 

133 """ 

134 Service jobs in an infinite loop 

135 """ 

136 while True: 

137 # if no job was found, sleep for a second, otherwise query for another job straight away 

138 if not process_job(): 

139 sleep(1) 

140 

141 

142def _run_job_and_schedule(sched: scheduler, schedule_id: int) -> None: 

143 job_type, frequency = SCHEDULE[schedule_id] 

144 logger.info(f"Processing job of type {job_type}") 

145 

146 # wake ourselves up after frequency 

147 sched.enter( 

148 frequency.total_seconds(), 

149 1, 

150 _run_job_and_schedule, 

151 argument=( 

152 sched, 

153 schedule_id, 

154 ), 

155 ) 

156 

157 # queue the job 

158 with session_scope() as session: 

159 queue_job(session, job_type, empty_pb2.Empty()) 

160 

161 

162def run_scheduler() -> None: 

163 """ 

164 Schedules jobs according to schedule in .definitions 

165 """ 

166 sched = scheduler(monotonic, sleep) 

167 

168 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE): 

169 sched.enter( 

170 0, 

171 1, 

172 _run_job_and_schedule, 

173 argument=( 

174 sched, 

175 schedule_id, 

176 ), 

177 ) 

178 

179 sched.run() 

180 

181 

182def _run_forever(func: Callable[[], None]) -> None: 

183 # Post-fork initialization: these services use threading/async internals that 

184 # don't survive fork() and must be initialized fresh in each child process 

185 db_post_fork() 

186 setup_tracing() 

187 setup_experimentation() # Must be initialized after fork - see couchers/experimentation.py 

188 

189 while True: 

190 try: 

191 logger.critical("Background worker starting") 

192 func() 

193 except Exception as e: 

194 logger.critical("Unhandled exception in background worker", exc_info=e) 

195 # cool off in case we have some programming error to not hammer the database 

196 sleep(60) 

197 

198 

199def start_jobs_scheduler() -> Process: 

200 scheduler = Process( 

201 target=_run_forever, 

202 args=(run_scheduler,), 

203 ) 

204 scheduler.start() 

205 return scheduler 

206 

207 

208def start_jobs_worker() -> Process: 

209 worker = Process(target=_run_forever, args=(service_jobs,)) 

210 worker.start() 

211 return worker