Coverage for app / backend / src / couchers / servicers / notifications.py: 90%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import functools 

2import json 

3import logging 

4from zoneinfo import ZoneInfo 

5 

6import grpc 

7from google.protobuf import empty_pb2 

8from sqlalchemy import select, update 

9from sqlalchemy.orm import Session 

10from sqlalchemy.sql import or_ 

11 

12from couchers.config import config 

13from couchers.constants import DATETIME_INFINITY 

14from couchers.context import CouchersContext 

15from couchers.i18n import LocalizationContext 

16from couchers.models import ( 

17 DeviceType, 

18 HostingStatus, 

19 MeetupStatus, 

20 Notification, 

21 NotificationDeliveryType, 

22 PushNotificationPlatform, 

23 PushNotificationSubscription, 

24 User, 

25) 

26from couchers.notifications.push import PushNotificationContent, push_to_subscription, push_to_user 

27from couchers.notifications.render_push import render_adhoc_push_notification, render_push_notification 

28from couchers.notifications.settings import ( 

29 PreferenceNotUserEditableError, 

30 get_topic_actions_by_delivery_type, 

31 get_user_setting_groups, 

32 set_preference, 

33) 

34from couchers.notifications.utils import enum_from_topic_action 

35from couchers.notifications.web_push_api import decode_key, get_vapid_public_key_from_private_key 

36from couchers.proto import notifications_pb2, notifications_pb2_grpc 

37from couchers.sql import moderation_state_column_visible, to_bool 

38from couchers.utils import Timestamp_from_datetime, now 

39 

40logger = logging.getLogger(__name__) 

41MAX_PAGINATION_LENGTH = 100 

42 

43 

44@functools.cache 

45def get_vapid_public_key() -> str: 

46 return get_vapid_public_key_from_private_key(config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"]) 

47 

48 

49def notification_to_pb(user: User, notification: Notification) -> notifications_pb2.Notification: 

50 content = render_push_notification(notification, LocalizationContext.from_user(user)) 

51 return notifications_pb2.Notification( 

52 notification_id=notification.id, 

53 created=Timestamp_from_datetime(notification.created), 

54 topic=notification.topic_action.topic, 

55 action=notification.topic_action.action, 

56 key=notification.key, 

57 title=content.title, 

58 body=content.body, 

59 icon=content.icon_url, 

60 url=content.action_url, 

61 is_seen=notification.is_seen, 

62 ) 

63 

64 

65class Notifications(notifications_pb2_grpc.NotificationsServicer): 

66 def GetNotificationSettings( 

67 self, request: notifications_pb2.GetNotificationSettingsReq, context: CouchersContext, session: Session 

68 ) -> notifications_pb2.GetNotificationSettingsRes: 

69 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

70 return notifications_pb2.GetNotificationSettingsRes( 

71 do_not_email_enabled=user.do_not_email, 

72 groups=get_user_setting_groups(user.id), 

73 ) 

74 

75 def SetNotificationSettings( 

76 self, request: notifications_pb2.SetNotificationSettingsReq, context: CouchersContext, session: Session 

77 ) -> notifications_pb2.GetNotificationSettingsRes: 

78 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

79 user.do_not_email = request.enable_do_not_email 

80 if request.enable_do_not_email: 

81 user.hosting_status = HostingStatus.cant_host 

82 user.meetup_status = MeetupStatus.does_not_want_to_meetup 

83 for preference in request.preferences: 

84 topic_action = enum_from_topic_action.get((preference.topic, preference.action), None) 

85 if not topic_action: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_notification_preference") 

87 delivery_types = {t.name for t in NotificationDeliveryType} 

88 if preference.delivery_method not in delivery_types: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "invalid_delivery_method") 

90 delivery_type = NotificationDeliveryType[preference.delivery_method] 

91 try: 

92 set_preference(session, user.id, topic_action, delivery_type, preference.enabled) 

93 except PreferenceNotUserEditableError: 

94 context.abort_with_error_code( 

95 grpc.StatusCode.FAILED_PRECONDITION, "cannot_edit_that_notification_preference" 

96 ) 

97 return notifications_pb2.GetNotificationSettingsRes( 

98 do_not_email_enabled=user.do_not_email, 

99 groups=get_user_setting_groups(user.id), 

100 ) 

101 

102 def ListNotifications( 

103 self, request: notifications_pb2.ListNotificationsReq, context: CouchersContext, session: Session 

104 ) -> notifications_pb2.ListNotificationsRes: 

105 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

106 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

107 next_notification_id = int(request.page_token) if request.page_token else 2**50 

108 notifications = ( 

109 session.execute( 

110 select(Notification) 

111 .where(Notification.user_id == context.user_id) 

112 .where(Notification.id <= next_notification_id) 

113 .where(or_(to_bool(request.only_unread == False), Notification.is_seen == False)) 

114 .where( 

115 Notification.topic_action.in_( 

116 get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push) 

117 ) 

118 ) 

119 .where(moderation_state_column_visible(context, Notification.moderation_state_id)) 

120 .order_by(Notification.id.desc()) 

121 .limit(page_size + 1) 

122 ) 

123 .scalars() 

124 .all() 

125 ) 

126 return notifications_pb2.ListNotificationsRes( 

127 notifications=[notification_to_pb(user, notification) for notification in notifications[:page_size]], 

128 next_page_token=str(notifications[-1].id) if len(notifications) > page_size else None, 

129 ) 

130 

131 def MarkNotificationSeen( 

132 self, request: notifications_pb2.MarkNotificationSeenReq, context: CouchersContext, session: Session 

133 ) -> empty_pb2.Empty: 

134 notification = ( 

135 session.execute( 

136 select(Notification) 

137 .where(Notification.user_id == context.user_id) 

138 .where(Notification.id == request.notification_id) 

139 ) 

140 .scalars() 

141 .one_or_none() 

142 ) 

143 if not notification: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found") 

145 notification.is_seen = request.set_seen 

146 return empty_pb2.Empty() 

147 

148 def MarkAllNotificationsSeen( 

149 self, request: notifications_pb2.MarkAllNotificationsSeenReq, context: CouchersContext, session: Session 

150 ) -> empty_pb2.Empty: 

151 session.execute( 

152 update(Notification) 

153 .values(is_seen=True) 

154 .where(Notification.user_id == context.user_id) 

155 .where(Notification.id <= request.latest_notification_id) 

156 ) 

157 return empty_pb2.Empty() 

158 

159 def GetVapidPublicKey( 

160 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

161 ) -> notifications_pb2.GetVapidPublicKeyRes: 

162 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

164 

165 return notifications_pb2.GetVapidPublicKeyRes(vapid_public_key=get_vapid_public_key()) 

166 

167 def RegisterPushNotificationSubscription( 

168 self, 

169 request: notifications_pb2.RegisterPushNotificationSubscriptionReq, 

170 context: CouchersContext, 

171 session: Session, 

172 ) -> empty_pb2.Empty: 

173 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

175 

176 data = json.loads(request.full_subscription_json) 

177 subscription = PushNotificationSubscription( 

178 user_id=context.user_id, 

179 platform=PushNotificationPlatform.web_push, 

180 endpoint=data["endpoint"], 

181 p256dh_key=decode_key(data["keys"]["p256dh"]), 

182 auth_key=decode_key(data["keys"]["auth"]), 

183 full_subscription_info=request.full_subscription_json, 

184 user_agent=request.user_agent, 

185 ) 

186 session.add(subscription) 

187 session.flush() 

188 push_to_subscription( 

189 session, 

190 push_notification_subscription_id=subscription.id, 

191 user_id=context.user_id, 

192 topic_action="adhoc:setup", 

193 content=PushNotificationContent( 

194 title="Push notifications test", 

195 ios_title="Push Notifications Test", 

196 body="Hi, thanks for enabling push notifications!", 

197 ), 

198 ) 

199 

200 return empty_pb2.Empty() 

201 

202 def SendTestPushNotification( 

203 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

204 ) -> empty_pb2.Empty: 

205 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true

206 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

207 

208 push_to_user( 

209 session, 

210 user_id=context.user_id, 

211 topic_action="adhoc:testing", 

212 content=PushNotificationContent( 

213 title="Push notifications test", 

214 ios_title="Push Notifications Test", 

215 body="If you see this, then it's working :)", 

216 ), 

217 ) 

218 

219 return empty_pb2.Empty() 

220 

221 def RegisterMobilePushNotificationSubscription( 

222 self, 

223 request: notifications_pb2.RegisterMobilePushNotificationSubscriptionReq, 

224 context: CouchersContext, 

225 session: Session, 

226 ) -> empty_pb2.Empty: 

227 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

229 

230 # Check for existing subscription with this token 

231 existing = session.execute( 

232 select(PushNotificationSubscription).where(PushNotificationSubscription.token == request.token) 

233 ).scalar_one_or_none() 

234 

235 if existing: 

236 # Re-enable if disabled 

237 if existing.disabled_at < now(): 

238 existing.disabled_at = DATETIME_INFINITY 

239 existing.device_name = request.device_name or existing.device_name 

240 if request.device_type: 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was always true

241 existing.device_type = DeviceType[request.device_type] 

242 logger.info(f"Re-enabled mobile push sub {existing.id} for user {context.user_id}") 

243 return empty_pb2.Empty() 

244 

245 # Parse device_type if provided 

246 device_type = DeviceType[request.device_type] if request.device_type else None 

247 

248 subscription = PushNotificationSubscription( 

249 user_id=context.user_id, 

250 platform=PushNotificationPlatform.expo, 

251 token=request.token, 

252 device_name=request.device_name if request.device_name else None, 

253 device_type=device_type, 

254 ) 

255 session.add(subscription) 

256 session.flush() 

257 

258 loc_context = LocalizationContext(locale=context.ui_language_preference or "en", timezone=ZoneInfo("Etc/UTC")) 

259 push_content = render_adhoc_push_notification("push_enabled", loc_context) 

260 push_to_subscription( 

261 session, 

262 push_notification_subscription_id=subscription.id, 

263 user_id=context.user_id, 

264 topic_action="adhoc:push_enabled", 

265 content=push_content, 

266 ) 

267 

268 return empty_pb2.Empty() 

269 

270 def SendTestMobilePushNotification( 

271 self, request: empty_pb2.Empty, context: CouchersContext, session: Session 

272 ) -> empty_pb2.Empty: 

273 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

275 

276 push_to_user( 

277 session, 

278 user_id=context.user_id, 

279 topic_action="adhoc:testing", 

280 content=PushNotificationContent( 

281 title="Mobile notifications test", 

282 ios_title="Mobile Notifications Test", 

283 body="If you see this on your phone, everything is wired up correctly 🎉", 

284 ), 

285 ) 

286 

287 return empty_pb2.Empty() 

288 

289 def SendDevPushNotification( 

290 self, request: notifications_pb2.SendDevPushNotificationReq, context: CouchersContext, session: Session 

291 ) -> empty_pb2.Empty: 

292 if not config["ENABLE_DEV_APIS"]: 

293 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled") 

294 

295 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

296 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

297 

298 push_to_user( 

299 session, 

300 user_id=context.user_id, 

301 topic_action="adhoc:testing", 

302 content=PushNotificationContent( 

303 title=request.title, 

304 ios_title=request.title, 

305 body=request.body, 

306 action_url=request.url or None, 

307 icon_url=request.icon or None, 

308 ), 

309 key=request.key or None, 

310 ttl=request.ttl, 

311 ) 

312 

313 return empty_pb2.Empty() 

314 

315 def DebugRedeliverPushNotification( 

316 self, 

317 request: notifications_pb2.DebugRedeliverPushNotificationReq, 

318 context: CouchersContext, 

319 session: Session, 

320 ) -> empty_pb2.Empty: 

321 if not config["ENABLE_DEV_APIS"]: 

322 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "dev_apis_disabled") 

323 

324 if not config["PUSH_NOTIFICATIONS_ENABLED"]: 

325 context.abort_with_error_code(grpc.StatusCode.UNAVAILABLE, "push_notifications_disabled") 

326 

327 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

328 

329 notification = session.execute( 

330 select(Notification) 

331 .where(Notification.id == request.notification_id) 

332 .where(Notification.user_id == context.user_id) 

333 ).scalar_one_or_none() 

334 

335 if not notification: 

336 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "notification_not_found") 

337 

338 push_to_user( 

339 session, 

340 user_id=context.user_id, 

341 topic_action=notification.topic_action.display, 

342 content=render_push_notification(notification, LocalizationContext.from_user(user)), 

343 key=notification.key, 

344 ) 

345 

346 return empty_pb2.Empty()