Coverage for src/couchers/servicers/notifications.py: 92%
77 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import json
2import logging
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import or_
8from couchers import errors
9from couchers.config import config
10from couchers.models import (
11 HostingStatus,
12 MeetupStatus,
13 Notification,
14 NotificationDeliveryType,
15 PushNotificationSubscription,
16 User,
17)
18from couchers.notifications.push import get_vapid_public_key, push_to_subscription, push_to_user
19from couchers.notifications.push_api import decode_key
20from couchers.notifications.render import render_notification
21from couchers.notifications.settings import (
22 PreferenceNotUserEditableError,
23 get_topic_actions_by_delivery_type,
24 get_user_setting_groups,
25 set_preference,
26)
27from couchers.notifications.utils import enum_from_topic_action
28from couchers.sql import couchers_select as select
29from couchers.utils import Timestamp_from_datetime
30from proto import notifications_pb2, notifications_pb2_grpc
32logger = logging.getLogger(__name__)
33MAX_PAGINATION_LENGTH = 100
36def notification_to_pb(user, notification: Notification):
37 rendered = render_notification(user, notification)
38 return notifications_pb2.Notification(
39 notification_id=notification.id,
40 created=Timestamp_from_datetime(notification.created),
41 topic=notification.topic_action.topic,
42 action=notification.topic_action.action,
43 key=notification.key,
44 title=rendered.push_title,
45 body=rendered.push_body,
46 icon=rendered.push_icon,
47 url=rendered.push_url,
48 is_seen=notification.is_seen,
49 )
52class Notifications(notifications_pb2_grpc.NotificationsServicer):
53 def GetNotificationSettings(self, request, context, session):
54 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
55 return notifications_pb2.GetNotificationSettingsRes(
56 do_not_email_enabled=user.do_not_email,
57 groups=get_user_setting_groups(user.id),
58 )
60 def SetNotificationSettings(self, request, context, session):
61 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
62 user.do_not_email = request.enable_do_not_email
63 if request.enable_do_not_email:
64 user.hosting_status = HostingStatus.cant_host
65 user.meetup_status = MeetupStatus.does_not_want_to_meetup
66 for preference in request.preferences:
67 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None)
68 if not topic_action:
69 context.abort(grpc.StatusCode.NOT_FOUND, errors.INVALID_NOTIFICATION_PREFERENCE)
70 delivery_types = {t.name for t in NotificationDeliveryType}
71 if preference.delivery_method not in delivery_types:
72 context.abort(grpc.StatusCode.NOT_FOUND, errors.INVALID_DELIVERY_METHOD)
73 delivery_type = NotificationDeliveryType[preference.delivery_method]
74 try:
75 set_preference(session, user.id, topic_action, delivery_type, preference.enabled)
76 except PreferenceNotUserEditableError:
77 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_EDIT_THAT_NOTIFICATION_PREFERENCE)
78 return notifications_pb2.GetNotificationSettingsRes(
79 do_not_email_enabled=user.do_not_email,
80 groups=get_user_setting_groups(user.id),
81 )
83 def ListNotifications(self, request, context, session):
84 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
85 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
86 next_notification_id = int(request.page_token) if request.page_token else 2**50
87 notifications = (
88 session.execute(
89 select(Notification)
90 .where(Notification.user_id == context.user_id)
91 .where(Notification.id <= next_notification_id)
92 .where(or_(request.only_unread == False, Notification.is_seen == False))
93 .where(
94 Notification.topic_action.in_(
95 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push)
96 )
97 )
98 .order_by(Notification.id.desc())
99 .limit(page_size + 1)
100 )
101 .scalars()
102 .all()
103 )
104 return notifications_pb2.ListNotificationsRes(
105 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]],
106 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None,
107 )
109 def MarkNotificationSeen(self, request, context, session):
110 notification = (
111 session.execute(
112 select(Notification)
113 .where(Notification.user_id == context.user_id)
114 .where(Notification.id == request.notification_id)
115 )
116 .scalars()
117 .one_or_none()
118 )
119 if not notification:
120 context.abort(grpc.StatusCode.NOT_FOUND, errors.NOTIFICATION_NOT_FOUND)
121 notification.is_seen = request.set_seen
122 return empty_pb2.Empty()
124 def MarkAllNotificationsSeen(self, request, context, session):
125 session.execute(
126 Notification.__table__.update()
127 .values(is_seen=True)
128 .where(Notification.user_id == context.user_id)
129 .where(Notification.id <= request.latest_notification_id)
130 )
131 return empty_pb2.Empty()
133 def GetVapidPublicKey(self, request, context, session):
134 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
135 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED)
137 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key())
139 def RegisterPushNotificationSubscription(self, request, context, session):
140 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
141 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED)
143 data = json.loads(request.full_subscription_json)
144 subscription = PushNotificationSubscription(
145 user_id=context.user_id,
146 endpoint=data["endpoint"],
147 p256dh_key=decode_key(data["keys"]["p256dh"]),
148 auth_key=decode_key(data["keys"]["auth"]),
149 full_subscription_info=request.full_subscription_json,
150 user_agent=request.user_agent,
151 )
152 session.add(subscription)
153 session.flush()
154 push_to_subscription(
155 session,
156 push_notification_subscription_id=subscription.id,
157 user_id=context.user_id,
158 topic_action="adhoc:setup",
159 title="Checking push notifications work!",
160 body="Hi, thanks for enabling push notifications!",
161 )
163 return empty_pb2.Empty()
165 def SendTestPushNotification(self, request, context, session):
166 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
167 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED)
169 push_to_user(
170 session,
171 user_id=context.user_id,
172 topic_action="adhoc:testing",
173 title="Checking push notifications work!",
174 body="If you see this, then it's working :)",
175 )
177 return empty_pb2.Empty()