Coverage for app / backend / src / couchers / servicers / notifications.py: 90%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import functools 

2import json 

3import logging 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from sqlalchemy import select, update 

8from sqlalchemy.orm import Session 

9from sqlalchemy.sql import or_ 

10 

11from couchers.config import config 

12from couchers.constants import DATETIME_INFINITY 

13from couchers.context import CouchersContext 

14from couchers.i18n import LocalizationContext 

15from couchers.models import ( 

16 DeviceType, 

17 HostingStatus, 

18 MeetupStatus, 

19 Notification, 

20 NotificationDeliveryType, 

21 PushNotificationPlatform, 

22 PushNotificationSubscription, 

23 User, 

24) 

25from couchers.notifications.push import PushNotificationContent, push_to_subscription, push_to_user 

26from couchers.notifications.render_push import render_adhoc_push_notification, render_push_notification 

27from couchers.notifications.settings import ( 

28 PreferenceNotUserEditableError, 

29 get_topic_actions_by_delivery_type, 

30 get_user_setting_groups, 

31 set_preference, 

32) 

33from couchers.notifications.utils import enum_from_topic_action 

34from couchers.notifications.web_push_api import decode_key, get_vapid_public_key_from_private_key 

35from couchers.proto import notifications_pb2, notifications_pb2_grpc 

36from couchers.sql import moderation_state_column_visible, to_bool 

37from couchers.utils import Timestamp_from_datetime, now 

38 

39logger = logging.getLogger(__name__) 

40MAX_PAGINATION_LENGTH = 100 

41 

42 

43@functools.cache 

44def get_vapid_public_key() -> str: 

45 return get_vapid_public_key_from_private_key(config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"]) 

46 

47 

48def notification_to_pb(user: User, notification: Notification) -> notifications_pb2.Notification: 

49 content = render_push_notification(notification, LocalizationContext.from_user(user)) 

50 return notifications_pb2.Notification( 

51 notification_id=notification.id, 

52 created=Timestamp_from_datetime(notification.created), 

53 topic=notification.topic_action.topic, 

54 action=notification.topic_action.action, 

55 key=notification.key, 

56 title=content.title, 

57 body=content.body, 

58 icon=content.icon_url, 

59 url=content.action_url, 

60 is_seen=notification.is_seen, 

61 ) 

62 

63 

64class Notifications(notifications_pb2_grpc.NotificationsServicer): 

65 def GetNotificationSettings( 

66 self, request: notifications_pb2.GetNotificationSettingsReq, context: CouchersContext, session: Session 

67 ) -> notifications_pb2.GetNotificationSettingsRes: 

68 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

69 return notifications_pb2.GetNotificationSettingsRes( 

70 do_not_email_enabled=user.do_not_email, 

71 groups=get_user_setting_groups(user.id, context.localization), 

72 ) 

73 

74 def SetNotificationSettings( 

75 self, request: notifications_pb2.SetNotificationSettingsReq, context: CouchersContext, session: Session 

76 ) -> notifications_pb2.GetNotificationSettingsRes: 

77 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

78 user.do_not_email = request.enable_do_not_email 

79 if request.enable_do_not_email: 

80 user.hosting_status = HostingStatus.cant_host 

81 user.meetup_status = MeetupStatus.does_not_want_to_meetup 

82 for preference in request.preferences: 

83 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None) 

84 if not topic_action: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_notification_preference") 

86 delivery_types = {t.name for t in NotificationDeliveryType} 

87 if preference.delivery_method not in delivery_types: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_delivery_method") 

89 delivery_type = NotificationDeliveryType[preference.delivery_method] 

90 try: 

91 set_preference(session, user.id, topic_action, delivery_type, preference.enabled) 

92 except PreferenceNotUserEditableError: 

93 context.abort_with_error_code( 

94 grpc.StatusCode.FAILED_PRECONDITION, "cannot_edit_that_notification_preference" 

95 ) 

96 return notifications_pb2.GetNotificationSettingsRes( 

97 do_not_email_enabled=user.do_not_email, 

98 groups=get_user_setting_groups(user.id, context.localization), 

99 ) 

100 

101 def ListNotifications( 

102 self, request: notifications_pb2.ListNotificationsReq, context: CouchersContext, session: Session 

103 ) -> notifications_pb2.ListNotificationsRes: 

104 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

105 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

106 next_notification_id = int(request.page_token) if request.page_token else 2**50 

107 notifications = ( 

108 session.execute( 

109 select(Notification) 

110 .where(Notification.user_id == context.user_id) 

111 .where(Notification.id <= next_notification_id) 

112 .where(or_(to_bool(request.only_unread == False), Notification.is_seen == False)) 

113 .where( 

114 Notification.topic_action.in_( 

115 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push) 

116 ) 

117 ) 

118 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

119 .order_by(Notification.id.desc()) 

120 .limit(page_size + 1) 

121 ) 

122 .scalars() 

123 .all() 

124 ) 

125 return notifications_pb2.ListNotificationsRes( 

126 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]], 

127 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None, 

128 ) 

129 

130 def MarkNotificationSeen( 

131 self, request: notifications_pb2.MarkNotificationSeenReq, context: CouchersContext, session: Session 

132 ) -> empty_pb2.Empty: 

133 notification = ( 

134 session.execute( 

135 select(Notification) 

136 .where(Notification.user_id == context.user_id) 

137 .where(Notification.id == request.notification_id) 

138 ) 

139 .scalars() 

140 .one_or_none() 

141 ) 

142 if not notification: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found") 

144 notification.is_seen = request.set_seen 

145 return empty_pb2.Empty() 

146 

147 def MarkAllNotificationsSeen( 

148 self, request: notifications_pb2.MarkAllNotificationsSeenReq, context: CouchersContext, session: Session 

149 ) -> empty_pb2.Empty: 

150 session.execute( 

151 update(Notification) 

152 .values(is_seen=True) 

153 .where(Notification.user_id == context.user_id) 

154 .where(Notification.id <= request.latest_notification_id) 

155 ) 

156 return empty_pb2.Empty() 

157 

158 def GetVapidPublicKey( 

159 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

160 ) -> notifications_pb2.GetVapidPublicKeyRes: 

161 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

163 

164 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key()) 

165 

166 def RegisterPushNotificationSubscription( 

167 self, 

168 request: notifications_pb2.RegisterPushNotificationSubscriptionReq, 

169 context: CouchersContext, 

170 session: Session, 

171 ) -> empty_pb2.Empty: 

172 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

174 

175 data = json.loads(request.full_subscription_json) 

176 subscription = PushNotificationSubscription( 

177 user_id=context.user_id, 

178 platform=PushNotificationPlatform.web_push, 

179 endpoint=data["endpoint"], 

180 p256dh_key=decode_key(data["keys"]["p256dh"]), 

181 auth_key=decode_key(data["keys"]["auth"]), 

182 full_subscription_info=request.full_subscription_json, 

183 user_agent=request.user_agent, 

184 ) 

185 session.add(subscription) 

186 session.flush() 

187 push_to_subscription( 

188 session, 

189 push_notification_subscription_id=subscription.id, 

190 user_id=context.user_id, 

191 topic_action="adhoc:setup", 

192 content=PushNotificationContent( 

193 title="Push notifications test", 

194 ios_title="Push Notifications Test", 

195 body="Hi, thanks for enabling push notifications!", 

196 ), 

197 ) 

198 

199 return empty_pb2.Empty() 

200 

201 def SendTestPushNotification( 

202 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

203 ) -> empty_pb2.Empty: 

204 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

206 

207 push_to_user( 

208 session, 

209 user_id=context.user_id, 

210 topic_action="adhoc:testing", 

211 content=PushNotificationContent( 

212 title="Push notifications test", 

213 ios_title="Push Notifications Test", 

214 body="If you see this, then it's working :)", 

215 ), 

216 ) 

217 

218 return empty_pb2.Empty() 

219 

220 def RegisterMobilePushNotificationSubscription( 

221 self, 

222 request: notifications_pb2.RegisterMobilePushNotificationSubscriptionReq, 

223 context: CouchersContext, 

224 session: Session, 

225 ) -> empty_pb2.Empty: 

226 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

228 

229 # Check for existing subscription with this token 

230 existing = session.execute( 

231 select(PushNotificationSubscription).where(PushNotificationSubscription.token == request.token) 

232 ).scalar_one_or_none() 

233 

234 if existing: 

235 # Re-enable if disabled 

236 if existing.disabled_at < now(): 

237 existing.disabled_at = DATETIME_INFINITY 

238 existing.device_name = request.device_name or existing.device_name 

239 if request.device_type: 239 ↛ 241line 239 didn't jump to line 241 because the condition on line 239 was always true

240 existing.device_type = DeviceType[request.device_type] 

241 logger.info(f"Re-enabled mobile push sub {existing.id} for user {context.user_id}") 

242 return empty_pb2.Empty() 

243 

244 # Parse device_type if provided 

245 device_type = DeviceType[request.device_type] if request.device_type else None 

246 

247 subscription = PushNotificationSubscription( 

248 user_id=context.user_id, 

249 platform=PushNotificationPlatform.expo, 

250 token=request.token, 

251 device_name=request.device_name if request.device_name else None, 

252 device_type=device_type, 

253 ) 

254 session.add(subscription) 

255 session.flush() 

256 

257 push_content = render_adhoc_push_notification("push_enabled", context.localization) 

258 push_to_subscription( 

259 session, 

260 push_notification_subscription_id=subscription.id, 

261 user_id=context.user_id, 

262 topic_action="adhoc:push_enabled", 

263 content=push_content, 

264 ) 

265 

266 return empty_pb2.Empty() 

267 

268 def SendTestMobilePushNotification( 

269 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

270 ) -> empty_pb2.Empty: 

271 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

273 

274 push_to_user( 

275 session, 

276 user_id=context.user_id, 

277 topic_action="adhoc:testing", 

278 content=PushNotificationContent( 

279 title="Mobile notifications test", 

280 ios_title="Mobile Notifications Test", 

281 body="If you see this on your phone, everything is wired up correctly 🎉", 

282 ), 

283 ) 

284 

285 return empty_pb2.Empty() 

286 

287 def SendDevPushNotification( 

288 self, request: notifications_pb2.SendDevPushNotificationReq, context: CouchersContext, session: Session 

289 ) -> empty_pb2.Empty: 

290 if not config["ENABLE_DEV_APIS"]: 

291 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled") 

292 

293 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

294 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

295 

296 push_to_user( 

297 session, 

298 user_id=context.user_id, 

299 topic_action="adhoc:testing", 

300 content=PushNotificationContent( 

301 title=request.title, 

302 ios_title=request.title, 

303 body=request.body, 

304 action_url=request.url or None, 

305 icon_url=request.icon or None, 

306 ), 

307 key=request.key or None, 

308 ttl=request.ttl, 

309 ) 

310 

311 return empty_pb2.Empty() 

312 

313 def DebugRedeliverPushNotification( 

314 self, 

315 request: notifications_pb2.DebugRedeliverPushNotificationReq, 

316 context: CouchersContext, 

317 session: Session, 

318 ) -> empty_pb2.Empty: 

319 if not config["ENABLE_DEV_APIS"]: 

320 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled") 

321 

322 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

323 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

324 

325 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

326 

327 notification = session.execute( 

328 select(Notification) 

329 .where(Notification.id == request.notification_id) 

330 .where(Notification.user_id == context.user_id) 

331 ).scalar_one_or_none() 

332 

333 if not notification: 

334 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found") 

335 

336 push_to_user( 

337 session, 

338 user_id=context.user_id, 

339 topic_action=notification.topic_action.display, 

340 content=render_push_notification(notification, LocalizationContext.from_user(user)), 

341 key=notification.key, 

342 ) 

343 

344 return empty_pb2.Empty()