Coverage for src/couchers/notifications/background.py: 68%
130 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import logging
2from pathlib import Path
4from google.protobuf import empty_pb2
5from jinja2 import Environment, FileSystemLoader
6from sqlalchemy.sql import func
8from couchers import urls
9from couchers.config import config
10from couchers.db import session_scope
11from couchers.email import queue_email
12from couchers.metrics import push_notification_counter, push_notification_disabled_counter
13from couchers.models import (
14 Notification,
15 NotificationDelivery,
16 NotificationDeliveryType,
17 PushNotificationDeliveryAttempt,
18 PushNotificationSubscription,
19 User,
20)
21from couchers.notifications.push import push_to_user
22from couchers.notifications.push_api import send_push
23from couchers.notifications.render import render_notification
24from couchers.notifications.settings import get_preference
25from couchers.notifications.unsubscribe import (
26 generate_do_not_email,
27 generate_unsub_topic_action,
28 generate_unsub_topic_key,
29)
30from couchers.sql import couchers_select as select
31from couchers.templates.v2 import add_filters
32from couchers.utils import get_tz_as_text, now
33from proto.internal import jobs_pb2
35logger = logging.getLogger(__name__)
37template_folder = Path(__file__).parent / ".." / ".." / ".." / "templates" / "v2"
39loader = FileSystemLoader(template_folder)
40env = Environment(loader=loader, trim_blocks=True)
42add_filters(env)
45def _send_email_notification(session, user: User, notification: Notification):
46 rendered = render_notification(user, notification)
47 template_args = {
48 "user": user,
49 "time": notification.created,
50 **rendered.email_template_args,
51 }
53 template_args["_year"] = now().year
54 template_args["_timezone_display"] = get_tz_as_text(user.timezone or "Etc/UTC")
56 plain_unsub_section = "\n\n---\n\n"
57 if rendered.is_critical:
58 plain_unsub_section += "This is a security email, you cannot unsubscribe from it."
59 html_unsub_section = "This is a security email, you cannot unsubscribe from it."
60 else:
61 manage_link = urls.notification_settings_link()
62 plain_unsub_section += f"Edit your notification settings at <{manage_link}>"
63 html_unsub_section = f'<a href="{manage_link}">Manage notification preferences</a>.'
64 unsub_options = []
65 ta = rendered.email_topic_action_unsubscribe_text
66 tk = rendered.email_topic_key_unsubscribe_text
67 ta_link = generate_unsub_topic_action(notification)
68 tk_link = generate_unsub_topic_key(notification)
69 if ta:
70 plain_unsub_section += f"\n\nTurn off emails for {ta}: <{ta_link}>"
71 unsub_options.append(f'<a href="{ta_link}">{ta}</a>')
72 if tk:
73 plain_unsub_section += f"\n\nTurn off emails for {tk}: <{tk_link}>"
74 unsub_options.append(f'<a href="{tk_link}">{tk}</a>')
75 if unsub_options:
76 html_unsub_section += f"<br />Turn off emails for: {' / '.join(unsub_options)}."
77 dne_link = generate_do_not_email(user)
78 plain_unsub_section += f"\n\nDo not email me (disables hosting): <{dne_link}>"
79 html_unsub_section += f'<br /><a href="{dne_link}">Do not email me (disables hosting)</a>.'
81 plain_tmplt = (template_folder / f"{rendered.email_template_name}.txt").read_text()
82 plain = env.from_string(plain_tmplt + plain_unsub_section).render(template_args)
83 html_tmplt = (template_folder / "generated_html" / f"{rendered.email_template_name}.html").read_text()
84 html = env.from_string(html_tmplt.replace("___UNSUB_SECTION___", html_unsub_section)).render(template_args)
86 if user.do_not_email and not rendered.is_critical:
87 logger.info(f"Not emailing {user} based on template {rendered.email_template_name} due to emails turned off")
88 return
90 if user.is_banned:
91 logger.info(f"Tried emailing {user} based on template {rendered.email_template_name} but user is banned")
92 return
94 if user.is_deleted and not rendered.allow_deleted:
95 logger.info(f"Tried emailing {user} based on template {rendered.email_template_name} but user is deleted")
96 return
98 list_unsubscribe_header = None
99 if rendered.email_list_unsubscribe_url:
100 list_unsubscribe_header = f"<{rendered.email_list_unsubscribe_url}>"
102 queue_email(
103 session,
104 sender_name=config["NOTIFICATION_EMAIL_SENDER"],
105 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"],
106 recipient=user.email,
107 subject=config["NOTIFICATION_PREFIX"] + rendered.email_subject,
108 plain=plain,
109 html=html,
110 source_data=config["VERSION"] + f"/{rendered.email_template_name}",
111 list_unsubscribe_header=list_unsubscribe_header,
112 )
115def _send_push_notification(session, user: User, notification: Notification):
116 logger.debug(f"Formatting push notification for {user}")
118 rendered = render_notification(user, notification)
120 if not rendered.push_title:
121 raise Exception(f"Tried to send push notification to {user} but didn't have push info")
123 push_to_user(
124 session,
125 user_id=user.id,
126 title=rendered.push_title,
127 body=rendered.push_body,
128 icon=rendered.push_icon,
129 url=rendered.push_url,
130 topic_action=notification.topic_action.display,
131 key=notification.key,
132 # keep on server for at most an hour if the client is not around
133 ttl=3600,
134 )
137def handle_notification(payload: jobs_pb2.HandleNotificationPayload):
138 with session_scope() as session:
139 notification = session.execute(
140 select(Notification).where(Notification.id == payload.notification_id)
141 ).scalar_one()
143 # ignore this notification if the user hasn't enabled new notifications
144 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one()
146 topic, action = notification.topic_action.unpack()
147 delivery_types = get_preference(session, notification.user.id, notification.topic_action)
148 for delivery_type in delivery_types:
149 logger.info(f"Should notify by {delivery_type}")
150 if delivery_type == NotificationDeliveryType.email:
151 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication
152 session.add(
153 NotificationDelivery(
154 notification_id=notification.id,
155 delivered=func.now(),
156 delivery_type=NotificationDeliveryType.email,
157 )
158 )
159 _send_email_notification(session, user, notification)
160 elif delivery_type == NotificationDeliveryType.digest:
161 # for digest notifications, add to digest queue
162 session.add(
163 NotificationDelivery(
164 notification_id=notification.id,
165 delivered=None,
166 delivery_type=NotificationDeliveryType.digest,
167 )
168 )
169 elif delivery_type == NotificationDeliveryType.push:
170 # for push notifications, we send them straight away
171 session.add(
172 NotificationDelivery(
173 notification_id=notification.id,
174 delivered=func.now(),
175 delivery_type=NotificationDeliveryType.push,
176 )
177 )
178 _send_push_notification(session, user, notification)
181def send_raw_push_notification(payload: jobs_pb2.SendRawPushNotificationPayload):
182 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
183 logger.info("Not sending push notification due to push notifications disabled")
185 with session_scope() as session:
186 if len(payload.data) > 3072:
187 raise Exception(f"Data too long for push notification to sub {payload.push_notification_subscription_id}")
188 sub = session.execute(
189 select(PushNotificationSubscription).where(
190 PushNotificationSubscription.id == payload.push_notification_subscription_id
191 )
192 ).scalar_one()
193 if sub.disabled_at < now():
194 logger.error(f"Tried to send push to disabled subscription: {sub.id}. Disabled at {sub.disabled_at}.")
195 return
196 # this of requests.response
197 resp = send_push(
198 payload.data,
199 sub.endpoint,
200 sub.auth_key,
201 sub.p256dh_key,
202 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"],
203 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"],
204 ttl=payload.ttl,
205 )
206 success = resp.status_code in [200, 201, 202]
207 session.add(
208 PushNotificationDeliveryAttempt(
209 push_notification_subscription_id=sub.id,
210 success=success,
211 status_code=resp.status_code,
212 response=resp.text,
213 )
214 )
215 session.commit()
216 if success:
217 logger.debug(f"Successfully sent push to sub {sub.id} for user {sub.user}")
218 push_notification_counter.inc()
219 elif resp.status_code == 404 or resp.status_code == 410:
220 # gone
221 logger.info(f"Push sub {sub.id} for user {sub.user} is gone! Disabling.")
222 sub.disabled_at = func.now()
223 push_notification_disabled_counter.inc()
224 else:
225 raise Exception(f"Failed to deliver push to {sub.id}, code: {resp.status_code}. Response: {resp.text}")
228def handle_email_digests(payload: empty_pb2.Empty):
229 """
230 Sends out email digests
232 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already.
234 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it.
236 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff.
237 """
238 logger.info("Sending out email digests")
240 with session_scope() as session:
241 # already sent email notifications
242 delivered_email_notifications = (
243 select(
244 Notification.id.label("notification_id"),
245 # min is superfluous but needed for group_by
246 func.min(NotificationDelivery.id).label("notification_delivery_id"),
247 )
248 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
249 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email)
250 .where(NotificationDelivery.delivered != None)
251 .group_by(Notification)
252 .subquery()
253 )
255 # users who have unsent "digest" type notifications but not sent email notifications
256 users_to_send_digests_to = (
257 session.execute(
258 select(User)
259 .where(User.digest_frequency != None)
260 .where(User.last_digest_sent < func.now() - User.digest_frequency)
261 # todo: tz
262 .join(Notification, Notification.user_id == User.id)
263 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
264 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
265 .where(NotificationDelivery.delivered == None)
266 .outerjoin(
267 delivered_email_notifications,
268 delivered_email_notifications.c.notification_id == Notification.id,
269 )
270 .where(delivered_email_notifications.c.notification_delivery_id == None)
271 .group_by(User)
272 )
273 .scalars()
274 .all()
275 )
277 logger.info(f"{users_to_send_digests_to=}")
279 for user in users_to_send_digests_to:
280 # digest notifications that haven't been delivered yet
281 notifications_and_deliveries = session.execute(
282 select(Notification, NotificationDelivery)
283 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
284 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
285 .where(NotificationDelivery.delivered == None)
286 .where(Notification.user_id == user.id)
287 .order_by(Notification.created)
288 ).all()
290 if notifications_and_deliveries:
291 notifications, deliveries = zip(*notifications_and_deliveries)
292 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications")
293 logger.info("TODO: supposed to send digest email")
294 for delivery in deliveries:
295 delivery.delivered = func.now()
296 user.last_digest_sent = func.now()
297 session.commit()