Coverage for src/couchers/notifications/background.py: 70%

125 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 16:34 +0000

1import logging 

2from pathlib import Path 

3 

4from google.protobuf import empty_pb2 

5from jinja2 import Environment, FileSystemLoader 

6from sqlalchemy.sql import func 

7 

8from couchers import urls 

9from couchers.config import config 

10from couchers.db import session_scope 

11from couchers.email import queue_email 

12from couchers.models import ( 

13 Notification, 

14 NotificationDelivery, 

15 NotificationDeliveryType, 

16 PushNotificationDeliveryAttempt, 

17 PushNotificationSubscription, 

18 User, 

19) 

20from couchers.notifications.push import push_to_user 

21from couchers.notifications.push_api import send_push 

22from couchers.notifications.render import render_notification 

23from couchers.notifications.settings import get_preference 

24from couchers.notifications.unsubscribe import ( 

25 generate_do_not_email, 

26 generate_unsub_topic_action, 

27 generate_unsub_topic_key, 

28) 

29from couchers.sql import couchers_select as select 

30from couchers.templates.v2 import add_filters 

31from couchers.utils import get_tz_as_text, now 

32from proto.internal import jobs_pb2 

33 

34logger = logging.getLogger(__name__) 

35 

36 

37template_folder = Path(__file__).parent / ".." / ".." / ".." / "templates" / "v2" 

38 

39loader = FileSystemLoader(template_folder) 

40env = Environment(loader=loader, trim_blocks=True) 

41 

42add_filters(env) 

43 

44 

45def _send_email_notification(user: User, notification: Notification): 

46 rendered = render_notification(user, notification) 

47 template_args = { 

48 "user": user, 

49 "time": notification.created, 

50 **rendered.email_template_args, 

51 } 

52 

53 template_args["_year"] = now().year 

54 template_args["_timezone_display"] = get_tz_as_text(user.timezone or "Etc/UTC") 

55 

56 plain_unsub_section = "\n\n---\n\n" 

57 if rendered.is_critical: 

58 plain_unsub_section += "This is a security email, you cannot unsubscribe from it." 

59 html_unsub_section = "This is a security email, you cannot unsubscribe from it." 

60 else: 

61 manage_link = urls.account_settings_link() 

62 plain_unsub_section += f"Edit your notification settings at <{manage_link}>" 

63 html_unsub_section = f'<a href="{manage_link}">Manage notification preferences</a>.' 

64 unsub_options = [] 

65 ta = rendered.email_topic_action_unsubscribe_text 

66 tk = rendered.email_topic_key_unsubscribe_text 

67 ta_link = generate_unsub_topic_action(notification) 

68 tk_link = generate_unsub_topic_key(notification) 

69 if ta: 

70 plain_unsub_section += f"\n\nTurn off emails for {ta}: <{ta_link}>" 

71 unsub_options.append(f'<a href="{ta_link}">{ta}</a>') 

72 if tk: 

73 plain_unsub_section += f"\n\nTurn off emails for {tk}: <{tk_link}>" 

74 unsub_options.append(f'<a href="{tk_link}">{tk}</a>') 

75 if unsub_options: 

76 html_unsub_section += f'<br />Turn off emails for: {" / ".join(unsub_options)}.' 

77 dne_link = generate_do_not_email(user) 

78 plain_unsub_section += f"\n\nDo not email me (disables hosting): <{dne_link}>" 

79 html_unsub_section += f'<br /><a href="{dne_link}">Do not email me (disables hosting)</a>.' 

80 

81 plain_tmplt = (template_folder / f"{rendered.email_template_name}.txt").read_text() 

82 plain = env.from_string(plain_tmplt + plain_unsub_section).render(template_args) 

83 html_tmplt = (template_folder / "generated_html" / f"{rendered.email_template_name}.html").read_text() 

84 html = env.from_string(html_tmplt.replace("___UNSUB_SECTION___", html_unsub_section)).render(template_args) 

85 

86 if not rendered.is_critical: 

87 if user.do_not_email: 

88 logger.info( 

89 f"Not emailing {user} based on template {rendered.email_template_name} due to emails turned off" 

90 ) 

91 return 

92 if not user.is_visible: 

93 logger.error( 

94 f"Tried emailing {user} based on template {rendered.email_template_name} but user not visible" 

95 ) 

96 return 

97 

98 list_unsubscribe_header = None 

99 if rendered.email_list_unsubscribe_url: 

100 list_unsubscribe_header = f"<{rendered.email_list_unsubscribe_url}>" 

101 

102 queue_email( 

103 sender_name=config["NOTIFICATION_EMAIL_SENDER"], 

104 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"], 

105 recipient=user.email, 

106 subject=config["NOTIFICATION_EMAIL_PREFIX"] + rendered.email_subject, 

107 plain=plain, 

108 html=html, 

109 source_data=config["VERSION"] + f"/{rendered.email_template_name}", 

110 list_unsubscribe_header=list_unsubscribe_header, 

111 ) 

112 

113 

114def _send_push_notification(user: User, notification: Notification): 

115 logger.debug(f"Formatting push notification for {user}") 

116 

117 rendered = render_notification(user, notification) 

118 

119 if not rendered.push_title: 

120 raise Exception(f"Tried to send push notification to {user} but didn't have push info") 

121 

122 push_to_user( 

123 user.id, 

124 title=rendered.push_title, 

125 body=rendered.push_body, 

126 icon=rendered.push_icon, 

127 url=rendered.push_url, 

128 # keep on server for at most an hour if the client is not around 

129 ttl=3600, 

130 ) 

131 

132 

133def handle_notification(payload: jobs_pb2.HandleNotificationPayload): 

134 with session_scope() as session: 

135 notification = session.execute( 

136 select(Notification).where(Notification.id == payload.notification_id) 

137 ).scalar_one() 

138 

139 # ignore this notification if the user hasn't enabled new notifications 

140 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one() 

141 

142 topic, action = notification.topic_action.unpack() 

143 delivery_types = get_preference(session, notification.user.id, notification.topic_action) 

144 for delivery_type in delivery_types: 

145 logger.info(f"Should notify by {delivery_type}") 

146 if delivery_type == NotificationDeliveryType.email: 

147 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication 

148 session.add( 

149 NotificationDelivery( 

150 notification_id=notification.id, 

151 delivered=func.now(), 

152 delivery_type=NotificationDeliveryType.email, 

153 ) 

154 ) 

155 _send_email_notification(user, notification) 

156 elif delivery_type == NotificationDeliveryType.digest: 

157 # for digest notifications, add to digest queue 

158 session.add( 

159 NotificationDelivery( 

160 notification_id=notification.id, 

161 delivered=None, 

162 delivery_type=NotificationDeliveryType.digest, 

163 ) 

164 ) 

165 elif delivery_type == NotificationDeliveryType.push: 

166 # for push notifications, we send them straight away 

167 session.add( 

168 NotificationDelivery( 

169 notification_id=notification.id, 

170 delivered=func.now(), 

171 delivery_type=NotificationDeliveryType.push, 

172 ) 

173 ) 

174 _send_push_notification(user, notification) 

175 

176 

177def send_raw_push_notification(payload: jobs_pb2.SendRawPushNotificationPayload): 

178 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

179 logger.info("Not sending push notification due to push notifications disabled") 

180 

181 with session_scope() as session: 

182 if len(payload.data) > 3072: 

183 raise Exception(f"Data too long for push notification to sub {payload.push_notification_subscription_id}") 

184 sub = session.execute( 

185 select(PushNotificationSubscription).where( 

186 PushNotificationSubscription.id == payload.push_notification_subscription_id 

187 ) 

188 ).scalar_one() 

189 if sub.disabled_at < now(): 

190 logger.error(f"Tried to send push to disabled subscription: {sub.id}. Disabled at {sub.disabled_at}.") 

191 return 

192 # this of requests.response 

193 resp = send_push( 

194 payload.data, 

195 sub.endpoint, 

196 sub.auth_key, 

197 sub.p256dh_key, 

198 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"], 

199 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"], 

200 ttl=payload.ttl, 

201 ) 

202 success = resp.status_code in [200, 201, 202] 

203 session.add( 

204 PushNotificationDeliveryAttempt( 

205 push_notification_subscription_id=sub.id, 

206 success=success, 

207 status_code=resp.status_code, 

208 response=resp.text, 

209 ) 

210 ) 

211 session.commit() 

212 if success: 

213 logger.debug(f"Successfully sent push to sub {sub.id} for user {sub.user}") 

214 elif resp.status_code == 410: 

215 # gone 

216 logger.info(f"Push sub {sub.id} for user {sub.user} is gone! Disabling.") 

217 sub.disabled_at = func.now() 

218 else: 

219 raise Exception(f"Failed to deliver push to {sub.id}, code: {resp.status_code}. Response: {resp.text}") 

220 

221 

222def handle_email_digests(payload: empty_pb2.Empty): 

223 """ 

224 Sends out email digests 

225 

226 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already. 

227 

228 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it. 

229 

230 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff. 

231 """ 

232 logger.info("Sending out email digests") 

233 

234 with session_scope() as session: 

235 # already sent email notifications 

236 delivered_email_notifications = ( 

237 select( 

238 Notification.id.label("notification_id"), 

239 # min is superfluous but needed for group_by 

240 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

241 ) 

242 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

243 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

244 .where(NotificationDelivery.delivered != None) 

245 .group_by(Notification) 

246 .subquery() 

247 ) 

248 

249 # users who have unsent "digest" type notifications but not sent email notifications 

250 users_to_send_digests_to = ( 

251 session.execute( 

252 select(User) 

253 .where(User.digest_frequency != None) 

254 .where(User.last_digest_sent < func.now() - User.digest_frequency) 

255 # todo: tz 

256 .join(Notification, Notification.user_id == User.id) 

257 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

258 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

259 .where(NotificationDelivery.delivered == None) 

260 .outerjoin( 

261 delivered_email_notifications, 

262 delivered_email_notifications.c.notification_id == Notification.id, 

263 ) 

264 .where(delivered_email_notifications.c.notification_delivery_id == None) 

265 .group_by(User) 

266 ) 

267 .scalars() 

268 .all() 

269 ) 

270 

271 logger.info(f"{users_to_send_digests_to=}") 

272 

273 for user in users_to_send_digests_to: 

274 # digest notifications that haven't been delivered yet 

275 notifications_and_deliveries = session.execute( 

276 select(Notification, NotificationDelivery) 

277 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

278 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

279 .where(NotificationDelivery.delivered == None) 

280 .where(Notification.user_id == user.id) 

281 .order_by(Notification.created) 

282 ).all() 

283 

284 if notifications_and_deliveries: 

285 notifications, deliveries = zip(*notifications_and_deliveries) 

286 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications") 

287 logger.info("TODO: supposed to send digest email") 

288 for delivery in deliveries: 

289 delivery.delivered = func.now() 

290 user.last_digest_sent = func.now() 

291 session.commit()