Coverage for src/couchers/servicers/notifications.py: 92%
104 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-06 23:17 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-06 23:17 +0000
1import functools
2import json
3import logging
5import grpc
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import or_
9from couchers.config import config
10from couchers.constants import DATETIME_INFINITY
11from couchers.models import (
12 DeviceType,
13 HostingStatus,
14 MeetupStatus,
15 Notification,
16 NotificationDeliveryType,
17 PushNotificationPlatform,
18 PushNotificationSubscription,
19 User,
20)
21from couchers.notifications.push import push_to_subscription, push_to_user
22from couchers.notifications.render import render_notification
23from couchers.notifications.settings import (
24 PreferenceNotUserEditableError,
25 get_topic_actions_by_delivery_type,
26 get_user_setting_groups,
27 set_preference,
28)
29from couchers.notifications.utils import enum_from_topic_action
30from couchers.notifications.web_push_api import decode_key, get_vapid_public_key_from_private_key
31from couchers.proto import notifications_pb2, notifications_pb2_grpc
32from couchers.sql import couchers_select as select
33from couchers.utils import Timestamp_from_datetime, now
35logger = logging.getLogger(__name__)
36MAX_PAGINATION_LENGTH = 100
39@functools.cache
40def get_vapid_public_key() -> str:
41 return get_vapid_public_key_from_private_key(config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"])
44def notification_to_pb(user, notification: Notification):
45 rendered = render_notification(user, notification)
46 return notifications_pb2.Notification(
47 notification_id=notification.id,
48 created=Timestamp_from_datetime(notification.created),
49 topic=notification.topic_action.topic,
50 action=notification.topic_action.action,
51 key=notification.key,
52 title=rendered.push_title,
53 body=rendered.push_body,
54 icon=rendered.push_icon,
55 url=rendered.push_url,
56 is_seen=notification.is_seen,
57 )
60class Notifications(notifications_pb2_grpc.NotificationsServicer):
61 def GetNotificationSettings(self, request, context, session):
62 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
63 return notifications_pb2.GetNotificationSettingsRes(
64 do_not_email_enabled=user.do_not_email,
65 groups=get_user_setting_groups(user.id),
66 )
68 def SetNotificationSettings(self, request, context, session):
69 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
70 user.do_not_email = request.enable_do_not_email
71 if request.enable_do_not_email:
72 user.hosting_status = HostingStatus.cant_host
73 user.meetup_status = MeetupStatus.does_not_want_to_meetup
74 for preference in request.preferences:
75 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None)
76 if not topic_action:
77 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_notification_preference")
78 delivery_types = {t.name for t in NotificationDeliveryType}
79 if preference.delivery_method not in delivery_types:
80 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_delivery_method")
81 delivery_type = NotificationDeliveryType[preference.delivery_method]
82 try:
83 set_preference(session, user.id, topic_action, delivery_type, preference.enabled)
84 except PreferenceNotUserEditableError:
85 context.abort_with_error_code(
86 grpc.StatusCode.FAILED_PRECONDITION, "cannot_edit_that_notification_preference"
87 )
88 return notifications_pb2.GetNotificationSettingsRes(
89 do_not_email_enabled=user.do_not_email,
90 groups=get_user_setting_groups(user.id),
91 )
93 def ListNotifications(self, request, context, session):
94 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
95 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
96 next_notification_id = int(request.page_token) if request.page_token else 2**50
97 notifications = (
98 session.execute(
99 select(Notification)
100 .where(Notification.user_id == context.user_id)
101 .where(Notification.id <= next_notification_id)
102 .where(or_(request.only_unread == False, Notification.is_seen == False))
103 .where(
104 Notification.topic_action.in_(
105 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push)
106 )
107 )
108 .order_by(Notification.id.desc())
109 .limit(page_size + 1)
110 )
111 .scalars()
112 .all()
113 )
114 return notifications_pb2.ListNotificationsRes(
115 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]],
116 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None,
117 )
119 def MarkNotificationSeen(self, request, context, session):
120 notification = (
121 session.execute(
122 select(Notification)
123 .where(Notification.user_id == context.user_id)
124 .where(Notification.id == request.notification_id)
125 )
126 .scalars()
127 .one_or_none()
128 )
129 if not notification:
130 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found")
131 notification.is_seen = request.set_seen
132 return empty_pb2.Empty()
134 def MarkAllNotificationsSeen(self, request, context, session):
135 session.execute(
136 Notification.__table__.update()
137 .values(is_seen=True)
138 .where(Notification.user_id == context.user_id)
139 .where(Notification.id <= request.latest_notification_id)
140 )
141 return empty_pb2.Empty()
143 def GetVapidPublicKey(self, request, context, session):
144 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
145 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
147 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key())
149 def RegisterPushNotificationSubscription(self, request, context, session):
150 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
151 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
153 data = json.loads(request.full_subscription_json)
154 subscription = PushNotificationSubscription(
155 user_id=context.user_id,
156 platform=PushNotificationPlatform.web_push,
157 endpoint=data["endpoint"],
158 p256dh_key=decode_key(data["keys"]["p256dh"]),
159 auth_key=decode_key(data["keys"]["auth"]),
160 full_subscription_info=request.full_subscription_json,
161 user_agent=request.user_agent,
162 )
163 session.add(subscription)
164 session.flush()
165 push_to_subscription(
166 session,
167 push_notification_subscription_id=subscription.id,
168 user_id=context.user_id,
169 topic_action="adhoc:setup",
170 title="Checking push notifications work!",
171 body="Hi, thanks for enabling push notifications!",
172 )
174 return empty_pb2.Empty()
176 def SendTestPushNotification(self, request, context, session):
177 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
178 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
180 push_to_user(
181 session,
182 user_id=context.user_id,
183 topic_action="adhoc:testing",
184 title="Checking push notifications work!",
185 body="If you see this, then it's working :)",
186 )
188 return empty_pb2.Empty()
190 def RegisterMobilePushNotificationSubscription(self, request, context, session):
191 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
192 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
194 # Check for existing subscription with this token
195 existing = session.execute(
196 select(PushNotificationSubscription).where(PushNotificationSubscription.token == request.token)
197 ).scalar_one_or_none()
199 if existing:
200 # Re-enable if disabled
201 if existing.disabled_at < now():
202 existing.disabled_at = DATETIME_INFINITY
203 existing.device_name = request.device_name or existing.device_name
204 if request.device_type:
205 existing.device_type = DeviceType[request.device_type]
206 logger.info(f"Re-enabled mobile push sub {existing.id} for user {context.user_id}")
207 return empty_pb2.Empty()
209 # Parse device_type if provided
210 device_type = DeviceType[request.device_type] if request.device_type else None
212 subscription = PushNotificationSubscription(
213 user_id=context.user_id,
214 platform=PushNotificationPlatform.expo,
215 token=request.token,
216 device_name=request.device_name if request.device_name else None,
217 device_type=device_type,
218 )
219 session.add(subscription)
220 session.flush()
222 push_to_subscription(
223 session,
224 push_notification_subscription_id=subscription.id,
225 user_id=context.user_id,
226 topic_action="adhoc:setup",
227 title="Push notifications enabled!",
228 body="You'll now receive notifications on this device.",
229 )
231 return empty_pb2.Empty()
233 def SendTestMobilePushNotification(self, request, context, session):
234 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
235 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled")
237 push_to_user(
238 session,
239 user_id=context.user_id,
240 topic_action="adhoc:testing",
241 title="Checking mobile push notifications work!",
242 body="If you see this on your phone, everything is wired up correctly 🎉",
243 )
245 return empty_pb2.Empty()