Coverage for app/backend/src/couchers/jobs/worker.py: 79%

104 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1""" 

2Background job workers 

3""" 

4 

5import logging 

6import traceback 

7from collections.abc import Callable 

8from datetime import timedelta 

9from multiprocessing import Process 

10from sched import scheduler 

11from time import monotonic, perf_counter_ns, sleep 

12from typing import Any 

13 

14import sentry_sdk 

15import sqlalchemy.exc 

16from google.protobuf import empty_pb2 

17from opentelemetry import trace 

18from sqlalchemy import select 

19 

20from couchers.config import config 

21from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope 

22from couchers.experimentation import setup_experimentation 

23from couchers.jobs.definitions import JOBS, Job 

24from couchers.jobs.enqueue import queue_job 

25from couchers.metrics import ( 

26 background_jobs_got_job_counter, 

27 background_jobs_no_jobs_counter, 

28 background_jobs_serialization_errors_counter, 

29 jobs_queued_histogram, 

30 observe_in_jobs_duration_histogram, 

31) 

32from couchers.models import BackgroundJob, BackgroundJobState 

33from couchers.profiling import setup_profiling 

34from couchers.tracing import setup_tracing 

35from couchers.utils import now 

36 

37logger = logging.getLogger(__name__) 

38tracer = trace.get_tracer(__name__) 

39 

40 

41def process_job() -> bool: 

42 """ 

43 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed, 

44 regardless of failure/success. 

45 """ 

46 logger.debug("Looking for a job") 

47 

48 with worker_repeatable_read_session_scope() as session: 

49 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction 

50 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's 

51 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside 

52 # psycopg2 that's quite annoying to catch and deal with 

53 try: 

54 job = ( 

55 session.execute( 

56 select(BackgroundJob) 

57 .where(BackgroundJob.ready_for_retry) 

58 .order_by(BackgroundJob.priority.desc(), BackgroundJob.next_attempt_after.asc()) 

59 .limit(1) 

60 .with_for_update(skip_locked=True) 

61 ) 

62 .scalars() 

63 .one_or_none() 

64 ) 

65 except sqlalchemy.exc.OperationalError: 

66 background_jobs_serialization_errors_counter.inc() 

67 logger.debug("Serialization error") 

68 return False 

69 

70 if not job: 

71 background_jobs_no_jobs_counter.inc() 

72 logger.debug("No pending jobs") 

73 return False 

74 

75 background_jobs_got_job_counter.inc() 

76 

77 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone 

78 logger.info(f"Job #{job.id} of type {job.job_type} grabbed") 

79 job.try_count += 1 

80 

81 job_def = JOBS[job.job_type] 

82 

83 jobs_queued_histogram.labels(str(job.priority)).observe((now() - job.queued).total_seconds()) 

84 try: 

85 with tracer.start_as_current_span(job.job_type) as rollspan: 

86 start = perf_counter_ns() 

87 job_def.handler(job_def.payload_type.FromString(job.payload)) 

88 finished = perf_counter_ns() 

89 job.state = BackgroundJobState.completed 

90 observe_in_jobs_duration_histogram( 

91 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9 

92 ) 

93 logger.info(f"Job #{job.id} complete on try number {job.try_count}") 

94 except Exception as e: 

95 finished = perf_counter_ns() 

96 logger.exception(e) 

97 sentry_sdk.set_tag("context", "job") 

98 sentry_sdk.set_tag("job", job.job_type) 

99 sentry_sdk.capture_exception(e) 

100 

101 if job.try_count >= job.max_tries: 

102 # if we already tried max_tries times, it's permanently failed 

103 job.state = BackgroundJobState.failed 

104 logger.info(f"Job #{job.id} failed on try number {job.try_count}") 

105 else: 

106 job.state = BackgroundJobState.error 

107 # exponential backoff 

108 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count)) 

109 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}") 

110 observe_in_jobs_duration_histogram( 

111 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9 

112 ) 

113 # add some info for debugging 

114 job.failure_info = traceback.format_exc() 

115 

116 if config.IN_TEST: 

117 raise e 

118 

119 # exiting ctx manager commits and releases the row lock 

120 return True 

121 

122 

123def service_jobs() -> None: 

124 """ 

125 Service jobs in an infinite loop 

126 """ 

127 while True: 

128 # if no job was found, sleep for a second, otherwise query for another job straight away 

129 if not process_job(): 

130 sleep(1) 

131 

132 

133def _run_job_and_schedule(sched: scheduler, job_def: Job[Any], frequency: timedelta) -> None: 

134 logger.info(f"Processing job of type {job_def.name}") 

135 

136 # wake ourselves up after frequency 

137 sched.enter( 

138 delay=frequency.total_seconds(), 

139 priority=1, 

140 action=_run_job_and_schedule, 

141 argument=( 

142 sched, 

143 job_def, 

144 frequency, 

145 ), 

146 ) 

147 

148 # queue the job 

149 with session_scope() as session: 

150 queue_job(session, job=job_def.handler, payload=empty_pb2.Empty()) 

151 

152 

153def run_scheduler() -> None: 

154 """ 

155 Schedules jobs according to schedule in JOBS 

156 """ 

157 sched = scheduler(monotonic, sleep) 

158 

159 for job_type, job_def in JOBS.items(): 

160 if job_def.schedule is not None: 160 ↛ 159line 160 didn't jump to line 159 because the condition on line 160 was always true

161 sched.enter( 

162 delay=0, 

163 priority=1, 

164 action=_run_job_and_schedule, 

165 argument=( 

166 sched, 

167 job_def, 

168 job_def.schedule, 

169 ), 

170 ) 

171 

172 sched.run() 

173 

174 

175def _run_forever(func: Callable[[], None], profile_instance: str | None = None) -> None: 

176 # Post-fork initialization: these services use threading/async internals that 

177 # don't survive fork() and must be initialized fresh in each child process 

178 db_post_fork() 

179 setup_tracing() 

180 setup_experimentation() 

181 if profile_instance is not None: 

182 setup_profiling(role="worker", instance=profile_instance) 

183 

184 while True: 

185 try: 

186 logger.info("Background worker starting") 

187 func() 

188 except Exception as e: 

189 logger.critical("Unhandled exception in background worker", exc_info=e) 

190 # cool off in case we have some programming error to not hammer the database 

191 sleep(60) 

192 

193 

194def start_jobs_scheduler() -> Process: 

195 scheduler = Process( 

196 target=_run_forever, 

197 args=(run_scheduler,), 

198 ) 

199 scheduler.start() 

200 return scheduler 

201 

202 

203def start_jobs_worker(index: int) -> Process: 

204 worker = Process(target=_run_forever, args=(service_jobs,), kwargs={"profile_instance": f"worker-{index}"}) 

205 worker.start() 

206 return worker