Coverage for src/couchers/notifications/background.py: 70%
125 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
1import logging
2from pathlib import Path
4from google.protobuf import empty_pb2
5from jinja2 import Environment, FileSystemLoader
6from sqlalchemy.sql import func
8from couchers import urls
9from couchers.config import config
10from couchers.db import session_scope
11from couchers.email import queue_email
12from couchers.models import (
13 Notification,
14 NotificationDelivery,
15 NotificationDeliveryType,
16 PushNotificationDeliveryAttempt,
17 PushNotificationSubscription,
18 User,
19)
20from couchers.notifications.push import push_to_user
21from couchers.notifications.push_api import send_push
22from couchers.notifications.render import render_notification
23from couchers.notifications.settings import get_preference
24from couchers.notifications.unsubscribe import (
25 generate_do_not_email,
26 generate_unsub_topic_action,
27 generate_unsub_topic_key,
28)
29from couchers.sql import couchers_select as select
30from couchers.templates.v2 import add_filters
31from couchers.utils import get_tz_as_text, now
32from proto.internal import jobs_pb2
34logger = logging.getLogger(__name__)
36template_folder = Path(__file__).parent / ".." / ".." / ".." / "templates" / "v2"
38loader = FileSystemLoader(template_folder)
39env = Environment(loader=loader, trim_blocks=True)
41add_filters(env)
44def _send_email_notification(session, user: User, notification: Notification):
45 rendered = render_notification(user, notification)
46 template_args = {
47 "user": user,
48 "time": notification.created,
49 **rendered.email_template_args,
50 }
52 template_args["_year"] = now().year
53 template_args["_timezone_display"] = get_tz_as_text(user.timezone or "Etc/UTC")
55 plain_unsub_section = "\n\n---\n\n"
56 if rendered.is_critical:
57 plain_unsub_section += "This is a security email, you cannot unsubscribe from it."
58 html_unsub_section = "This is a security email, you cannot unsubscribe from it."
59 else:
60 manage_link = urls.notification_settings_link()
61 plain_unsub_section += f"Edit your notification settings at <{manage_link}>"
62 html_unsub_section = f'<a href="{manage_link}">Manage notification preferences</a>.'
63 unsub_options = []
64 ta = rendered.email_topic_action_unsubscribe_text
65 tk = rendered.email_topic_key_unsubscribe_text
66 ta_link = generate_unsub_topic_action(notification)
67 tk_link = generate_unsub_topic_key(notification)
68 if ta:
69 plain_unsub_section += f"\n\nTurn off emails for {ta}: <{ta_link}>"
70 unsub_options.append(f'<a href="{ta_link}">{ta}</a>')
71 if tk:
72 plain_unsub_section += f"\n\nTurn off emails for {tk}: <{tk_link}>"
73 unsub_options.append(f'<a href="{tk_link}">{tk}</a>')
74 if unsub_options:
75 html_unsub_section += f'<br />Turn off emails for: {" / ".join(unsub_options)}.'
76 dne_link = generate_do_not_email(user)
77 plain_unsub_section += f"\n\nDo not email me (disables hosting): <{dne_link}>"
78 html_unsub_section += f'<br /><a href="{dne_link}">Do not email me (disables hosting)</a>.'
80 plain_tmplt = (template_folder / f"{rendered.email_template_name}.txt").read_text()
81 plain = env.from_string(plain_tmplt + plain_unsub_section).render(template_args)
82 html_tmplt = (template_folder / "generated_html" / f"{rendered.email_template_name}.html").read_text()
83 html = env.from_string(html_tmplt.replace("___UNSUB_SECTION___", html_unsub_section)).render(template_args)
85 if not rendered.is_critical:
86 if user.do_not_email:
87 logger.info(
88 f"Not emailing {user} based on template {rendered.email_template_name} due to emails turned off"
89 )
90 return
91 if not user.is_visible:
92 logger.error(
93 f"Tried emailing {user} based on template {rendered.email_template_name} but user not visible"
94 )
95 return
97 list_unsubscribe_header = None
98 if rendered.email_list_unsubscribe_url:
99 list_unsubscribe_header = f"<{rendered.email_list_unsubscribe_url}>"
101 queue_email(
102 session,
103 sender_name=config["NOTIFICATION_EMAIL_SENDER"],
104 sender_email=config["NOTIFICATION_EMAIL_ADDRESS"],
105 recipient=user.email,
106 subject=config["NOTIFICATION_PREFIX"] + rendered.email_subject,
107 plain=plain,
108 html=html,
109 source_data=config["VERSION"] + f"/{rendered.email_template_name}",
110 list_unsubscribe_header=list_unsubscribe_header,
111 )
114def _send_push_notification(session, user: User, notification: Notification):
115 logger.debug(f"Formatting push notification for {user}")
117 rendered = render_notification(user, notification)
119 if not rendered.push_title:
120 raise Exception(f"Tried to send push notification to {user} but didn't have push info")
122 push_to_user(
123 session,
124 user_id=user.id,
125 title=rendered.push_title,
126 body=rendered.push_body,
127 icon=rendered.push_icon,
128 url=rendered.push_url,
129 topic_action=notification.topic_action.display,
130 key=notification.key,
131 # keep on server for at most an hour if the client is not around
132 ttl=3600,
133 )
136def handle_notification(payload: jobs_pb2.HandleNotificationPayload):
137 with session_scope() as session:
138 notification = session.execute(
139 select(Notification).where(Notification.id == payload.notification_id)
140 ).scalar_one()
142 # ignore this notification if the user hasn't enabled new notifications
143 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one()
145 topic, action = notification.topic_action.unpack()
146 delivery_types = get_preference(session, notification.user.id, notification.topic_action)
147 for delivery_type in delivery_types:
148 logger.info(f"Should notify by {delivery_type}")
149 if delivery_type == NotificationDeliveryType.email:
150 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication
151 session.add(
152 NotificationDelivery(
153 notification_id=notification.id,
154 delivered=func.now(),
155 delivery_type=NotificationDeliveryType.email,
156 )
157 )
158 _send_email_notification(session, user, notification)
159 elif delivery_type == NotificationDeliveryType.digest:
160 # for digest notifications, add to digest queue
161 session.add(
162 NotificationDelivery(
163 notification_id=notification.id,
164 delivered=None,
165 delivery_type=NotificationDeliveryType.digest,
166 )
167 )
168 elif delivery_type == NotificationDeliveryType.push:
169 # for push notifications, we send them straight away
170 session.add(
171 NotificationDelivery(
172 notification_id=notification.id,
173 delivered=func.now(),
174 delivery_type=NotificationDeliveryType.push,
175 )
176 )
177 _send_push_notification(session, user, notification)
180def send_raw_push_notification(payload: jobs_pb2.SendRawPushNotificationPayload):
181 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
182 logger.info("Not sending push notification due to push notifications disabled")
184 with session_scope() as session:
185 if len(payload.data) > 3072:
186 raise Exception(f"Data too long for push notification to sub {payload.push_notification_subscription_id}")
187 sub = session.execute(
188 select(PushNotificationSubscription).where(
189 PushNotificationSubscription.id == payload.push_notification_subscription_id
190 )
191 ).scalar_one()
192 if sub.disabled_at < now():
193 logger.error(f"Tried to send push to disabled subscription: {sub.id}. Disabled at {sub.disabled_at}.")
194 return
195 # this of requests.response
196 resp = send_push(
197 payload.data,
198 sub.endpoint,
199 sub.auth_key,
200 sub.p256dh_key,
201 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"],
202 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"],
203 ttl=payload.ttl,
204 )
205 success = resp.status_code in [200, 201, 202]
206 session.add(
207 PushNotificationDeliveryAttempt(
208 push_notification_subscription_id=sub.id,
209 success=success,
210 status_code=resp.status_code,
211 response=resp.text,
212 )
213 )
214 session.commit()
215 if success:
216 logger.debug(f"Successfully sent push to sub {sub.id} for user {sub.user}")
217 elif resp.status_code == 410:
218 # gone
219 logger.info(f"Push sub {sub.id} for user {sub.user} is gone! Disabling.")
220 sub.disabled_at = func.now()
221 else:
222 raise Exception(f"Failed to deliver push to {sub.id}, code: {resp.status_code}. Response: {resp.text}")
225def handle_email_digests(payload: empty_pb2.Empty):
226 """
227 Sends out email digests
229 The email digest is sent if the user has "digest" type notifications that have not had an individual email sent about them already.
231 If a digest is sent, then we send out every notification that has type digest, regardless of if they already got another type of notification about it.
233 That is, we don't send out an email unless there's something new, but if we do send one out, we send new and old stuff.
234 """
235 logger.info("Sending out email digests")
237 with session_scope() as session:
238 # already sent email notifications
239 delivered_email_notifications = (
240 select(
241 Notification.id.label("notification_id"),
242 # min is superfluous but needed for group_by
243 func.min(NotificationDelivery.id).label("notification_delivery_id"),
244 )
245 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
246 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email)
247 .where(NotificationDelivery.delivered != None)
248 .group_by(Notification)
249 .subquery()
250 )
252 # users who have unsent "digest" type notifications but not sent email notifications
253 users_to_send_digests_to = (
254 session.execute(
255 select(User)
256 .where(User.digest_frequency != None)
257 .where(User.last_digest_sent < func.now() - User.digest_frequency)
258 # todo: tz
259 .join(Notification, Notification.user_id == User.id)
260 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
261 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
262 .where(NotificationDelivery.delivered == None)
263 .outerjoin(
264 delivered_email_notifications,
265 delivered_email_notifications.c.notification_id == Notification.id,
266 )
267 .where(delivered_email_notifications.c.notification_delivery_id == None)
268 .group_by(User)
269 )
270 .scalars()
271 .all()
272 )
274 logger.info(f"{users_to_send_digests_to=}")
276 for user in users_to_send_digests_to:
277 # digest notifications that haven't been delivered yet
278 notifications_and_deliveries = session.execute(
279 select(Notification, NotificationDelivery)
280 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
281 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
282 .where(NotificationDelivery.delivered == None)
283 .where(Notification.user_id == user.id)
284 .order_by(Notification.created)
285 ).all()
287 if notifications_and_deliveries:
288 notifications, deliveries = zip(*notifications_and_deliveries)
289 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications")
290 logger.info("TODO: supposed to send digest email")
291 for delivery in deliveries:
292 delivery.delivered = func.now()
293 user.last_digest_sent = func.now()
294 session.commit()