Coverage for app / backend / src / couchers / servicers / notifications.py: 90%
129 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import functools
2import json
3import logging
4from zoneinfo import ZoneInfo
6import grpc
7from google.protobuf import empty_pb2
8from sqlalchemy import select, update
9from sqlalchemy.orm import Session
10from sqlalchemy.sql import or_
12from couchers.config import config
13from couchers.constants import DATETIME_INFINITY
14from couchers.context import CouchersContext
15from couchers.i18n import LocalizationContext
16from couchers.models import (
17 DeviceType,
18 HostingStatus,
19 MeetupStatus,
20 Notification,
21 NotificationDeliveryType,
22 PushNotificationPlatform,
23 PushNotificationSubscription,
24 User,
25)
26from couchers.notifications.push import PushNotificationContent, push_to_subscription, push_to_user
27from couchers.notifications.render_push import render_adhoc_push_notification, render_push_notification
28from couchers.notifications.settings import (
29 PreferenceNotUserEditableError,
30 get_topic_actions_by_delivery_type,
31 get_user_setting_groups,
32 set_preference,
33)
34from couchers.notifications.utils import enum_from_topic_action
35from couchers.notifications.web_push_api import decode_key, get_vapid_public_key_from_private_key
36from couchers.proto import notifications_pb2, notifications_pb2_grpc
37from couchers.sql import moderation_state_column_visible, to_bool
38from couchers.utils import Timestamp_from_datetime, now
40logger = logging.getLogger(__name__)
41MAX_PAGINATION_LENGTH = 100
44@functools.cache
45def get_vapid_public_key() -> str:
46 return get_vapid_public_key_from_private_key(config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"])
49def notification_to_pb(user: User, notification: Notification) -> notifications_pb2.Notification:
50 content = render_push_notification(notification, LocalizationContext.from_user(user))
51 return notifications_pb2.Notification(
52 notification_id=notification.id,
53 created=Timestamp_from_datetime(notification.created),
54 topic=notification.topic_action.topic,
55 action=notification.topic_action.action,
56 key=notification.key,
57 title=content.title,
58 body=content.body,
59 icon=content.icon_url,
60 url=content.action_url,
61 is_seen=notification.is_seen,
62 )
65class Notifications(notifications_pb2_grpc.NotificationsServicer):
66 def GetNotificationSettings(
67 self, request: notifications_pb2.GetNotificationSettingsReq, context: CouchersContext, session: Session
68 ) -> notifications_pb2.GetNotificationSettingsRes:
69 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
70 return notifications_pb2.GetNotificationSettingsRes(
71 do_not_email_enabled=user.do_not_email,
72 groups=get_user_setting_groups(user.id),
73 )
75 def SetNotificationSettings(
76 self, request: notifications_pb2.SetNotificationSettingsReq, context: CouchersContext, session: Session
77 ) -> notifications_pb2.GetNotificationSettingsRes:
78 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
79 user.do_not_email = request.enable_do_not_email
80 if request.enable_do_not_email:
81 user.hosting_status = HostingStatus.cant_host
82 user.meetup_status = MeetupStatus.does_not_want_to_meetup
83 for preference in request.preferences:
84 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None)
85 if not topic_action: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_notification_preference")
87 delivery_types = {t.name for t in NotificationDeliveryType}
88 if preference.delivery_method not in delivery_types: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_delivery_method")
90 delivery_type = NotificationDeliveryType[preference.delivery_method]
91 try:
92 set_preference(session, user.id, topic_action, delivery_type, preference.enabled)
93 except PreferenceNotUserEditableError:
94 context.abort_with_error_code(
95 grpc.StatusCode.FAILED_PRECONDITION, "cannot_edit_that_notification_preference"
96 )
97 return notifications_pb2.GetNotificationSettingsRes(
98 do_not_email_enabled=user.do_not_email,
99 groups=get_user_setting_groups(user.id),
100 )
102 def ListNotifications(
103 self, request: notifications_pb2.ListNotificationsReq, context: CouchersContext, session: Session
104 ) -> notifications_pb2.ListNotificationsRes:
105 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
106 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
107 next_notification_id = int(request.page_token) if request.page_token else 2**50
108 notifications = (
109 session.execute(
110 select(Notification)
111 .where(Notification.user_id == context.user_id)
112 .where(Notification.id <= next_notification_id)
113 .where(or_(to_bool(request.only_unread == False), Notification.is_seen == False))
114 .where(
115 Notification.topic_action.in_(
116 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push)
117 )
118 )
119 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
120 .order_by(Notification.id.desc())
121 .limit(page_size + 1)
122 )
123 .scalars()
124 .all()
125 )
126 return notifications_pb2.ListNotificationsRes(
127 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]],
128 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None,
129 )
131 def MarkNotificationSeen(
132 self, request: notifications_pb2.MarkNotificationSeenReq, context: CouchersContext, session: Session
133 ) -> empty_pb2.Empty:
134 notification = (
135 session.execute(
136 select(Notification)
137 .where(Notification.user_id == context.user_id)
138 .where(Notification.id == request.notification_id)
139 )
140 .scalars()
141 .one_or_none()
142 )
143 if not notification: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found")
145 notification.is_seen = request.set_seen
146 return empty_pb2.Empty()
148 def MarkAllNotificationsSeen(
149 self, request: notifications_pb2.MarkAllNotificationsSeenReq, context: CouchersContext, session: Session
150 ) -> empty_pb2.Empty:
151 session.execute(
152 update(Notification)
153 .values(is_seen=True)
154 .where(Notification.user_id == context.user_id)
155 .where(Notification.id <= request.latest_notification_id)
156 )
157 return empty_pb2.Empty()
159 def GetVapidPublicKey(
160 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
161 ) -> notifications_pb2.GetVapidPublicKeyRes:
162 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
165 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key())
167 def RegisterPushNotificationSubscription(
168 self,
169 request: notifications_pb2.RegisterPushNotificationSubscriptionReq,
170 context: CouchersContext,
171 session: Session,
172 ) -> empty_pb2.Empty:
173 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
176 data = json.loads(request.full_subscription_json)
177 subscription = PushNotificationSubscription(
178 user_id=context.user_id,
179 platform=PushNotificationPlatform.web_push,
180 endpoint=data["endpoint"],
181 p256dh_key=decode_key(data["keys"]["p256dh"]),
182 auth_key=decode_key(data["keys"]["auth"]),
183 full_subscription_info=request.full_subscription_json,
184 user_agent=request.user_agent,
185 )
186 session.add(subscription)
187 session.flush()
188 push_to_subscription(
189 session,
190 push_notification_subscription_id=subscription.id,
191 user_id=context.user_id,
192 topic_action="adhoc:setup",
193 content=PushNotificationContent(
194 title="Push notifications test",
195 ios_title="Push Notifications Test",
196 body="Hi, thanks for enabling push notifications!",
197 ),
198 )
200 return empty_pb2.Empty()
202 def SendTestPushNotification(
203 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
204 ) -> empty_pb2.Empty:
205 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true
206 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
208 push_to_user(
209 session,
210 user_id=context.user_id,
211 topic_action="adhoc:testing",
212 content=PushNotificationContent(
213 title="Push notifications test",
214 ios_title="Push Notifications Test",
215 body="If you see this, then it's working :)",
216 ),
217 )
219 return empty_pb2.Empty()
221 def RegisterMobilePushNotificationSubscription(
222 self,
223 request: notifications_pb2.RegisterMobilePushNotificationSubscriptionReq,
224 context: CouchersContext,
225 session: Session,
226 ) -> empty_pb2.Empty:
227 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
230 # Check for existing subscription with this token
231 existing = session.execute(
232 select(PushNotificationSubscription).where(PushNotificationSubscription.token == request.token)
233 ).scalar_one_or_none()
235 if existing:
236 # Re-enable if disabled
237 if existing.disabled_at < now():
238 existing.disabled_at = DATETIME_INFINITY
239 existing.device_name = request.device_name or existing.device_name
240 if request.device_type: 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was always true
241 existing.device_type = DeviceType[request.device_type]
242 logger.info(f"Re-enabled mobile push sub {existing.id} for user {context.user_id}")
243 return empty_pb2.Empty()
245 # Parse device_type if provided
246 device_type = DeviceType[request.device_type] if request.device_type else None
248 subscription = PushNotificationSubscription(
249 user_id=context.user_id,
250 platform=PushNotificationPlatform.expo,
251 token=request.token,
252 device_name=request.device_name if request.device_name else None,
253 device_type=device_type,
254 )
255 session.add(subscription)
256 session.flush()
258 loc_context = LocalizationContext(locale=context.ui_language_preference or "en", timezone=ZoneInfo("Etc/UTC"))
259 push_content = render_adhoc_push_notification("push_enabled", loc_context)
260 push_to_subscription(
261 session,
262 push_notification_subscription_id=subscription.id,
263 user_id=context.user_id,
264 topic_action="adhoc:push_enabled",
265 content=push_content,
266 )
268 return empty_pb2.Empty()
270 def SendTestMobilePushNotification(
271 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
272 ) -> empty_pb2.Empty:
273 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
276 push_to_user(
277 session,
278 user_id=context.user_id,
279 topic_action="adhoc:testing",
280 content=PushNotificationContent(
281 title="Mobile notifications test",
282 ios_title="Mobile Notifications Test",
283 body="If you see this on your phone, everything is wired up correctly 🎉",
284 ),
285 )
287 return empty_pb2.Empty()
289 def SendDevPushNotification(
290 self, request: notifications_pb2.SendDevPushNotificationReq, context: CouchersContext, session: Session
291 ) -> empty_pb2.Empty:
292 if not config["ENABLE_DEV_APIS"]:
293 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled")
295 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
296 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
298 push_to_user(
299 session,
300 user_id=context.user_id,
301 topic_action="adhoc:testing",
302 content=PushNotificationContent(
303 title=request.title,
304 ios_title=request.title,
305 body=request.body,
306 action_url=request.url or None,
307 icon_url=request.icon or None,
308 ),
309 key=request.key or None,
310 ttl=request.ttl,
311 )
313 return empty_pb2.Empty()
315 def DebugRedeliverPushNotification(
316 self,
317 request: notifications_pb2.DebugRedeliverPushNotificationReq,
318 context: CouchersContext,
319 session: Session,
320 ) -> empty_pb2.Empty:
321 if not config["ENABLE_DEV_APIS"]:
322 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled")
324 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
325 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
327 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
329 notification = session.execute(
330 select(Notification)
331 .where(Notification.id == request.notification_id)
332 .where(Notification.user_id == context.user_id)
333 ).scalar_one_or_none()
335 if not notification:
336 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found")
338 push_to_user(
339 session,
340 user_id=context.user_id,
341 topic_action=notification.topic_action.display,
342 content=render_push_notification(notification, LocalizationContext.from_user(user)),
343 key=notification.key,
344 )
346 return empty_pb2.Empty()