Coverage for src/couchers/jobs/worker.py: 86%

91 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 16:44 +0000

1""" 

2Background job workers 

3""" 

4 

5import logging 

6import traceback 

7from datetime import timedelta 

8from inspect import getmembers, isfunction 

9from multiprocessing import Process 

10from sched import scheduler 

11from time import monotonic, sleep 

12 

13import sentry_sdk 

14from google.protobuf import empty_pb2 

15 

16from couchers.config import config 

17from couchers.db import get_engine, session_scope 

18from couchers.jobs import handlers 

19from couchers.jobs.enqueue import queue_job 

20from couchers.metrics import create_prometheus_server, job_process_registry, jobs_counter 

21from couchers.models import BackgroundJob, BackgroundJobState 

22from couchers.sql import couchers_select as select 

23 

24logger = logging.getLogger(__name__) 

25 

26JOBS = {} 

27SCHEDULE = [] 

28 

29for name, func in getmembers(handlers, isfunction): 

30 if hasattr(func, "PAYLOAD"): 

31 JOBS[name] = (func.PAYLOAD, func) 

32 if hasattr(func, "SCHEDULE"): 

33 SCHEDULE.append((name, func.SCHEDULE)) 

34 

35 

36def process_job(): 

37 """ 

38 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed, 

39 regardless of failure/success. 

40 """ 

41 logger.debug("Looking for a job") 

42 

43 with session_scope(isolation_level="REPEATABLE READ") as session: 

44 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction 

45 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's 

46 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside 

47 # psycopg2 that's quite annoying to catch and deal with 

48 job = ( 

49 session.execute( 

50 select(BackgroundJob).where(BackgroundJob.ready_for_retry).with_for_update(skip_locked=True) 

51 ) 

52 .scalars() 

53 .first() 

54 ) 

55 

56 if not job: 

57 logger.debug("No pending jobs") 

58 return False 

59 

60 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone 

61 logger.info(f"Job #{job.id} of type {job.job_type} grabbed") 

62 job.try_count += 1 

63 

64 message_type, func = JOBS[job.job_type] 

65 

66 try: 

67 ret = func(message_type.FromString(job.payload)) 

68 job.state = BackgroundJobState.completed 

69 jobs_counter.labels(job.job_type, job.state.name, str(job.try_count), "").inc() 

70 logger.info(f"Job #{job.id} complete on try number {job.try_count}") 

71 except Exception as e: 

72 logger.exception(e) 

73 sentry_sdk.set_tag("context", "job") 

74 sentry_sdk.set_tag("job", job.job_type) 

75 sentry_sdk.capture_exception(e) 

76 

77 if job.try_count >= job.max_tries: 

78 # if we already tried max_tries times, it's permanently failed 

79 job.state = BackgroundJobState.failed 

80 logger.info(f"Job #{job.id} failed on try number {job.try_count}") 

81 else: 

82 job.state = BackgroundJobState.error 

83 # exponential backoff 

84 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count)) 

85 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}") 

86 # add some info for debugging 

87 jobs_counter.labels(job.job_type, job.state.name, str(job.try_count), type(e).__name__).inc() 

88 job.failure_info = traceback.format_exc() 

89 

90 if config["IN_TEST"]: 

91 raise e 

92 

93 # exiting ctx manager commits and releases the row lock 

94 return True 

95 

96 

97def service_jobs(): 

98 """ 

99 Service jobs in an infinite loop 

100 """ 

101 get_engine().dispose() 

102 t = create_prometheus_server(job_process_registry, 8001) 

103 try: 

104 while True: 

105 # if no job was found, sleep for a second, otherwise query for another job straight away 

106 if not process_job(): 

107 sleep(1) 

108 finally: 

109 logger.info("Closing prometheus server") 

110 t.server_close() 

111 

112 

113def _run_job_and_schedule(sched, schedule_id): 

114 job_type, frequency = SCHEDULE[schedule_id] 

115 logger.info(f"Processing job of type {job_type}") 

116 

117 # wake ourselves up after frequency 

118 sched.enter( 

119 frequency.total_seconds(), 

120 1, 

121 _run_job_and_schedule, 

122 argument=( 

123 sched, 

124 schedule_id, 

125 ), 

126 ) 

127 

128 # queue the job 

129 queue_job(job_type, empty_pb2.Empty()) 

130 

131 

132def run_scheduler(): 

133 """ 

134 Schedules jobs according to schedule in .definitions 

135 """ 

136 # multiprocessing uses fork() which in turn copies file descriptors, so the engine may have connections in its pool 

137 # that we don't want to reuse. This is the SQLALchemy-recommended way of clearing the connection pool in this thread 

138 get_engine().dispose() 

139 

140 sched = scheduler(monotonic, sleep) 

141 

142 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE): 

143 sched.enter( 

144 0, 

145 1, 

146 _run_job_and_schedule, 

147 argument=( 

148 sched, 

149 schedule_id, 

150 ), 

151 ) 

152 

153 sched.run() 

154 

155 

156def _run_forever(func): 

157 while True: 

158 try: 

159 logger.critical("Background worker starting") 

160 func() 

161 except Exception as e: 

162 logger.critical("Unhandled exception in background worker", exc_info=e) 

163 # cool off in case we have some programming error to not hammer the database 

164 sleep(60) 

165 

166 

167def start_jobs_scheduler(): 

168 scheduler = Process( 

169 target=_run_forever, 

170 args=(run_scheduler,), 

171 ) 

172 scheduler.start() 

173 return scheduler 

174 

175 

176def start_jobs_worker(): 

177 worker = Process(target=_run_forever, args=(service_jobs,)) 

178 worker.start() 

179 return worker