Coverage for src/couchers/jobs/worker.py: 85%

99 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1""" 

2Background job workers 

3""" 

4 

5import logging 

6import traceback 

7from datetime import timedelta 

8from inspect import getmembers, isfunction 

9from multiprocessing import Process 

10from sched import scheduler 

11from time import monotonic, perf_counter_ns, sleep 

12 

13import sentry_sdk 

14from google.protobuf import empty_pb2 

15from opentelemetry import trace 

16 

17from couchers.config import config 

18from couchers.db import db_post_fork, session_scope, worker_repeatable_read_session_scope 

19from couchers.jobs import handlers 

20from couchers.jobs.enqueue import queue_job 

21from couchers.metrics import create_prometheus_server, job_process_registry, observe_in_jobs_duration_histogram 

22from couchers.models import BackgroundJob, BackgroundJobState 

23from couchers.sql import couchers_select as select 

24from couchers.tracing import setup_tracing 

25 

26logger = logging.getLogger(__name__) 

27trace = trace.get_tracer(__name__) 

28 

29JOBS = {} 

30SCHEDULE = [] 

31 

32for name, func in getmembers(handlers, isfunction): 

33 if hasattr(func, "PAYLOAD"): 

34 JOBS[name] = (func.PAYLOAD, func) 

35 if hasattr(func, "SCHEDULE"): 

36 SCHEDULE.append((name, func.SCHEDULE)) 

37 

38 

39def process_job(): 

40 """ 

41 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed, 

42 regardless of failure/success. 

43 """ 

44 logger.debug("Looking for a job") 

45 

46 with worker_repeatable_read_session_scope() as session: 

47 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction 

48 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's 

49 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside 

50 # psycopg2 that's quite annoying to catch and deal with 

51 job = ( 

52 session.execute( 

53 select(BackgroundJob).where(BackgroundJob.ready_for_retry).with_for_update(skip_locked=True) 

54 ) 

55 .scalars() 

56 .first() 

57 ) 

58 

59 if not job: 

60 logger.debug("No pending jobs") 

61 return False 

62 

63 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone 

64 logger.info(f"Job #{job.id} of type {job.job_type} grabbed") 

65 job.try_count += 1 

66 

67 message_type, func = JOBS[job.job_type] 

68 

69 try: 

70 with trace.start_as_current_span(job.job_type) as rollspan: 

71 start = perf_counter_ns() 

72 ret = func(message_type.FromString(job.payload)) 

73 finished = perf_counter_ns() 

74 job.state = BackgroundJobState.completed 

75 observe_in_jobs_duration_histogram( 

76 job.job_type, job.state.name, job.try_count, "", (finished - start) / 1e9 

77 ) 

78 logger.info(f"Job #{job.id} complete on try number {job.try_count}") 

79 except Exception as e: 

80 finished = perf_counter_ns() 

81 logger.exception(e) 

82 sentry_sdk.set_tag("context", "job") 

83 sentry_sdk.set_tag("job", job.job_type) 

84 sentry_sdk.capture_exception(e) 

85 

86 if job.try_count >= job.max_tries: 

87 # if we already tried max_tries times, it's permanently failed 

88 job.state = BackgroundJobState.failed 

89 logger.info(f"Job #{job.id} failed on try number {job.try_count}") 

90 else: 

91 job.state = BackgroundJobState.error 

92 # exponential backoff 

93 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count)) 

94 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}") 

95 observe_in_jobs_duration_histogram( 

96 job.job_type, job.state.name, job.try_count, type(e).__name__, (finished - start) / 1e9 

97 ) 

98 # add some info for debugging 

99 job.failure_info = traceback.format_exc() 

100 

101 if config["IN_TEST"]: 

102 raise e 

103 

104 # exiting ctx manager commits and releases the row lock 

105 return True 

106 

107 

108def service_jobs(): 

109 """ 

110 Service jobs in an infinite loop 

111 """ 

112 t = create_prometheus_server(job_process_registry, 8001) 

113 try: 

114 while True: 

115 # if no job was found, sleep for a second, otherwise query for another job straight away 

116 if not process_job(): 

117 sleep(1) 

118 finally: 

119 logger.info("Closing prometheus server") 

120 t.server_close() 

121 

122 

123def _run_job_and_schedule(sched, schedule_id): 

124 job_type, frequency = SCHEDULE[schedule_id] 

125 logger.info(f"Processing job of type {job_type}") 

126 

127 # wake ourselves up after frequency 

128 sched.enter( 

129 frequency.total_seconds(), 

130 1, 

131 _run_job_and_schedule, 

132 argument=( 

133 sched, 

134 schedule_id, 

135 ), 

136 ) 

137 

138 # queue the job 

139 with session_scope() as session: 

140 queue_job(session, job_type, empty_pb2.Empty()) 

141 

142 

143def run_scheduler(): 

144 """ 

145 Schedules jobs according to schedule in .definitions 

146 """ 

147 sched = scheduler(monotonic, sleep) 

148 

149 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE): 

150 sched.enter( 

151 0, 

152 1, 

153 _run_job_and_schedule, 

154 argument=( 

155 sched, 

156 schedule_id, 

157 ), 

158 ) 

159 

160 sched.run() 

161 

162 

163def _run_forever(func): 

164 db_post_fork() 

165 setup_tracing() 

166 

167 while True: 

168 try: 

169 logger.critical("Background worker starting") 

170 func() 

171 except Exception as e: 

172 logger.critical("Unhandled exception in background worker", exc_info=e) 

173 # cool off in case we have some programming error to not hammer the database 

174 sleep(60) 

175 

176 

177def start_jobs_scheduler(): 

178 scheduler = Process( 

179 target=_run_forever, 

180 args=(run_scheduler,), 

181 ) 

182 scheduler.start() 

183 return scheduler 

184 

185 

186def start_jobs_worker(): 

187 worker = Process(target=_run_forever, args=(service_jobs,)) 

188 worker.start() 

189 return worker