Coverage for app / backend / src / couchers / notifications / background.py: 78%
83 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import dataclasses
2import logging
4from google.protobuf import empty_pb2
5from sqlalchemy import select
6from sqlalchemy.orm import Session
7from sqlalchemy.sql import exists, func
9from couchers.config import config
10from couchers.context import make_background_user_context
11from couchers.db import session_scope
12from couchers.email.queuing import queue_email
13from couchers.i18n import LocalizationContext
14from couchers.models import (
15 Notification,
16 NotificationDelivery,
17 NotificationDeliveryType,
18 User,
19)
20from couchers.notifications.push import push_to_user
21from couchers.notifications.render_email import render_email_notification
22from couchers.notifications.render_push import render_push_notification
23from couchers.notifications.settings import get_preference
24from couchers.notifications.utils import can_notify_deleted_user
25from couchers.proto.internal import jobs_pb2
26from couchers.sql import moderation_state_column_visible
28logger = logging.getLogger(__name__)
31def _send_email_notification(session: Session, user: User, notification: Notification) -> None:
32 if user.do_not_email and not notification.topic_action.is_critical:
33 logger.info(f"Not emailing {user} based on notification {notification.topic_action} due to emails turned off")
34 return
36 if user.banned_at is not None:
37 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is banned")
38 return
40 if user.deleted_at is not None and not can_notify_deleted_user(notification.topic_action):
41 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is deleted")
42 return
44 loc_context = LocalizationContext.from_user(user)
45 if not config["ENABLE_NOTIFICATION_TRANSLATIONS"]: 45 ↛ 48line 45 didn't jump to line 48 because the condition on line 45 was always true
46 loc_context = dataclasses.replace(loc_context, locale="en")
48 rendered = render_email_notification(user, notification, loc_context)
50 queue_email(
51 session,
52 sender_name=config["NOTIFICATION_EMAIL_SENDER"],
53 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"],
54 recipient=user.email,
55 subject=config["NOTIFICATION_PREFIX"] + rendered.subject,
56 plain=rendered.body_plaintext,
57 html=rendered.body_html,
58 source_data=rendered.source_data,
59 list_unsubscribe_header=rendered.list_unsubscribe_header,
60 )
63def _send_push_notification(session: Session, user: User, notification: Notification) -> None:
64 logger.debug(f"Formatting push notification for {user}")
66 content = render_push_notification(notification, LocalizationContext.from_user(user))
67 push_to_user(
68 session,
69 user_id=user.id,
70 topic_action=notification.topic_action.display,
71 content=content,
72 key=notification.key,
73 # keep on server for at most an hour if the client is not around
74 ttl=3600,
75 )
78def handle_notification(payload: jobs_pb2.HandleNotificationPayload) -> None:
79 with session_scope() as session:
80 # Select and lock the row.
81 notification = session.execute(
82 select(Notification).where(Notification.id == payload.notification_id).with_for_update()
83 ).scalar_one()
85 # Check moderation visibility if this notification is linked to moderated content
86 if notification.moderation_state_id:
87 context = make_background_user_context(notification.user_id)
88 content_visible = session.execute(
89 select(
90 exists(
91 select(Notification)
92 .where(Notification.id == notification.id)
93 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
94 )
95 )
96 ).scalar_one()
98 if not content_visible:
99 # Content is not visible to recipient, leave notification for later processing
100 logger.info(
101 f"Deferring notification {notification.id}: content not visible to user {notification.user_id}"
102 )
103 return
105 # ignore this notification if the user hasn't enabled new notifications
106 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one()
108 delivery_types = get_preference(session, notification.user.id, notification.topic_action)
109 for delivery_type in delivery_types:
110 # Check if delivery already exists for this notification and delivery type
111 # (this can happen if the job was queued multiple times)
112 existing_delivery = session.execute(
113 select(NotificationDelivery)
114 .where(NotificationDelivery.notification_id == notification.id)
115 .where(NotificationDelivery.delivery_type == delivery_type)
116 ).scalar_one_or_none()
117 if existing_delivery:
118 logger.info(f"Skipping {delivery_type} delivery for notification {notification.id}: already delivered")
119 continue
121 logger.info(f"Should notify by {delivery_type}")
122 if delivery_type == NotificationDeliveryType.email:
123 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication
124 session.add(
125 NotificationDelivery(
126 notification_id=notification.id,
127 delivered=func.now(),
128 delivery_type=NotificationDeliveryType.email,
129 )
130 )
131 session.flush()
132 _send_email_notification(session, user, notification)
133 elif delivery_type == NotificationDeliveryType.digest:
134 # for digest notifications, add to digest queue
135 session.add(
136 NotificationDelivery(
137 notification_id=notification.id,
138 delivered=None,
139 delivery_type=NotificationDeliveryType.digest,
140 )
141 )
142 elif delivery_type == NotificationDeliveryType.push: 142 ↛ 109line 142 didn't jump to line 109 because the condition on line 142 was always true
143 # for push notifications, we send them straight away (web + mobile)
144 session.add(
145 NotificationDelivery(
146 notification_id=notification.id,
147 delivered=func.now(),
148 delivery_type=NotificationDeliveryType.push,
149 )
150 )
151 session.flush()
152 _send_push_notification(session, user, notification)
155def handle_email_digests(payload: empty_pb2.Empty) -> None:
156 """
157 Sends out email digests
159 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already.
161 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it.
163 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff.
164 """
165 logger.info("Sending out email digests")
167 with session_scope() as session:
168 # already sent email notifications
169 delivered_email_notifications = (
170 select(
171 Notification.id.label("notification_id"),
172 # min is superfluous but needed for group_by
173 func.min(NotificationDelivery.id).label("notification_delivery_id"),
174 )
175 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
176 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email)
177 .where(NotificationDelivery.delivered != None)
178 .group_by(Notification.id)
179 .subquery()
180 )
182 # users who have unsent "digest" type notifications but not sent email notifications
183 users_to_send_digests_to = (
184 session.execute(
185 select(User)
186 .where(User.digest_frequency != None)
187 .where(User.last_digest_sent < func.now() - User.digest_frequency)
188 # todo: tz
189 .join(Notification, Notification.user_id == User.id)
190 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
191 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
192 .where(NotificationDelivery.delivered == None)
193 .outerjoin(
194 delivered_email_notifications,
195 delivered_email_notifications.c.notification_id == Notification.id,
196 )
197 .where(delivered_email_notifications.c.notification_delivery_id == None)
198 .group_by(User.id)
199 )
200 .scalars()
201 .all()
202 )
204 logger.info(f"{users_to_send_digests_to=}")
206 for user in users_to_send_digests_to:
207 # digest notifications that haven't been delivered yet
208 # Exclude notifications linked to non-visible moderated content
209 context = make_background_user_context(user.id)
210 notifications_and_deliveries = session.execute(
211 select(Notification, NotificationDelivery)
212 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
213 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
214 .where(NotificationDelivery.delivered == None)
215 .where(Notification.user_id == user.id)
216 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
217 .order_by(Notification.created)
218 ).all()
220 if notifications_and_deliveries:
221 notifications, deliveries = zip(*notifications_and_deliveries)
222 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications")
223 logger.info("TODO: supposed to send digest email")
224 for delivery in deliveries:
225 delivery.delivered = func.now()
226 user.last_digest_sent = func.now()
227 session.commit()