Coverage for app / backend / src / couchers / servicers / notifications.py: 90%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import functools 

2import json 

3import logging 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from sqlalchemy import select, update 

8from sqlalchemy.orm import Session 

9from sqlalchemy.sql import or_ 

10 

11from couchers.config import config 

12from couchers.constants import DATETIME_INFINITY 

13from couchers.context import CouchersContext 

14from couchers.i18n import LocalizationContext 

15from couchers.models import ( 

16 DeviceType, 

17 HostingStatus, 

18 MeetupStatus, 

19 Notification, 

20 NotificationDeliveryType, 

21 PushNotificationPlatform, 

22 PushNotificationSubscription, 

23 User, 

24) 

25from couchers.notifications.push import PushNotificationContent, push_to_subscription, push_to_user 

26from couchers.notifications.render_push import render_adhoc_push_notification, render_push_notification 

27from couchers.notifications.send_raw_push_notification import is_known_invalid_endpoint 

28from couchers.notifications.settings import ( 

29 PreferenceNotUserEditableError, 

30 get_topic_actions_by_delivery_type, 

31 get_user_setting_groups, 

32 set_preference, 

33) 

34from couchers.notifications.utils import enum_from_topic_action 

35from couchers.notifications.web_push_api import decode_key, get_vapid_public_key_from_private_key 

36from couchers.proto import notifications_pb2, notifications_pb2_grpc 

37from couchers.sql import moderation_state_column_visible, to_bool 

38from couchers.utils import Timestamp_from_datetime, now 

39 

40logger = logging.getLogger(__name__) 

41MAX_PAGINATION_LENGTH = 100 

42 

43 

44@functools.cache 

45def get_vapid_public_key() -> str: 

46 return get_vapid_public_key_from_private_key(config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"]) 

47 

48 

49def notification_to_pb(user: User, notification: Notification) -> notifications_pb2.Notification: 

50 content = render_push_notification(notification, LocalizationContext.from_user(user)) 

51 return notifications_pb2.Notification( 

52 notification_id=notification.id, 

53 created=Timestamp_from_datetime(notification.created), 

54 topic=notification.topic_action.topic, 

55 action=notification.topic_action.action, 

56 key=notification.key, 

57 title=content.title, 

58 body=content.body, 

59 icon=content.icon_url, 

60 url=content.action_url, 

61 is_seen=notification.is_seen, 

62 ) 

63 

64 

65class Notifications(notifications_pb2_grpc.NotificationsServicer): 

66 def GetNotificationSettings( 

67 self, request: notifications_pb2.GetNotificationSettingsReq, context: CouchersContext, session: Session 

68 ) -> notifications_pb2.GetNotificationSettingsRes: 

69 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

70 return notifications_pb2.GetNotificationSettingsRes( 

71 do_not_email_enabled=user.do_not_email, 

72 groups=get_user_setting_groups(user.id, context.localization), 

73 ) 

74 

75 def SetNotificationSettings( 

76 self, request: notifications_pb2.SetNotificationSettingsReq, context: CouchersContext, session: Session 

77 ) -> notifications_pb2.GetNotificationSettingsRes: 

78 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

79 user.do_not_email = request.enable_do_not_email 

80 if request.enable_do_not_email: 

81 user.hosting_status = HostingStatus.cant_host 

82 user.meetup_status = MeetupStatus.does_not_want_to_meetup 

83 for preference in request.preferences: 

84 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None) 

85 if not topic_action: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_notification_preference") 

87 delivery_types = {t.name for t in NotificationDeliveryType} 

88 if preference.delivery_method not in delivery_types: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_delivery_method") 

90 delivery_type = NotificationDeliveryType[preference.delivery_method] 

91 try: 

92 set_preference(session, user.id, topic_action, delivery_type, preference.enabled) 

93 except PreferenceNotUserEditableError: 

94 context.abort_with_error_code( 

95 grpc.StatusCode.FAILED_PRECONDITION, "cannot_edit_that_notification_preference" 

96 ) 

97 return notifications_pb2.GetNotificationSettingsRes( 

98 do_not_email_enabled=user.do_not_email, 

99 groups=get_user_setting_groups(user.id, context.localization), 

100 ) 

101 

102 def ListNotifications( 

103 self, request: notifications_pb2.ListNotificationsReq, context: CouchersContext, session: Session 

104 ) -> notifications_pb2.ListNotificationsRes: 

105 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

106 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

107 next_notification_id = int(request.page_token) if request.page_token else 2**50 

108 notifications = ( 

109 session.execute( 

110 select(Notification) 

111 .where(Notification.user_id == context.user_id) 

112 .where(Notification.id <= next_notification_id) 

113 .where(or_(to_bool(request.only_unread == False), Notification.is_seen == False)) 

114 .where( 

115 Notification.topic_action.in_( 

116 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push) 

117 ) 

118 ) 

119 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

120 .order_by(Notification.id.desc()) 

121 .limit(page_size + 1) 

122 ) 

123 .scalars() 

124 .all() 

125 ) 

126 return notifications_pb2.ListNotificationsRes( 

127 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]], 

128 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None, 

129 ) 

130 

131 def MarkNotificationSeen( 

132 self, request: notifications_pb2.MarkNotificationSeenReq, context: CouchersContext, session: Session 

133 ) -> empty_pb2.Empty: 

134 notification = ( 

135 session.execute( 

136 select(Notification) 

137 .where(Notification.user_id == context.user_id) 

138 .where(Notification.id == request.notification_id) 

139 ) 

140 .scalars() 

141 .one_or_none() 

142 ) 

143 if not notification: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found") 

145 notification.is_seen = request.set_seen 

146 return empty_pb2.Empty() 

147 

148 def MarkAllNotificationsSeen( 

149 self, request: notifications_pb2.MarkAllNotificationsSeenReq, context: CouchersContext, session: Session 

150 ) -> empty_pb2.Empty: 

151 session.execute( 

152 update(Notification) 

153 .values(is_seen=True) 

154 .where(Notification.user_id == context.user_id) 

155 .where(Notification.id <= request.latest_notification_id) 

156 ) 

157 return empty_pb2.Empty() 

158 

159 def GetVapidPublicKey( 

160 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

161 ) -> notifications_pb2.GetVapidPublicKeyRes: 

162 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

164 

165 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key()) 

166 

167 def RegisterPushNotificationSubscription( 

168 self, 

169 request: notifications_pb2.RegisterPushNotificationSubscriptionReq, 

170 context: CouchersContext, 

171 session: Session, 

172 ) -> empty_pb2.Empty: 

173 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

175 

176 data = json.loads(request.full_subscription_json) 

177 if is_known_invalid_endpoint(data["endpoint"]): 

178 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_endpoint") 

179 

180 subscription = PushNotificationSubscription( 

181 user_id=context.user_id, 

182 platform=PushNotificationPlatform.web_push, 

183 endpoint=data["endpoint"], 

184 p256dh_key=decode_key(data["keys"]["p256dh"]), 

185 auth_key=decode_key(data["keys"]["auth"]), 

186 full_subscription_info=request.full_subscription_json, 

187 user_agent=request.user_agent, 

188 ) 

189 session.add(subscription) 

190 session.flush() 

191 push_to_subscription( 

192 session, 

193 push_notification_subscription_id=subscription.id, 

194 user_id=context.user_id, 

195 topic_action="adhoc:setup", 

196 content=PushNotificationContent( 

197 title="Push notifications test", 

198 ios_title="Push Notifications Test", 

199 body="Hi, thanks for enabling push notifications!", 

200 ), 

201 ) 

202 

203 return empty_pb2.Empty() 

204 

205 def SendTestPushNotification( 

206 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

207 ) -> empty_pb2.Empty: 

208 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

210 

211 push_to_user( 

212 session, 

213 user_id=context.user_id, 

214 topic_action="adhoc:testing", 

215 content=PushNotificationContent( 

216 title="Push notifications test", 

217 ios_title="Push Notifications Test", 

218 body="If you see this, then it's working :)", 

219 ), 

220 ) 

221 

222 return empty_pb2.Empty() 

223 

224 def RegisterMobilePushNotificationSubscription( 

225 self, 

226 request: notifications_pb2.RegisterMobilePushNotificationSubscriptionReq, 

227 context: CouchersContext, 

228 session: Session, 

229 ) -> empty_pb2.Empty: 

230 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

232 

233 # Check for existing subscription with this token 

234 existing = session.execute( 

235 select(PushNotificationSubscription).where(PushNotificationSubscription.token == request.token) 

236 ).scalar_one_or_none() 

237 

238 if existing: 

239 # Re-enable if disabled 

240 if existing.disabled_at < now(): 

241 existing.disabled_at = DATETIME_INFINITY 

242 existing.device_name = request.device_name or existing.device_name 

243 if request.device_type: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was always true

244 existing.device_type = DeviceType[request.device_type] 

245 logger.info(f"Re-enabled mobile push sub {existing.id} for user {context.user_id}") 

246 return empty_pb2.Empty() 

247 

248 # Parse device_type if provided 

249 device_type = DeviceType[request.device_type] if request.device_type else None 

250 

251 subscription = PushNotificationSubscription( 

252 user_id=context.user_id, 

253 platform=PushNotificationPlatform.expo, 

254 token=request.token, 

255 device_name=request.device_name if request.device_name else None, 

256 device_type=device_type, 

257 ) 

258 session.add(subscription) 

259 session.flush() 

260 

261 push_content = render_adhoc_push_notification("push_enabled", context.localization) 

262 push_to_subscription( 

263 session, 

264 push_notification_subscription_id=subscription.id, 

265 user_id=context.user_id, 

266 topic_action="adhoc:push_enabled", 

267 content=push_content, 

268 ) 

269 

270 return empty_pb2.Empty() 

271 

272 def SendTestMobilePushNotification( 

273 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

274 ) -> empty_pb2.Empty: 

275 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

277 

278 push_to_user( 

279 session, 

280 user_id=context.user_id, 

281 topic_action="adhoc:testing", 

282 content=PushNotificationContent( 

283 title="Mobile notifications test", 

284 ios_title="Mobile Notifications Test", 

285 body="If you see this on your phone, everything is wired up correctly 🎉", 

286 ), 

287 ) 

288 

289 return empty_pb2.Empty() 

290 

291 def SendDevPushNotification( 

292 self, request: notifications_pb2.SendDevPushNotificationReq, context: CouchersContext, session: Session 

293 ) -> empty_pb2.Empty: 

294 if not config["ENABLE_DEV_APIS"]: 

295 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled") 

296 

297 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

298 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

299 

300 push_to_user( 

301 session, 

302 user_id=context.user_id, 

303 topic_action="adhoc:testing", 

304 content=PushNotificationContent( 

305 title=request.title, 

306 ios_title=request.title, 

307 body=request.body, 

308 action_url=request.url or None, 

309 icon_url=request.icon or None, 

310 ), 

311 key=request.key or None, 

312 ttl=request.ttl, 

313 ) 

314 

315 return empty_pb2.Empty() 

316 

317 def DebugRedeliverPushNotification( 

318 self, 

319 request: notifications_pb2.DebugRedeliverPushNotificationReq, 

320 context: CouchersContext, 

321 session: Session, 

322 ) -> empty_pb2.Empty: 

323 if not config["ENABLE_DEV_APIS"]: 

324 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled") 

325 

326 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

327 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

328 

329 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

330 

331 notification = session.execute( 

332 select(Notification) 

333 .where(Notification.id == request.notification_id) 

334 .where(Notification.user_id == context.user_id) 

335 ).scalar_one_or_none() 

336 

337 if not notification: 

338 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found") 

339 

340 push_to_user( 

341 session, 

342 user_id=context.user_id, 

343 topic_action=notification.topic_action.display, 

344 content=render_push_notification(notification, LocalizationContext.from_user(user)), 

345 key=notification.key, 

346 ) 

347 

348 return empty_pb2.Empty()