Coverage for app / backend / src / couchers / servicers / notifications.py: 90%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import functools
2import json
3import logging
5import grpc
6from google.protobuf import empty_pb2
7from sqlalchemy import select, update
8from sqlalchemy.orm import Session
9from sqlalchemy.sql import or_
11from couchers.config import config
12from couchers.constants import DATETIME_INFINITY
13from couchers.context import CouchersContext
14from couchers.i18n import LocalizationContext
15from couchers.models import (
16 DeviceType,
17 HostingStatus,
18 MeetupStatus,
19 Notification,
20 NotificationDeliveryType,
21 PushNotificationPlatform,
22 PushNotificationSubscription,
23 User,
24)
25from couchers.notifications.push import PushNotificationContent, push_to_subscription, push_to_user
26from couchers.notifications.render_push import render_adhoc_push_notification, render_push_notification
27from couchers.notifications.settings import (
28 PreferenceNotUserEditableError,
29 get_topic_actions_by_delivery_type,
30 get_user_setting_groups,
31 set_preference,
32)
33from couchers.notifications.utils import enum_from_topic_action
34from couchers.notifications.web_push_api import decode_key, get_vapid_public_key_from_private_key
35from couchers.proto import notifications_pb2, notifications_pb2_grpc
36from couchers.sql import moderation_state_column_visible, to_bool
37from couchers.utils import Timestamp_from_datetime, now
39logger = logging.getLogger(__name__)
40MAX_PAGINATION_LENGTH = 100
43@functools.cache
44def get_vapid_public_key() -> str:
45 return get_vapid_public_key_from_private_key(config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"])
48def notification_to_pb(user: User, notification: Notification) -> notifications_pb2.Notification:
49 content = render_push_notification(notification, LocalizationContext.from_user(user))
50 return notifications_pb2.Notification(
51 notification_id=notification.id,
52 created=Timestamp_from_datetime(notification.created),
53 topic=notification.topic_action.topic,
54 action=notification.topic_action.action,
55 key=notification.key,
56 title=content.title,
57 body=content.body,
58 icon=content.icon_url,
59 url=content.action_url,
60 is_seen=notification.is_seen,
61 )
64class Notifications(notifications_pb2_grpc.NotificationsServicer):
65 def GetNotificationSettings(
66 self, request: notifications_pb2.GetNotificationSettingsReq, context: CouchersContext, session: Session
67 ) -> notifications_pb2.GetNotificationSettingsRes:
68 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
69 return notifications_pb2.GetNotificationSettingsRes(
70 do_not_email_enabled=user.do_not_email,
71 groups=get_user_setting_groups(user.id, context.localization),
72 )
74 def SetNotificationSettings(
75 self, request: notifications_pb2.SetNotificationSettingsReq, context: CouchersContext, session: Session
76 ) -> notifications_pb2.GetNotificationSettingsRes:
77 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
78 user.do_not_email = request.enable_do_not_email
79 if request.enable_do_not_email:
80 user.hosting_status = HostingStatus.cant_host
81 user.meetup_status = MeetupStatus.does_not_want_to_meetup
82 for preference in request.preferences:
83 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None)
84 if not topic_action: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_notification_preference")
86 delivery_types = {t.name for t in NotificationDeliveryType}
87 if preference.delivery_method not in delivery_types: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_delivery_method")
89 delivery_type = NotificationDeliveryType[preference.delivery_method]
90 try:
91 set_preference(session, user.id, topic_action, delivery_type, preference.enabled)
92 except PreferenceNotUserEditableError:
93 context.abort_with_error_code(
94 grpc.StatusCode.FAILED_PRECONDITION, "cannot_edit_that_notification_preference"
95 )
96 return notifications_pb2.GetNotificationSettingsRes(
97 do_not_email_enabled=user.do_not_email,
98 groups=get_user_setting_groups(user.id, context.localization),
99 )
101 def ListNotifications(
102 self, request: notifications_pb2.ListNotificationsReq, context: CouchersContext, session: Session
103 ) -> notifications_pb2.ListNotificationsRes:
104 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
105 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
106 next_notification_id = int(request.page_token) if request.page_token else 2**50
107 notifications = (
108 session.execute(
109 select(Notification)
110 .where(Notification.user_id == context.user_id)
111 .where(Notification.id <= next_notification_id)
112 .where(or_(to_bool(request.only_unread == False), Notification.is_seen == False))
113 .where(
114 Notification.topic_action.in_(
115 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push)
116 )
117 )
118 .where(moderation_state_column_visible(context, Notification.moderation_state_id))
119 .order_by(Notification.id.desc())
120 .limit(page_size + 1)
121 )
122 .scalars()
123 .all()
124 )
125 return notifications_pb2.ListNotificationsRes(
126 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]],
127 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None,
128 )
130 def MarkNotificationSeen(
131 self, request: notifications_pb2.MarkNotificationSeenReq, context: CouchersContext, session: Session
132 ) -> empty_pb2.Empty:
133 notification = (
134 session.execute(
135 select(Notification)
136 .where(Notification.user_id == context.user_id)
137 .where(Notification.id == request.notification_id)
138 )
139 .scalars()
140 .one_or_none()
141 )
142 if not notification: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found")
144 notification.is_seen = request.set_seen
145 return empty_pb2.Empty()
147 def MarkAllNotificationsSeen(
148 self, request: notifications_pb2.MarkAllNotificationsSeenReq, context: CouchersContext, session: Session
149 ) -> empty_pb2.Empty:
150 session.execute(
151 update(Notification)
152 .values(is_seen=True)
153 .where(Notification.user_id == context.user_id)
154 .where(Notification.id <= request.latest_notification_id)
155 )
156 return empty_pb2.Empty()
158 def GetVapidPublicKey(
159 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
160 ) -> notifications_pb2.GetVapidPublicKeyRes:
161 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
164 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key())
166 def RegisterPushNotificationSubscription(
167 self,
168 request: notifications_pb2.RegisterPushNotificationSubscriptionReq,
169 context: CouchersContext,
170 session: Session,
171 ) -> empty_pb2.Empty:
172 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
175 data = json.loads(request.full_subscription_json)
176 subscription = PushNotificationSubscription(
177 user_id=context.user_id,
178 platform=PushNotificationPlatform.web_push,
179 endpoint=data["endpoint"],
180 p256dh_key=decode_key(data["keys"]["p256dh"]),
181 auth_key=decode_key(data["keys"]["auth"]),
182 full_subscription_info=request.full_subscription_json,
183 user_agent=request.user_agent,
184 )
185 session.add(subscription)
186 session.flush()
187 push_to_subscription(
188 session,
189 push_notification_subscription_id=subscription.id,
190 user_id=context.user_id,
191 topic_action="adhoc:setup",
192 content=PushNotificationContent(
193 title="Push notifications test",
194 ios_title="Push Notifications Test",
195 body="Hi, thanks for enabling push notifications!",
196 ),
197 )
199 return empty_pb2.Empty()
201 def SendTestPushNotification(
202 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
203 ) -> empty_pb2.Empty:
204 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
207 push_to_user(
208 session,
209 user_id=context.user_id,
210 topic_action="adhoc:testing",
211 content=PushNotificationContent(
212 title="Push notifications test",
213 ios_title="Push Notifications Test",
214 body="If you see this, then it's working :)",
215 ),
216 )
218 return empty_pb2.Empty()
220 def RegisterMobilePushNotificationSubscription(
221 self,
222 request: notifications_pb2.RegisterMobilePushNotificationSubscriptionReq,
223 context: CouchersContext,
224 session: Session,
225 ) -> empty_pb2.Empty:
226 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
229 # Check for existing subscription with this token
230 existing = session.execute(
231 select(PushNotificationSubscription).where(PushNotificationSubscription.token == request.token)
232 ).scalar_one_or_none()
234 if existing:
235 # Re-enable if disabled
236 if existing.disabled_at < now():
237 existing.disabled_at = DATETIME_INFINITY
238 existing.device_name = request.device_name or existing.device_name
239 if request.device_type: 239 ↛ 241line 239 didn't jump to line 241 because the condition on line 239 was always true
240 existing.device_type = DeviceType[request.device_type]
241 logger.info(f"Re-enabled mobile push sub {existing.id} for user {context.user_id}")
242 return empty_pb2.Empty()
244 # Parse device_type if provided
245 device_type = DeviceType[request.device_type] if request.device_type else None
247 subscription = PushNotificationSubscription(
248 user_id=context.user_id,
249 platform=PushNotificationPlatform.expo,
250 token=request.token,
251 device_name=request.device_name if request.device_name else None,
252 device_type=device_type,
253 )
254 session.add(subscription)
255 session.flush()
257 push_content = render_adhoc_push_notification("push_enabled", context.localization)
258 push_to_subscription(
259 session,
260 push_notification_subscription_id=subscription.id,
261 user_id=context.user_id,
262 topic_action="adhoc:push_enabled",
263 content=push_content,
264 )
266 return empty_pb2.Empty()
268 def SendTestMobilePushNotification(
269 self, request: empty_pb2.Empty, context: CouchersContext, session: Session
270 ) -> empty_pb2.Empty:
271 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
274 push_to_user(
275 session,
276 user_id=context.user_id,
277 topic_action="adhoc:testing",
278 content=PushNotificationContent(
279 title="Mobile notifications test",
280 ios_title="Mobile Notifications Test",
281 body="If you see this on your phone, everything is wired up correctly 🎉",
282 ),
283 )
285 return empty_pb2.Empty()
287 def SendDevPushNotification(
288 self, request: notifications_pb2.SendDevPushNotificationReq, context: CouchersContext, session: Session
289 ) -> empty_pb2.Empty:
290 if not config["ENABLE_DEV_APIS"]:
291 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled")
293 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
294 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
296 push_to_user(
297 session,
298 user_id=context.user_id,
299 topic_action="adhoc:testing",
300 content=PushNotificationContent(
301 title=request.title,
302 ios_title=request.title,
303 body=request.body,
304 action_url=request.url or None,
305 icon_url=request.icon or None,
306 ),
307 key=request.key or None,
308 ttl=request.ttl,
309 )
311 return empty_pb2.Empty()
313 def DebugRedeliverPushNotification(
314 self,
315 request: notifications_pb2.DebugRedeliverPushNotificationReq,
316 context: CouchersContext,
317 session: Session,
318 ) -> empty_pb2.Empty:
319 if not config["ENABLE_DEV_APIS"]:
320 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled")
322 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
323 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
325 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
327 notification = session.execute(
328 select(Notification)
329 .where(Notification.id == request.notification_id)
330 .where(Notification.user_id == context.user_id)
331 ).scalar_one_or_none()
333 if not notification:
334 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found")
336 push_to_user(
337 session,
338 user_id=context.user_id,
339 topic_action=notification.topic_action.display,
340 content=render_push_notification(notification, LocalizationContext.from_user(user)),
341 key=notification.key,
342 )
344 return empty_pb2.Empty()