Coverage for app/backend/src/couchers/notifications/background.py: 79%

82 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import logging 

2 

3from google.protobuf import empty_pb2 

4from sqlalchemy import select 

5from sqlalchemy.orm import Session 

6from sqlalchemy.sql import exists, func 

7 

8from couchers.context import make_background_user_context 

9from couchers.db import session_scope 

10from couchers.email.queuing import queue_email 

11from couchers.i18n import LocalizationContext 

12from couchers.models import ( 

13 Notification, 

14 NotificationDelivery, 

15 NotificationDeliveryType, 

16 User, 

17) 

18from couchers.notifications.push import push_to_user 

19from couchers.notifications.render_email import get_send_email_payload 

20from couchers.notifications.render_push import render_push_notification 

21from couchers.notifications.settings import get_preference 

22from couchers.notifications.utils import can_notify_deleted_user 

23from couchers.proto.internal import jobs_pb2 

24from couchers.sql import moderation_state_column_visible 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29def _send_email_notification(session: Session, user: User, notification: Notification) -> None: 

30 if user.do_not_email and not notification.topic_action.is_critical: 

31 logger.info(f"Not emailing {user} based on notification {notification.topic_action} due to emails turned off") 

32 return 

33 

34 if user.banned_at is not None: 

35 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is banned") 

36 return 

37 

38 if user.deleted_at is not None and not can_notify_deleted_user(notification.topic_action): 

39 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is deleted") 

40 return 

41 

42 context = make_background_user_context(user.id) 

43 

44 loc_context = LocalizationContext.from_user(user) 

45 if not context.get_boolean_value("notification_translations_enabled", default=False): 

46 loc_context = LocalizationContext(locale="en", timezone=loc_context.timezone) 

47 

48 payload = get_send_email_payload( 

49 user, 

50 notification, 

51 loc_context, 

52 include_ics_attachments=context.get_boolean_value("email_ics_attachments_enabled", default=False), 

53 ) 

54 

55 queue_email(session, payload) 

56 

57 

58def _send_push_notification(session: Session, user: User, notification: Notification) -> None: 

59 logger.debug(f"Formatting push notification for {user}") 

60 

61 content = render_push_notification(notification, LocalizationContext.from_user(user)) 

62 push_to_user( 

63 session, 

64 user_id=user.id, 

65 topic_action=notification.topic_action.display, 

66 content=content, 

67 key=notification.key, 

68 # keep on server for at most an hour if the client is not around 

69 ttl=3600, 

70 ) 

71 

72 

73def handle_notification(payload: jobs_pb2.HandleNotificationPayload) -> None: 

74 with session_scope() as session: 

75 # Select and lock the row. 

76 notification = session.execute( 

77 select(Notification).where(Notification.id == payload.notification_id).with_for_update() 

78 ).scalar_one() 

79 

80 # Check moderation visibility if this notification is linked to moderated content 

81 if notification.moderation_state_id: 

82 context = make_background_user_context(notification.user_id) 

83 content_visible = session.execute( 

84 select( 

85 exists( 

86 select(Notification) 

87 .where(Notification.id == notification.id) 

88 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

89 ) 

90 ) 

91 ).scalar_one() 

92 

93 if not content_visible: 

94 # Content is not visible to recipient, leave notification for later processing 

95 logger.info( 

96 f"Deferring notification {notification.id}: content not visible to user {notification.user_id}" 

97 ) 

98 return 

99 

100 # ignore this notification if the user hasn't enabled new notifications 

101 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one() 

102 

103 delivery_types = get_preference(session, notification.user.id, notification.topic_action) 

104 for delivery_type in delivery_types: 

105 # Check if delivery already exists for this notification and delivery type 

106 # (this can happen if the job was queued multiple times) 

107 existing_delivery = session.execute( 

108 select(NotificationDelivery) 

109 .where(NotificationDelivery.notification_id == notification.id) 

110 .where(NotificationDelivery.delivery_type == delivery_type) 

111 ).scalar_one_or_none() 

112 if existing_delivery: 

113 logger.info(f"Skipping {delivery_type} delivery for notification {notification.id}: already delivered") 

114 continue 

115 

116 logger.info(f"Should notify by {delivery_type}") 

117 if delivery_type == NotificationDeliveryType.email: 

118 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication 

119 session.add( 

120 NotificationDelivery( 

121 notification_id=notification.id, 

122 delivered=func.now(), 

123 delivery_type=NotificationDeliveryType.email, 

124 ) 

125 ) 

126 session.flush() 

127 _send_email_notification(session, user, notification) 

128 elif delivery_type == NotificationDeliveryType.digest: 

129 # for digest notifications, add to digest queue 

130 session.add( 

131 NotificationDelivery( 

132 notification_id=notification.id, 

133 delivered=None, 

134 delivery_type=NotificationDeliveryType.digest, 

135 ) 

136 ) 

137 elif delivery_type == NotificationDeliveryType.push: 137 ↛ 104line 137 didn't jump to line 104 because the condition on line 137 was always true

138 # for push notifications, we send them straight away (web + mobile) 

139 session.add( 

140 NotificationDelivery( 

141 notification_id=notification.id, 

142 delivered=func.now(), 

143 delivery_type=NotificationDeliveryType.push, 

144 ) 

145 ) 

146 session.flush() 

147 _send_push_notification(session, user, notification) 

148 

149 

150def handle_email_digests(payload: empty_pb2.Empty) -> None: 

151 """ 

152 Sends out email digests 

153 

154 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already. 

155 

156 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it. 

157 

158 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff. 

159 """ 

160 logger.info("Sending out email digests") 

161 

162 with session_scope() as session: 

163 # already sent email notifications 

164 delivered_email_notifications = ( 

165 select( 

166 Notification.id.label("notification_id"), 

167 # min is superfluous but needed for group_by 

168 func.min(NotificationDelivery.id).label("notification_delivery_id"), 

169 ) 

170 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

171 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email) 

172 .where(NotificationDelivery.delivered != None) 

173 .group_by(Notification.id) 

174 .subquery() 

175 ) 

176 

177 # users who have unsent "digest" type notifications but not sent email notifications 

178 users_to_send_digests_to = ( 

179 session.execute( 

180 select(User) 

181 .where(User.digest_frequency != None) 

182 .where(User.last_digest_sent < func.now() - User.digest_frequency) 

183 # todo: tz 

184 .join(Notification, Notification.user_id == User.id) 

185 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

186 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

187 .where(NotificationDelivery.delivered == None) 

188 .outerjoin( 

189 delivered_email_notifications, 

190 delivered_email_notifications.c.notification_id == Notification.id, 

191 ) 

192 .where(delivered_email_notifications.c.notification_delivery_id == None) 

193 .group_by(User.id) 

194 ) 

195 .scalars() 

196 .all() 

197 ) 

198 

199 logger.info(f"{users_to_send_digests_to=}") 

200 

201 for user in users_to_send_digests_to: 

202 # digest notifications that haven't been delivered yet 

203 # Exclude notifications linked to non-visible moderated content 

204 context = make_background_user_context(user.id) 

205 notifications_and_deliveries = session.execute( 

206 select(Notification, NotificationDelivery) 

207 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id) 

208 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest) 

209 .where(NotificationDelivery.delivered == None) 

210 .where(Notification.user_id == user.id) 

211 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

212 .order_by(Notification.created) 

213 ).all() 

214 

215 if notifications_and_deliveries: 

216 notifications, deliveries = zip(*notifications_and_deliveries) 

217 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications") 

218 logger.info("TODO: supposed to send digest email") 

219 for delivery in deliveries: 

220 delivery.delivered = func.now() 

221 user.last_digest_sent = func.now() 

222 session.commit()