Coverage for src/couchers/jobs/worker.py: 82%

105 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-24 14:08 +0000

1""" 

2Background job workers 

3""" 

4 

5import logging 

6import traceback 

7from datetime import timedelta 

8from inspect import getmembers, isfunction 

9from multiprocessing import Process 

10from sched import scheduler 

11from time import monotonic, perf_counter_ns, sleep 

12 

13import sentry_sdk 

14import sqlalchemy.exc 

15from google.protobuf import empty_pb2 

16from opentelemetry import trace 

17 

18from couchers.config import config 

19from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope 

20from couchers.jobs import handlers 

21from couchers.jobs.enqueue import queue_job 

22from couchers.metrics import ( 

23 background_jobs_got_job_counter, 

24 background_jobs_no_jobs_counter, 

25 background_jobs_serialization_errors_counter, 

26 jobs_queued_histogram, 

27 observe_in_jobs_duration_histogram, 

28) 

29from couchers.models import BackgroundJob, BackgroundJobState 

30from couchers.sql import couchers_select as select 

31from couchers.tracing import setup_tracing 

32from couchers.utils import now 

33 

34logger = logging.getLogger(__name__) 

35trace = trace.get_tracer(__name__) 

36 

37JOBS = {} 

38SCHEDULE = [] 

39 

40for name, func in getmembers(handlers, isfunction): 

41 if hasattr(func, "PAYLOAD"): 

42 JOBS[name] = (func.PAYLOAD, func) 

43 if hasattr(func, "SCHEDULE"): 

44 SCHEDULE.append((name, func.SCHEDULE)) 

45 

46 

47def process_job(): 

48 """ 

49 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed, 

50 regardless of failure/success. 

51 """ 

52 logger.debug("Looking for a job") 

53 

54 with worker_repeatable_read_session_scope() as session: 

55 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction 

56 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's 

57 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside 

58 # psycopg2 that's quite annoying to catch and deal with 

59 try: 

60 job = ( 

61 session.execute( 

62 select(BackgroundJob) 

63 .where(BackgroundJob.ready_for_retry) 

64 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc()) 

65 .with_for_update(skip_locked=True) 

66 ) 

67 .scalars() 

68 .first() 

69 ) 

70 except sqlalchemy.exc.OperationalError: 

71 background_jobs_serialization_errors_counter.inc() 

72 logger.debug("Serialization error") 

73 return False 

74 

75 if not job: 

76 background_jobs_no_jobs_counter.inc() 

77 logger.debug("No pending jobs") 

78 return False 

79 

80 background_jobs_got_job_counter.inc() 

81 

82 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone 

83 logger.info(f"Job #{job.id} of type {job.job_type} grabbed") 

84 job.try_count += 1 

85 

86 message_type, func = JOBS[job.job_type] 

87 

88 jobs_queued_histogram.observe((now() - job.queued).total_seconds()) 

89 try: 

90 with trace.start_as_current_span(job.job_type) as rollspan: 

91 start = perf_counter_ns() 

92 ret = func(message_type.FromString(job.payload)) 

93 finished = perf_counter_ns() 

94 job.state = BackgroundJobState.completed 

95 observe_in_jobs_duration_histogram( 

96 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9 

97 ) 

98 logger.info(f"Job #{job.id} complete on try number {job.try_count}") 

99 except Exception as e: 

100 finished = perf_counter_ns() 

101 logger.exception(e) 

102 sentry_sdk.set_tag("context", "job") 

103 sentry_sdk.set_tag("job", job.job_type) 

104 sentry_sdk.capture_exception(e) 

105 

106 if job.try_count >= job.max_tries: 

107 # if we already tried max_tries times, it's permanently failed 

108 job.state = BackgroundJobState.failed 

109 logger.info(f"Job #{job.id} failed on try number {job.try_count}") 

110 else: 

111 job.state = BackgroundJobState.error 

112 # exponential backoff 

113 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count)) 

114 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}") 

115 observe_in_jobs_duration_histogram( 

116 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9 

117 ) 

118 # add some info for debugging 

119 job.failure_info = traceback.format_exc() 

120 

121 if config["IN_TEST"]: 

122 raise e 

123 

124 # exiting ctx manager commits and releases the row lock 

125 return True 

126 

127 

128def service_jobs(): 

129 """ 

130 Service jobs in an infinite loop 

131 """ 

132 while True: 

133 # if no job was found, sleep for a second, otherwise query for another job straight away 

134 if not process_job(): 

135 sleep(1) 

136 

137 

138def _run_job_and_schedule(sched, schedule_id): 

139 job_type, frequency = SCHEDULE[schedule_id] 

140 logger.info(f"Processing job of type {job_type}") 

141 

142 # wake ourselves up after frequency 

143 sched.enter( 

144 frequency.total_seconds(), 

145 1, 

146 _run_job_and_schedule, 

147 argument=( 

148 sched, 

149 schedule_id, 

150 ), 

151 ) 

152 

153 # queue the job 

154 with session_scope() as session: 

155 queue_job(session, job_type, empty_pb2.Empty()) 

156 

157 

158def run_scheduler(): 

159 """ 

160 Schedules jobs according to schedule in .definitions 

161 """ 

162 sched = scheduler(monotonic, sleep) 

163 

164 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE): 

165 sched.enter( 

166 0, 

167 1, 

168 _run_job_and_schedule, 

169 argument=( 

170 sched, 

171 schedule_id, 

172 ), 

173 ) 

174 

175 sched.run() 

176 

177 

178def _run_forever(func): 

179 db_post_fork() 

180 setup_tracing() 

181 

182 while True: 

183 try: 

184 logger.critical("Background worker starting") 

185 func() 

186 except Exception as e: 

187 logger.critical("Unhandled exception in background worker", exc_info=e) 

188 # cool off in case we have some programming error to not hammer the database 

189 sleep(60) 

190 

191 

192def start_jobs_scheduler(): 

193 scheduler = Process( 

194 target=_run_forever, 

195 args=(run_scheduler,), 

196 ) 

197 scheduler.start() 

198 return scheduler 

199 

200 

201def start_jobs_worker(): 

202 worker = Process(target=_run_forever, args=(service_jobs,)) 

203 worker.start() 

204 return worker