Coverage for src/couchers/servicers/notifications.py: 93%

67 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import json 

2import logging 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6 

7from couchers import errors 

8from couchers.config import config 

9from couchers.models import ( 

10 HostingStatus, 

11 MeetupStatus, 

12 Notification, 

13 NotificationDeliveryType, 

14 PushNotificationSubscription, 

15 User, 

16) 

17from couchers.notifications.push import get_vapid_public_key, push_to_subscription, push_to_user 

18from couchers.notifications.push_api import decode_key 

19from couchers.notifications.render import render_notification 

20from couchers.notifications.settings import PreferenceNotUserEditableError, get_user_setting_groups, set_preference 

21from couchers.notifications.utils import enum_from_topic_action 

22from couchers.sql import couchers_select as select 

23from couchers.utils import Timestamp_from_datetime 

24from proto import notifications_pb2, notifications_pb2_grpc 

25 

26logger = logging.getLogger(__name__) 

27MAX_PAGINATION_LENGTH = 100 

28 

29 

30def notification_to_pb(user, notification: Notification): 

31 rendered = render_notification(user, notification) 

32 return notifications_pb2.Notification( 

33 notification_id=notification.id, 

34 created=Timestamp_from_datetime(notification.created), 

35 topic=notification.topic_action.topic, 

36 action=notification.topic_action.action, 

37 key=notification.key, 

38 title=rendered.push_title, 

39 body=rendered.push_body, 

40 icon=rendered.push_icon, 

41 url=rendered.push_url, 

42 ) 

43 

44 

45class Notifications(notifications_pb2_grpc.NotificationsServicer): 

46 def GetNotificationSettings(self, request, context, session): 

47 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

48 return notifications_pb2.GetNotificationSettingsRes( 

49 do_not_email_enabled=user.do_not_email, 

50 groups=get_user_setting_groups(user.id), 

51 ) 

52 

53 def SetNotificationSettings(self, request, context, session): 

54 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

55 user.do_not_email = request.enable_do_not_email 

56 if request.enable_do_not_email: 

57 user.hosting_status = HostingStatus.cant_host 

58 user.meetup_status = MeetupStatus.does_not_want_to_meetup 

59 for preference in request.preferences: 

60 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None) 

61 if not topic_action: 

62 context.abort(grpc.StatusCode.NOT_FOUND, errors.INVALID_NOTIFICATION_PREFERENCE) 

63 delivery_types = {t.name for t in NotificationDeliveryType} 

64 if preference.delivery_method not in delivery_types: 

65 context.abort(grpc.StatusCode.NOT_FOUND, errors.INVALID_DELIVERY_METHOD) 

66 delivery_type = NotificationDeliveryType[preference.delivery_method] 

67 try: 

68 set_preference(session, user.id, topic_action, delivery_type, preference.enabled) 

69 except PreferenceNotUserEditableError: 

70 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_EDIT_THAT_NOTIFICATION_PREFERENCE) 

71 return notifications_pb2.GetNotificationSettingsRes( 

72 do_not_email_enabled=user.do_not_email, 

73 groups=get_user_setting_groups(user.id), 

74 ) 

75 

76 def ListNotifications(self, request, context, session): 

77 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

78 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

79 next_notification_id = int(request.page_token) if request.page_token else 2**50 

80 notifications = ( 

81 session.execute( 

82 select(Notification) 

83 .where(Notification.user_id == context.user_id) 

84 .where(Notification.id <= next_notification_id) 

85 .order_by(Notification.id.desc()) 

86 .limit(page_size + 1) 

87 ) 

88 .scalars() 

89 .all() 

90 ) 

91 return notifications_pb2.ListNotificationsRes( 

92 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]], 

93 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None, 

94 ) 

95 

96 def GetVapidPublicKey(self, request, context, session): 

97 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

98 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED) 

99 

100 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key()) 

101 

102 def RegisterPushNotificationSubscription(self, request, context, session): 

103 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

104 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED) 

105 

106 data = json.loads(request.full_subscription_json) 

107 subscription = PushNotificationSubscription( 

108 user_id=context.user_id, 

109 endpoint=data["endpoint"], 

110 p256dh_key=decode_key(data["keys"]["p256dh"]), 

111 auth_key=decode_key(data["keys"]["auth"]), 

112 full_subscription_info=request.full_subscription_json, 

113 user_agent=request.user_agent, 

114 ) 

115 session.add(subscription) 

116 session.flush() 

117 push_to_subscription( 

118 session, 

119 push_notification_subscription_id=subscription.id, 

120 user_id=context.user_id, 

121 topic_action="adhoc:setup", 

122 title="Checking push notifications work!", 

123 body="Hi, thanks for enabling push notifications!", 

124 ) 

125 

126 return empty_pb2.Empty() 

127 

128 def SendTestPushNotification(self, request, context, session): 

129 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

130 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED) 

131 

132 push_to_user( 

133 session, 

134 user_id=context.user_id, 

135 topic_action="adhoc:testing", 

136 title="Checking push notifications work!", 

137 body="If you see this, then it's working :)", 

138 ) 

139 

140 return empty_pb2.Empty()