Coverage for src/couchers/notifications/background.py: 70%

125 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import logging 

2from pathlib import Path 

3 

4from google.protobuf import empty_pb2 

5from jinja2 import Environment, FileSystemLoader 

6from sqlalchemy.sql import func 

7 

8from couchers import urls 

9from couchers.config import config 

10from couchers.db import session_scope 

11from couchers.email import queue_email 

12from couchers.models import ( 

13 Notification, 

14 NotificationDelivery, 

15 NotificationDeliveryType, 

16 PushNotificationDeliveryAttempt, 

17 PushNotificationSubscription, 

18 User, 

19) 

20from couchers.notifications.push import push_to_user 

21from couchers.notifications.push_api import send_push 

22from couchers.notifications.render import render_notification 

23from couchers.notifications.settings import get_preference 

24from couchers.notifications.unsubscribe import ( 

25 generate_do_not_email, 

26 generate_unsub_topic_action, 

27 generate_unsub_topic_key, 

28) 

29from couchers.sql import couchers_select as select 

30from couchers.templates.v2 import add_filters 

31from couchers.utils import get_tz_as_text, now 

32from proto.internal import jobs_pb2 

33 

34logger = logging.getLogger(__name__) 

35 

36template_folder = Path(__file__).parent / ".." / ".." / ".." / "templates" / "v2" 

37 

38loader = FileSystemLoader(template_folder) 

39env = Environment(loader=loader, trim_blocks=True) 

40 

41add_filters(env) 

42 

43 

44def _send_email_notification(session, user: User, notification: Notification): 

45 rendered = render_notification(user, notification) 

46 template_args = { 

47 "user": user, 

48 "time": notification.created, 

49 **rendered.email_template_args, 

50 } 

51 

52 template_args["_year"] = now().year 

53 template_args["_timezone_display"] = get_tz_as_text(user.timezone or "Etc/UTC") 

54 

55 plain_unsub_section = "\n\n---\n\n" 

56 if rendered.is_critical: 

57 plain_unsub_section += "This is a security email, you cannot unsubscribe from it." 

58 html_unsub_section = "This is a security email, you cannot unsubscribe from it." 

59 else: 

60 manage_link = urls.notification_settings_link() 

61 plain_unsub_section += f"Edit your notification settings at <{manage_link}>" 

62 html_unsub_section = f'<a href="{manage_link}">Manage notification preferences</a>.' 

63 unsub_options = [] 

64 ta = rendered.email_topic_action_unsubscribe_text 

65 tk = rendered.email_topic_key_unsubscribe_text 

66 ta_link = generate_unsub_topic_action(notification) 

67 tk_link = generate_unsub_topic_key(notification) 

68 if ta: 

69 plain_unsub_section += f"\n\nTurn off emails for {ta}: <{ta_link}>" 

70 unsub_options.append(f'<a href="{ta_link}">{ta}</a>') 

71 if tk: 

72 plain_unsub_section += f"\n\nTurn off emails for {tk}: <{tk_link}>" 

73 unsub_options.append(f'<a href="{tk_link}">{tk}</a>') 

74 if unsub_options: 

75 html_unsub_section += f"<br />Turn off emails for: {' / '.join(unsub_options)}." 

76 dne_link = generate_do_not_email(user) 

77 plain_unsub_section += f"\n\nDo not email me (disables hosting): <{dne_link}>" 

78 html_unsub_section += f'<br /><a href="{dne_link}">Do not email me (disables hosting)</a>.' 

79 

80 plain_tmplt = (template_folder / f"{rendered.email_template_name}.txt").read_text() 

81 plain = env.from_string(plain_tmplt + plain_unsub_section).render(template_args) 

82 html_tmplt = (template_folder / "generated_html" / f"{rendered.email_template_name}.html").read_text() 

83 html = env.from_string(html_tmplt.replace("___UNSUB_SECTION___", html_unsub_section)).render(template_args) 

84 

85 if not rendered.is_critical: 

86 if user.do_not_email: 

87 logger.info( 

88 f"Not emailing {user} based on template {rendered.email_template_name} due to emails turned off" 

89 ) 

90 return 

91 if not user.is_visible: 

92 logger.error( 

93 f"Tried emailing {user} based on template {rendered.email_template_name} but user not visible" 

94 ) 

95 return 

96 

97 list_unsubscribe_header = None 

98 if rendered.email_list_unsubscribe_url: 

99 list_unsubscribe_header = f"<{rendered.email_list_unsubscribe_url}>" 

100 

101 queue_email( 

102 session, 

103 sender_name=config["NOTIFICATION_EMAIL_SENDER"], 

104 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"], 

105 recipient=user.email, 

106 subject=config["NOTIFICATION_PREFIX"] + rendered.email_subject, 

107 plain=plain, 

108 html=html, 

109 source_data=config["VERSION"] + f"/{rendered.email_template_name}", 

110 list_unsubscribe_header=list_unsubscribe_header, 

111 ) 

112 

113 

114def _send_push_notification(session, user: User, notification: Notification): 

115 logger.debug(f"Formatting push notification for {user}") 

116 

117 rendered = render_notification(user, notification) 

118 

119 if not rendered.push_title: 

120 raise Exception(f"Tried to send push notification to {user} but didn't have push info") 

121 

122 push_to_user( 

123 session, 

124 user_id=user.id, 

125 title=rendered.push_title, 

126 body=rendered.push_body, 

127 icon=rendered.push_icon, 

128 url=rendered.push_url, 

129 topic_action=notification.topic_action.display, 

130 key=notification.key, 

131 # keep on server for at most an hour if the client is not around 

132 ttl=3600, 

133 ) 

134 

135 

136def handle_notification(payload: jobs_pb2.HandleNotificationPayload): 

137 with session_scope() as session: 

138 notification = session.execute( 

139 select(Notification).where(Notification.id == payload.notification_id) 

140 ).scalar_one() 

141 

142 # ignore this notification if the user hasn't enabled new notifications 

143 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one() 

144 

145 topic, action = notification.topic_action.unpack() 

146 delivery_types = get_preference(session, notification.user.id, notification.topic_action) 

147 for delivery_type in delivery_types: 

148 logger.info(f"Should notify by {delivery_type}") 

149 if delivery_type == NotificationDeliveryType.email: 

150 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication 

151 session.add( 

152 NotificationDelivery( 

153 notification_id=notification.id, 

154 delivered=func.now(), 

155 delivery_type=NotificationDeliveryType.email, 

156 ) 

157 ) 

158 _send_email_notification(session, user, notification) 

159 elif delivery_type == NotificationDeliveryType.digest: 

160 # for digest notifications, add to digest queue 

161 session.add( 

162 NotificationDelivery( 

163 notification_id=notification.id, 

164 delivered=None, 

165 delivery_type=NotificationDeliveryType.digest, 

166 ) 

167 ) 

168 elif delivery_type == NotificationDeliveryType.push: 

169 # for push notifications, we send them straight away 

170 session.add( 

171 NotificationDelivery( 

172 notification_id=notification.id, 

173 delivered=func.now(), 

174 delivery_type=NotificationDeliveryType.push, 

175 ) 

176 ) 

177 _send_push_notification(session, user, notification) 

178 

179 

180def send_raw_push_notification(payload: jobs_pb2.SendRawPushNotificationPayload): 

181 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

182 logger.info("Not sending push notification due to push notifications disabled") 

183 

184 with session_scope() as session: 

185 if len(payload.data) > 3072: 

186 raise Exception(f"Data too long for push notification to sub {payload.push_notification_subscription_id}") 

187 sub = session.execute( 

188 select(PushNotificationSubscription).where( 

189 PushNotificationSubscription.id == payload.push_notification_subscription_id 

190 ) 

191 ).scalar_one() 

192 if sub.disabled_at < now(): 

193 logger.error(f"Tried to send push to disabled subscription: {sub.id}. Disabled at {sub.disabled_at}.") 

194 return 

195 # this of requests.response 

196 resp = send_push( 

197 payload.data, 

198 sub.endpoint, 

199 sub.auth_key, 

200 sub.p256dh_key, 

201 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"], 

202 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"], 

203 ttl=payload.ttl, 

204 ) 

205 success = resp.status_code in [200, 201, 202] 

206 session.add( 

207 PushNotificationDeliveryAttempt( 

208 push_notification_subscription_id=sub.id, 

209 success=success, 

210 status_code=resp.status_code, 

211 response=resp.text, 

212 ) 

213 ) 

214 session.commit() 

215 if success: 

216 logger.debug(f"Successfully sent push to sub {sub.id} for user {sub.user}") 

217 elif resp.status_code == 410: 

218 # gone 

219 logger.info(f"Push sub {sub.id} for user {sub.user} is gone! Disabling.") 

220 sub.disabled_at = func.now() 

221 else: 

222 raise Exception(f"Failed to deliver push to {sub.id}, code: {resp.status_code}. Response: {resp.text}") 

223 

224 

225def handle_email_digests(payload: empty_pb2.Empty): 

226 """ 

227 Sends out email digests 

228 

229 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already. 

230 

231 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it. 

232 

233 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff. 

234 """ 

235 logger.info("Sending out email digests") 

236 

237 with session_scope() as session: 

238 # already sent email notifications 

239 delivered_email_notifications = ( 

240 select( 

241 Notification.id.label("notification_id"), 

242 # min is superfluous but needed for group_by 

243 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

244 ) 

245 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

246 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

247 .where(NotificationDelivery.delivered != None) 

248 .group_by(Notification) 

249 .subquery() 

250 ) 

251 

252 # users who have unsent "digest" type notifications but not sent email notifications 

253 users_to_send_digests_to = ( 

254 session.execute( 

255 select(User) 

256 .where(User.digest_frequency != None) 

257 .where(User.last_digest_sent < func.now() - User.digest_frequency) 

258 # todo: tz 

259 .join(Notification, Notification.user_id == User.id) 

260 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

261 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

262 .where(NotificationDelivery.delivered == None) 

263 .outerjoin( 

264 delivered_email_notifications, 

265 delivered_email_notifications.c.notification_id == Notification.id, 

266 ) 

267 .where(delivered_email_notifications.c.notification_delivery_id == None) 

268 .group_by(User) 

269 ) 

270 .scalars() 

271 .all() 

272 ) 

273 

274 logger.info(f"{users_to_send_digests_to=}") 

275 

276 for user in users_to_send_digests_to: 

277 # digest notifications that haven't been delivered yet 

278 notifications_and_deliveries = session.execute( 

279 select(Notification, NotificationDelivery) 

280 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

281 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

282 .where(NotificationDelivery.delivered == None) 

283 .where(Notification.user_id == user.id) 

284 .order_by(Notification.created) 

285 ).all() 

286 

287 if notifications_and_deliveries: 

288 notifications, deliveries = zip(*notifications_and_deliveries) 

289 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications") 

290 logger.info("TODO: supposed to send digest email") 

291 for delivery in deliveries: 

292 delivery.delivered = func.now() 

293 user.last_digest_sent = func.now() 

294 session.commit()