Coverage for app / backend / src / couchers / jobs / definitions.py: 100%

24 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1from dataclasses import dataclass 

2from datetime import timedelta 

3from typing import Any, Protocol, cast, get_type_hints 

4 

5from couchers.jobs.handlers import ( 

6 add_users_to_email_list, 

7 auto_approve_moderation_queue, 

8 check_database_consistency, 

9 check_expo_push_receipts, 

10 enforce_community_membership, 

11 finalize_strong_verification, 

12 purge_account_deletion_tokens, 

13 purge_login_tokens, 

14 purge_password_reset_tokens, 

15 send_activeness_probes, 

16 send_email, 

17 send_event_reminders, 

18 send_host_request_reminders, 

19 send_message_notifications, 

20 send_onboarding_emails, 

21 send_postal_verification_postcard, 

22 send_reference_reminders, 

23 send_request_notifications, 

24 update_badges, 

25 update_randomized_locations, 

26 update_recommendation_scores, 

27) 

28from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid 

29from couchers.notifications.background import handle_email_digests, handle_notification 

30from couchers.notifications.send_raw_push_notification import send_raw_push_notification_v2 

31from couchers.servicers.conversations import generate_message_notifications 

32from couchers.servicers.discussions import generate_create_discussion_notifications 

33from couchers.servicers.editor import generate_new_blog_post_notifications 

34from couchers.servicers.events import ( 

35 generate_event_cancel_notifications, 

36 generate_event_create_notifications, 

37 generate_event_delete_notifications, 

38 generate_event_update_notifications, 

39) 

40from couchers.servicers.threads import generate_reply_notifications 

41 

42 

43class JobHandler[T](Protocol): 

44 def __call__(self, payload: T) -> None: ... 

45 

46 @property 

47 def __name__(self) -> str: ... 

48 

49 

50@dataclass(frozen=True, slots=True) 

51class Job[T]: 

52 """Definition of a background job.""" 

53 

54 handler: JobHandler[T] 

55 schedule: timedelta | None = None 

56 

57 @property 

58 def name(self) -> str: 

59 return self.handler.__name__ 

60 

61 @property 

62 def payload_type(self) -> type[T]: 

63 """Extracts protobuf type from type hints.""" 

64 return cast(type[T], get_type_hints(self.handler)["payload"]) 

65 

66 

67# Job registry - first create a list of all jobs. 

68_JOBS_LIST = [ 

69 Job(handle_notification), 

70 Job(send_raw_push_notification_v2), 

71 Job(handle_email_digests, schedule=timedelta(minutes=15)), 

72 Job(generate_message_notifications), 

73 Job(generate_reply_notifications), 

74 Job(generate_create_discussion_notifications), 

75 Job(generate_event_create_notifications), 

76 Job(generate_event_update_notifications), 

77 Job(generate_event_cancel_notifications), 

78 Job(generate_event_delete_notifications), 

79 Job(generate_new_blog_post_notifications), 

80 Job(refresh_materialized_views, schedule=timedelta(minutes=5)), 

81 Job(refresh_materialized_views_rapid, schedule=timedelta(seconds=30)), 

82 Job(send_email), 

83 Job(purge_login_tokens, schedule=timedelta(hours=24)), 

84 Job(purge_password_reset_tokens, schedule=timedelta(hours=24)), 

85 Job(purge_account_deletion_tokens, schedule=timedelta(hours=24)), 

86 Job(send_message_notifications, schedule=timedelta(minutes=3)), 

87 Job(send_request_notifications, schedule=timedelta(minutes=3)), 

88 Job(send_onboarding_emails, schedule=timedelta(hours=1)), 

89 Job(send_reference_reminders, schedule=timedelta(hours=1)), 

90 Job(send_host_request_reminders, schedule=timedelta(minutes=15)), 

91 Job(add_users_to_email_list, schedule=timedelta(hours=1)), 

92 Job(enforce_community_membership, schedule=timedelta(minutes=15)), 

93 Job(update_recommendation_scores, schedule=timedelta(hours=24)), 

94 Job(update_badges, schedule=timedelta(minutes=15)), 

95 Job(finalize_strong_verification), 

96 Job(send_activeness_probes, schedule=timedelta(minutes=60)), 

97 Job(update_randomized_locations, schedule=timedelta(hours=1)), 

98 Job(send_event_reminders, schedule=timedelta(hours=1)), 

99 Job(check_expo_push_receipts, schedule=timedelta(minutes=5)), 

100 Job(send_postal_verification_postcard), 

101 Job(check_database_consistency, schedule=timedelta(hours=24)), 

102 Job(auto_approve_moderation_queue, schedule=timedelta(seconds=15)), 

103] 

104 

105# Map job names to job definitions 

106JOBS: dict[str, Job[Any]] = {job.name: job for job in cast(list[Job[Any]], _JOBS_LIST)}