Coverage for src/couchers/tasks.py: 99%
68 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-14 16:54 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-14 16:54 +0000
1import logging
3from sqlalchemy import insert
4from sqlalchemy.sql import func
6from couchers import email, urls
7from couchers.config import config
8from couchers.constants import SIGNUP_EMAIL_TOKEN_VALIDITY
9from couchers.crypto import urlsafe_secure_token
10from couchers.db import session_scope
11from couchers.models import (
12 Cluster,
13 ClusterRole,
14 ClusterSubscription,
15 EventCommunityInviteRequest,
16 Node,
17 RateLimitViolation,
18 User,
19)
20from couchers.rate_limits.definitions import RATE_LIMIT_INTERVAL_STRING
21from couchers.sql import couchers_select as select
22from couchers.templates.v2 import send_simple_pretty_email
23from couchers.utils import now
25logger = logging.getLogger(__name__)
28def send_signup_email(session, flow):
29 logger.info(f"Sending signup email to {flow.email=}:")
31 # whether we've sent an email at all yet
32 email_sent_before = flow.email_sent
33 if flow.email_verified:
34 # we just send a link to continue, not a verification link
35 signup_link = urls.signup_link(token=flow.flow_token)
36 elif flow.email_token and flow.token_is_valid:
37 # if the verification email was sent and still is not expired, just resend the verification email
38 signup_link = urls.signup_link(token=flow.email_token)
39 else:
40 # otherwise send a fresh email with new token
41 token = urlsafe_secure_token()
42 flow.email_verified = False
43 flow.email_token = token
44 flow.email_token_expiry = now() + SIGNUP_EMAIL_TOKEN_VALIDITY
45 signup_link = urls.signup_link(token=flow.email_token)
47 flow.email_sent = True
49 send_simple_pretty_email(
50 session,
51 flow.email,
52 "Finish signing up for Couchers.org",
53 "signup_verify" if not email_sent_before else "signup_continue",
54 template_args={"flow": flow, "signup_link": signup_link},
55 )
58def send_email_changed_confirmation_to_new_email(session, user):
59 """
60 Send an email to user's new email address requesting confirmation of email change
61 """
62 logger.info(
63 f"Sending email changed (confirmation) email to {user=}'s new email address, (old email: {user.email}, new email: {user.new_email=})"
64 )
66 confirmation_link = urls.change_email_link(confirmation_token=user.new_email_token)
67 send_simple_pretty_email(
68 session,
69 user.new_email,
70 "Confirm your new email for Couchers.org",
71 "email_changed_confirmation_new_email",
72 template_args={"user": user, "confirmation_link": confirmation_link},
73 )
76def send_content_report_email(session, content_report):
77 logger.info("Sending content report email")
78 email.enqueue_system_email(
79 session,
80 config["REPORTS_EMAIL_RECIPIENT"],
81 "content_report",
82 template_args={"report": content_report},
83 )
86def maybe_send_reference_report_email(session, reference):
87 if reference.should_report:
88 logger.info("Sending reference report email")
89 email.enqueue_system_email(
90 session,
91 config["REPORTS_EMAIL_RECIPIENT"],
92 "reference_report",
93 template_args={"reference": reference},
94 )
97def send_rate_limit_violation_report_email(session, rate_limit_violation: RateLimitViolation, events, threshold: int):
98 """Send a report email to the moderation team if a user exceeds a rate limit within a given time frame."""
99 logger.info(
100 f"Sending rate limit moderation email for user '{rate_limit_violation.user_id}' ({rate_limit_violation.action})"
101 )
102 user = session.get(User, rate_limit_violation.user_id)
103 email.enqueue_system_email(
104 session,
105 config["REPORTS_EMAIL_RECIPIENT"],
106 "rate_limit_violation_report",
107 template_args={
108 "user": user,
109 "action": rate_limit_violation.action,
110 "threshold": threshold,
111 "time_interval_str": RATE_LIMIT_INTERVAL_STRING,
112 "is_hard_limit": rate_limit_violation.is_hard_limit,
113 "events": events,
114 },
115 )
118def send_duplicate_strong_verification_email(session, old_attempt, new_attempt):
119 logger.info("Sending duplicate SV email")
120 email.enqueue_system_email(
121 session,
122 config["REPORTS_EMAIL_RECIPIENT"],
123 "duplicate_strong_verification_report",
124 template_args={
125 "new_user": new_attempt.user,
126 "new_attempt_id": new_attempt.id,
127 "old_user": old_attempt.user,
128 "old_attempt_id": old_attempt.id,
129 },
130 )
133def maybe_send_contributor_form_email(session, form):
134 if form.should_notify:
135 email.enqueue_system_email(
136 session,
137 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"],
138 "contributor_form",
139 template_args={"form": form},
140 )
143def send_event_community_invite_request_email(session, request: EventCommunityInviteRequest):
144 email.enqueue_system_email(
145 session,
146 config["MODS_EMAIL_RECIPIENT"],
147 "event_community_invite_request",
148 template_args={
149 "event_link": urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
150 "user_link": urls.user_link(username=request.user.username),
151 "view_link": urls.console_link(page="api/org.couchers.admin.Admin"),
152 },
153 )
156def send_account_deletion_report_email(session, reason):
157 logger.info("Sending account deletion report email")
158 email.enqueue_system_email(
159 session,
160 config["REPORTS_EMAIL_RECIPIENT"],
161 "account_deletion_report",
162 template_args={
163 "reason": reason,
164 },
165 )
168def enforce_community_memberships():
169 """
170 Go through all communities and make sure every user in the polygon is also a member
171 """
172 with session_scope() as session:
173 for node in session.execute(select(Node)).scalars().all():
174 existing_users = select(ClusterSubscription.user_id).where(
175 ClusterSubscription.cluster == node.official_cluster
176 )
177 node_geom = select(Node.geom).where(Node.id == node.id)
178 user_ids_needing_adding = (
179 session.execute(
180 select(User.id)
181 .where(User.is_visible)
182 .where(func.ST_Contains(node_geom, User.geom))
183 .where(~User.id.in_(existing_users))
184 )
185 .scalars()
186 .all()
187 )
188 if user_ids_needing_adding:
189 session.execute(
190 insert(ClusterSubscription),
191 [
192 {"user_id": user_id, "cluster_id": node.official_cluster.id, "role": ClusterRole.member}
193 for user_id in user_ids_needing_adding
194 ],
195 )
196 session.commit()
199def enforce_community_memberships_for_user(session, user):
200 """
201 Adds a given user to all the communities they belong in based on their location.
202 """
203 cluster_ids = (
204 session.execute(
205 select(Cluster.id)
206 .join(Node, Node.id == Cluster.parent_node_id)
207 .where(Cluster.is_official_cluster)
208 .where(func.ST_Contains(Node.geom, user.geom))
209 )
210 .scalars()
211 .all()
212 )
214 for cluster_id in cluster_ids:
215 session.add(
216 ClusterSubscription(
217 user=user,
218 cluster_id=cluster_id,
219 role=ClusterRole.member,
220 )
221 )
222 session.commit()