Coverage for src/tests/test_notifications.py: 99%

509 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-07 19:51 +0000

1import json 

2import re 

3from urllib.parse import parse_qs, urlparse 

4 

5import grpc 

6import pytest 

7from google.protobuf import empty_pb2, timestamp_pb2 

8 

9from couchers.constants import DATETIME_INFINITY 

10from couchers.context import make_background_user_context 

11from couchers.crypto import b64decode 

12from couchers.jobs.worker import process_job 

13from couchers.models import ( 

14 DeviceType, 

15 HostingStatus, 

16 MeetupStatus, 

17 Notification, 

18 NotificationDelivery, 

19 NotificationDeliveryType, 

20 NotificationTopicAction, 

21 PushNotificationPlatform, 

22 PushNotificationSubscription, 

23 User, 

24) 

25from couchers.notifications.notify import notify 

26from couchers.notifications.settings import get_topic_actions_by_delivery_type 

27from couchers.proto import ( 

28 api_pb2, 

29 auth_pb2, 

30 conversations_pb2, 

31 editor_pb2, 

32 events_pb2, 

33 notification_data_pb2, 

34 notifications_pb2, 

35) 

36from couchers.proto.internal import unsubscribe_pb2 

37from couchers.servicers.api import user_model_to_pb 

38from couchers.sql import couchers_select as select 

39from couchers.templates.v2 import v2timestamp 

40from couchers.utils import now 

41from tests.test_fixtures import ( # noqa 

42 api_session, 

43 auth_api_session, 

44 conversations_session, 

45 db, 

46 email_fields, 

47 generate_user, 

48 mock_notification_email, 

49 notifications_session, 

50 process_jobs, 

51 push_collector, 

52 real_admin_session, 

53 real_editor_session, 

54 session_scope, 

55 testconfig, 

56) 

57 

58 

59@pytest.fixture(autouse=True) 

60def _(testconfig): 

61 pass 

62 

63 

64@pytest.mark.parametrize("enabled", [True, False]) 

65def test_SetNotificationSettings_preferences_respected_editable(db, enabled): 

66 user, token = generate_user() 

67 

68 # enable a notification type and check it gets delivered 

69 topic_action = NotificationTopicAction.badge__add 

70 

71 with notifications_session(token) as notifications: 

72 notifications.SetNotificationSettings( 

73 notifications_pb2.SetNotificationSettingsReq( 

74 preferences=[ 

75 notifications_pb2.SingleNotificationPreference( 

76 topic=topic_action.topic, 

77 action=topic_action.action, 

78 delivery_method="push", 

79 enabled=enabled, 

80 ) 

81 ], 

82 ) 

83 ) 

84 

85 with session_scope() as session: 

86 notify( 

87 session, 

88 user_id=user.id, 

89 topic_action=topic_action.display, 

90 data=notification_data_pb2.BadgeAdd( 

91 badge_id="volunteer", 

92 badge_name="Active Volunteer", 

93 badge_description="This user is an active volunteer for Couchers.org", 

94 ), 

95 ) 

96 

97 process_job() 

98 

99 with session_scope() as session: 

100 deliv = session.execute( 

101 select(NotificationDelivery) 

102 .join(Notification, Notification.id == NotificationDelivery.notification_id) 

103 .where(Notification.user_id == user.id) 

104 .where(Notification.topic_action == topic_action) 

105 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.push) 

106 ).scalar_one_or_none() 

107 

108 if enabled: 

109 assert deliv is not None 

110 else: 

111 assert deliv is None 

112 

113 

114def test_SetNotificationSettings_preferences_not_editable(db): 

115 user, token = generate_user() 

116 

117 # enable a notification type and check it gets delivered 

118 topic_action = NotificationTopicAction.password_reset__start 

119 

120 with notifications_session(token) as notifications: 

121 with pytest.raises(grpc.RpcError) as e: 

122 notifications.SetNotificationSettings( 

123 notifications_pb2.SetNotificationSettingsReq( 

124 preferences=[ 

125 notifications_pb2.SingleNotificationPreference( 

126 topic=topic_action.topic, 

127 action=topic_action.action, 

128 delivery_method="push", 

129 enabled=False, 

130 ) 

131 ], 

132 ) 

133 ) 

134 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

135 assert e.value.details() == "That notification preference is not user editable." 

136 

137 

138def test_unsubscribe(db): 

139 # this is the ugliest test i've written 

140 

141 user, token = generate_user() 

142 

143 topic_action = NotificationTopicAction.badge__add 

144 

145 # first enable email notifs 

146 with notifications_session(token) as notifications: 

147 notifications.SetNotificationSettings( 

148 notifications_pb2.SetNotificationSettingsReq( 

149 preferences=[ 

150 notifications_pb2.SingleNotificationPreference( 

151 topic=topic_action.topic, 

152 action=topic_action.action, 

153 delivery_method=method, 

154 enabled=enabled, 

155 ) 

156 for method, enabled in [("email", True), ("digest", False), ("push", False)] 

157 ], 

158 ) 

159 ) 

160 

161 with mock_notification_email() as mock: 

162 with session_scope() as session: 

163 notify( 

164 session, 

165 user_id=user.id, 

166 topic_action=topic_action.display, 

167 data=notification_data_pb2.BadgeAdd( 

168 badge_id="volunteer", 

169 badge_name="Active Volunteer", 

170 badge_description="This user is an active volunteer for Couchers.org", 

171 ), 

172 ) 

173 

174 assert mock.call_count == 1 

175 assert email_fields(mock).recipient == user.email 

176 # very ugly 

177 # http://localhost:3000/quick-link?payload=CAEiGAoOZnJpZW5kX3JlcXVlc3QSBmFjY2VwdA==&sig=BQdk024NTATm8zlR0krSXTBhP5U9TlFv7VhJeIHZtUg= 

178 for link in re.findall(r'<a href="(.*?)"', email_fields(mock).html): 

179 if "payload" not in link: 

180 continue 

181 print(link) 

182 url_parts = urlparse(link) 

183 params = parse_qs(url_parts.query) 

184 print(params["payload"][0]) 

185 payload = unsubscribe_pb2.UnsubscribePayload.FromString(b64decode(params["payload"][0])) 

186 if payload.HasField("topic_action"): 

187 with auth_api_session() as (auth_api, metadata_interceptor): 

188 assert ( 

189 auth_api.Unsubscribe( 

190 auth_pb2.UnsubscribeReq( 

191 payload=b64decode(params["payload"][0]), 

192 sig=b64decode(params["sig"][0]), 

193 ) 

194 ).response 

195 == "You've been unsubscribed from email notifications of that type." 

196 ) 

197 break 

198 else: 

199 raise Exception("Didn't find link") 

200 

201 with notifications_session(token) as notifications: 

202 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq()) 

203 

204 for group in res.groups: 

205 for topic in group.topics: 

206 for item in topic.items: 

207 if topic == topic_action.topic and item == topic_action.action: 

208 assert not item.email 

209 

210 with mock_notification_email() as mock: 

211 with session_scope() as session: 

212 notify( 

213 session, 

214 user_id=user.id, 

215 topic_action=topic_action.display, 

216 data=notification_data_pb2.BadgeAdd( 

217 badge_id="volunteer", 

218 badge_name="Active Volunteer", 

219 badge_description="This user is an active volunteer for Couchers.org", 

220 ), 

221 ) 

222 

223 assert mock.call_count == 0 

224 

225 

226def test_unsubscribe_do_not_email(db): 

227 user, token = generate_user() 

228 

229 _, token2 = generate_user(complete_profile=True) 

230 with mock_notification_email() as mock: 

231 with api_session(token2) as api: 

232 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id)) 

233 

234 assert mock.call_count == 1 

235 assert email_fields(mock).recipient == user.email 

236 # very ugly 

237 # http://localhost:3000/quick-link?payload=CAEiGAoOZnJpZW5kX3JlcXVlc3QSBmFjY2VwdA==&sig=BQdk024NTATm8zlR0krSXTBhP5U9TlFv7VhJeIHZtUg= 

238 for link in re.findall(r'<a href="(.*?)"', email_fields(mock).html): 

239 if "payload" not in link: 

240 continue 

241 print(link) 

242 url_parts = urlparse(link) 

243 params = parse_qs(url_parts.query) 

244 print(params["payload"][0]) 

245 payload = unsubscribe_pb2.UnsubscribePayload.FromString(b64decode(params["payload"][0])) 

246 if payload.HasField("do_not_email"): 

247 with auth_api_session() as (auth_api, metadata_interceptor): 

248 assert ( 

249 auth_api.Unsubscribe( 

250 auth_pb2.UnsubscribeReq( 

251 payload=b64decode(params["payload"][0]), 

252 sig=b64decode(params["sig"][0]), 

253 ) 

254 ).response 

255 == "You will not receive any non-security emails, and your hosting status has been turned off. You may still receive the newsletter, and need to unsubscribe from it separately." 

256 ) 

257 break 

258 else: 

259 raise Exception("Didn't find link") 

260 

261 _, token3 = generate_user(complete_profile=True) 

262 with mock_notification_email() as mock: 

263 with api_session(token3) as api: 

264 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id)) 

265 

266 assert mock.call_count == 0 

267 

268 with session_scope() as session: 

269 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one() 

270 assert user_.do_not_email 

271 

272 

273def test_get_do_not_email(db): 

274 _, token = generate_user() 

275 

276 with session_scope() as session: 

277 user = session.execute(select(User)).scalar_one() 

278 user.do_not_email = False 

279 

280 with notifications_session(token) as notifications: 

281 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq()) 

282 assert not res.do_not_email_enabled 

283 

284 with session_scope() as session: 

285 user = session.execute(select(User)).scalar_one() 

286 user.do_not_email = True 

287 user.hosting_status = HostingStatus.cant_host 

288 user.meetup_status = MeetupStatus.does_not_want_to_meetup 

289 

290 with notifications_session(token) as notifications: 

291 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq()) 

292 assert res.do_not_email_enabled 

293 

294 

295def test_set_do_not_email(db): 

296 _, token = generate_user() 

297 

298 with session_scope() as session: 

299 user = session.execute(select(User)).scalar_one() 

300 user.do_not_email = False 

301 user.hosting_status = HostingStatus.can_host 

302 user.meetup_status = MeetupStatus.wants_to_meetup 

303 

304 with notifications_session(token) as notifications: 

305 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=False)) 

306 

307 with session_scope() as session: 

308 user = session.execute(select(User)).scalar_one() 

309 assert not user.do_not_email 

310 

311 with notifications_session(token) as notifications: 

312 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=True)) 

313 

314 with session_scope() as session: 

315 user = session.execute(select(User)).scalar_one() 

316 assert user.do_not_email 

317 assert user.hosting_status == HostingStatus.cant_host 

318 assert user.meetup_status == MeetupStatus.does_not_want_to_meetup 

319 

320 with notifications_session(token) as notifications: 

321 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=False)) 

322 

323 with session_scope() as session: 

324 user = session.execute(select(User)).scalar_one() 

325 assert not user.do_not_email 

326 

327 

328def test_list_notifications(db, push_collector): 

329 user1, token1 = generate_user() 

330 user2, token2 = generate_user() 

331 

332 with api_session(token2) as api: 

333 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id)) 

334 

335 with notifications_session(token1) as notifications: 

336 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq()) 

337 assert len(res.notifications) == 1 

338 

339 n = res.notifications[0] 

340 

341 assert n.topic == "friend_request" 

342 assert n.action == "create" 

343 assert n.key == "2" 

344 assert n.title == f"{user2.name} wants to be your friend" 

345 assert n.body == f"You've received a friend request from {user2.name}" 

346 assert n.icon.startswith("http://localhost:5001/img/thumbnail/") 

347 assert n.url == "http://localhost:3000/connections/friends/" 

348 

349 with conversations_session(token2) as c: 

350 res = c.CreateGroupChat(conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user1.id])) 

351 group_chat_id = res.group_chat_id 

352 for i in range(17): 

353 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text=f"Test message {i}")) 

354 

355 process_jobs() 

356 

357 all_notifs = [] 

358 with notifications_session(token1) as notifications: 

359 page_token = None 

360 for _ in range(100): 

361 res = notifications.ListNotifications( 

362 notifications_pb2.ListNotificationsReq( 

363 page_size=5, 

364 page_token=page_token, 

365 ) 

366 ) 

367 assert len(res.notifications) == 5 or not res.next_page_token 

368 all_notifs += res.notifications 

369 page_token = res.next_page_token 

370 if not page_token: 

371 break 

372 

373 bodys = [f"Test message {16 - i}" for i in range(17)] + [f"You've received a friend request from {user2.name}"] 

374 assert bodys == [n.body for n in all_notifs] 

375 

376 

377def test_notifications_seen(db, push_collector): 

378 user1, token1 = generate_user() 

379 user2, token2 = generate_user() 

380 user3, token3 = generate_user() 

381 user4, token4 = generate_user() 

382 

383 with api_session(token2) as api: 

384 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id)) 

385 

386 with api_session(token3) as api: 

387 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id)) 

388 

389 with notifications_session(token1) as notifications, api_session(token1) as api: 

390 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq()) 

391 assert len(res.notifications) == 2 

392 assert [n.is_seen for n in res.notifications] == [False, False] 

393 notification_ids = [n.notification_id for n in res.notifications] 

394 # should be listed desc time 

395 assert notification_ids[0] > notification_ids[1] 

396 

397 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2 

398 

399 with api_session(token4) as api: 

400 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id)) 

401 

402 with notifications_session(token1) as notifications, api_session(token1) as api: 

403 # mark everything before just the last one as seen (pretend we didn't load the last one yet in the api) 

404 notifications.MarkAllNotificationsSeen( 

405 notifications_pb2.MarkAllNotificationsSeenReq(latest_notification_id=notification_ids[0]) 

406 ) 

407 

408 # last one is still unseen 

409 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 1 

410 

411 # mark the first one unseen 

412 notifications.MarkNotificationSeen( 

413 notifications_pb2.MarkNotificationSeenReq(notification_id=notification_ids[1], set_seen=False) 

414 ) 

415 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2 

416 

417 # mark the last one seen 

418 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq()) 

419 assert len(res.notifications) == 3 

420 assert [n.is_seen for n in res.notifications] == [False, True, False] 

421 notification_ids2 = [n.notification_id for n in res.notifications] 

422 

423 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2 

424 

425 notifications.MarkNotificationSeen( 

426 notifications_pb2.MarkNotificationSeenReq(notification_id=notification_ids2[0], set_seen=True) 

427 ) 

428 

429 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq()) 

430 assert len(res.notifications) == 3 

431 assert [n.is_seen for n in res.notifications] == [True, True, False] 

432 

433 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 1 

434 

435 

436def test_GetVapidPublicKey(db): 

437 _, token = generate_user() 

438 

439 with notifications_session(token) as notifications: 

440 assert ( 

441 notifications.GetVapidPublicKey(empty_pb2.Empty()).vapid_public_key 

442 == "BApMo2tGuon07jv-pEaAKZmVo6E_d4HfcdDeV6wx2k9wV8EovJ0ve00bdLzZm9fizDrGZXRYJFqCcRJUfBcgA0A" 

443 ) 

444 

445 

446def test_RegisterPushNotificationSubscription(db): 

447 _, token = generate_user() 

448 

449 subscription_info = { 

450 "endpoint": "https://updates.push.services.mozilla.com/wpush/v2/gAAAAABmW2_iYKVyZRJPhAhktbkXd6Bc8zjIUvtVi5diYL7ZYn8FHka94kIdF46Mp8DwCDWlACnbKOEo97ikaa7JYowGLiGz3qsWL7Vo19LaV4I71mUDUOIKxWIsfp_kM77MlRJQKDUddv-sYyiffOyg63d1lnc_BMIyLXt69T5SEpfnfWTNb6I", 

451 "expirationTime": None, 

452 "keys": { 

453 "auth": "TnuEJ1OdfEkf6HKcUovl0Q", 

454 "p256dh": "BK7Rp8og3eFJPqm0ofR8F-l2mtNCCCWYo6f_5kSs8jPEFiKetnZHNOglvC6IrgU9vHmgFHlG7gHGtB1HM599sy0", 

455 }, 

456 } 

457 

458 with notifications_session(token) as notifications: 

459 res = notifications.RegisterPushNotificationSubscription( 

460 notifications_pb2.RegisterPushNotificationSubscriptionReq( 

461 full_subscription_json=json.dumps(subscription_info), 

462 ) 

463 ) 

464 

465 

466def test_SendTestPushNotification(db, push_collector): 

467 user, token = generate_user() 

468 

469 with notifications_session(token) as notifications: 

470 notifications.SendTestPushNotification(empty_pb2.Empty()) 

471 

472 push_collector.assert_user_has_count(user.id, 1) 

473 push_collector.assert_user_push_matches_fields( 

474 user.id, 

475 title="Checking push notifications work!", 

476 body="If you see this, then it's working :)", 

477 ) 

478 

479 # the above two are equivalent to this 

480 

481 push_collector.assert_user_has_single_matching( 

482 user.id, 

483 title="Checking push notifications work!", 

484 body="If you see this, then it's working :)", 

485 ) 

486 

487 

488def test_SendBlogPostNotification(db, push_collector): 

489 super_user, super_token = generate_user(is_superuser=True) 

490 

491 user1, user1_token = generate_user() 

492 # enabled email 

493 user2, user2_token = generate_user() 

494 # disabled push 

495 user3, user3_token = generate_user() 

496 

497 topic_action = NotificationTopicAction.general__new_blog_post 

498 

499 with notifications_session(user2_token) as notifications: 

500 notifications.SetNotificationSettings( 

501 notifications_pb2.SetNotificationSettingsReq( 

502 preferences=[ 

503 notifications_pb2.SingleNotificationPreference( 

504 topic=topic_action.topic, 

505 action=topic_action.action, 

506 delivery_method="email", 

507 enabled=True, 

508 ) 

509 ], 

510 ) 

511 ) 

512 

513 with notifications_session(user3_token) as notifications: 

514 notifications.SetNotificationSettings( 

515 notifications_pb2.SetNotificationSettingsReq( 

516 preferences=[ 

517 notifications_pb2.SingleNotificationPreference( 

518 topic=topic_action.topic, 

519 action=topic_action.action, 

520 delivery_method="push", 

521 enabled=False, 

522 ) 

523 ], 

524 ) 

525 ) 

526 

527 with mock_notification_email() as mock: 

528 with real_editor_session(super_token) as editor_api: 

529 editor_api.SendBlogPostNotification( 

530 editor_pb2.SendBlogPostNotificationReq( 

531 title="Couchers.org v0.9.9 Release Notes", 

532 blurb="Read about last major updates before v1!", 

533 url="https://couchers.org/blog/2025/05/11/v0.9.9-release", 

534 ) 

535 ) 

536 

537 process_jobs() 

538 

539 assert mock.call_count == 1 

540 assert email_fields(mock).recipient == user2.email 

541 assert "Couchers.org v0.9.9 Release Notes" in email_fields(mock).html 

542 assert "Couchers.org v0.9.9 Release Notes" in email_fields(mock).plain 

543 assert "Read about last major updates before v1!" in email_fields(mock).html 

544 assert "Read about last major updates before v1!" in email_fields(mock).plain 

545 assert "https://couchers.org/blog/2025/05/11/v0.9.9-release" in email_fields(mock).html 

546 assert "https://couchers.org/blog/2025/05/11/v0.9.9-release" in email_fields(mock).plain 

547 

548 push_collector.assert_user_has_count(user1.id, 1) 

549 push_collector.assert_user_push_matches_fields( 

550 user1.id, 

551 title="New blog post: Couchers.org v0.9.9 Release Notes", 

552 body="Read about last major updates before v1!", 

553 url="https://couchers.org/blog/2025/05/11/v0.9.9-release", 

554 ) 

555 

556 push_collector.assert_user_has_count(user2.id, 1) 

557 push_collector.assert_user_push_matches_fields( 

558 user2.id, 

559 title="New blog post: Couchers.org v0.9.9 Release Notes", 

560 body="Read about last major updates before v1!", 

561 url="https://couchers.org/blog/2025/05/11/v0.9.9-release", 

562 ) 

563 

564 push_collector.assert_user_has_count(user3.id, 0) 

565 

566 

567def test_get_topic_actions_by_delivery_type(db): 

568 user, token = generate_user() 

569 

570 # these are enabled by default 

571 assert NotificationDeliveryType.push in NotificationTopicAction.reference__receive_friend.defaults 

572 assert NotificationDeliveryType.push in NotificationTopicAction.host_request__accept.defaults 

573 

574 # these are disabled by default 

575 assert NotificationDeliveryType.push not in NotificationTopicAction.event__create_any.defaults 

576 assert NotificationDeliveryType.push not in NotificationTopicAction.discussion__create.defaults 

577 

578 with notifications_session(token) as notifications: 

579 notifications.SetNotificationSettings( 

580 notifications_pb2.SetNotificationSettingsReq( 

581 preferences=[ 

582 notifications_pb2.SingleNotificationPreference( 

583 topic=NotificationTopicAction.reference__receive_friend.topic, 

584 action=NotificationTopicAction.reference__receive_friend.action, 

585 delivery_method="push", 

586 enabled=False, 

587 ), 

588 notifications_pb2.SingleNotificationPreference( 

589 topic=NotificationTopicAction.event__create_any.topic, 

590 action=NotificationTopicAction.event__create_any.action, 

591 delivery_method="push", 

592 enabled=True, 

593 ), 

594 ], 

595 ) 

596 ) 

597 

598 with session_scope() as session: 

599 deliver = get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push) 

600 assert NotificationTopicAction.reference__receive_friend not in deliver 

601 assert NotificationTopicAction.host_request__accept in deliver 

602 assert NotificationTopicAction.event__create_any in deliver 

603 assert NotificationTopicAction.discussion__create not in deliver 

604 assert NotificationTopicAction.account_deletion__start in deliver 

605 

606 

607def test_event_reminder_email_sent(db): 

608 user, token = generate_user() 

609 title = "Board Game Night" 

610 start_event_time = timestamp_pb2.Timestamp(seconds=1751690400) 

611 expected_time_str = v2timestamp(start_event_time, user) 

612 

613 with mock_notification_email() as mock: 

614 with session_scope() as session: 

615 user_in_session = session.get(User, user.id) 

616 

617 notify( 

618 session, 

619 user_id=user.id, 

620 topic_action="event:reminder", 

621 data=notification_data_pb2.EventReminder( 

622 event=events_pb2.Event( 

623 event_id=1, 

624 slug="board-game-night", 

625 title=title, 

626 start_time=start_event_time, 

627 ), 

628 user=user_model_to_pb(user_in_session, session, make_background_user_context(user_id=user.id)), 

629 ), 

630 ) 

631 

632 assert mock.call_count == 1 

633 assert email_fields(mock).recipient == user.email 

634 assert title in email_fields(mock).html 

635 assert title in email_fields(mock).plain 

636 assert expected_time_str in email_fields(mock).html 

637 assert expected_time_str in email_fields(mock).plain 

638 

639 

640def test_RegisterMobilePushNotificationSubscription(db): 

641 user, token = generate_user() 

642 

643 with notifications_session(token) as notifications: 

644 notifications.RegisterMobilePushNotificationSubscription( 

645 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq( 

646 token="ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx]", 

647 device_name="My iPhone", 

648 device_type="ios", 

649 ) 

650 ) 

651 

652 # Check subscription was created 

653 with session_scope() as session: 

654 sub = session.execute( 

655 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id) 

656 ).scalar_one() 

657 assert sub.platform == PushNotificationPlatform.expo 

658 assert sub.token == "ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx]" 

659 assert sub.device_name == "My iPhone" 

660 assert sub.device_type == DeviceType.ios 

661 assert sub.disabled_at == DATETIME_INFINITY 

662 

663 

664def test_RegisterMobilePushNotificationSubscription_android(db): 

665 user, token = generate_user() 

666 

667 with notifications_session(token) as notifications: 

668 notifications.RegisterMobilePushNotificationSubscription( 

669 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq( 

670 token="ExponentPushToken[yyyyyyyyyyyyyyyyyyyyyy]", 

671 device_name="My Android", 

672 device_type="android", 

673 ) 

674 ) 

675 

676 with session_scope() as session: 

677 sub = session.execute( 

678 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id) 

679 ).scalar_one() 

680 assert sub.platform == PushNotificationPlatform.expo 

681 assert sub.device_type == DeviceType.android 

682 

683 

684def test_RegisterMobilePushNotificationSubscription_no_device_type(db): 

685 user, token = generate_user() 

686 

687 with notifications_session(token) as notifications: 

688 notifications.RegisterMobilePushNotificationSubscription( 

689 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq( 

690 token="ExponentPushToken[zzzzzzzzzzzzzzzzzzzzzz]", 

691 ) 

692 ) 

693 

694 with session_scope() as session: 

695 sub = session.execute( 

696 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id) 

697 ).scalar_one() 

698 assert sub.platform == PushNotificationPlatform.expo 

699 assert sub.device_name is None 

700 assert sub.device_type is None 

701 

702 

703def test_RegisterMobilePushNotificationSubscription_re_enable(db): 

704 user, token = generate_user() 

705 

706 # Create a disabled subscription directly in the DB 

707 with session_scope() as session: 

708 sub = PushNotificationSubscription( 

709 user_id=user.id, 

710 platform=PushNotificationPlatform.expo, 

711 token="ExponentPushToken[reeeeeeeeeeeeeeeeeeeee]", 

712 device_name="Old Device", 

713 device_type=DeviceType.ios, 

714 disabled_at=now(), 

715 ) 

716 session.add(sub) 

717 session.flush() 

718 sub_id = sub.id 

719 

720 # Re-register with the same token 

721 with notifications_session(token) as notifications: 

722 notifications.RegisterMobilePushNotificationSubscription( 

723 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq( 

724 token="ExponentPushToken[reeeeeeeeeeeeeeeeeeeee]", 

725 device_name="New Device Name", 

726 device_type="android", 

727 ) 

728 ) 

729 

730 # Check subscription was re-enabled and updated 

731 with session_scope() as session: 

732 sub = session.execute( 

733 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id) 

734 ).scalar_one() 

735 assert sub.disabled_at == DATETIME_INFINITY 

736 assert sub.device_name == "New Device Name" 

737 assert sub.device_type == DeviceType.android 

738 

739 

740def test_RegisterMobilePushNotificationSubscription_already_exists(db): 

741 user, token = generate_user() 

742 

743 # Create an active subscription directly in the DB 

744 with session_scope() as session: 

745 sub = PushNotificationSubscription( 

746 user_id=user.id, 

747 platform=PushNotificationPlatform.expo, 

748 token="ExponentPushToken[existingtoken]", 

749 device_name="Existing Device", 

750 device_type=DeviceType.ios, 

751 ) 

752 session.add(sub) 

753 

754 # Try to register with the same token - should just return without error 

755 with notifications_session(token) as notifications: 

756 notifications.RegisterMobilePushNotificationSubscription( 

757 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq( 

758 token="ExponentPushToken[existingtoken]", 

759 device_name="Different Name", 

760 ) 

761 ) 

762 

763 # Check subscription was NOT modified (already active) 

764 with session_scope() as session: 

765 sub = session.execute( 

766 select(PushNotificationSubscription).where( 

767 PushNotificationSubscription.token == "ExponentPushToken[existingtoken]" 

768 ) 

769 ).scalar_one() 

770 assert sub.device_name == "Existing Device" # unchanged 

771 

772 

773def test_SendTestMobilePushNotification(db, push_collector): 

774 user, token = generate_user() 

775 

776 with notifications_session(token) as notifications: 

777 notifications.SendTestMobilePushNotification(empty_pb2.Empty()) 

778 

779 push_collector.assert_user_has_single_matching( 

780 user.id, 

781 title="Checking mobile push notifications work!", 

782 body="If you see this on your phone, everything is wired up correctly 🎉", 

783 ) 

784 

785 

786def test_get_expo_push_receipts(db): 

787 from unittest.mock import Mock, patch 

788 

789 from couchers.notifications.expo_api import get_expo_push_receipts 

790 

791 mock_response = Mock() 

792 mock_response.status_code = 200 

793 mock_response.json.return_value = { 

794 "data": { 

795 "ticket-1": {"status": "ok"}, 

796 "ticket-2": {"status": "error", "details": {"error": "DeviceNotRegistered"}}, 

797 } 

798 } 

799 

800 with patch("couchers.notifications.expo_api.requests.post", return_value=mock_response) as mock_post: 

801 result = get_expo_push_receipts(["ticket-1", "ticket-2"]) 

802 

803 mock_post.assert_called_once() 

804 call_args = mock_post.call_args 

805 assert call_args[0][0] == "https://exp.host/--/api/v2/push/getReceipts" 

806 assert call_args[1]["json"] == {"ids": ["ticket-1", "ticket-2"]} 

807 

808 assert result == { 

809 "ticket-1": {"status": "ok"}, 

810 "ticket-2": {"status": "error", "details": {"error": "DeviceNotRegistered"}}, 

811 } 

812 

813 

814def test_get_expo_push_receipts_empty(db): 

815 from couchers.notifications.expo_api import get_expo_push_receipts 

816 

817 result = get_expo_push_receipts([]) 

818 assert result == {} 

819 

820 

821def test_check_expo_push_receipts_success(db): 

822 """Test batch receipt checking with successful delivery.""" 

823 from datetime import timedelta 

824 from unittest.mock import patch 

825 

826 from google.protobuf import empty_pb2 

827 

828 from couchers.jobs.handlers import check_expo_push_receipts 

829 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome 

830 

831 user, token = generate_user() 

832 

833 # Create a push subscription and delivery attempt (old enough to be checked) 

834 with session_scope() as session: 

835 sub = PushNotificationSubscription( 

836 user_id=user.id, 

837 platform=PushNotificationPlatform.expo, 

838 token="ExponentPushToken[testtoken123]", 

839 device_name="Test Device", 

840 device_type=DeviceType.ios, 

841 ) 

842 session.add(sub) 

843 session.flush() 

844 

845 attempt = PushNotificationDeliveryAttempt( 

846 push_notification_subscription_id=sub.id, 

847 outcome=PushNotificationDeliveryOutcome.success, 

848 status_code=200, 

849 expo_ticket_id="test-ticket-id", 

850 ) 

851 session.add(attempt) 

852 session.flush() 

853 # Make the attempt old enough to be checked (>15 min) 

854 attempt.time = now() - timedelta(minutes=20) 

855 attempt_id = attempt.id 

856 sub_id = sub.id 

857 

858 # Mock the receipt API call 

859 with patch("couchers.notifications.expo_api.requests.post") as mock_post: 

860 mock_post.return_value.status_code = 200 

861 mock_post.return_value.json.return_value = {"data": {"test-ticket-id": {"status": "ok"}}} 

862 

863 check_expo_push_receipts(empty_pb2.Empty()) 

864 

865 # Verify the attempt was updated 

866 with session_scope() as session: 

867 attempt = session.execute( 

868 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id) 

869 ).scalar_one() 

870 assert attempt.receipt_checked_at is not None 

871 assert attempt.receipt_status == "ok" 

872 assert attempt.receipt_error_code is None 

873 

874 # Subscription should still be enabled 

875 sub = session.execute( 

876 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id) 

877 ).scalar_one() 

878 assert sub.disabled_at == DATETIME_INFINITY 

879 

880 

881def test_check_expo_push_receipts_device_not_registered(db): 

882 """Test batch receipt checking with DeviceNotRegistered error disables subscription.""" 

883 from datetime import timedelta 

884 from unittest.mock import patch 

885 

886 from google.protobuf import empty_pb2 

887 

888 from couchers.jobs.handlers import check_expo_push_receipts 

889 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome 

890 

891 user, token = generate_user() 

892 

893 # Create a push subscription and delivery attempt 

894 with session_scope() as session: 

895 sub = PushNotificationSubscription( 

896 user_id=user.id, 

897 platform=PushNotificationPlatform.expo, 

898 token="ExponentPushToken[devicegone]", 

899 device_name="Test Device", 

900 device_type=DeviceType.android, 

901 ) 

902 session.add(sub) 

903 session.flush() 

904 

905 attempt = PushNotificationDeliveryAttempt( 

906 push_notification_subscription_id=sub.id, 

907 outcome=PushNotificationDeliveryOutcome.success, 

908 status_code=200, 

909 expo_ticket_id="ticket-device-gone", 

910 ) 

911 session.add(attempt) 

912 session.flush() 

913 # Make the attempt old enough to be checked 

914 attempt.time = now() - timedelta(minutes=15) 

915 attempt_id = attempt.id 

916 sub_id = sub.id 

917 

918 # Mock the receipt API call with DeviceNotRegistered error 

919 with patch("couchers.notifications.expo_api.requests.post") as mock_post: 

920 mock_post.return_value.status_code = 200 

921 mock_post.return_value.json.return_value = { 

922 "data": { 

923 "ticket-device-gone": { 

924 "status": "error", 

925 "details": {"error": "DeviceNotRegistered"}, 

926 } 

927 } 

928 } 

929 

930 check_expo_push_receipts(empty_pb2.Empty()) 

931 

932 # Verify the attempt was updated and subscription disabled 

933 with session_scope() as session: 

934 attempt = session.execute( 

935 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id) 

936 ).scalar_one() 

937 assert attempt.receipt_checked_at is not None 

938 assert attempt.receipt_status == "error" 

939 assert attempt.receipt_error_code == "DeviceNotRegistered" 

940 

941 # Subscription should be disabled 

942 sub = session.execute( 

943 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id) 

944 ).scalar_one() 

945 assert sub.disabled_at <= now() 

946 

947 

948def test_check_expo_push_receipts_not_found(db): 

949 """Test batch receipt checking when ticket not found (expired).""" 

950 from datetime import timedelta 

951 from unittest.mock import patch 

952 

953 from google.protobuf import empty_pb2 

954 

955 from couchers.jobs.handlers import check_expo_push_receipts 

956 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome 

957 

958 user, token = generate_user() 

959 

960 with session_scope() as session: 

961 sub = PushNotificationSubscription( 

962 user_id=user.id, 

963 platform=PushNotificationPlatform.expo, 

964 token="ExponentPushToken[notfound]", 

965 ) 

966 session.add(sub) 

967 session.flush() 

968 

969 attempt = PushNotificationDeliveryAttempt( 

970 push_notification_subscription_id=sub.id, 

971 outcome=PushNotificationDeliveryOutcome.success, 

972 status_code=200, 

973 expo_ticket_id="unknown-ticket", 

974 ) 

975 session.add(attempt) 

976 session.flush() 

977 # Make the attempt old enough to be checked 

978 attempt.time = now() - timedelta(minutes=15) 

979 attempt_id = attempt.id 

980 sub_id = sub.id 

981 

982 # Mock empty receipt response (ticket not found) 

983 with patch("couchers.notifications.expo_api.requests.post") as mock_post: 

984 mock_post.return_value.status_code = 200 

985 mock_post.return_value.json.return_value = {"data": {}} 

986 

987 check_expo_push_receipts(empty_pb2.Empty()) 

988 

989 with session_scope() as session: 

990 attempt = session.execute( 

991 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id) 

992 ).scalar_one() 

993 assert attempt.receipt_checked_at is not None 

994 assert attempt.receipt_status == "not_found" 

995 

996 # Subscription should still be enabled 

997 sub = session.execute( 

998 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id) 

999 ).scalar_one() 

1000 assert sub.disabled_at == DATETIME_INFINITY 

1001 

1002 

1003def test_check_expo_push_receipts_skips_already_checked(db): 

1004 """Test that already-checked receipts are not re-checked.""" 

1005 from datetime import timedelta 

1006 from unittest.mock import patch 

1007 

1008 from google.protobuf import empty_pb2 

1009 

1010 from couchers.jobs.handlers import check_expo_push_receipts 

1011 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome 

1012 

1013 user, token = generate_user() 

1014 

1015 # Create an attempt that was already checked 

1016 with session_scope() as session: 

1017 sub = PushNotificationSubscription( 

1018 user_id=user.id, 

1019 platform=PushNotificationPlatform.expo, 

1020 token="ExponentPushToken[alreadychecked]", 

1021 ) 

1022 session.add(sub) 

1023 session.flush() 

1024 

1025 attempt = PushNotificationDeliveryAttempt( 

1026 push_notification_subscription_id=sub.id, 

1027 outcome=PushNotificationDeliveryOutcome.success, 

1028 status_code=200, 

1029 expo_ticket_id="already-checked-ticket", 

1030 receipt_checked_at=now(), 

1031 receipt_status="ok", 

1032 ) 

1033 session.add(attempt) 

1034 session.flush() 

1035 # Make the attempt old enough 

1036 attempt.time = now() - timedelta(minutes=15) 

1037 

1038 # Should not call the API since the only attempt is already checked 

1039 with patch("couchers.notifications.expo_api.requests.post") as mock_post: 

1040 check_expo_push_receipts(empty_pb2.Empty()) 

1041 mock_post.assert_not_called() 

1042 

1043 

1044def test_check_expo_push_receipts_skips_too_recent(db): 

1045 """Test that too-recent receipts (<15 min) are not checked.""" 

1046 from datetime import timedelta 

1047 from unittest.mock import patch 

1048 

1049 from google.protobuf import empty_pb2 

1050 

1051 from couchers.jobs.handlers import check_expo_push_receipts 

1052 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome 

1053 

1054 user, token = generate_user() 

1055 

1056 # Create a recent attempt (not old enough to check) 

1057 with session_scope() as session: 

1058 sub = PushNotificationSubscription( 

1059 user_id=user.id, 

1060 platform=PushNotificationPlatform.expo, 

1061 token="ExponentPushToken[recent]", 

1062 ) 

1063 session.add(sub) 

1064 session.flush() 

1065 

1066 attempt = PushNotificationDeliveryAttempt( 

1067 push_notification_subscription_id=sub.id, 

1068 outcome=PushNotificationDeliveryOutcome.success, 

1069 status_code=200, 

1070 expo_ticket_id="recent-ticket", 

1071 ) 

1072 session.add(attempt) 

1073 session.flush() 

1074 # Make the attempt only 5 minutes old (too recent) 

1075 attempt.time = now() - timedelta(minutes=5) 

1076 

1077 # Should not call the API since the attempt is too recent 

1078 with patch("couchers.notifications.expo_api.requests.post") as mock_post: 

1079 check_expo_push_receipts(empty_pb2.Empty()) 

1080 mock_post.assert_not_called() 

1081 

1082 

1083def test_check_expo_push_receipts_batch(db): 

1084 """Test that multiple receipts are checked in a single batch.""" 

1085 from datetime import timedelta 

1086 from unittest.mock import patch 

1087 

1088 from google.protobuf import empty_pb2 

1089 

1090 from couchers.jobs.handlers import check_expo_push_receipts 

1091 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome 

1092 

1093 user, token = generate_user() 

1094 

1095 # Create multiple delivery attempts 

1096 attempt_ids = [] 

1097 with session_scope() as session: 

1098 sub = PushNotificationSubscription( 

1099 user_id=user.id, 

1100 platform=PushNotificationPlatform.expo, 

1101 token="ExponentPushToken[batch]", 

1102 ) 

1103 session.add(sub) 

1104 session.flush() 

1105 

1106 for i in range(3): 

1107 attempt = PushNotificationDeliveryAttempt( 

1108 push_notification_subscription_id=sub.id, 

1109 outcome=PushNotificationDeliveryOutcome.success, 

1110 status_code=200, 

1111 expo_ticket_id=f"batch-ticket-{i}", 

1112 ) 

1113 session.add(attempt) 

1114 session.flush() 

1115 attempt.time = now() - timedelta(minutes=20) 

1116 attempt_ids.append(attempt.id) 

1117 

1118 # Mock the batch receipt API call 

1119 with patch("couchers.notifications.expo_api.requests.post") as mock_post: 

1120 mock_post.return_value.status_code = 200 

1121 mock_post.return_value.json.return_value = { 

1122 "data": { 

1123 "batch-ticket-0": {"status": "ok"}, 

1124 "batch-ticket-1": {"status": "ok"}, 

1125 "batch-ticket-2": {"status": "ok"}, 

1126 } 

1127 } 

1128 

1129 check_expo_push_receipts(empty_pb2.Empty()) 

1130 

1131 # Should only call the API once for all tickets 

1132 assert mock_post.call_count == 1 

1133 

1134 # Verify all attempts were updated 

1135 with session_scope() as session: 

1136 for attempt_id in attempt_ids: 

1137 attempt = session.execute( 

1138 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id) 

1139 ).scalar_one() 

1140 assert attempt.receipt_checked_at is not None 

1141 assert attempt.receipt_status == "ok"