Coverage for src/couchers/tasks.py: 100%
61 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1import logging
3from sqlalchemy.sql import func
5from couchers import email, urls
6from couchers.config import config
7from couchers.constants import SIGNUP_EMAIL_TOKEN_VALIDITY
8from couchers.crypto import urlsafe_secure_token
9from couchers.db import session_scope
10from couchers.models import (
11 Cluster,
12 ClusterRole,
13 ClusterSubscription,
14 EventCommunityInviteRequest,
15 Node,
16 User,
17)
18from couchers.sql import couchers_select as select
19from couchers.templates.v2 import send_simple_pretty_email
20from couchers.utils import now
22logger = logging.getLogger(__name__)
25def send_signup_email(session, flow):
26 logger.info(f"Sending signup email to {flow.email=}:")
28 # whether we've sent an email at all yet
29 email_sent_before = flow.email_sent
30 if flow.email_verified:
31 # we just send a link to continue, not a verification link
32 signup_link = urls.signup_link(token=flow.flow_token)
33 elif flow.email_token and flow.token_is_valid:
34 # if the verification email was sent and still is not expired, just resend the verification email
35 signup_link = urls.signup_link(token=flow.email_token)
36 else:
37 # otherwise send a fresh email with new token
38 token = urlsafe_secure_token()
39 flow.email_verified = False
40 flow.email_token = token
41 flow.email_token_expiry = now() + SIGNUP_EMAIL_TOKEN_VALIDITY
42 signup_link = urls.signup_link(token=flow.email_token)
44 flow.email_sent = True
46 send_simple_pretty_email(
47 session,
48 flow.email,
49 "Finish signing up for Couchers.org",
50 "signup_verify" if not email_sent_before else "signup_continue",
51 template_args={"flow": flow, "signup_link": signup_link},
52 )
55def send_email_changed_confirmation_to_new_email(session, user):
56 """
57 Send an email to user's new email address requesting confirmation of email change
58 """
59 logger.info(
60 f"Sending email changed (confirmation) email to {user=}'s new email address, (old email: {user.email}, new email: {user.new_email=})"
61 )
63 confirmation_link = urls.change_email_link(confirmation_token=user.new_email_token)
64 send_simple_pretty_email(
65 session,
66 user.new_email,
67 "Confirm your new email for Couchers.org",
68 "email_changed_confirmation_new_email",
69 template_args={"user": user, "confirmation_link": confirmation_link},
70 )
73def send_content_report_email(session, content_report):
74 logger.info("Sending content report email")
75 email.enqueue_system_email(
76 session,
77 config["REPORTS_EMAIL_RECIPIENT"],
78 "content_report",
79 template_args={"report": content_report},
80 )
83def maybe_send_reference_report_email(session, reference):
84 if reference.should_report:
85 logger.info("Sending reference report email")
86 email.enqueue_system_email(
87 session,
88 config["REPORTS_EMAIL_RECIPIENT"],
89 "reference_report",
90 template_args={"reference": reference},
91 )
94def send_duplicate_strong_verification_email(session, old_attempt, new_attempt):
95 logger.info("Sending duplicate SV email")
96 email.enqueue_system_email(
97 session,
98 config["REPORTS_EMAIL_RECIPIENT"],
99 "duplicate_strong_verification_report",
100 template_args={
101 "new_user": new_attempt.user,
102 "new_attempt_id": new_attempt.id,
103 "old_user": old_attempt.user,
104 "old_attempt_id": old_attempt.id,
105 },
106 )
109def maybe_send_contributor_form_email(session, form):
110 if form.should_notify:
111 email.enqueue_system_email(
112 session,
113 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"],
114 "contributor_form",
115 template_args={"form": form},
116 )
119def send_event_community_invite_request_email(session, request: EventCommunityInviteRequest):
120 email.enqueue_system_email(
121 session,
122 config["MODS_EMAIL_RECIPIENT"],
123 "event_community_invite_request",
124 template_args={
125 "event_link": urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
126 "user_link": urls.user_link(username=request.user.username),
127 "view_link": urls.console_link(page="api/org.couchers.admin.Admin"),
128 },
129 )
132def send_account_deletion_report_email(session, reason):
133 logger.info("Sending account deletion report email")
134 email.enqueue_system_email(
135 session,
136 config["REPORTS_EMAIL_RECIPIENT"],
137 "account_deletion_report",
138 template_args={
139 "reason": reason,
140 },
141 )
144def enforce_community_memberships():
145 """
146 Go through all communities and make sure every user in the polygon is also a member
147 """
148 with session_scope() as session:
149 for node in session.execute(select(Node)).scalars().all():
150 existing_users = select(ClusterSubscription.user_id).where(
151 ClusterSubscription.cluster == node.official_cluster
152 )
153 users_needing_adding = (
154 session.execute(
155 select(User)
156 .where(User.is_visible)
157 .where(func.ST_Contains(node.geom, User.geom))
158 .where(~User.id.in_(existing_users))
159 )
160 .scalars()
161 .all()
162 )
163 for user in users_needing_adding:
164 node.official_cluster.cluster_subscriptions.append(
165 ClusterSubscription(
166 user=user,
167 role=ClusterRole.member,
168 )
169 )
170 session.commit()
173def enforce_community_memberships_for_user(session, user):
174 """
175 Adds a given user to all the communities they belong in based on their location.
176 """
177 cluster_ids = (
178 session.execute(
179 select(Cluster.id)
180 .join(Node, Node.id == Cluster.parent_node_id)
181 .where(Cluster.is_official_cluster)
182 .where(func.ST_Contains(Node.geom, user.geom))
183 )
184 .scalars()
185 .all()
186 )
188 for cluster_id in cluster_ids:
189 session.add(
190 ClusterSubscription(
191 user=user,
192 cluster_id=cluster_id,
193 role=ClusterRole.member,
194 )
195 )
196 session.commit()