Coverage for src/couchers/servicers/notifications.py: 93%
67 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-22 06:42 +0000
1import json
2import logging
4import grpc
5from google.protobuf import empty_pb2
7from couchers import errors
8from couchers.config import config
9from couchers.models import (
10 HostingStatus,
11 MeetupStatus,
12 Notification,
13 NotificationDeliveryType,
14 PushNotificationSubscription,
15 User,
16)
17from couchers.notifications.push import get_vapid_public_key, push_to_subscription, push_to_user
18from couchers.notifications.push_api import decode_key
19from couchers.notifications.render import render_notification
20from couchers.notifications.settings import PreferenceNotUserEditableError, get_user_setting_groups, set_preference
21from couchers.notifications.utils import enum_from_topic_action
22from couchers.sql import couchers_select as select
23from couchers.utils import Timestamp_from_datetime
24from proto import notifications_pb2, notifications_pb2_grpc
26logger = logging.getLogger(__name__)
27MAX_PAGINATION_LENGTH = 100
30def notification_to_pb(user, notification: Notification):
31 rendered = render_notification(user, notification)
32 return notifications_pb2.Notification(
33 notification_id=notification.id,
34 created=Timestamp_from_datetime(notification.created),
35 topic=notification.topic_action.topic,
36 action=notification.topic_action.action,
37 key=notification.key,
38 title=rendered.push_title,
39 body=rendered.push_body,
40 icon=rendered.push_icon,
41 url=rendered.push_url,
42 )
45class Notifications(notifications_pb2_grpc.NotificationsServicer):
46 def GetNotificationSettings(self, request, context, session):
47 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
48 return notifications_pb2.GetNotificationSettingsRes(
49 do_not_email_enabled=user.do_not_email,
50 groups=get_user_setting_groups(user.id),
51 )
53 def SetNotificationSettings(self, request, context, session):
54 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
55 user.do_not_email = request.enable_do_not_email
56 if request.enable_do_not_email:
57 user.hosting_status = HostingStatus.cant_host
58 user.meetup_status = MeetupStatus.does_not_want_to_meetup
59 for preference in request.preferences:
60 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None)
61 if not topic_action:
62 context.abort(grpc.StatusCode.NOT_FOUND, errors.INVALID_NOTIFICATION_PREFERENCE)
63 delivery_types = {t.name for t in NotificationDeliveryType}
64 if preference.delivery_method not in delivery_types:
65 context.abort(grpc.StatusCode.NOT_FOUND, errors.INVALID_DELIVERY_METHOD)
66 delivery_type = NotificationDeliveryType[preference.delivery_method]
67 try:
68 set_preference(session, user.id, topic_action, delivery_type, preference.enabled)
69 except PreferenceNotUserEditableError:
70 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_EDIT_THAT_NOTIFICATION_PREFERENCE)
71 return notifications_pb2.GetNotificationSettingsRes(
72 do_not_email_enabled=user.do_not_email,
73 groups=get_user_setting_groups(user.id),
74 )
76 def ListNotifications(self, request, context, session):
77 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
78 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
79 next_notification_id = int(request.page_token) if request.page_token else 2**50
80 notifications = (
81 session.execute(
82 select(Notification)
83 .where(Notification.user_id == context.user_id)
84 .where(Notification.id <= next_notification_id)
85 .order_by(Notification.id.desc())
86 .limit(page_size + 1)
87 )
88 .scalars()
89 .all()
90 )
91 return notifications_pb2.ListNotificationsRes(
92 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]],
93 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None,
94 )
96 def GetVapidPublicKey(self, request, context, session):
97 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
98 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED)
100 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key())
102 def RegisterPushNotificationSubscription(self, request, context, session):
103 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
104 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED)
106 data = json.loads(request.full_subscription_json)
107 subscription = PushNotificationSubscription(
108 user_id=context.user_id,
109 endpoint=data["endpoint"],
110 p256dh_key=decode_key(data["keys"]["p256dh"]),
111 auth_key=decode_key(data["keys"]["auth"]),
112 full_subscription_info=request.full_subscription_json,
113 user_agent=request.user_agent,
114 )
115 session.add(subscription)
116 session.flush()
117 push_to_subscription(
118 session,
119 push_notification_subscription_id=subscription.id,
120 user_id=context.user_id,
121 topic_action="adhoc:setup",
122 title="Checking push notifications work!",
123 body="Hi, thanks for enabling push notifications!",
124 )
126 return empty_pb2.Empty()
128 def SendTestPushNotification(self, request, context, session):
129 if not config["PUSH_NOTIFICATIONS_ENABLED"]:
130 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED)
132 push_to_user(
133 session,
134 user_id=context.user_id,
135 topic_action="adhoc:testing",
136 title="Checking push notifications work!",
137 body="If you see this, then it's working :)",
138 )
140 return empty_pb2.Empty()