Coverage for src/couchers/servicers/notifications.py: 92%

104 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-06 23:17 +0000

1import functools 

2import json 

3import logging 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import or_ 

8 

9from couchers.config import config 

10from couchers.constants import DATETIME_INFINITY 

11from couchers.models import ( 

12 DeviceType, 

13 HostingStatus, 

14 MeetupStatus, 

15 Notification, 

16 NotificationDeliveryType, 

17 PushNotificationPlatform, 

18 PushNotificationSubscription, 

19 User, 

20) 

21from couchers.notifications.push import push_to_subscription, push_to_user 

22from couchers.notifications.render import render_notification 

23from couchers.notifications.settings import ( 

24 PreferenceNotUserEditableError, 

25 get_topic_actions_by_delivery_type, 

26 get_user_setting_groups, 

27 set_preference, 

28) 

29from couchers.notifications.utils import enum_from_topic_action 

30from couchers.notifications.web_push_api import decode_key, get_vapid_public_key_from_private_key 

31from couchers.proto import notifications_pb2, notifications_pb2_grpc 

32from couchers.sql import couchers_select as select 

33from couchers.utils import Timestamp_from_datetime, now 

34 

35logger = logging.getLogger(__name__) 

36MAX_PAGINATION_LENGTH = 100 

37 

38 

39@functools.cache 

40def get_vapid_public_key() -> str: 

41 return get_vapid_public_key_from_private_key(config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"]) 

42 

43 

44def notification_to_pb(user, notification: Notification): 

45 rendered = render_notification(user, notification) 

46 return notifications_pb2.Notification( 

47 notification_id=notification.id, 

48 created=Timestamp_from_datetime(notification.created), 

49 topic=notification.topic_action.topic, 

50 action=notification.topic_action.action, 

51 key=notification.key, 

52 title=rendered.push_title, 

53 body=rendered.push_body, 

54 icon=rendered.push_icon, 

55 url=rendered.push_url, 

56 is_seen=notification.is_seen, 

57 ) 

58 

59 

60class Notifications(notifications_pb2_grpc.NotificationsServicer): 

61 def GetNotificationSettings(self, request, context, session): 

62 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

63 return notifications_pb2.GetNotificationSettingsRes( 

64 do_not_email_enabled=user.do_not_email, 

65 groups=get_user_setting_groups(user.id), 

66 ) 

67 

68 def SetNotificationSettings(self, request, context, session): 

69 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

70 user.do_not_email = request.enable_do_not_email 

71 if request.enable_do_not_email: 

72 user.hosting_status = HostingStatus.cant_host 

73 user.meetup_status = MeetupStatus.does_not_want_to_meetup 

74 for preference in request.preferences: 

75 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None) 

76 if not topic_action: 

77 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_notification_preference") 

78 delivery_types = {t.name for t in NotificationDeliveryType} 

79 if preference.delivery_method not in delivery_types: 

80 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_delivery_method") 

81 delivery_type = NotificationDeliveryType[preference.delivery_method] 

82 try: 

83 set_preference(session, user.id, topic_action, delivery_type, preference.enabled) 

84 except PreferenceNotUserEditableError: 

85 context.abort_with_error_code( 

86 grpc.StatusCode.FAILED_PRECONDITION, "cannot_edit_that_notification_preference" 

87 ) 

88 return notifications_pb2.GetNotificationSettingsRes( 

89 do_not_email_enabled=user.do_not_email, 

90 groups=get_user_setting_groups(user.id), 

91 ) 

92 

93 def ListNotifications(self, request, context, session): 

94 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

95 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

96 next_notification_id = int(request.page_token) if request.page_token else 2**50 

97 notifications = ( 

98 session.execute( 

99 select(Notification) 

100 .where(Notification.user_id == context.user_id) 

101 .where(Notification.id <= next_notification_id) 

102 .where(or_(request.only_unread == False, Notification.is_seen == False)) 

103 .where( 

104 Notification.topic_action.in_( 

105 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push) 

106 ) 

107 ) 

108 .order_by(Notification.id.desc()) 

109 .limit(page_size + 1) 

110 ) 

111 .scalars() 

112 .all() 

113 ) 

114 return notifications_pb2.ListNotificationsRes( 

115 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]], 

116 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None, 

117 ) 

118 

119 def MarkNotificationSeen(self, request, context, session): 

120 notification = ( 

121 session.execute( 

122 select(Notification) 

123 .where(Notification.user_id == context.user_id) 

124 .where(Notification.id == request.notification_id) 

125 ) 

126 .scalars() 

127 .one_or_none() 

128 ) 

129 if not notification: 

130 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found") 

131 notification.is_seen = request.set_seen 

132 return empty_pb2.Empty() 

133 

134 def MarkAllNotificationsSeen(self, request, context, session): 

135 session.execute( 

136 Notification.__table__.update() 

137 .values(is_seen=True) 

138 .where(Notification.user_id == context.user_id) 

139 .where(Notification.id <= request.latest_notification_id) 

140 ) 

141 return empty_pb2.Empty() 

142 

143 def GetVapidPublicKey(self, request, context, session): 

144 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

145 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

146 

147 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key()) 

148 

149 def RegisterPushNotificationSubscription(self, request, context, session): 

150 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

151 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

152 

153 data = json.loads(request.full_subscription_json) 

154 subscription = PushNotificationSubscription( 

155 user_id=context.user_id, 

156 platform=PushNotificationPlatform.web_push, 

157 endpoint=data["endpoint"], 

158 p256dh_key=decode_key(data["keys"]["p256dh"]), 

159 auth_key=decode_key(data["keys"]["auth"]), 

160 full_subscription_info=request.full_subscription_json, 

161 user_agent=request.user_agent, 

162 ) 

163 session.add(subscription) 

164 session.flush() 

165 push_to_subscription( 

166 session, 

167 push_notification_subscription_id=subscription.id, 

168 user_id=context.user_id, 

169 topic_action="adhoc:setup", 

170 title="Checking push notifications work!", 

171 body="Hi, thanks for enabling push notifications!", 

172 ) 

173 

174 return empty_pb2.Empty() 

175 

176 def SendTestPushNotification(self, request, context, session): 

177 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

178 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

179 

180 push_to_user( 

181 session, 

182 user_id=context.user_id, 

183 topic_action="adhoc:testing", 

184 title="Checking push notifications work!", 

185 body="If you see this, then it's working :)", 

186 ) 

187 

188 return empty_pb2.Empty() 

189 

190 def RegisterMobilePushNotificationSubscription(self, request, context, session): 

191 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

192 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

193 

194 # Check for existing subscription with this token 

195 existing = session.execute( 

196 select(PushNotificationSubscription).where(PushNotificationSubscription.token == request.token) 

197 ).scalar_one_or_none() 

198 

199 if existing: 

200 # Re-enable if disabled 

201 if existing.disabled_at < now(): 

202 existing.disabled_at = DATETIME_INFINITY 

203 existing.device_name = request.device_name or existing.device_name 

204 if request.device_type: 

205 existing.device_type = DeviceType[request.device_type] 

206 logger.info(f"Re-enabled mobile push sub {existing.id} for user {context.user_id}") 

207 return empty_pb2.Empty() 

208 

209 # Parse device_type if provided 

210 device_type = DeviceType[request.device_type] if request.device_type else None 

211 

212 subscription = PushNotificationSubscription( 

213 user_id=context.user_id, 

214 platform=PushNotificationPlatform.expo, 

215 token=request.token, 

216 device_name=request.device_name if request.device_name else None, 

217 device_type=device_type, 

218 ) 

219 session.add(subscription) 

220 session.flush() 

221 

222 push_to_subscription( 

223 session, 

224 push_notification_subscription_id=subscription.id, 

225 user_id=context.user_id, 

226 topic_action="adhoc:setup", 

227 title="Push notifications enabled!", 

228 body="You'll now receive notifications on this device.", 

229 ) 

230 

231 return empty_pb2.Empty() 

232 

233 def SendTestMobilePushNotification(self, request, context, session): 

234 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

235 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

236 

237 push_to_user( 

238 session, 

239 user_id=context.user_id, 

240 topic_action="adhoc:testing", 

241 title="Checking mobile push notifications work!", 

242 body="If you see this on your phone, everything is wired up correctly 🎉", 

243 ) 

244 

245 return empty_pb2.Empty()