Coverage for app / backend / src / couchers / notifications / background.py: 78%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import dataclasses 

2import logging 

3 

4from google.protobuf import empty_pb2 

5from sqlalchemy import select 

6from sqlalchemy.orm import Session 

7from sqlalchemy.sql import exists, func 

8 

9from couchers.config import config 

10from couchers.context import make_background_user_context 

11from couchers.db import session_scope 

12from couchers.email.queuing import queue_email 

13from couchers.i18n import LocalizationContext 

14from couchers.models import ( 

15 Notification, 

16 NotificationDelivery, 

17 NotificationDeliveryType, 

18 User, 

19) 

20from couchers.notifications.push import push_to_user 

21from couchers.notifications.render_email import render_email_notification 

22from couchers.notifications.render_push import render_push_notification 

23from couchers.notifications.settings import get_preference 

24from couchers.notifications.utils import can_notify_deleted_user 

25from couchers.proto.internal import jobs_pb2 

26from couchers.sql import moderation_state_column_visible 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31def _send_email_notification(session: Session, user: User, notification: Notification) -> None: 

32 if user.do_not_email and not notification.topic_action.is_critical: 

33 logger.info(f"Not emailing {user} based on notification {notification.topic_action} due to emails turned off") 

34 return 

35 

36 if user.banned_at is not None: 

37 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is banned") 

38 return 

39 

40 if user.deleted_at is not None and not can_notify_deleted_user(notification.topic_action): 

41 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is deleted") 

42 return 

43 

44 loc_context = LocalizationContext.from_user(user) 

45 if not config["ENABLE_NOTIFICATION_TRANSLATIONS"]: 45 ↛ 48line 45 didn't jump to line 48 because the condition on line 45 was always true

46 loc_context = dataclasses.replace(loc_context, locale="en") 

47 

48 rendered = render_email_notification(user, notification, loc_context) 

49 

50 queue_email( 

51 session, 

52 sender_name=config["NOTIFICATION_EMAIL_SENDER"], 

53 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"], 

54 recipient=user.email, 

55 subject=config["NOTIFICATION_PREFIX"] + rendered.subject, 

56 plain=rendered.body_plaintext, 

57 html=rendered.body_html, 

58 source_data=rendered.source_data, 

59 list_unsubscribe_header=rendered.list_unsubscribe_header, 

60 ) 

61 

62 

63def _send_push_notification(session: Session, user: User, notification: Notification) -> None: 

64 logger.debug(f"Formatting push notification for {user}") 

65 

66 content = render_push_notification(notification, LocalizationContext.from_user(user)) 

67 push_to_user( 

68 session, 

69 user_id=user.id, 

70 topic_action=notification.topic_action.display, 

71 content=content, 

72 key=notification.key, 

73 # keep on server for at most an hour if the client is not around 

74 ttl=3600, 

75 ) 

76 

77 

78def handle_notification(payload: jobs_pb2.HandleNotificationPayload) -> None: 

79 with session_scope() as session: 

80 # Select and lock the row. 

81 notification = session.execute( 

82 select(Notification).where(Notification.id == payload.notification_id).with_for_update() 

83 ).scalar_one() 

84 

85 # Check moderation visibility if this notification is linked to moderated content 

86 if notification.moderation_state_id: 

87 context = make_background_user_context(notification.user_id) 

88 content_visible = session.execute( 

89 select( 

90 exists( 

91 select(Notification) 

92 .where(Notification.id == notification.id) 

93 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

94 ) 

95 ) 

96 ).scalar_one() 

97 

98 if not content_visible: 

99 # Content is not visible to recipient, leave notification for later processing 

100 logger.info( 

101 f"Deferring notification {notification.id}: content not visible to user {notification.user_id}" 

102 ) 

103 return 

104 

105 # ignore this notification if the user hasn't enabled new notifications 

106 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one() 

107 

108 delivery_types = get_preference(session, notification.user.id, notification.topic_action) 

109 for delivery_type in delivery_types: 

110 # Check if delivery already exists for this notification and delivery type 

111 # (this can happen if the job was queued multiple times) 

112 existing_delivery = session.execute( 

113 select(NotificationDelivery) 

114 .where(NotificationDelivery.notification_id == notification.id) 

115 .where(NotificationDelivery.delivery_type == delivery_type) 

116 ).scalar_one_or_none() 

117 if existing_delivery: 

118 logger.info(f"Skipping {delivery_type} delivery for notification {notification.id}: already delivered") 

119 continue 

120 

121 logger.info(f"Should notify by {delivery_type}") 

122 if delivery_type == NotificationDeliveryType.email: 

123 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication 

124 session.add( 

125 NotificationDelivery( 

126 notification_id=notification.id, 

127 delivered=func.now(), 

128 delivery_type=NotificationDeliveryType.email, 

129 ) 

130 ) 

131 session.flush() 

132 _send_email_notification(session, user, notification) 

133 elif delivery_type == NotificationDeliveryType.digest: 

134 # for digest notifications, add to digest queue 

135 session.add( 

136 NotificationDelivery( 

137 notification_id=notification.id, 

138 delivered=None, 

139 delivery_type=NotificationDeliveryType.digest, 

140 ) 

141 ) 

142 elif delivery_type == NotificationDeliveryType.push: 142 ↛ 109line 142 didn't jump to line 109 because the condition on line 142 was always true

143 # for push notifications, we send them straight away (web + mobile) 

144 session.add( 

145 NotificationDelivery( 

146 notification_id=notification.id, 

147 delivered=func.now(), 

148 delivery_type=NotificationDeliveryType.push, 

149 ) 

150 ) 

151 session.flush() 

152 _send_push_notification(session, user, notification) 

153 

154 

155def handle_email_digests(payload: empty_pb2.Empty) -> None: 

156 """ 

157 Sends out email digests 

158 

159 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already. 

160 

161 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it. 

162 

163 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff. 

164 """ 

165 logger.info("Sending out email digests") 

166 

167 with session_scope() as session: 

168 # already sent email notifications 

169 delivered_email_notifications = ( 

170 select( 

171 Notification.id.label("notification_id"), 

172 # min is superfluous but needed for group_by 

173 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

174 ) 

175 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

176 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

177 .where(NotificationDelivery.delivered != None) 

178 .group_by(Notification.id) 

179 .subquery() 

180 ) 

181 

182 # users who have unsent "digest" type notifications but not sent email notifications 

183 users_to_send_digests_to = ( 

184 session.execute( 

185 select(User) 

186 .where(User.digest_frequency != None) 

187 .where(User.last_digest_sent < func.now() - User.digest_frequency) 

188 # todo: tz 

189 .join(Notification, Notification.user_id == User.id) 

190 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

191 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

192 .where(NotificationDelivery.delivered == None) 

193 .outerjoin( 

194 delivered_email_notifications, 

195 delivered_email_notifications.c.notification_id == Notification.id, 

196 ) 

197 .where(delivered_email_notifications.c.notification_delivery_id == None) 

198 .group_by(User.id) 

199 ) 

200 .scalars() 

201 .all() 

202 ) 

203 

204 logger.info(f"{users_to_send_digests_to=}") 

205 

206 for user in users_to_send_digests_to: 

207 # digest notifications that haven't been delivered yet 

208 # Exclude notifications linked to non-visible moderated content 

209 context = make_background_user_context(user.id) 

210 notifications_and_deliveries = session.execute( 

211 select(Notification, NotificationDelivery) 

212 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

213 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

214 .where(NotificationDelivery.delivered == None) 

215 .where(Notification.user_id == user.id) 

216 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

217 .order_by(Notification.created) 

218 ).all() 

219 

220 if notifications_and_deliveries: 

221 notifications, deliveries = zip(*notifications_and_deliveries) 

222 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications") 

223 logger.info("TODO: supposed to send digest email") 

224 for delivery in deliveries: 

225 delivery.delivered = func.now() 

226 user.last_digest_sent = func.now() 

227 session.commit()