Coverage for app / backend / src / couchers / jobs / definitions.py: 100%
24 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1from dataclasses import dataclass
2from datetime import timedelta
3from typing import Any, Protocol, cast, get_type_hints
5from couchers.jobs.handlers import (
6 add_users_to_email_list,
7 auto_approve_moderation_queue,
8 check_database_consistency,
9 check_expo_push_receipts,
10 check_mypostcard_jobs,
11 enforce_community_membership,
12 finalize_strong_verification,
13 purge_account_deletion_tokens,
14 purge_login_tokens,
15 purge_password_reset_tokens,
16 send_activeness_probes,
17 send_email,
18 send_event_reminders,
19 send_host_request_reminders,
20 send_message_notifications,
21 send_onboarding_emails,
22 send_postal_verification_postcard,
23 send_reference_reminders,
24 send_request_notifications,
25 update_badges,
26 update_randomized_locations,
27 update_recommendation_scores,
28)
29from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid
30from couchers.notifications.background import handle_email_digests, handle_notification
31from couchers.notifications.send_raw_push_notification import send_raw_push_notification_v2
32from couchers.servicers.conversations import generate_message_notifications
33from couchers.servicers.discussions import generate_create_discussion_notifications
34from couchers.servicers.editor import generate_new_blog_post_notifications
35from couchers.servicers.events import (
36 generate_event_cancel_notifications,
37 generate_event_create_notifications,
38 generate_event_delete_notifications,
39 generate_event_update_notifications,
40)
41from couchers.servicers.threads import generate_reply_notifications
44class JobHandler[T](Protocol):
45 def __call__(self, payload: T) -> None: ...
47 @property
48 def __name__(self) -> str: ...
51@dataclass(frozen=True, slots=True)
52class Job[T]:
53 """Definition of a background job."""
55 handler: JobHandler[T]
56 schedule: timedelta | None = None
58 @property
59 def name(self) -> str:
60 return self.handler.__name__
62 @property
63 def payload_type(self) -> type[T]:
64 """Extracts protobuf type from type hints."""
65 return cast(type[T], get_type_hints(self.handler)["payload"])
68# Job registry - first create a list of all jobs.
69_JOBS_LIST = [
70 Job(handle_notification),
71 Job(send_raw_push_notification_v2),
72 Job(handle_email_digests, schedule=timedelta(minutes=15)),
73 Job(generate_message_notifications),
74 Job(generate_reply_notifications),
75 Job(generate_create_discussion_notifications),
76 Job(generate_event_create_notifications),
77 Job(generate_event_update_notifications),
78 Job(generate_event_cancel_notifications),
79 Job(generate_event_delete_notifications),
80 Job(generate_new_blog_post_notifications),
81 Job(refresh_materialized_views, schedule=timedelta(minutes=5)),
82 Job(refresh_materialized_views_rapid, schedule=timedelta(seconds=30)),
83 Job(send_email),
84 Job(purge_login_tokens, schedule=timedelta(hours=24)),
85 Job(purge_password_reset_tokens, schedule=timedelta(hours=24)),
86 Job(purge_account_deletion_tokens, schedule=timedelta(hours=24)),
87 Job(send_message_notifications, schedule=timedelta(minutes=3)),
88 Job(send_request_notifications, schedule=timedelta(minutes=3)),
89 Job(send_onboarding_emails, schedule=timedelta(hours=1)),
90 Job(send_reference_reminders, schedule=timedelta(hours=1)),
91 Job(send_host_request_reminders, schedule=timedelta(minutes=15)),
92 Job(add_users_to_email_list, schedule=timedelta(hours=1)),
93 Job(enforce_community_membership, schedule=timedelta(minutes=15)),
94 Job(update_recommendation_scores, schedule=timedelta(hours=24)),
95 Job(update_badges, schedule=timedelta(minutes=15)),
96 Job(finalize_strong_verification),
97 Job(send_activeness_probes, schedule=timedelta(minutes=60)),
98 Job(update_randomized_locations, schedule=timedelta(hours=1)),
99 Job(send_event_reminders, schedule=timedelta(hours=1)),
100 Job(check_expo_push_receipts, schedule=timedelta(minutes=5)),
101 Job(send_postal_verification_postcard),
102 Job(check_database_consistency, schedule=timedelta(hours=24)),
103 Job(check_mypostcard_jobs, schedule=timedelta(hours=24)),
104 Job(auto_approve_moderation_queue, schedule=timedelta(seconds=15)),
105]
107# Map job names to job definitions
108JOBS: dict[str, Job[Any]] = {job.name: job for job in cast(list[Job[Any]], _JOBS_LIST)}