Coverage for src/couchers/jobs/worker.py: 85%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

78 statements  

1""" 

2Background job workers 

3""" 

4import logging 

5import traceback 

6from datetime import timedelta 

7from multiprocessing import Process 

8from sched import scheduler 

9from time import monotonic, sleep 

10 

11import sentry_sdk 

12from google.protobuf import empty_pb2 

13 

14from couchers.db import get_engine, session_scope 

15from couchers.jobs.definitions import JOBS, SCHEDULE 

16from couchers.jobs.enqueue import queue_job 

17from couchers.metrics import create_prometheus_server, job_process_registry, jobs_counter 

18from couchers.models import BackgroundJob, BackgroundJobState 

19from couchers.sql import couchers_select as select 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24def process_job(): 

25 """ 

26 Attempt to process one job from the job queue. Returns False if no job was found, True if a job was processed, 

27 regardless of failure/success. 

28 """ 

29 logger.debug(f"Looking for a job") 

30 

31 with session_scope(isolation_level="REPEATABLE READ") as session: 

32 # a combination of REPEATABLE READ and SELECT ... FOR UPDATE SKIP LOCKED makes sure that only one transaction 

33 # will modify the job at a time. SKIP UPDATE means that if the job is locked, then we ignore that row, it's 

34 # easier to use SKIP LOCKED vs NOWAIT in the ORM, with NOWAIT you get an ugly exception from deep inside 

35 # psycopg2 that's quite annoying to catch and deal with 

36 job = ( 

37 session.execute( 

38 select(BackgroundJob).where(BackgroundJob.ready_for_retry).with_for_update(skip_locked=True) 

39 ) 

40 .scalars() 

41 .first() 

42 ) 

43 

44 if not job: 

45 logger.debug(f"No pending jobs") 

46 return False 

47 

48 # we've got a lock for a job now, it's "pending" until we commit or the lock is gone 

49 logger.info(f"Job #{job.id} grabbed") 

50 job.try_count += 1 

51 

52 message_type, func = JOBS[job.job_type] 

53 

54 try: 

55 ret = func(message_type.FromString(job.payload)) 

56 job.state = BackgroundJobState.completed 

57 jobs_counter.labels(job.job_type.name, job.state.name, str(job.try_count), "").inc() 

58 logger.info(f"Job #{job.id} complete on try number {job.try_count}") 

59 except Exception as e: 

60 logger.exception(e) 

61 sentry_sdk.set_tag("context", "job") 

62 sentry_sdk.set_tag("job", job.job_type.name) 

63 sentry_sdk.capture_exception(e) 

64 

65 if job.try_count >= job.max_tries: 

66 # if we already tried max_tries times, it's permanently failed 

67 job.state = BackgroundJobState.failed 

68 logger.info(f"Job #{job.id} failed on try number {job.try_count}") 

69 else: 

70 job.state = BackgroundJobState.error 

71 # exponential backoff 

72 job.next_attempt_after += timedelta(seconds=15 * (2**job.try_count)) 

73 logger.info(f"Job #{job.id} error on try number {job.try_count}, next try at {job.next_attempt_after}") 

74 # add some info for debugging 

75 jobs_counter.labels(job.job_type.name, job.state.name, str(job.try_count), type(e).__name__).inc() 

76 job.failure_info = traceback.format_exc() 

77 

78 # exiting ctx manager commits and releases the row lock 

79 return True 

80 

81 

82def service_jobs(): 

83 """ 

84 Service jobs in an infinite loop 

85 """ 

86 get_engine().dispose() 

87 t = create_prometheus_server(job_process_registry, 8001) 

88 try: 

89 while True: 

90 # if no job was found, sleep for a second, otherwise query for another job straight away 

91 if not process_job(): 

92 sleep(1) 

93 finally: 

94 logger.info(f"Closing prometheus server") 

95 t.server_close() 

96 

97 

98def _run_job_and_schedule(sched, schedule_id): 

99 job_type, frequency = SCHEDULE[schedule_id] 

100 logger.info(f"Processing job of type {job_type}") 

101 

102 # wake ourselves up after frequency 

103 sched.enter( 

104 frequency.total_seconds(), 

105 1, 

106 _run_job_and_schedule, 

107 argument=( 

108 sched, 

109 schedule_id, 

110 ), 

111 ) 

112 

113 # queue the job 

114 queue_job(job_type, empty_pb2.Empty()) 

115 

116 

117def run_scheduler(): 

118 """ 

119 Schedules jobs according to schedule in .definitions 

120 """ 

121 # multiprocessing uses fork() which in turn copies file descriptors, so the engine may have connections in its pool 

122 # that we don't want to reuse. This is the SQLALchemy-recommended way of clearing the connection pool in this thread 

123 get_engine().dispose() 

124 

125 sched = scheduler(monotonic, sleep) 

126 

127 for schedule_id, (job_type, frequency) in enumerate(SCHEDULE): 

128 sched.enter( 

129 0, 

130 1, 

131 _run_job_and_schedule, 

132 argument=( 

133 sched, 

134 schedule_id, 

135 ), 

136 ) 

137 

138 sched.run() 

139 

140 

141def _run_forever(func): 

142 while True: 

143 try: 

144 logger.critical("Background worker starting") 

145 func() 

146 except Exception as e: 

147 logger.critical("Unhandled exception in background worker", exc_info=e) 

148 # cool off in case we have some programming error to not hammer the database 

149 sleep(60) 

150 

151 

152def start_jobs_scheduler(): 

153 scheduler = Process( 

154 target=_run_forever, 

155 args=(run_scheduler,), 

156 ) 

157 scheduler.start() 

158 return scheduler 

159 

160 

161def start_jobs_worker(): 

162 worker = Process(target=_run_forever, args=(service_jobs,)) 

163 worker.start() 

164 return worker