Coverage for app / backend / src / couchers / notifications / background.py: 83%
106 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import dataclasses
2import logging
4from google.protobuf import empty_pb2
5from sqlalchemy import select
6from sqlalchemy.orm import Session
7from sqlalchemy.sql import exists, func
9from couchers import urls
10from couchers.config import config
11from couchers.context import make_background_user_context
12from couchers.db import session_scope
13from couchers.email.queuing import queue_email
14from couchers.i18n import LocalizationContext
15from couchers.models import (
16 Notification,
17 NotificationDelivery,
18 NotificationDeliveryType,
19 User,
20)
21from couchers.notifications.push import push_to_user
22from couchers.notifications.quick_links import (
23 generate_do_not_email,
24 generate_unsub_topic_action,
25 generate_unsub_topic_key,
26)
27from couchers.notifications.render_email import get_list_unsubscribe_header, render_email_notification
28from couchers.notifications.render_push import render_push_notification
29from couchers.notifications.settings import get_preference
30from couchers.notifications.utils import can_notify_deleted_user
31from couchers.proto.internal import jobs_pb2
32from couchers.sql import moderation_state_column_visible
33from couchers.templating import Jinja2Template, template_folder
34from couchers.utils import now
36logger = logging.getLogger(__name__)
39def _send_email_notification(session: Session, user: User, notification: Notification) -> None:
40 if user.do_not_email and not notification.topic_action.is_critical:
41 logger.info(f"Not emailing {user} based on notification {notification.topic_action} due to emails turned off")
42 return
44 if user.banned_at is not None:
45 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is banned")
46 return
48 if user.deleted_at is not None and not can_notify_deleted_user(notification.topic_action):
49 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is deleted")
50 return
52 loc_context = LocalizationContext.from_user(user)
53 if not config["ENABLE_NOTIFICATION_TRANSLATIONS"]: 53 ↛ 56line 53 didn't jump to line 56 because the condition on line 53 was always true
54 loc_context = dataclasses.replace(loc_context, locale="en")
56 rendered = render_email_notification(notification, loc_context)
58 template_args = {
59 **rendered.template_args,
60 "header_subject": rendered.subject,
61 "header_preview": rendered.preview,
62 "user": user,
63 "time": notification.created,
64 "footer_timezone_name": loc_context.localized_timezone,
65 "footer_copyright_year": now().year,
66 }
68 if notification.topic_action.is_critical:
69 template_args["footer_email_is_critical"] = True
70 else:
71 template_args["footer_email_is_critical"] = False
72 template_args["footer_manage_notifications_link"] = urls.notification_settings_link()
73 template_args["footer_do_not_email_link"] = generate_do_not_email(user)
75 if rendered.topic_action_unsubscribe_text:
76 template_args["footer_notification_topic_action"] = rendered.topic_action_unsubscribe_text
77 template_args["footer_notification_topic_action_link"] = generate_unsub_topic_action(notification)
79 if rendered.topic_key_unsubscribe_text:
80 template_args["footer_notification_topic_key"] = rendered.topic_key_unsubscribe_text
81 template_args["footer_notification_topic_key_link"] = generate_unsub_topic_key(notification)
83 # Format plaintext template
84 plain_tmplt_body = (template_folder / f"{rendered.template_name}.txt").read_text()
85 plain_tmplt_footer = (template_folder / "_footer.txt").read_text()
86 plain_tmplt = Jinja2Template(source=plain_tmplt_body + plain_tmplt_footer, html=False)
87 plain = plain_tmplt.render(template_args, loc_context)
89 # Format html template
90 html_tmplt = Jinja2Template(
91 source=(template_folder / "generated_html" / f"{rendered.template_name}.html").read_text(), html=True
92 )
93 html = html_tmplt.render(template_args, loc_context)
95 list_unsubscribe_header = get_list_unsubscribe_header(notification)
96 queue_email(
97 session,
98 sender_name=config["NOTIFICATION_EMAIL_SENDER"],
99 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"],
100 recipient=user.email,
101 subject=config["NOTIFICATION_PREFIX"] + rendered.subject,
102 plain=plain,
103 html=html,
104 source_data=config["VERSION"] + f"/{rendered.template_name}",
105 list_unsubscribe_header=list_unsubscribe_header,
106 )
109def _send_push_notification(session: Session, user: User, notification: Notification) -> None:
110 logger.debug(f"Formatting push notification for {user}")
112 content = render_push_notification(notification, LocalizationContext.from_user(user))
113 push_to_user(
114 session,
115 user_id=user.id,
116 topic_action=notification.topic_action.display,
117 content=content,
118 key=notification.key,
119 # keep on server for at most an hour if the client is not around
120 ttl=3600,
121 )
124def handle_notification(payload: jobs_pb2.HandleNotificationPayload) -> None:
125 with session_scope() as session:
126 # Select and lock the row.
127 notification = session.execute(
128 select(Notification).where(Notification.id == payload.notification_id).with_for_update()
129 ).scalar_one()
131 # Check moderation visibility if this notification is linked to moderated content
132 if notification.moderation_state_id:
133 context = make_background_user_context(notification.user_id)
134 content_visible = session.execute(
135 select(
136 exists(
137 select(Notification)
138 .where(Notification.id == notification.id)
139 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
140 )
141 )
142 ).scalar_one()
144 if not content_visible:
145 # Content is not visible to recipient, leave notification for later processing
146 logger.info(
147 f"Deferring notification {notification.id}: content not visible to user {notification.user_id}"
148 )
149 return
151 # ignore this notification if the user hasn't enabled new notifications
152 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one()
154 delivery_types = get_preference(session, notification.user.id, notification.topic_action)
155 for delivery_type in delivery_types:
156 # Check if delivery already exists for this notification and delivery type
157 # (this can happen if the job was queued multiple times)
158 existing_delivery = session.execute(
159 select(NotificationDelivery)
160 .where(NotificationDelivery.notification_id == notification.id)
161 .where(NotificationDelivery.delivery_type == delivery_type)
162 ).scalar_one_or_none()
163 if existing_delivery:
164 logger.info(f"Skipping {delivery_type} delivery for notification {notification.id}: already delivered")
165 continue
167 logger.info(f"Should notify by {delivery_type}")
168 if delivery_type == NotificationDeliveryType.email:
169 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication
170 session.add(
171 NotificationDelivery(
172 notification_id=notification.id,
173 delivered=func.now(),
174 delivery_type=NotificationDeliveryType.email,
175 )
176 )
177 session.flush()
178 _send_email_notification(session, user, notification)
179 elif delivery_type == NotificationDeliveryType.digest:
180 # for digest notifications, add to digest queue
181 session.add(
182 NotificationDelivery(
183 notification_id=notification.id,
184 delivered=None,
185 delivery_type=NotificationDeliveryType.digest,
186 )
187 )
188 elif delivery_type == NotificationDeliveryType.push: 188 ↛ 155line 188 didn't jump to line 155 because the condition on line 188 was always true
189 # for push notifications, we send them straight away (web + mobile)
190 session.add(
191 NotificationDelivery(
192 notification_id=notification.id,
193 delivered=func.now(),
194 delivery_type=NotificationDeliveryType.push,
195 )
196 )
197 session.flush()
198 _send_push_notification(session, user, notification)
201def handle_email_digests(payload: empty_pb2.Empty) -> None:
202 """
203 Sends out email digests
205 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already.
207 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it.
209 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff.
210 """
211 logger.info("Sending out email digests")
213 with session_scope() as session:
214 # already sent email notifications
215 delivered_email_notifications = (
216 select(
217 Notification.id.label("notification_id"),
218 # min is superfluous but needed for group_by
219 func.min(NotificationDelivery.id).label("notification_delivery_id"),
220 )
221 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
222 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email)
223 .where(NotificationDelivery.delivered != None)
224 .group_by(Notification.id)
225 .subquery()
226 )
228 # users who have unsent "digest" type notifications but not sent email notifications
229 users_to_send_digests_to = (
230 session.execute(
231 select(User)
232 .where(User.digest_frequency != None)
233 .where(User.last_digest_sent < func.now() - User.digest_frequency)
234 # todo: tz
235 .join(Notification, Notification.user_id == User.id)
236 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
237 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
238 .where(NotificationDelivery.delivered == None)
239 .outerjoin(
240 delivered_email_notifications,
241 delivered_email_notifications.c.notification_id == Notification.id,
242 )
243 .where(delivered_email_notifications.c.notification_delivery_id == None)
244 .group_by(User.id)
245 )
246 .scalars()
247 .all()
248 )
250 logger.info(f"{users_to_send_digests_to=}")
252 for user in users_to_send_digests_to:
253 # digest notifications that haven't been delivered yet
254 # Exclude notifications linked to non-visible moderated content
255 context = make_background_user_context(user.id)
256 notifications_and_deliveries = session.execute(
257 select(Notification, NotificationDelivery)
258 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
259 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
260 .where(NotificationDelivery.delivered == None)
261 .where(Notification.user_id == user.id)
262 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
263 .order_by(Notification.created)
264 ).all()
266 if notifications_and_deliveries:
267 notifications, deliveries = zip(*notifications_and_deliveries)
268 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications")
269 logger.info("TODO: supposed to send digest email")
270 for delivery in deliveries:
271 delivery.delivered = func.now()
272 user.last_digest_sent = func.now()
273 session.commit()