Coverage for app / backend / src / couchers / jobs / definitions.py: 100%
24 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1from dataclasses import dataclass
2from datetime import timedelta
3from typing import Any, Protocol, cast, get_type_hints
5from couchers.jobs.handlers import (
6 add_users_to_email_list,
7 auto_approve_moderation_queue,
8 check_database_consistency,
9 check_expo_push_receipts,
10 enforce_community_membership,
11 finalize_strong_verification,
12 purge_account_deletion_tokens,
13 purge_login_tokens,
14 purge_password_reset_tokens,
15 send_activeness_probes,
16 send_email,
17 send_event_reminders,
18 send_host_request_reminders,
19 send_message_notifications,
20 send_onboarding_emails,
21 send_postal_verification_postcard,
22 send_reference_reminders,
23 send_request_notifications,
24 update_badges,
25 update_randomized_locations,
26 update_recommendation_scores,
27)
28from couchers.materialized_views import refresh_materialized_views, refresh_materialized_views_rapid
29from couchers.notifications.background import handle_email_digests, handle_notification
30from couchers.notifications.send_raw_push_notification import send_raw_push_notification_v2
31from couchers.servicers.conversations import generate_message_notifications
32from couchers.servicers.discussions import generate_create_discussion_notifications
33from couchers.servicers.editor import generate_new_blog_post_notifications
34from couchers.servicers.events import (
35 generate_event_cancel_notifications,
36 generate_event_create_notifications,
37 generate_event_delete_notifications,
38 generate_event_update_notifications,
39)
40from couchers.servicers.threads import generate_reply_notifications
43class JobHandler[T](Protocol):
44 def __call__(self, payload: T) -> None: ...
46 @property
47 def __name__(self) -> str: ...
50@dataclass(frozen=True, slots=True)
51class Job[T]:
52 """Definition of a background job."""
54 handler: JobHandler[T]
55 schedule: timedelta | None = None
57 @property
58 def name(self) -> str:
59 return self.handler.__name__
61 @property
62 def payload_type(self) -> type[T]:
63 """Extracts protobuf type from type hints."""
64 return cast(type[T], get_type_hints(self.handler)["payload"])
67# Job registry - first create a list of all jobs.
68_JOBS_LIST = [
69 Job(handle_notification),
70 Job(send_raw_push_notification_v2),
71 Job(handle_email_digests, schedule=timedelta(minutes=15)),
72 Job(generate_message_notifications),
73 Job(generate_reply_notifications),
74 Job(generate_create_discussion_notifications),
75 Job(generate_event_create_notifications),
76 Job(generate_event_update_notifications),
77 Job(generate_event_cancel_notifications),
78 Job(generate_event_delete_notifications),
79 Job(generate_new_blog_post_notifications),
80 Job(refresh_materialized_views, schedule=timedelta(minutes=5)),
81 Job(refresh_materialized_views_rapid, schedule=timedelta(seconds=30)),
82 Job(send_email),
83 Job(purge_login_tokens, schedule=timedelta(hours=24)),
84 Job(purge_password_reset_tokens, schedule=timedelta(hours=24)),
85 Job(purge_account_deletion_tokens, schedule=timedelta(hours=24)),
86 Job(send_message_notifications, schedule=timedelta(minutes=3)),
87 Job(send_request_notifications, schedule=timedelta(minutes=3)),
88 Job(send_onboarding_emails, schedule=timedelta(hours=1)),
89 Job(send_reference_reminders, schedule=timedelta(hours=1)),
90 Job(send_host_request_reminders, schedule=timedelta(minutes=15)),
91 Job(add_users_to_email_list, schedule=timedelta(hours=1)),
92 Job(enforce_community_membership, schedule=timedelta(minutes=15)),
93 Job(update_recommendation_scores, schedule=timedelta(hours=24)),
94 Job(update_badges, schedule=timedelta(minutes=15)),
95 Job(finalize_strong_verification),
96 Job(send_activeness_probes, schedule=timedelta(minutes=60)),
97 Job(update_randomized_locations, schedule=timedelta(hours=1)),
98 Job(send_event_reminders, schedule=timedelta(hours=1)),
99 Job(check_expo_push_receipts, schedule=timedelta(minutes=5)),
100 Job(send_postal_verification_postcard),
101 Job(check_database_consistency, schedule=timedelta(hours=24)),
102 Job(auto_approve_moderation_queue, schedule=timedelta(seconds=15)),
103]
105# Map job names to job definitions
106JOBS: dict[str, Job[Any]] = {job.name: job for job in cast(list[Job[Any]], _JOBS_LIST)}