Coverage for src/couchers/tasks.py: 99%

68 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-14 16:54 +0000

1import logging 

2 

3from sqlalchemy import insert 

4from sqlalchemy.sql import func 

5 

6from couchers import email, urls 

7from couchers.config import config 

8from couchers.constants import SIGNUP_EMAIL_TOKEN_VALIDITY 

9from couchers.crypto import urlsafe_secure_token 

10from couchers.db import session_scope 

11from couchers.models import ( 

12 Cluster, 

13 ClusterRole, 

14 ClusterSubscription, 

15 EventCommunityInviteRequest, 

16 Node, 

17 RateLimitViolation, 

18 User, 

19) 

20from couchers.rate_limits.definitions import RATE_LIMIT_INTERVAL_STRING 

21from couchers.sql import couchers_select as select 

22from couchers.templates.v2 import send_simple_pretty_email 

23from couchers.utils import now 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28def send_signup_email(session, flow): 

29 logger.info(f"Sending signup email to {flow.email=}:") 

30 

31 # whether we've sent an email at all yet 

32 email_sent_before = flow.email_sent 

33 if flow.email_verified: 

34 # we just send a link to continue, not a verification link 

35 signup_link = urls.signup_link(token=flow.flow_token) 

36 elif flow.email_token and flow.token_is_valid: 

37 # if the verification email was sent and still is not expired, just resend the verification email 

38 signup_link = urls.signup_link(token=flow.email_token) 

39 else: 

40 # otherwise send a fresh email with new token 

41 token = urlsafe_secure_token() 

42 flow.email_verified = False 

43 flow.email_token = token 

44 flow.email_token_expiry = now() + SIGNUP_EMAIL_TOKEN_VALIDITY 

45 signup_link = urls.signup_link(token=flow.email_token) 

46 

47 flow.email_sent = True 

48 

49 send_simple_pretty_email( 

50 session, 

51 flow.email, 

52 "Finish signing up for Couchers.org", 

53 "signup_verify" if not email_sent_before else "signup_continue", 

54 template_args={"flow": flow, "signup_link": signup_link}, 

55 ) 

56 

57 

58def send_email_changed_confirmation_to_new_email(session, user): 

59 """ 

60 Send an email to user's new email address requesting confirmation of email change 

61 """ 

62 logger.info( 

63 f"Sending email changed (confirmation) email to {user=}'s new email address, (old email: {user.email}, new email: {user.new_email=})" 

64 ) 

65 

66 confirmation_link = urls.change_email_link(confirmation_token=user.new_email_token) 

67 send_simple_pretty_email( 

68 session, 

69 user.new_email, 

70 "Confirm your new email for Couchers.org", 

71 "email_changed_confirmation_new_email", 

72 template_args={"user": user, "confirmation_link": confirmation_link}, 

73 ) 

74 

75 

76def send_content_report_email(session, content_report): 

77 logger.info("Sending content report email") 

78 email.enqueue_system_email( 

79 session, 

80 config["REPORTS_EMAIL_RECIPIENT"], 

81 "content_report", 

82 template_args={"report": content_report}, 

83 ) 

84 

85 

86def maybe_send_reference_report_email(session, reference): 

87 if reference.should_report: 

88 logger.info("Sending reference report email") 

89 email.enqueue_system_email( 

90 session, 

91 config["REPORTS_EMAIL_RECIPIENT"], 

92 "reference_report", 

93 template_args={"reference": reference}, 

94 ) 

95 

96 

97def send_rate_limit_violation_report_email(session, rate_limit_violation: RateLimitViolation, events, threshold: int): 

98 """Send a report email to the moderation team if a user exceeds a rate limit within a given time frame.""" 

99 logger.info( 

100 f"Sending rate limit moderation email for user '{rate_limit_violation.user_id}' ({rate_limit_violation.action})" 

101 ) 

102 user = session.get(User, rate_limit_violation.user_id) 

103 email.enqueue_system_email( 

104 session, 

105 config["REPORTS_EMAIL_RECIPIENT"], 

106 "rate_limit_violation_report", 

107 template_args={ 

108 "user": user, 

109 "action": rate_limit_violation.action, 

110 "threshold": threshold, 

111 "time_interval_str": RATE_LIMIT_INTERVAL_STRING, 

112 "is_hard_limit": rate_limit_violation.is_hard_limit, 

113 "events": events, 

114 }, 

115 ) 

116 

117 

118def send_duplicate_strong_verification_email(session, old_attempt, new_attempt): 

119 logger.info("Sending duplicate SV email") 

120 email.enqueue_system_email( 

121 session, 

122 config["REPORTS_EMAIL_RECIPIENT"], 

123 "duplicate_strong_verification_report", 

124 template_args={ 

125 "new_user": new_attempt.user, 

126 "new_attempt_id": new_attempt.id, 

127 "old_user": old_attempt.user, 

128 "old_attempt_id": old_attempt.id, 

129 }, 

130 ) 

131 

132 

133def maybe_send_contributor_form_email(session, form): 

134 if form.should_notify: 

135 email.enqueue_system_email( 

136 session, 

137 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"], 

138 "contributor_form", 

139 template_args={"form": form}, 

140 ) 

141 

142 

143def send_event_community_invite_request_email(session, request: EventCommunityInviteRequest): 

144 email.enqueue_system_email( 

145 session, 

146 config["MODS_EMAIL_RECIPIENT"], 

147 "event_community_invite_request", 

148 template_args={ 

149 "event_link": urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

150 "user_link": urls.user_link(username=request.user.username), 

151 "view_link": urls.console_link(page="api/org.couchers.admin.Admin"), 

152 }, 

153 ) 

154 

155 

156def send_account_deletion_report_email(session, reason): 

157 logger.info("Sending account deletion report email") 

158 email.enqueue_system_email( 

159 session, 

160 config["REPORTS_EMAIL_RECIPIENT"], 

161 "account_deletion_report", 

162 template_args={ 

163 "reason": reason, 

164 }, 

165 ) 

166 

167 

168def enforce_community_memberships(): 

169 """ 

170 Go through all communities and make sure every user in the polygon is also a member 

171 """ 

172 with session_scope() as session: 

173 for node in session.execute(select(Node)).scalars().all(): 

174 existing_users = select(ClusterSubscription.user_id).where( 

175 ClusterSubscription.cluster == node.official_cluster 

176 ) 

177 node_geom = select(Node.geom).where(Node.id == node.id) 

178 user_ids_needing_adding = ( 

179 session.execute( 

180 select(User.id) 

181 .where(User.is_visible) 

182 .where(func.ST_Contains(node_geom, User.geom)) 

183 .where(~User.id.in_(existing_users)) 

184 ) 

185 .scalars() 

186 .all() 

187 ) 

188 if user_ids_needing_adding: 

189 session.execute( 

190 insert(ClusterSubscription), 

191 [ 

192 {"user_id": user_id, "cluster_id": node.official_cluster.id, "role": ClusterRole.member} 

193 for user_id in user_ids_needing_adding 

194 ], 

195 ) 

196 session.commit() 

197 

198 

199def enforce_community_memberships_for_user(session, user): 

200 """ 

201 Adds a given user to all the communities they belong in based on their location. 

202 """ 

203 cluster_ids = ( 

204 session.execute( 

205 select(Cluster.id) 

206 .join(Node, Node.id == Cluster.parent_node_id) 

207 .where(Cluster.is_official_cluster) 

208 .where(func.ST_Contains(Node.geom, user.geom)) 

209 ) 

210 .scalars() 

211 .all() 

212 ) 

213 

214 for cluster_id in cluster_ids: 

215 session.add( 

216 ClusterSubscription( 

217 user=user, 

218 cluster_id=cluster_id, 

219 role=ClusterRole.member, 

220 ) 

221 ) 

222 session.commit()