Coverage for app / backend / src / couchers / notifications / background.py: 76%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import dataclasses 

2import logging 

3 

4from google.protobuf import empty_pb2 

5from sqlalchemy import select 

6from sqlalchemy.orm import Session 

7from sqlalchemy.sql import exists, func 

8 

9from couchers import urls 

10from couchers.config import config 

11from couchers.context import make_background_user_context 

12from couchers.db import session_scope 

13from couchers.email import queue_email 

14from couchers.i18n import LocalizationContext 

15from couchers.models import ( 

16 Notification, 

17 NotificationDelivery, 

18 NotificationDeliveryType, 

19 User, 

20) 

21from couchers.notifications.push import push_to_user 

22from couchers.notifications.quick_links import ( 

23 generate_do_not_email, 

24 generate_unsub_topic_action, 

25 generate_unsub_topic_key, 

26) 

27from couchers.notifications.render_email import render_email_notification 

28from couchers.notifications.render_push import render_push_notification 

29from couchers.notifications.settings import get_preference 

30from couchers.proto.internal import jobs_pb2 

31from couchers.sql import moderation_state_column_visible 

32from couchers.templating import Jinja2Template, template_folder 

33from couchers.utils import now 

34 

35logger = logging.getLogger(__name__) 

36 

37 

38def _send_email_notification(session: Session, user: User, notification: Notification) -> None: 

39 loc_context = LocalizationContext.from_user(user) 

40 if not config["ENABLE_NOTIFICATION_TRANSLATIONS"]: 40 ↛ 43line 40 didn't jump to line 43 because the condition on line 40 was always true

41 loc_context = dataclasses.replace(loc_context, locale="en") 

42 

43 rendered = render_email_notification(notification, loc_context) 

44 

45 template_args = { 

46 **rendered.template_args, 

47 "header_subject": rendered.subject, 

48 "header_preview": rendered.preview, 

49 "user": user, 

50 "time": notification.created, 

51 "footer_timezone_name": loc_context.localized_timezone, 

52 "footer_copyright_year": now().year, 

53 "footer_email_is_critical": rendered.is_critical, 

54 "footer_manage_notifications_link": urls.notification_settings_link(), 

55 "footer_notification_topic_action": rendered.topic_action_unsubscribe_text, 

56 "footer_notification_topic_action_link": generate_unsub_topic_action(notification), 

57 "footer_notification_topic_key": rendered.topic_key_unsubscribe_text, 

58 "footer_notification_topic_key_link": generate_unsub_topic_key(notification), 

59 "footer_do_not_email_link": generate_do_not_email(user), 

60 } 

61 

62 # Format plaintext template 

63 plain_tmplt_body = (template_folder / f"{rendered.template_name}.txt").read_text() 

64 plain_tmplt_footer = (template_folder / "_footer.txt").read_text() 

65 plain_tmplt = Jinja2Template(source=plain_tmplt_body + plain_tmplt_footer, html=False) 

66 plain = plain_tmplt.render(template_args, loc_context) 

67 

68 # Format html template 

69 html_tmplt = Jinja2Template( 

70 source=(template_folder / "generated_html" / f"{rendered.template_name}.html").read_text(), html=True 

71 ) 

72 html = html_tmplt.render(template_args, loc_context) 

73 

74 if user.do_not_email and not rendered.is_critical: 

75 logger.info(f"Not emailing {user} based on template {rendered.template_name} due to emails turned off") 

76 return 

77 

78 if user.is_banned: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 logger.info(f"Tried emailing {user} based on template {rendered.template_name} but user is banned") 

80 return 

81 

82 if user.is_deleted and not rendered.allow_deleted: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 logger.info(f"Tried emailing {user} based on template {rendered.template_name} but user is deleted") 

84 return 

85 

86 list_unsubscribe_header = None 

87 if rendered.list_unsubscribe_url: 

88 list_unsubscribe_header = f"<{rendered.list_unsubscribe_url}>" 

89 

90 queue_email( 

91 session, 

92 sender_name=config["NOTIFICATION_EMAIL_SENDER"], 

93 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"], 

94 recipient=user.email, 

95 subject=config["NOTIFICATION_PREFIX"] + rendered.subject, 

96 plain=plain, 

97 html=html, 

98 source_data=config["VERSION"] + f"/{rendered.template_name}", 

99 list_unsubscribe_header=list_unsubscribe_header, 

100 ) 

101 

102 

103def _send_push_notification(session: Session, user: User, notification: Notification) -> None: 

104 logger.debug(f"Formatting push notification for {user}") 

105 

106 content = render_push_notification(notification, LocalizationContext.from_user(user)) 

107 push_to_user( 

108 session, 

109 user_id=user.id, 

110 topic_action=notification.topic_action.display, 

111 content=content, 

112 key=notification.key, 

113 # keep on server for at most an hour if the client is not around 

114 ttl=3600, 

115 ) 

116 

117 

118def handle_notification(payload: jobs_pb2.HandleNotificationPayload) -> None: 

119 with session_scope() as session: 

120 notification = session.execute( 

121 select(Notification).where(Notification.id == payload.notification_id) 

122 ).scalar_one() 

123 

124 # Check moderation visibility if this notification is linked to moderated content 

125 if notification.moderation_state_id: 

126 context = make_background_user_context(notification.user_id) 

127 content_visible = session.execute( 

128 select( 

129 exists( 

130 select(Notification) 

131 .where(Notification.id == notification.id) 

132 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

133 ) 

134 ) 

135 ).scalar_one() 

136 

137 if not content_visible: 

138 # Content is not visible to recipient, leave notification for later processing 

139 logger.info( 

140 f"Deferring notification {notification.id}: content not visible to user {notification.user_id}" 

141 ) 

142 return 

143 

144 # ignore this notification if the user hasn't enabled new notifications 

145 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one() 

146 

147 delivery_types = get_preference(session, notification.user.id, notification.topic_action) 

148 for delivery_type in delivery_types: 

149 # Check if delivery already exists for this notification and delivery type 

150 # (this can happen if the job was queued multiple times) 

151 existing_delivery = session.execute( 

152 select(NotificationDelivery) 

153 .where(NotificationDelivery.notification_id == notification.id) 

154 .where(NotificationDelivery.delivery_type == delivery_type) 

155 ).scalar_one_or_none() 

156 if existing_delivery: 

157 logger.info(f"Skipping {delivery_type} delivery for notification {notification.id}: already delivered") 

158 continue 

159 

160 logger.info(f"Should notify by {delivery_type}") 

161 if delivery_type == NotificationDeliveryType.email: 

162 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication 

163 session.add( 

164 NotificationDelivery( 

165 notification_id=notification.id, 

166 delivered=func.now(), 

167 delivery_type=NotificationDeliveryType.email, 

168 ) 

169 ) 

170 _send_email_notification(session, user, notification) 

171 elif delivery_type == NotificationDeliveryType.digest: 

172 # for digest notifications, add to digest queue 

173 session.add( 

174 NotificationDelivery( 

175 notification_id=notification.id, 

176 delivered=None, 

177 delivery_type=NotificationDeliveryType.digest, 

178 ) 

179 ) 

180 elif delivery_type == NotificationDeliveryType.push: 180 ↛ 148line 180 didn't jump to line 148 because the condition on line 180 was always true

181 # for push notifications, we send them straight away (web + mobile) 

182 session.add( 

183 NotificationDelivery( 

184 notification_id=notification.id, 

185 delivered=func.now(), 

186 delivery_type=NotificationDeliveryType.push, 

187 ) 

188 ) 

189 _send_push_notification(session, user, notification) 

190 

191 

192def handle_email_digests(payload: empty_pb2.Empty) -> None: 

193 """ 

194 Sends out email digests 

195 

196 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already. 

197 

198 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it. 

199 

200 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff. 

201 """ 

202 logger.info("Sending out email digests") 

203 

204 with session_scope() as session: 

205 # already sent email notifications 

206 delivered_email_notifications = ( 

207 select( 

208 Notification.id.label("notification_id"), 

209 # min is superfluous but needed for group_by 

210 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

211 ) 

212 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

213 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

214 .where(NotificationDelivery.delivered != None) 

215 .group_by(Notification.id) 

216 .subquery() 

217 ) 

218 

219 # users who have unsent "digest" type notifications but not sent email notifications 

220 users_to_send_digests_to = ( 

221 session.execute( 

222 select(User) 

223 .where(User.digest_frequency != None) 

224 .where(User.last_digest_sent < func.now() - User.digest_frequency) 

225 # todo: tz 

226 .join(Notification, Notification.user_id == User.id) 

227 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

228 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

229 .where(NotificationDelivery.delivered == None) 

230 .outerjoin( 

231 delivered_email_notifications, 

232 delivered_email_notifications.c.notification_id == Notification.id, 

233 ) 

234 .where(delivered_email_notifications.c.notification_delivery_id == None) 

235 .group_by(User.id) 

236 ) 

237 .scalars() 

238 .all() 

239 ) 

240 

241 logger.info(f"{users_to_send_digests_to=}") 

242 

243 for user in users_to_send_digests_to: 

244 # digest notifications that haven't been delivered yet 

245 # Exclude notifications linked to non-visible moderated content 

246 context = make_background_user_context(user.id) 

247 notifications_and_deliveries = session.execute( 

248 select(Notification, NotificationDelivery) 

249 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

250 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

251 .where(NotificationDelivery.delivered == None) 

252 .where(Notification.user_id == user.id) 

253 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

254 .order_by(Notification.created) 

255 ).all() 

256 

257 if notifications_and_deliveries: 

258 notifications, deliveries = zip(*notifications_and_deliveries) 

259 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications") 

260 logger.info("TODO: supposed to send digest email") 

261 for delivery in deliveries: 

262 delivery.delivered = func.now() 

263 user.last_digest_sent = func.now() 

264 session.commit()