Coverage for src/couchers/servicers/notifications.py: 92%

77 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1import json 

2import logging 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import or_ 

7 

8from couchers import errors 

9from couchers.config import config 

10from couchers.models import ( 

11 HostingStatus, 

12 MeetupStatus, 

13 Notification, 

14 NotificationDeliveryType, 

15 PushNotificationSubscription, 

16 User, 

17) 

18from couchers.notifications.push import get_vapid_public_key, push_to_subscription, push_to_user 

19from couchers.notifications.push_api import decode_key 

20from couchers.notifications.render import render_notification 

21from couchers.notifications.settings import ( 

22 PreferenceNotUserEditableError, 

23 get_topic_actions_by_delivery_type, 

24 get_user_setting_groups, 

25 set_preference, 

26) 

27from couchers.notifications.utils import enum_from_topic_action 

28from couchers.sql import couchers_select as select 

29from couchers.utils import Timestamp_from_datetime 

30from proto import notifications_pb2, notifications_pb2_grpc 

31 

32logger = logging.getLogger(__name__) 

33MAX_PAGINATION_LENGTH = 100 

34 

35 

36def notification_to_pb(user, notification: Notification): 

37 rendered = render_notification(user, notification) 

38 return notifications_pb2.Notification( 

39 notification_id=notification.id, 

40 created=Timestamp_from_datetime(notification.created), 

41 topic=notification.topic_action.topic, 

42 action=notification.topic_action.action, 

43 key=notification.key, 

44 title=rendered.push_title, 

45 body=rendered.push_body, 

46 icon=rendered.push_icon, 

47 url=rendered.push_url, 

48 is_seen=notification.is_seen, 

49 ) 

50 

51 

52class Notifications(notifications_pb2_grpc.NotificationsServicer): 

53 def GetNotificationSettings(self, request, context, session): 

54 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

55 return notifications_pb2.GetNotificationSettingsRes( 

56 do_not_email_enabled=user.do_not_email, 

57 groups=get_user_setting_groups(user.id), 

58 ) 

59 

60 def SetNotificationSettings(self, request, context, session): 

61 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

62 user.do_not_email = request.enable_do_not_email 

63 if request.enable_do_not_email: 

64 user.hosting_status = HostingStatus.cant_host 

65 user.meetup_status = MeetupStatus.does_not_want_to_meetup 

66 for preference in request.preferences: 

67 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None) 

68 if not topic_action: 

69 context.abort(grpc.StatusCode.NOT_FOUND, errors.INVALID_NOTIFICATION_PREFERENCE) 

70 delivery_types = {t.name for t in NotificationDeliveryType} 

71 if preference.delivery_method not in delivery_types: 

72 context.abort(grpc.StatusCode.NOT_FOUND, errors.INVALID_DELIVERY_METHOD) 

73 delivery_type = NotificationDeliveryType[preference.delivery_method] 

74 try: 

75 set_preference(session, user.id, topic_action, delivery_type, preference.enabled) 

76 except PreferenceNotUserEditableError: 

77 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_EDIT_THAT_NOTIFICATION_PREFERENCE) 

78 return notifications_pb2.GetNotificationSettingsRes( 

79 do_not_email_enabled=user.do_not_email, 

80 groups=get_user_setting_groups(user.id), 

81 ) 

82 

83 def ListNotifications(self, request, context, session): 

84 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

85 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

86 next_notification_id = int(request.page_token) if request.page_token else 2**50 

87 notifications = ( 

88 session.execute( 

89 select(Notification) 

90 .where(Notification.user_id == context.user_id) 

91 .where(Notification.id <= next_notification_id) 

92 .where(or_(request.only_unread == False, Notification.is_seen == False)) 

93 .where( 

94 Notification.topic_action.in_( 

95 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push) 

96 ) 

97 ) 

98 .order_by(Notification.id.desc()) 

99 .limit(page_size + 1) 

100 ) 

101 .scalars() 

102 .all() 

103 ) 

104 return notifications_pb2.ListNotificationsRes( 

105 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]], 

106 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None, 

107 ) 

108 

109 def MarkNotificationSeen(self, request, context, session): 

110 notification = ( 

111 session.execute( 

112 select(Notification) 

113 .where(Notification.user_id == context.user_id) 

114 .where(Notification.id == request.notification_id) 

115 ) 

116 .scalars() 

117 .one_or_none() 

118 ) 

119 if not notification: 

120 context.abort(grpc.StatusCode.NOT_FOUND, errors.NOTIFICATION_NOT_FOUND) 

121 notification.is_seen = request.set_seen 

122 return empty_pb2.Empty() 

123 

124 def MarkAllNotificationsSeen(self, request, context, session): 

125 session.execute( 

126 Notification.__table__.update() 

127 .values(is_seen=True) 

128 .where(Notification.user_id == context.user_id) 

129 .where(Notification.id <= request.latest_notification_id) 

130 ) 

131 return empty_pb2.Empty() 

132 

133 def GetVapidPublicKey(self, request, context, session): 

134 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

135 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED) 

136 

137 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key()) 

138 

139 def RegisterPushNotificationSubscription(self, request, context, session): 

140 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

141 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED) 

142 

143 data = json.loads(request.full_subscription_json) 

144 subscription = PushNotificationSubscription( 

145 user_id=context.user_id, 

146 endpoint=data["endpoint"], 

147 p256dh_key=decode_key(data["keys"]["p256dh"]), 

148 auth_key=decode_key(data["keys"]["auth"]), 

149 full_subscription_info=request.full_subscription_json, 

150 user_agent=request.user_agent, 

151 ) 

152 session.add(subscription) 

153 session.flush() 

154 push_to_subscription( 

155 session, 

156 push_notification_subscription_id=subscription.id, 

157 user_id=context.user_id, 

158 topic_action="adhoc:setup", 

159 title="Checking push notifications work!", 

160 body="Hi, thanks for enabling push notifications!", 

161 ) 

162 

163 return empty_pb2.Empty() 

164 

165 def SendTestPushNotification(self, request, context, session): 

166 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

167 context.abort(grpc.StatusCode.UNAVAILABLE, errors.PUSH_NOTIFICATIONS_DISABLED) 

168 

169 push_to_user( 

170 session, 

171 user_id=context.user_id, 

172 topic_action="adhoc:testing", 

173 title="Checking push notifications work!", 

174 body="If you see this, then it's working :)", 

175 ) 

176 

177 return empty_pb2.Empty()