Coverage for app / backend / src / couchers / notifications / background.py: 83%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import dataclasses 

2import logging 

3 

4from google.protobuf import empty_pb2 

5from sqlalchemy import select 

6from sqlalchemy.orm import Session 

7from sqlalchemy.sql import exists, func 

8 

9from couchers import urls 

10from couchers.config import config 

11from couchers.context import make_background_user_context 

12from couchers.db import session_scope 

13from couchers.email.queuing import queue_email 

14from couchers.i18n import LocalizationContext 

15from couchers.models import ( 

16 Notification, 

17 NotificationDelivery, 

18 NotificationDeliveryType, 

19 User, 

20) 

21from couchers.notifications.push import push_to_user 

22from couchers.notifications.quick_links import ( 

23 generate_do_not_email, 

24 generate_unsub_topic_action, 

25 generate_unsub_topic_key, 

26) 

27from couchers.notifications.render_email import get_list_unsubscribe_header, render_email_notification 

28from couchers.notifications.render_push import render_push_notification 

29from couchers.notifications.settings import get_preference 

30from couchers.notifications.utils import can_notify_deleted_user 

31from couchers.proto.internal import jobs_pb2 

32from couchers.sql import moderation_state_column_visible 

33from couchers.templating import Jinja2Template, template_folder 

34from couchers.utils import now 

35 

36logger = logging.getLogger(__name__) 

37 

38 

39def _send_email_notification(session: Session, user: User, notification: Notification) -> None: 

40 if user.do_not_email and not notification.topic_action.is_critical: 

41 logger.info(f"Not emailing {user} based on notification {notification.topic_action} due to emails turned off") 

42 return 

43 

44 if user.banned_at is not None: 

45 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is banned") 

46 return 

47 

48 if user.deleted_at is not None and not can_notify_deleted_user(notification.topic_action): 

49 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is deleted") 

50 return 

51 

52 loc_context = LocalizationContext.from_user(user) 

53 if not config["ENABLE_NOTIFICATION_TRANSLATIONS"]: 53 ↛ 56line 53 didn't jump to line 56 because the condition on line 53 was always true

54 loc_context = dataclasses.replace(loc_context, locale="en") 

55 

56 rendered = render_email_notification(notification, loc_context) 

57 

58 template_args = { 

59 **rendered.template_args, 

60 "header_subject": rendered.subject, 

61 "header_preview": rendered.preview, 

62 "user": user, 

63 "time": notification.created, 

64 "footer_timezone_name": loc_context.localized_timezone, 

65 "footer_copyright_year": now().year, 

66 } 

67 

68 if notification.topic_action.is_critical: 

69 template_args["footer_email_is_critical"] = True 

70 else: 

71 template_args["footer_email_is_critical"] = False 

72 template_args["footer_manage_notifications_link"] = urls.notification_settings_link() 

73 template_args["footer_do_not_email_link"] = generate_do_not_email(user) 

74 

75 if rendered.topic_action_unsubscribe_text: 

76 template_args["footer_notification_topic_action"] = rendered.topic_action_unsubscribe_text 

77 template_args["footer_notification_topic_action_link"] = generate_unsub_topic_action(notification) 

78 

79 if rendered.topic_key_unsubscribe_text: 

80 template_args["footer_notification_topic_key"] = rendered.topic_key_unsubscribe_text 

81 template_args["footer_notification_topic_key_link"] = generate_unsub_topic_key(notification) 

82 

83 # Format plaintext template 

84 plain_tmplt_body = (template_folder / f"{rendered.template_name}.txt").read_text() 

85 plain_tmplt_footer = (template_folder / "_footer.txt").read_text() 

86 plain_tmplt = Jinja2Template(source=plain_tmplt_body + plain_tmplt_footer, html=False) 

87 plain = plain_tmplt.render(template_args, loc_context) 

88 

89 # Format html template 

90 html_tmplt = Jinja2Template( 

91 source=(template_folder / "generated_html" / f"{rendered.template_name}.html").read_text(), html=True 

92 ) 

93 html = html_tmplt.render(template_args, loc_context) 

94 

95 list_unsubscribe_header = get_list_unsubscribe_header(notification) 

96 queue_email( 

97 session, 

98 sender_name=config["NOTIFICATION_EMAIL_SENDER"], 

99 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"], 

100 recipient=user.email, 

101 subject=config["NOTIFICATION_PREFIX"] + rendered.subject, 

102 plain=plain, 

103 html=html, 

104 source_data=config["VERSION"] + f"/{rendered.template_name}", 

105 list_unsubscribe_header=list_unsubscribe_header, 

106 ) 

107 

108 

109def _send_push_notification(session: Session, user: User, notification: Notification) -> None: 

110 logger.debug(f"Formatting push notification for {user}") 

111 

112 content = render_push_notification(notification, LocalizationContext.from_user(user)) 

113 push_to_user( 

114 session, 

115 user_id=user.id, 

116 topic_action=notification.topic_action.display, 

117 content=content, 

118 key=notification.key, 

119 # keep on server for at most an hour if the client is not around 

120 ttl=3600, 

121 ) 

122 

123 

124def handle_notification(payload: jobs_pb2.HandleNotificationPayload) -> None: 

125 with session_scope() as session: 

126 # Select and lock the row. 

127 notification = session.execute( 

128 select(Notification).where(Notification.id == payload.notification_id).with_for_update() 

129 ).scalar_one() 

130 

131 # Check moderation visibility if this notification is linked to moderated content 

132 if notification.moderation_state_id: 

133 context = make_background_user_context(notification.user_id) 

134 content_visible = session.execute( 

135 select( 

136 exists( 

137 select(Notification) 

138 .where(Notification.id == notification.id) 

139 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

140 ) 

141 ) 

142 ).scalar_one() 

143 

144 if not content_visible: 

145 # Content is not visible to recipient, leave notification for later processing 

146 logger.info( 

147 f"Deferring notification {notification.id}: content not visible to user {notification.user_id}" 

148 ) 

149 return 

150 

151 # ignore this notification if the user hasn't enabled new notifications 

152 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one() 

153 

154 delivery_types = get_preference(session, notification.user.id, notification.topic_action) 

155 for delivery_type in delivery_types: 

156 # Check if delivery already exists for this notification and delivery type 

157 # (this can happen if the job was queued multiple times) 

158 existing_delivery = session.execute( 

159 select(NotificationDelivery) 

160 .where(NotificationDelivery.notification_id == notification.id) 

161 .where(NotificationDelivery.delivery_type == delivery_type) 

162 ).scalar_one_or_none() 

163 if existing_delivery: 

164 logger.info(f"Skipping {delivery_type} delivery for notification {notification.id}: already delivered") 

165 continue 

166 

167 logger.info(f"Should notify by {delivery_type}") 

168 if delivery_type == NotificationDeliveryType.email: 

169 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication 

170 session.add( 

171 NotificationDelivery( 

172 notification_id=notification.id, 

173 delivered=func.now(), 

174 delivery_type=NotificationDeliveryType.email, 

175 ) 

176 ) 

177 session.flush() 

178 _send_email_notification(session, user, notification) 

179 elif delivery_type == NotificationDeliveryType.digest: 

180 # for digest notifications, add to digest queue 

181 session.add( 

182 NotificationDelivery( 

183 notification_id=notification.id, 

184 delivered=None, 

185 delivery_type=NotificationDeliveryType.digest, 

186 ) 

187 ) 

188 elif delivery_type == NotificationDeliveryType.push: 188 ↛ 155line 188 didn't jump to line 155 because the condition on line 188 was always true

189 # for push notifications, we send them straight away (web + mobile) 

190 session.add( 

191 NotificationDelivery( 

192 notification_id=notification.id, 

193 delivered=func.now(), 

194 delivery_type=NotificationDeliveryType.push, 

195 ) 

196 ) 

197 session.flush() 

198 _send_push_notification(session, user, notification) 

199 

200 

201def handle_email_digests(payload: empty_pb2.Empty) -> None: 

202 """ 

203 Sends out email digests 

204 

205 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already. 

206 

207 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it. 

208 

209 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff. 

210 """ 

211 logger.info("Sending out email digests") 

212 

213 with session_scope() as session: 

214 # already sent email notifications 

215 delivered_email_notifications = ( 

216 select( 

217 Notification.id.label("notification_id"), 

218 # min is superfluous but needed for group_by 

219 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

220 ) 

221 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

222 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

223 .where(NotificationDelivery.delivered != None) 

224 .group_by(Notification.id) 

225 .subquery() 

226 ) 

227 

228 # users who have unsent "digest" type notifications but not sent email notifications 

229 users_to_send_digests_to = ( 

230 session.execute( 

231 select(User) 

232 .where(User.digest_frequency != None) 

233 .where(User.last_digest_sent < func.now() - User.digest_frequency) 

234 # todo: tz 

235 .join(Notification, Notification.user_id == User.id) 

236 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

237 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

238 .where(NotificationDelivery.delivered == None) 

239 .outerjoin( 

240 delivered_email_notifications, 

241 delivered_email_notifications.c.notification_id == Notification.id, 

242 ) 

243 .where(delivered_email_notifications.c.notification_delivery_id == None) 

244 .group_by(User.id) 

245 ) 

246 .scalars() 

247 .all() 

248 ) 

249 

250 logger.info(f"{users_to_send_digests_to=}") 

251 

252 for user in users_to_send_digests_to: 

253 # digest notifications that haven't been delivered yet 

254 # Exclude notifications linked to non-visible moderated content 

255 context = make_background_user_context(user.id) 

256 notifications_and_deliveries = session.execute( 

257 select(Notification, NotificationDelivery) 

258 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

259 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

260 .where(NotificationDelivery.delivered == None) 

261 .where(Notification.user_id == user.id) 

262 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

263 .order_by(Notification.created) 

264 ).all() 

265 

266 if notifications_and_deliveries: 

267 notifications, deliveries = zip(*notifications_and_deliveries) 

268 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications") 

269 logger.info("TODO: supposed to send digest email") 

270 for delivery in deliveries: 

271 delivery.delivered = func.now() 

272 user.last_digest_sent = func.now() 

273 session.commit()