Coverage for app / backend / src / couchers / servicers / notifications.py: 90%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import functools
2import json
3import logging
5import grpc
6from google.protobuf import empty_pb2
7from sqlalchemy import select, update
8from sqlalchemy.orm import Session
9from sqlalchemy.sql import or_
11from couchers.config import config
12from couchers.constants import DATETIME_INFINITY
13from couchers.context import CouchersContext
14from couchers.i18n import LocalizationContext
15from couchers.models import (
16 DeviceType,
17 HostingStatus,
18 MeetupStatus,
19 Notification,
20 NotificationDeliveryType,
21 PushNotificationPlatform,
22 PushNotificationSubscription,
23 User,
24)
25from couchers.notifications.push import PushNotificationContent, push_to_subscription, push_to_user
26from couchers.notifications.render_push import render_adhoc_push_notification, render_push_notification
27from couchers.notifications.send_raw_push_notification import is_known_invalid_endpoint
28from couchers.notifications.settings import (
29 PreferenceNotUserEditableError,
30 get_topic_actions_by_delivery_type,
31 get_user_setting_groups,
32 set_preference,
33)
34from couchers.notifications.utils import enum_from_topic_action
35from couchers.notifications.web_push_api import decode_key, get_vapid_public_key_from_private_key
36from couchers.proto import notifications_pb2, notifications_pb2_grpc
37from couchers.sql import moderation_state_column_visible, to_bool
38from couchers.utils import Timestamp_from_datetime, now
40logger = logging.getLogger(__name__)
41MAX_PAGINATION_LENGTH = 100
44@functools.cache
45def get_vapid_public_key() -> str:
46 return get_vapid_public_key_from_private_key(config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"])
49def notification_to_pb(user: User, notification: Notification) -> notifications_pb2.Notification:
50 content = render_push_notification(notification, LocalizationContext.from_user(user))
51 return notifications_pb2.Notification(
52 notification_id=notification.id,
53 created=Timestamp_from_datetime(notification.created),
54 topic=notification.topic_action.topic,
55 action=notification.topic_action.action,
56 key=notification.key,
57 title=content.title,
58 body=content.body,
59 icon=content.icon_url,
60 url=content.action_url,
61 is_seen=notification.is_seen,
62 )
65class Notifications(notifications_pb2_grpc.NotificationsServicer):
66 def GetNotificationSettings(
67 self, request: notifications_pb2.GetNotificationSettingsReq, context: CouchersContext, session: Session
68 ) -> notifications_pb2.GetNotificationSettingsRes:
69 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
70 return notifications_pb2.GetNotificationSettingsRes(
71 do_not_email_enabled=user.do_not_email,
72 groups=get_user_setting_groups(user.id, context.localization),
73 )
75 def SetNotificationSettings(
76 self, request: notifications_pb2.SetNotificationSettingsReq, context: CouchersContext, session: Session
77 ) -> notifications_pb2.GetNotificationSettingsRes:
78 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
79 user.do_not_email = request.enable_do_not_email
80 if request.enable_do_not_email:
81 user.hosting_status = HostingStatus.cant_host
82 user.meetup_status = MeetupStatus.does_not_want_to_meetup
83 for preference in request.preferences:
84 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None)
85 if not topic_action: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_notification_preference")
87 delivery_types = {t.name for t in NotificationDeliveryType}
88 if preference.delivery_method not in delivery_types: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_delivery_method")
90 delivery_type = NotificationDeliveryType[preference.delivery_method]
91 try:
92 set_preference(session, user.id, topic_action, delivery_type, preference.enabled)
93 except PreferenceNotUserEditableError:
94 context.abort_with_error_code(
95 grpc.StatusCode.FAILED_PRECONDITION, "cannot_edit_that_notification_preference"
96 )
97 return notifications_pb2.GetNotificationSettingsRes(
98 do_not_email_enabled=user.do_not_email,
99 groups=get_user_setting_groups(user.id, context.localization),
100 )
102 def ListNotifications(
103 self, request: notifications_pb2.ListNotificationsReq, context: CouchersContext, session: Session
104 ) -> notifications_pb2.ListNotificationsRes:
105 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
106 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
107 next_notification_id = int(request.page_token) if request.page_token else 2**50
108 notifications = (
109 session.execute(
110 select(Notification)
111 .where(Notification.user_id == context.user_id)
112 .where(Notification.id <= next_notification_id)
113 .where(or_(to_bool(request.only_unread == False), Notification.is_seen == False))
114 .where(
115 Notification.topic_action.in_(
116 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push)
117 )
118 )
119 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
120 .order_by(Notification.id.desc())
121 .limit(page_size + 1)
122 )
123 .scalars()
124 .all()
125 )
126 return notifications_pb2.ListNotificationsRes(
127 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]],
128 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None,
129 )
131 def MarkNotificationSeen(
132 self, request: notifications_pb2.MarkNotificationSeenReq, context: CouchersContext, session: Session
133 ) -> empty_pb2.Empty:
134 notification = (
135 session.execute(
136 select(Notification)
137 .where(Notification.user_id == context.user_id)
138 .where(Notification.id == request.notification_id)
139 )
140 .scalars()
141 .one_or_none()
142 )
143 if not notification: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found")
145 notification.is_seen = request.set_seen
146 return empty_pb2.Empty()
148 def MarkAllNotificationsSeen(
149 self, request: notifications_pb2.MarkAllNotificationsSeenReq, context: CouchersContext, session: Session
150 ) -> empty_pb2.Empty:
151 session.execute(
152 update(Notification)
153 .values(is_seen=True)
154 .where(Notification.user_id == context.user_id)
155 .where(Notification.id <= request.latest_notification_id)
156 )
157 return empty_pb2.Empty()
159 def GetVapidPublicKey(
160 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
161 ) -> notifications_pb2.GetVapidPublicKeyRes:
162 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
165 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key())
167 def RegisterPushNotificationSubscription(
168 self,
169 request: notifications_pb2.RegisterPushNotificationSubscriptionReq,
170 context: CouchersContext,
171 session: Session,
172 ) -> empty_pb2.Empty:
173 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
176 data = json.loads(request.full_subscription_json)
177 if is_known_invalid_endpoint(data["endpoint"]):
178 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_endpoint")
180 subscription = PushNotificationSubscription(
181 user_id=context.user_id,
182 platform=PushNotificationPlatform.web_push,
183 endpoint=data["endpoint"],
184 p256dh_key=decode_key(data["keys"]["p256dh"]),
185 auth_key=decode_key(data["keys"]["auth"]),
186 full_subscription_info=request.full_subscription_json,
187 user_agent=request.user_agent,
188 )
189 session.add(subscription)
190 session.flush()
191 push_to_subscription(
192 session,
193 push_notification_subscription_id=subscription.id,
194 user_id=context.user_id,
195 topic_action="adhoc:setup",
196 content=PushNotificationContent(
197 title="Push notifications test",
198 ios_title="Push Notifications Test",
199 body="Hi, thanks for enabling push notifications!",
200 ),
201 )
203 return empty_pb2.Empty()
205 def SendTestPushNotification(
206 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
207 ) -> empty_pb2.Empty:
208 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
211 push_to_user(
212 session,
213 user_id=context.user_id,
214 topic_action="adhoc:testing",
215 content=PushNotificationContent(
216 title="Push notifications test",
217 ios_title="Push Notifications Test",
218 body="If you see this, then it's working :)",
219 ),
220 )
222 return empty_pb2.Empty()
224 def RegisterMobilePushNotificationSubscription(
225 self,
226 request: notifications_pb2.RegisterMobilePushNotificationSubscriptionReq,
227 context: CouchersContext,
228 session: Session,
229 ) -> empty_pb2.Empty:
230 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
233 # Check for existing subscription with this token
234 existing = session.execute(
235 select(PushNotificationSubscription).where(PushNotificationSubscription.token == request.token)
236 ).scalar_one_or_none()
238 if existing:
239 # Re-enable if disabled
240 if existing.disabled_at < now():
241 existing.disabled_at = DATETIME_INFINITY
242 existing.device_name = request.device_name or existing.device_name
243 if request.device_type: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was always true
244 existing.device_type = DeviceType[request.device_type]
245 logger.info(f"Re-enabled mobile push sub {existing.id} for user {context.user_id}")
246 return empty_pb2.Empty()
248 # Parse device_type if provided
249 device_type = DeviceType[request.device_type] if request.device_type else None
251 subscription = PushNotificationSubscription(
252 user_id=context.user_id,
253 platform=PushNotificationPlatform.expo,
254 token=request.token,
255 device_name=request.device_name if request.device_name else None,
256 device_type=device_type,
257 )
258 session.add(subscription)
259 session.flush()
261 push_content = render_adhoc_push_notification("push_enabled", context.localization)
262 push_to_subscription(
263 session,
264 push_notification_subscription_id=subscription.id,
265 user_id=context.user_id,
266 topic_action="adhoc:push_enabled",
267 content=push_content,
268 )
270 return empty_pb2.Empty()
272 def SendTestMobilePushNotification(
273 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
274 ) -> empty_pb2.Empty:
275 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
278 push_to_user(
279 session,
280 user_id=context.user_id,
281 topic_action="adhoc:testing",
282 content=PushNotificationContent(
283 title="Mobile notifications test",
284 ios_title="Mobile Notifications Test",
285 body="If you see this on your phone, everything is wired up correctly 🎉",
286 ),
287 )
289 return empty_pb2.Empty()
291 def SendDevPushNotification(
292 self, request: notifications_pb2.SendDevPushNotificationReq, context: CouchersContext, session: Session
293 ) -> empty_pb2.Empty:
294 if not config["ENABLE_DEV_APIS"]:
295 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled")
297 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
298 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
300 push_to_user(
301 session,
302 user_id=context.user_id,
303 topic_action="adhoc:testing",
304 content=PushNotificationContent(
305 title=request.title,
306 ios_title=request.title,
307 body=request.body,
308 action_url=request.url or None,
309 icon_url=request.icon or None,
310 ),
311 key=request.key or None,
312 ttl=request.ttl,
313 )
315 return empty_pb2.Empty()
317 def DebugRedeliverPushNotification(
318 self,
319 request: notifications_pb2.DebugRedeliverPushNotificationReq,
320 context: CouchersContext,
321 session: Session,
322 ) -> empty_pb2.Empty:
323 if not config["ENABLE_DEV_APIS"]:
324 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled")
326 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
327 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
329 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
331 notification = session.execute(
332 select(Notification)
333 .where(Notification.id == request.notification_id)
334 .where(Notification.user_id == context.user_id)
335 ).scalar_one_or_none()
337 if not notification:
338 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found")
340 push_to_user(
341 session,
342 user_id=context.user_id,
343 topic_action=notification.topic_action.display,
344 content=render_push_notification(notification, LocalizationContext.from_user(user)),
345 key=notification.key,
346 )
348 return empty_pb2.Empty()