Coverage for src/couchers/notifications/background.py: 68%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1import logging 

2from pathlib import Path 

3 

4from google.protobuf import empty_pb2 

5from jinja2 import Environment, FileSystemLoader 

6from sqlalchemy.sql import func 

7 

8from couchers import urls 

9from couchers.config import config 

10from couchers.db import session_scope 

11from couchers.email import queue_email 

12from couchers.metrics import push_notification_counter, push_notification_disabled_counter 

13from couchers.models import ( 

14 Notification, 

15 NotificationDelivery, 

16 NotificationDeliveryType, 

17 PushNotificationDeliveryAttempt, 

18 PushNotificationSubscription, 

19 User, 

20) 

21from couchers.notifications.push import push_to_user 

22from couchers.notifications.push_api import send_push 

23from couchers.notifications.render import render_notification 

24from couchers.notifications.settings import get_preference 

25from couchers.notifications.unsubscribe import ( 

26 generate_do_not_email, 

27 generate_unsub_topic_action, 

28 generate_unsub_topic_key, 

29) 

30from couchers.sql import couchers_select as select 

31from couchers.templates.v2 import add_filters 

32from couchers.utils import get_tz_as_text, now 

33from proto.internal import jobs_pb2 

34 

35logger = logging.getLogger(__name__) 

36 

37template_folder = Path(__file__).parent / ".." / ".." / ".." / "templates" / "v2" 

38 

39loader = FileSystemLoader(template_folder) 

40env = Environment(loader=loader, trim_blocks=True) 

41 

42add_filters(env) 

43 

44 

45def _send_email_notification(session, user: User, notification: Notification): 

46 rendered = render_notification(user, notification) 

47 template_args = { 

48 "user": user, 

49 "time": notification.created, 

50 **rendered.email_template_args, 

51 } 

52 

53 template_args["_year"] = now().year 

54 template_args["_timezone_display"] = get_tz_as_text(user.timezone or "Etc/UTC") 

55 

56 plain_unsub_section = "\n\n---\n\n" 

57 if rendered.is_critical: 

58 plain_unsub_section += "This is a security email, you cannot unsubscribe from it." 

59 html_unsub_section = "This is a security email, you cannot unsubscribe from it." 

60 else: 

61 manage_link = urls.notification_settings_link() 

62 plain_unsub_section += f"Edit your notification settings at <{manage_link}>" 

63 html_unsub_section = f'<a href="{manage_link}">Manage notification preferences</a>.' 

64 unsub_options = [] 

65 ta = rendered.email_topic_action_unsubscribe_text 

66 tk = rendered.email_topic_key_unsubscribe_text 

67 ta_link = generate_unsub_topic_action(notification) 

68 tk_link = generate_unsub_topic_key(notification) 

69 if ta: 

70 plain_unsub_section += f"\n\nTurn off emails for {ta}: <{ta_link}>" 

71 unsub_options.append(f'<a href="{ta_link}">{ta}</a>') 

72 if tk: 

73 plain_unsub_section += f"\n\nTurn off emails for {tk}: <{tk_link}>" 

74 unsub_options.append(f'<a href="{tk_link}">{tk}</a>') 

75 if unsub_options: 

76 html_unsub_section += f"<br />Turn off emails for: {' / '.join(unsub_options)}." 

77 dne_link = generate_do_not_email(user) 

78 plain_unsub_section += f"\n\nDo not email me (disables hosting): <{dne_link}>" 

79 html_unsub_section += f'<br /><a href="{dne_link}">Do not email me (disables hosting)</a>.' 

80 

81 plain_tmplt = (template_folder / f"{rendered.email_template_name}.txt").read_text() 

82 plain = env.from_string(plain_tmplt + plain_unsub_section).render(template_args) 

83 html_tmplt = (template_folder / "generated_html" / f"{rendered.email_template_name}.html").read_text() 

84 html = env.from_string(html_tmplt.replace("___UNSUB_SECTION___", html_unsub_section)).render(template_args) 

85 

86 if user.do_not_email and not rendered.is_critical: 

87 logger.info(f"Not emailing {user} based on template {rendered.email_template_name} due to emails turned off") 

88 return 

89 

90 if user.is_banned: 

91 logger.info(f"Tried emailing {user} based on template {rendered.email_template_name} but user is banned") 

92 return 

93 

94 if user.is_deleted and not rendered.allow_deleted: 

95 logger.info(f"Tried emailing {user} based on template {rendered.email_template_name} but user is deleted") 

96 return 

97 

98 list_unsubscribe_header = None 

99 if rendered.email_list_unsubscribe_url: 

100 list_unsubscribe_header = f"<{rendered.email_list_unsubscribe_url}>" 

101 

102 queue_email( 

103 session, 

104 sender_name=config["NOTIFICATION_EMAIL_SENDER"], 

105 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"], 

106 recipient=user.email, 

107 subject=config["NOTIFICATION_PREFIX"] + rendered.email_subject, 

108 plain=plain, 

109 html=html, 

110 source_data=config["VERSION"] + f"/{rendered.email_template_name}", 

111 list_unsubscribe_header=list_unsubscribe_header, 

112 ) 

113 

114 

115def _send_push_notification(session, user: User, notification: Notification): 

116 logger.debug(f"Formatting push notification for {user}") 

117 

118 rendered = render_notification(user, notification) 

119 

120 if not rendered.push_title: 

121 raise Exception(f"Tried to send push notification to {user} but didn't have push info") 

122 

123 push_to_user( 

124 session, 

125 user_id=user.id, 

126 title=rendered.push_title, 

127 body=rendered.push_body, 

128 icon=rendered.push_icon, 

129 url=rendered.push_url, 

130 topic_action=notification.topic_action.display, 

131 key=notification.key, 

132 # keep on server for at most an hour if the client is not around 

133 ttl=3600, 

134 ) 

135 

136 

137def handle_notification(payload: jobs_pb2.HandleNotificationPayload): 

138 with session_scope() as session: 

139 notification = session.execute( 

140 select(Notification).where(Notification.id == payload.notification_id) 

141 ).scalar_one() 

142 

143 # ignore this notification if the user hasn't enabled new notifications 

144 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one() 

145 

146 topic, action = notification.topic_action.unpack() 

147 delivery_types = get_preference(session, notification.user.id, notification.topic_action) 

148 for delivery_type in delivery_types: 

149 logger.info(f"Should notify by {delivery_type}") 

150 if delivery_type == NotificationDeliveryType.email: 

151 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication 

152 session.add( 

153 NotificationDelivery( 

154 notification_id=notification.id, 

155 delivered=func.now(), 

156 delivery_type=NotificationDeliveryType.email, 

157 ) 

158 ) 

159 _send_email_notification(session, user, notification) 

160 elif delivery_type == NotificationDeliveryType.digest: 

161 # for digest notifications, add to digest queue 

162 session.add( 

163 NotificationDelivery( 

164 notification_id=notification.id, 

165 delivered=None, 

166 delivery_type=NotificationDeliveryType.digest, 

167 ) 

168 ) 

169 elif delivery_type == NotificationDeliveryType.push: 

170 # for push notifications, we send them straight away 

171 session.add( 

172 NotificationDelivery( 

173 notification_id=notification.id, 

174 delivered=func.now(), 

175 delivery_type=NotificationDeliveryType.push, 

176 ) 

177 ) 

178 _send_push_notification(session, user, notification) 

179 

180 

181def send_raw_push_notification(payload: jobs_pb2.SendRawPushNotificationPayload): 

182 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

183 logger.info("Not sending push notification due to push notifications disabled") 

184 

185 with session_scope() as session: 

186 if len(payload.data) > 3072: 

187 raise Exception(f"Data too long for push notification to sub {payload.push_notification_subscription_id}") 

188 sub = session.execute( 

189 select(PushNotificationSubscription).where( 

190 PushNotificationSubscription.id == payload.push_notification_subscription_id 

191 ) 

192 ).scalar_one() 

193 if sub.disabled_at < now(): 

194 logger.error(f"Tried to send push to disabled subscription: {sub.id}. Disabled at {sub.disabled_at}.") 

195 return 

196 # this of requests.response 

197 resp = send_push( 

198 payload.data, 

199 sub.endpoint, 

200 sub.auth_key, 

201 sub.p256dh_key, 

202 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"], 

203 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"], 

204 ttl=payload.ttl, 

205 ) 

206 success = resp.status_code in [200, 201, 202] 

207 session.add( 

208 PushNotificationDeliveryAttempt( 

209 push_notification_subscription_id=sub.id, 

210 success=success, 

211 status_code=resp.status_code, 

212 response=resp.text, 

213 ) 

214 ) 

215 session.commit() 

216 if success: 

217 logger.debug(f"Successfully sent push to sub {sub.id} for user {sub.user}") 

218 push_notification_counter.inc() 

219 elif resp.status_code == 404 or resp.status_code == 410: 

220 # gone 

221 logger.info(f"Push sub {sub.id} for user {sub.user} is gone! Disabling.") 

222 sub.disabled_at = func.now() 

223 push_notification_disabled_counter.inc() 

224 else: 

225 raise Exception(f"Failed to deliver push to {sub.id}, code: {resp.status_code}. Response: {resp.text}") 

226 

227 

228def handle_email_digests(payload: empty_pb2.Empty): 

229 """ 

230 Sends out email digests 

231 

232 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already. 

233 

234 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it. 

235 

236 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff. 

237 """ 

238 logger.info("Sending out email digests") 

239 

240 with session_scope() as session: 

241 # already sent email notifications 

242 delivered_email_notifications = ( 

243 select( 

244 Notification.id.label("notification_id"), 

245 # min is superfluous but needed for group_by 

246 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

247 ) 

248 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

249 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

250 .where(NotificationDelivery.delivered != None) 

251 .group_by(Notification) 

252 .subquery() 

253 ) 

254 

255 # users who have unsent "digest" type notifications but not sent email notifications 

256 users_to_send_digests_to = ( 

257 session.execute( 

258 select(User) 

259 .where(User.digest_frequency != None) 

260 .where(User.last_digest_sent < func.now() - User.digest_frequency) 

261 # todo: tz 

262 .join(Notification, Notification.user_id == User.id) 

263 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

264 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

265 .where(NotificationDelivery.delivered == None) 

266 .outerjoin( 

267 delivered_email_notifications, 

268 delivered_email_notifications.c.notification_id == Notification.id, 

269 ) 

270 .where(delivered_email_notifications.c.notification_delivery_id == None) 

271 .group_by(User) 

272 ) 

273 .scalars() 

274 .all() 

275 ) 

276 

277 logger.info(f"{users_to_send_digests_to=}") 

278 

279 for user in users_to_send_digests_to: 

280 # digest notifications that haven't been delivered yet 

281 notifications_and_deliveries = session.execute( 

282 select(Notification, NotificationDelivery) 

283 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

284 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

285 .where(NotificationDelivery.delivered == None) 

286 .where(Notification.user_id == user.id) 

287 .order_by(Notification.created) 

288 ).all() 

289 

290 if notifications_and_deliveries: 

291 notifications, deliveries = zip(*notifications_and_deliveries) 

292 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications") 

293 logger.info("TODO: supposed to send digest email") 

294 for delivery in deliveries: 

295 delivery.delivered = func.now() 

296 user.last_digest_sent = func.now() 

297 session.commit()