Coverage for src/couchers/jobs/worker.py: 82%

105 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1""" 

2Background job workers 

3""" 

4 

5import logging 

6import traceback 

7from datetime import timedelta 

8from inspect import getmembers, isfunction 

9from multiprocessing import Process 

10from sched import scheduler 

11from time import monotonic, perf_counter_ns, sleep 

12 

13import sentry_sdk 

14import sqlalchemy.exc 

15from google.protobuf import empty_pb2 

16from opentelemetry import trace 

17 

18from couchers.config import config 

19from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope 

20from couchers.jobs import handlers 

21from couchers.jobs.enqueue import queue_job 

22from couchers.metrics import ( 

23 background_jobs_got_job_counter, 

24 background_jobs_no_jobs_counter, 

25 background_jobs_serialization_errors_counter, 

26 jobs_queued_histogram, 

27 observe_in_jobs_duration_histogram, 

28) 

29from couchers.models import BackgroundJob, BackgroundJobState 

30from couchers.sql import couchers_select as select 

31from couchers.tracing import setup_tracing 

32from couchers.utils import now 

33 

34logger = logging.getLogger(__name__) 

35trace = trace.get_tracer(__name__) 

36 

37JOBS = {} 

38SCHEDULE = [] 

39 

40for name, func in getmembers(handlers, isfunction): 

41 if hasattr(func, "PAYLOAD"): 

42 JOBS[name] = (func.PAYLOAD, func) 

43 if hasattr(func, "SCHEDULE"): 

44 SCHEDULE.append((name, func.SCHEDULE)) 

45 

46 

47def process_job(): 

48 """ 

49 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed, 

50 regardless of failure/success. 

51 """ 

52 logger.debug("Looking for a job") 

53 

54 with worker_repeatable_read_session_scope() as session: 

55 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction 

56 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's 

57 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside 

58 # psycopg2 that's quite annoying to catch and deal with 

59 try: 

60 job = ( 

61 session.execute( 

62 select(BackgroundJob) 

63 .where(BackgroundJob.ready_for_retry) 

64 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc()) 

65 .limit(1) 

66 .with_for_update(skip_locked=True) 

67 ) 

68 .scalars() 

69 .one_or_none() 

70 ) 

71 except sqlalchemy.exc.OperationalError: 

72 background_jobs_serialization_errors_counter.inc() 

73 logger.debug("Serialization error") 

74 return False 

75 

76 if not job: 

77 background_jobs_no_jobs_counter.inc() 

78 logger.debug("No pending jobs") 

79 return False 

80 

81 background_jobs_got_job_counter.inc() 

82 

83 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone 

84 logger.info(f"Job #{job.id} of type {job.job_type} grabbed") 

85 job.try_count += 1 

86 

87 message_type, func = JOBS[job.job_type] 

88 

89 jobs_queued_histogram.observe((now() - job.queued).total_seconds()) 

90 try: 

91 with trace.start_as_current_span(job.job_type) as rollspan: 

92 start = perf_counter_ns() 

93 ret = func(message_type.FromString(job.payload)) 

94 finished = perf_counter_ns() 

95 job.state = BackgroundJobState.completed 

96 observe_in_jobs_duration_histogram( 

97 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9 

98 ) 

99 logger.info(f"Job #{job.id} complete on try number {job.try_count}") 

100 except Exception as e: 

101 finished = perf_counter_ns() 

102 logger.exception(e) 

103 sentry_sdk.set_tag("context", "job") 

104 sentry_sdk.set_tag("job", job.job_type) 

105 sentry_sdk.capture_exception(e) 

106 

107 if job.try_count >= job.max_tries: 

108 # if we already tried max_tries times, it's permanently failed 

109 job.state = BackgroundJobState.failed 

110 logger.info(f"Job #{job.id} failed on try number {job.try_count}") 

111 else: 

112 job.state = BackgroundJobState.error 

113 # exponential backoff 

114 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count)) 

115 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}") 

116 observe_in_jobs_duration_histogram( 

117 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9 

118 ) 

119 # add some info for debugging 

120 job.failure_info = traceback.format_exc() 

121 

122 if config["IN_TEST"]: 

123 raise e 

124 

125 # exiting ctx manager commits and releases the row lock 

126 return True 

127 

128 

129def service_jobs(): 

130 """ 

131 Service jobs in an infinite loop 

132 """ 

133 while True: 

134 # if no job was found, sleep for a second, otherwise query for another job straight away 

135 if not process_job(): 

136 sleep(1) 

137 

138 

139def _run_job_and_schedule(sched, schedule_id): 

140 job_type, frequency = SCHEDULE[schedule_id] 

141 logger.info(f"Processing job of type {job_type}") 

142 

143 # wake ourselves up after frequency 

144 sched.enter( 

145 frequency.total_seconds(), 

146 1, 

147 _run_job_and_schedule, 

148 argument=( 

149 sched, 

150 schedule_id, 

151 ), 

152 ) 

153 

154 # queue the job 

155 with session_scope() as session: 

156 queue_job(session, job_type, empty_pb2.Empty()) 

157 

158 

159def run_scheduler(): 

160 """ 

161 Schedules jobs according to schedule in .definitions 

162 """ 

163 sched = scheduler(monotonic, sleep) 

164 

165 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE): 

166 sched.enter( 

167 0, 

168 1, 

169 _run_job_and_schedule, 

170 argument=( 

171 sched, 

172 schedule_id, 

173 ), 

174 ) 

175 

176 sched.run() 

177 

178 

179def _run_forever(func): 

180 db_post_fork() 

181 setup_tracing() 

182 

183 while True: 

184 try: 

185 logger.critical("Background worker starting") 

186 func() 

187 except Exception as e: 

188 logger.critical("Unhandled exception in background worker", exc_info=e) 

189 # cool off in case we have some programming error to not hammer the database 

190 sleep(60) 

191 

192 

193def start_jobs_scheduler(): 

194 scheduler = Process( 

195 target=_run_forever, 

196 args=(run_scheduler,), 

197 ) 

198 scheduler.start() 

199 return scheduler 

200 

201 

202def start_jobs_worker(): 

203 worker = Process(target=_run_forever, args=(service_jobs,)) 

204 worker.start() 

205 return worker