Coverage for src/couchers/tasks.py: 96%
74 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-06 23:17 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-06 23:17 +0000
1import logging
2from collections.abc import Sequence
4from sqlalchemy import RowMapping, insert
5from sqlalchemy.orm import Session
6from sqlalchemy.sql import func
8from couchers import email, urls
9from couchers.config import config
10from couchers.constants import SIGNUP_EMAIL_TOKEN_VALIDITY
11from couchers.crypto import urlsafe_secure_token
12from couchers.db import session_scope
13from couchers.models import (
14 Cluster,
15 ClusterRole,
16 ClusterSubscription,
17 ContentReport,
18 ContributorForm,
19 EventCommunityInviteRequest,
20 Node,
21 RateLimitAction,
22 RateLimitViolation,
23 Reference,
24 SignupFlow,
25 StrongVerificationAttempt,
26 User,
27)
28from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
29from couchers.sql import couchers_select as select
30from couchers.templates.v2 import send_simple_pretty_email
31from couchers.utils import now
33logger = logging.getLogger(__name__)
36def send_signup_email(session: Session, flow: SignupFlow) -> None:
37 logger.info(f"Sending signup email to {flow.email=}:")
39 # whether we've sent an email at all yet
40 email_sent_before = flow.email_sent
41 if flow.email_verified:
42 # we just send a link to continue, not a verification link
43 signup_link = urls.signup_link(token=flow.flow_token)
44 elif flow.email_token and flow.token_is_valid:
45 # if the verification email was sent and still is not expired, just resend the verification email
46 signup_link = urls.signup_link(token=flow.email_token)
47 else:
48 # otherwise send a fresh email with a new token
49 token = urlsafe_secure_token()
50 flow.email_verified = False
51 flow.email_token = token
52 flow.email_token_expiry = now() + SIGNUP_EMAIL_TOKEN_VALIDITY
53 signup_link = urls.signup_link(token=flow.email_token)
55 flow.email_sent = True
57 send_simple_pretty_email(
58 session,
59 flow.email,
60 "Finish signing up for Couchers.org",
61 "signup_verify" if not email_sent_before else "signup_continue",
62 template_args={"flow": flow, "signup_link": signup_link},
63 )
66def send_email_changed_confirmation_to_new_email(session: Session, user: User) -> None:
67 """
68 Send an email to the user's new email address requesting confirmation of email change
69 """
70 logger.info(
71 f"Sending email changed (confirmation) email to {user=}'s new email address, "
72 f"(old email: {user.email}, new email: {user.new_email=})"
73 )
75 if not user.new_email_token:
76 raise ValueError(f"No new email token for {user.id}")
77 elif not user.new_email:
78 raise ValueError(f"No new email for {user.id}")
80 confirmation_link = urls.change_email_link(confirmation_token=user.new_email_token)
81 send_simple_pretty_email(
82 session,
83 user.new_email,
84 "Confirm your new email for Couchers.org",
85 "email_changed_confirmation_new_email",
86 template_args={"user": user, "confirmation_link": confirmation_link},
87 )
90def send_content_report_email(session: Session, content_report: ContentReport) -> None:
91 logger.info("Sending content report email")
92 email.enqueue_system_email(
93 session,
94 config["REPORTS_EMAIL_RECIPIENT"],
95 "content_report",
96 template_args={"report": content_report},
97 )
100def maybe_send_reference_report_email(session: Session, reference: Reference) -> None:
101 if reference.should_report:
102 logger.info("Sending reference report email")
103 email.enqueue_system_email(
104 session,
105 config["REPORTS_EMAIL_RECIPIENT"],
106 "reference_report",
107 template_args={"reference": reference},
108 )
111def send_rate_limit_violation_report_email(
112 session: Session,
113 rate_limit_violation: RateLimitViolation,
114 events: dict[RateLimitAction, Sequence[RowMapping]],
115 threshold: int,
116) -> None:
117 """Send a report email to the moderation team if a user exceeds a rate limit within a given time frame."""
118 logger.info(
119 f"Sending rate limit moderation email for user '{rate_limit_violation.user_id}' ({rate_limit_violation.action})"
120 )
121 user = session.get(User, rate_limit_violation.user_id)
122 email.enqueue_system_email(
123 session,
124 config["REPORTS_EMAIL_RECIPIENT"],
125 "rate_limit_violation_report",
126 template_args={
127 "user": user,
128 "action": rate_limit_violation.action,
129 "threshold": threshold,
130 "hours": RATE_LIMIT_HOURS,
131 "is_hard_limit": rate_limit_violation.is_hard_limit,
132 "events": events,
133 },
134 )
137def send_duplicate_strong_verification_email(
138 session: Session, old_attempt: StrongVerificationAttempt, new_attempt: StrongVerificationAttempt
139) -> None:
140 logger.info("Sending duplicate SV email")
141 email.enqueue_system_email(
142 session,
143 config["REPORTS_EMAIL_RECIPIENT"],
144 "duplicate_strong_verification_report",
145 template_args={
146 "new_user": new_attempt.user,
147 "new_attempt_id": new_attempt.id,
148 "old_user": old_attempt.user,
149 "old_attempt_id": old_attempt.id,
150 },
151 )
154def maybe_send_contributor_form_email(session: Session, form: ContributorForm) -> None:
155 if form.should_notify:
156 email.enqueue_system_email(
157 session,
158 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"],
159 "contributor_form",
160 template_args={"form": form},
161 )
164def send_event_community_invite_request_email(session: Session, request: EventCommunityInviteRequest) -> None:
165 email.enqueue_system_email(
166 session,
167 config["MODS_EMAIL_RECIPIENT"],
168 "event_community_invite_request",
169 template_args={
170 "event_link": urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
171 "user_link": urls.user_link(username=request.user.username),
172 "view_link": urls.console_link(page="admin/community-invites"),
173 },
174 )
177def send_account_deletion_report_email(session: Session, reason: str) -> None:
178 logger.info("Sending account deletion report email")
179 email.enqueue_system_email(
180 session,
181 config["REPORTS_EMAIL_RECIPIENT"],
182 "account_deletion_report",
183 template_args={
184 "reason": reason,
185 },
186 )
189def enforce_community_memberships() -> None:
190 """
191 Go through all communities and make sure every user in the polygon is also a member
192 """
193 with session_scope() as session:
194 for node in session.execute(select(Node)).scalars().all():
195 existing_users = select(ClusterSubscription.user_id).where(
196 ClusterSubscription.cluster == node.official_cluster
197 )
198 node_geom = select(Node.geom).where(Node.id == node.id)
199 user_ids_needing_adding = (
200 session.execute(
201 select(User.id)
202 .where(User.is_visible)
203 .where(func.ST_Contains(node_geom, User.geom))
204 .where(~User.id.in_(existing_users))
205 )
206 .scalars()
207 .all()
208 )
209 if user_ids_needing_adding:
210 session.execute(
211 insert(ClusterSubscription),
212 [
213 {"user_id": user_id, "cluster_id": node.official_cluster.id, "role": ClusterRole.member}
214 for user_id in user_ids_needing_adding
215 ],
216 )
217 session.commit()
220def enforce_community_memberships_for_user(session: Session, user: User) -> None:
221 """
222 Adds a given user to all the communities they belong in based on their location.
223 """
224 cluster_ids = (
225 session.execute(
226 select(Cluster.id)
227 .join(Node, Node.id == Cluster.parent_node_id)
228 .where(Cluster.is_official_cluster)
229 .where(func.ST_Contains(Node.geom, user.geom))
230 )
231 .scalars()
232 .all()
233 )
235 for cluster_id in cluster_ids:
236 session.add(
237 ClusterSubscription(
238 user=user,
239 cluster_id=cluster_id,
240 role=ClusterRole.member,
241 )
242 )
243 session.commit()