Coverage for src/couchers/notifications/background.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2from datetime import timedelta
3from typing import List
5from sqlalchemy.sql import and_, func
7from couchers.db import session_scope
8from couchers.models import (
9 Notification,
10 NotificationDelivery,
11 NotificationDeliveryType,
12 NotificationPreference,
13 NotificationTopicAction,
14 User,
15)
16from couchers.sql import couchers_select as select
17from couchers.tasks import send_digest_email, send_notification_email
19logger = logging.getLogger(__name__)
22def get_notification_preference(
23 session, user_id: int, topic_action: NotificationTopicAction
24) -> List[NotificationDeliveryType]:
25 """
26 Gets the user's preference from the DB or otherwise falls back to defaults
28 Must be done in session scope
30 Returns list of delivery types
31 """
32 overrides = {
33 res.delivery_type: res.deliver
34 for res in session.execute(
35 select(NotificationPreference)
36 .where(NotificationPreference.id == user_id)
37 .where(NotificationPreference.topic_action == topic_action)
38 )
39 .scalars()
40 .all()
41 }
42 return [dt for dt in NotificationDeliveryType if overrides.get(dt, dt in topic_action.defaults)]
45def handle_notification(notification_id):
46 with session_scope() as session:
47 notification = session.execute(select(Notification).where(Notification.id == notification_id)).scalar_one()
49 # ignore this notification if the user hasn't enabled new notifications
50 user = session.execute(select(User).where(User.id == notification.user_id)).scalar_one()
51 if not user.new_notifications_enabled:
52 logger.info(f"Skipping notification for {user} due to new notifications disabled")
53 return
55 topic, action = notification.topic_action.unpack()
56 delivery_types = get_notification_preference(session, notification.user.id, notification.topic_action)
57 for delivery_type in delivery_types:
58 logger.info(f"Should notify by {delivery_type}")
59 if delivery_type == NotificationDeliveryType.email:
60 # for emails we don't deliver straight up, wait until the email background worker gets around to it and handles deduplication
61 session.add(
62 NotificationDelivery(
63 notification_id=notification.id,
64 delivered=None,
65 delivery_type=NotificationDeliveryType.email,
66 )
67 )
68 elif delivery_type == NotificationDeliveryType.push:
69 # for push notifications, we send them straight away
70 session.add(
71 NotificationDelivery(
72 notification_id=notification.id,
73 delivered=func.now(),
74 delivery_type=NotificationDeliveryType.push,
75 )
76 )
77 # todo
78 logger.info("Supposed to send push notification")
81def handle_email_notifications():
82 """
83 Sends out emails for notifications
84 """
85 logger.info(f"Sending out email notifications")
87 with session_scope() as session:
88 # delivered email notifications: we don't want to send emails for these
89 subquery = (
90 select(Notification.user_id, Notification.topic_action, Notification.key)
91 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
92 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email)
93 .where(NotificationDelivery.delivered != None)
94 .where(Notification.created > func.now() - timedelta(hours=24))
95 .group_by(Notification.user_id, Notification.topic_action, Notification.key)
96 .subquery()
97 )
99 email_notifications_to_send = session.execute(
100 (
101 select(
102 User,
103 Notification.topic_action,
104 Notification.key,
105 func.min(Notification.id).label("notification_id"),
106 func.min(NotificationDelivery.id).label("notification_delivery_id"),
107 )
108 .join(User, User.id == Notification.user_id)
109 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
110 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email)
111 .where(Notification.created > func.now() - timedelta(hours=1))
112 .group_by(User, Notification.user_id, Notification.topic_action, Notification.key)
113 # pick the notifications that haven't been delivered
114 .outerjoin(
115 subquery,
116 and_(
117 and_(
118 subquery.c.user_id == Notification.user_id,
119 subquery.c.topic_action == Notification.topic_action,
120 ),
121 subquery.c.key == Notification.key,
122 ),
123 )
124 .where(subquery.c.key == None)
125 )
126 ).all()
128 for user, topic_action, key, notification_id, notification_delivery_id in email_notifications_to_send:
129 topic, action = topic_action.unpack()
130 logger.info(f"Sending notification id {notification_id} to {user.id} ({topic}/{action}/{key})")
131 notification_delivery = session.execute(
132 select(NotificationDelivery).where(NotificationDelivery.id == notification_delivery_id)
133 ).scalar_one()
134 assert notification_delivery.delivery_type == NotificationDeliveryType.email
135 assert not notification_delivery.delivered
136 assert notification_delivery.notification_id == notification_id
137 send_notification_email(notification_delivery.notification)
138 notification_delivery.delivered = func.now()
139 session.commit()
142def handle_email_digests():
143 """
144 Sends out email digests
145 """
146 logger.info(f"Sending out email digests")
148 with session_scope() as session:
149 users_to_send_digests_to = session.execute(
150 (
151 select(User)
152 .where(User.last_digest_sent < func.now() - timedelta(hours=24))
153 # todo: tz
154 .join(Notification, Notification.user_id == User.id)
155 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
156 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
157 .where(NotificationDelivery.delivered == None)
158 .group_by(User)
159 )
160 ).all()
162 for user in users_to_send_digests_to:
163 # already sent notifications
164 subquery = (
165 select(
166 Notification.id.label("notification_id"),
167 func.min(NotificationDelivery.id).label("notification_delivery_id"),
168 )
169 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
170 .where(NotificationDelivery.delivered == True)
171 .where(Notification.user_id == user.id)
172 .group_by(Notification)
173 .subquery()
174 )
176 # notifications that haven't been delivered in any way yet
177 notifications_and_deliveries = session.execute(
178 (
179 select(Notification, NotificationDelivery)
180 .join(NotificationDelivery, NotificationDelivery.notification_id == Notification.id)
181 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
182 .outerjoin(subquery, subquery.c.notification_id == Notification.id)
183 .where(subquery.c.notification_delivery_id == None)
184 .where(Notification.user_id == user.id)
185 .order_by(Notification.created)
186 )
187 ).all()
189 if notifications_and_deliveries:
190 notifications, deliveries = zip(*notifications_and_deliveries)
191 logger.info(f"Sending {user.id=} a digest with {len(notifications)} notifications")
192 send_digest_email(notifications)
193 for delivery in deliveries:
194 delivery.delivered = func.now()
195 user.last_digest_sent = func.now()
196 session.commit()