Coverage for app/backend/src/couchers/tasks.py: 94%

79 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import logging 

2from collections.abc import Sequence 

3 

4from sqlalchemy import RowMapping, insert, select 

5from sqlalchemy.orm import Session 

6from sqlalchemy.sql import func 

7 

8from couchers import urls 

9from couchers.config import config 

10from couchers.constants import SIGNUP_EMAIL_TOKEN_VALIDITY 

11from couchers.context import CouchersContext 

12from couchers.crypto import urlsafe_secure_token 

13from couchers.db import session_scope 

14from couchers.email.blocks import EmailBase 

15from couchers.email.emails import EmailChangeConfirmationEmail, SignupContinueEmail, SignupVerifyEmail 

16from couchers.email.queuing import queue_system_email, queue_userless_email 

17from couchers.models import ( 

18 AccountDeletionReason, 

19 Cluster, 

20 ClusterRole, 

21 ClusterSubscription, 

22 ContentReport, 

23 ContributorForm, 

24 EventCommunityInviteRequest, 

25 Node, 

26 RateLimitAction, 

27 RateLimitViolation, 

28 Reference, 

29 SignupFlow, 

30 StrongVerificationAttempt, 

31 User, 

32) 

33from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

34from couchers.utils import now 

35 

36logger = logging.getLogger(__name__) 

37 

38 

39def send_signup_email(context: CouchersContext, session: Session, flow: SignupFlow) -> None: 

40 logger.info(f"Sending signup email to {flow.email=}:") 

41 

42 # whether we've sent an email at all yet 

43 email_sent_before = flow.email_sent 

44 if flow.email_verified: 

45 # we just send a link to continue, not a verification link 

46 signup_link = urls.signup_link(token=flow.flow_token) 

47 elif flow.email_token and flow.token_is_valid: 

48 # if the verification email was sent and still is not expired, just resend the verification email 

49 signup_link = urls.signup_link(token=flow.email_token) 

50 else: 

51 # otherwise send a fresh email with a new token 

52 token = urlsafe_secure_token() 

53 flow.email_verified = False 

54 flow.email_token = token 

55 flow.email_token_expiry = now() + SIGNUP_EMAIL_TOKEN_VALIDITY 

56 signup_link = urls.signup_link(token=flow.email_token) 

57 

58 flow.email_sent = True 

59 

60 email: EmailBase 

61 if email_sent_before: 

62 email = SignupContinueEmail(user_name=flow.name, continue_url=signup_link) 

63 else: 

64 email = SignupVerifyEmail(user_name=flow.name, verify_url=signup_link) 

65 

66 queue_userless_email( 

67 context, 

68 session, 

69 flow.email, 

70 email, 

71 source_data_header=f"signup; initial={not email_sent_before}", 

72 ) 

73 

74 

75def send_email_changed_confirmation_to_new_email(context: CouchersContext, session: Session, user: User) -> None: 

76 """ 

77 Send an email to the user's new email address requesting confirmation of email change 

78 """ 

79 logger.info( 

80 f"Sending email changed (confirmation) email to {user=}'s new email address, " 

81 f"(old email: {user.email}, new email: {user.new_email=})" 

82 ) 

83 

84 if not user.new_email_token: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 raise ValueError(f"No new email token for {user.id}") 

86 elif not user.new_email: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 raise ValueError(f"No new email for {user.id}") 

88 

89 email = EmailChangeConfirmationEmail( 

90 user_name=user.name, 

91 old_email=user.email, 

92 confirm_url=urls.change_email_link(confirmation_token=user.new_email_token), 

93 ) 

94 

95 queue_userless_email(context, session, user.new_email, email, source_data_header="email_changed_confirmation") 

96 

97 

98def send_content_report_email(session: Session, content_report: ContentReport) -> None: 

99 logger.info("Sending content report email") 

100 queue_system_email( 

101 session, 

102 config.REPORTS_EMAIL_RECIPIENT, 

103 "content_report", 

104 template_args={"report": content_report}, 

105 ) 

106 

107 

108def maybe_send_reference_report_email(session: Session, reference: Reference) -> None: 

109 if reference.should_report: 

110 logger.info("Sending reference report email") 

111 queue_system_email( 

112 session, 

113 config.REPORTS_EMAIL_RECIPIENT, 

114 "reference_report", 

115 template_args={"reference": reference}, 

116 ) 

117 

118 

119def send_rate_limit_violation_report_email( 

120 session: Session, 

121 rate_limit_violation: RateLimitViolation, 

122 events: dict[RateLimitAction, Sequence[RowMapping]], 

123 threshold: int, 

124) -> None: 

125 """Send a report email to the moderation team if a user exceeds a rate limit within a given time frame.""" 

126 logger.info( 

127 f"Sending rate limit moderation email for user '{rate_limit_violation.user_id}' ({rate_limit_violation.action})" 

128 ) 

129 user = session.get_one(User, rate_limit_violation.user_id) 

130 queue_system_email( 

131 session, 

132 config.REPORTS_EMAIL_RECIPIENT, 

133 "rate_limit_violation_report", 

134 template_args={ 

135 "user": user, 

136 "action": rate_limit_violation.action, 

137 "threshold": threshold, 

138 "hours": RATE_LIMIT_HOURS, 

139 "is_hard_limit": rate_limit_violation.is_hard_limit, 

140 "events": events, 

141 }, 

142 ) 

143 

144 

145def send_duplicate_strong_verification_email( 

146 session: Session, old_attempt: StrongVerificationAttempt, new_attempt: StrongVerificationAttempt 

147) -> None: 

148 logger.info("Sending duplicate SV email") 

149 queue_system_email( 

150 session, 

151 config.REPORTS_EMAIL_RECIPIENT, 

152 "duplicate_strong_verification_report", 

153 template_args={ 

154 "new_user": new_attempt.user, 

155 "new_attempt_id": new_attempt.id, 

156 "old_user": old_attempt.user, 

157 "old_attempt_id": old_attempt.id, 

158 }, 

159 ) 

160 

161 

162def maybe_send_contributor_form_email(session: Session, form: ContributorForm) -> None: 

163 if form.should_notify: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 queue_system_email( 

165 session, 

166 config.CONTRIBUTOR_FORM_EMAIL_RECIPIENT, 

167 "contributor_form", 

168 template_args={"form": form}, 

169 ) 

170 

171 

172def send_event_community_invite_request_email(session: Session, request: EventCommunityInviteRequest) -> None: 

173 queue_system_email( 

174 session, 

175 config.MODS_EMAIL_RECIPIENT, 

176 "event_community_invite_request", 

177 template_args={ 

178 "event_link": urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

179 "user_link": urls.user_link(username=request.user.username), 

180 "view_link": urls.console_link(page="tools/community-invites"), 

181 }, 

182 ) 

183 

184 

185def send_account_deletion_report_email(session: Session, reason: AccountDeletionReason) -> None: 

186 logger.info("Sending account deletion report email") 

187 queue_system_email( 

188 session, 

189 config.REPORTS_EMAIL_RECIPIENT, 

190 "account_deletion_report", 

191 template_args={ 

192 "reason": reason, 

193 }, 

194 ) 

195 

196 

197def enforce_community_memberships() -> None: 

198 """ 

199 Go through all communities and make sure every user in the polygon is also a member 

200 """ 

201 with session_scope() as session: 

202 for node in session.execute(select(Node)).scalars().all(): 

203 existing_users = select(ClusterSubscription.user_id).where( 

204 ClusterSubscription.cluster == node.official_cluster 

205 ) 

206 node_geom = select(Node.geom).where(Node.id == node.id) 

207 user_ids_needing_adding = ( 

208 session.execute( 

209 select(User.id) 

210 .where(User.is_visible) 

211 .where(func.ST_Contains(node_geom, User.geom)) 

212 .where(~User.id.in_(existing_users)) 

213 ) 

214 .scalars() 

215 .all() 

216 ) 

217 if user_ids_needing_adding: 

218 session.execute( 

219 insert(ClusterSubscription), 

220 [ 

221 {"user_id": user_id, "cluster_id": node.official_cluster.id, "role": ClusterRole.member} 

222 for user_id in user_ids_needing_adding 

223 ], 

224 ) 

225 session.commit() 

226 

227 

228def enforce_community_memberships_for_user(session: Session, user: User) -> None: 

229 """ 

230 Adds a given user to all the communities they belong in based on their location. 

231 """ 

232 cluster_ids = ( 

233 session.execute( 

234 select(Cluster.id) 

235 .join(Node, Node.id == Cluster.parent_node_id) 

236 .where(Cluster.is_official_cluster) 

237 .where(func.ST_Contains(Node.geom, user.geom)) 

238 ) 

239 .scalars() 

240 .all() 

241 ) 

242 

243 for cluster_id in cluster_ids: 

244 session.add( 

245 ClusterSubscription( 

246 user_id=user.id, 

247 cluster_id=cluster_id, 

248 role=ClusterRole.member, 

249 ) 

250 ) 

251 session.commit()