Coverage for app/backend/src/couchers/notifications/background.py: 79%
82 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import logging
3from google.protobuf import empty_pb2
4from sqlalchemy import select
5from sqlalchemy.orm import Session
6from sqlalchemy.sql import exists, func
8from couchers.context import make_background_user_context
9from couchers.db import session_scope
10from couchers.email.queuing import queue_email
11from couchers.i18n import LocalizationContext
12from couchers.models import (
13 Notification,
14 NotificationDelivery,
15 NotificationDeliveryType,
16 User,
17)
18from couchers.notifications.push import push_to_user
19from couchers.notifications.render_email import get_send_email_payload
20from couchers.notifications.render_push import render_push_notification
21from couchers.notifications.settings import get_preference
22from couchers.notifications.utils import can_notify_deleted_user
23from couchers.proto.internal import jobs_pb2
24from couchers.sql import moderation_state_column_visible
26logger = logging.getLogger(__name__)
29def _send_email_notification(session: Session, user: User, notification: Notification) -> None:
30 if user.do_not_email and not notification.topic_action.is_critical:
31 logger.info(f"Not emailing {user} based on notification {notification.topic_action} due to emails turned off")
32 return
34 if user.banned_at is not None:
35 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is banned")
36 return
38 if user.deleted_at is not None and not can_notify_deleted_user(notification.topic_action):
39 logger.info(f"Tried emailing {user} based on notification {notification.topic_action} but user is deleted")
40 return
42 context = make_background_user_context(user.id)
44 loc_context = LocalizationContext.from_user(user)
45 if not context.get_boolean_value("notification_translations_enabled", default=False):
46 loc_context = LocalizationContext(locale="en", timezone=loc_context.timezone)
48 payload = get_send_email_payload(
49 user,
50 notification,
51 loc_context,
52 include_ics_attachments=context.get_boolean_value("email_ics_attachments_enabled", default=False),
53 )
55 queue_email(session, payload)
58def _send_push_notification(session: Session, user: User, notification: Notification) -> None:
59 logger.debug(f"Formatting push notification for {user}")
61 content = render_push_notification(notification, LocalizationContext.from_user(user))
62 push_to_user(
63 session,
64 user_id=user.id,
65 topic_action=notification.topic_action.display,
66 content=content,
67 key=notification.key,
68 # keep on server for at most an hour if the client is not around
69 ttl=3600,
70 )
73def handle_notification(payload: jobs_pb2.HandleNotificationPayload) -> None:
74 with session_scope() as session:
75 # Select and lock the row.
76 notification = session.execute(
77 select(Notification).where(Notification.id == payload.notification_id).with_for_update()
78 ).scalar_one()
80 # Check moderation visibility if this notification is linked to moderated content
81 if notification.moderation_state_id:
82 context = make_background_user_context(notification.user_id)
83 content_visible = session.execute(
84 select(
85 exists(
86 select(Notification)
87 .where(Notification.id == notification.id)
88 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
89 )
90 )
91 ).scalar_one()
93 if not content_visible:
94 # Content is not visible to recipient, leave notification for later processing
95 logger.info(
96 f"Deferring notification {notification.id}: content not visible to user {notification.user_id}"
97 )
98 return
100 # ignore this notification if the user hasn't enabled new notifications
101 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one()
103 delivery_types = get_preference(session, notification.user.id, notification.topic_action)
104 for delivery_type in delivery_types:
105 # Check if delivery already exists for this notification and delivery type
106 # (this can happen if the job was queued multiple times)
107 existing_delivery = session.execute(
108 select(NotificationDelivery)
109 .where(NotificationDelivery.notification_id == notification.id)
110 .where(NotificationDelivery.delivery_type == delivery_type)
111 ).scalar_one_or_none()
112 if existing_delivery:
113 logger.info(f"Skipping {delivery_type} delivery for notification {notification.id}: already delivered")
114 continue
116 logger.info(f"Should notify by {delivery_type}")
117 if delivery_type == NotificationDeliveryType.email:
118 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication
119 session.add(
120 NotificationDelivery(
121 notification_id=notification.id,
122 delivered=func.now(),
123 delivery_type=NotificationDeliveryType.email,
124 )
125 )
126 session.flush()
127 _send_email_notification(session, user, notification)
128 elif delivery_type == NotificationDeliveryType.digest:
129 # for digest notifications, add to digest queue
130 session.add(
131 NotificationDelivery(
132 notification_id=notification.id,
133 delivered=None,
134 delivery_type=NotificationDeliveryType.digest,
135 )
136 )
137 elif delivery_type == NotificationDeliveryType.push: 137 ↛ 104line 137 didn't jump to line 104 because the condition on line 137 was always true
138 # for push notifications, we send them straight away (web + mobile)
139 session.add(
140 NotificationDelivery(
141 notification_id=notification.id,
142 delivered=func.now(),
143 delivery_type=NotificationDeliveryType.push,
144 )
145 )
146 session.flush()
147 _send_push_notification(session, user, notification)
150def handle_email_digests(payload: empty_pb2.Empty) -> None:
151 """
152 Sends out email digests
154 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already.
156 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it.
158 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff.
159 """
160 logger.info("Sending out email digests")
162 with session_scope() as session:
163 # already sent email notifications
164 delivered_email_notifications = (
165 select(
166 Notification.id.label("notification_id"),
167 # min is superfluous but needed for group_by
168 func.min(NotificationDelivery.id).label("notification_delivery_id"),
169 )
170 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
171 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email)
172 .where(NotificationDelivery.delivered != None)
173 .group_by(Notification.id)
174 .subquery()
175 )
177 # users who have unsent "digest" type notifications but not sent email notifications
178 users_to_send_digests_to = (
179 session.execute(
180 select(User)
181 .where(User.digest_frequency != None)
182 .where(User.last_digest_sent < func.now() - User.digest_frequency)
183 # todo: tz
184 .join(Notification, Notification.user_id == User.id)
185 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
186 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
187 .where(NotificationDelivery.delivered == None)
188 .outerjoin(
189 delivered_email_notifications,
190 delivered_email_notifications.c.notification_id == Notification.id,
191 )
192 .where(delivered_email_notifications.c.notification_delivery_id == None)
193 .group_by(User.id)
194 )
195 .scalars()
196 .all()
197 )
199 logger.info(f"{users_to_send_digests_to=}")
201 for user in users_to_send_digests_to:
202 # digest notifications that haven't been delivered yet
203 # Exclude notifications linked to non-visible moderated content
204 context = make_background_user_context(user.id)
205 notifications_and_deliveries = session.execute(
206 select(Notification, NotificationDelivery)
207 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
208 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
209 .where(NotificationDelivery.delivered == None)
210 .where(Notification.user_id == user.id)
211 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
212 .order_by(Notification.created)
213 ).all()
215 if notifications_and_deliveries:
216 notifications, deliveries = zip(*notifications_and_deliveries)
217 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications")
218 logger.info("TODO: supposed to send digest email")
219 for delivery in deliveries:
220 delivery.delivered = func.now()
221 user.last_digest_sent = func.now()
222 session.commit()