Coverage for src/tests/test_notifications.py: 99%
509 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-07 19:51 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-07 19:51 +0000
1import json
2import re
3from urllib.parse import parse_qs, urlparse
5import grpc
6import pytest
7from google.protobuf import empty_pb2, timestamp_pb2
9from couchers.constants import DATETIME_INFINITY
10from couchers.context import make_background_user_context
11from couchers.crypto import b64decode
12from couchers.jobs.worker import process_job
13from couchers.models import (
14 DeviceType,
15 HostingStatus,
16 MeetupStatus,
17 Notification,
18 NotificationDelivery,
19 NotificationDeliveryType,
20 NotificationTopicAction,
21 PushNotificationPlatform,
22 PushNotificationSubscription,
23 User,
24)
25from couchers.notifications.notify import notify
26from couchers.notifications.settings import get_topic_actions_by_delivery_type
27from couchers.proto import (
28 api_pb2,
29 auth_pb2,
30 conversations_pb2,
31 editor_pb2,
32 events_pb2,
33 notification_data_pb2,
34 notifications_pb2,
35)
36from couchers.proto.internal import unsubscribe_pb2
37from couchers.servicers.api import user_model_to_pb
38from couchers.sql import couchers_select as select
39from couchers.templates.v2 import v2timestamp
40from couchers.utils import now
41from tests.test_fixtures import ( # noqa
42 api_session,
43 auth_api_session,
44 conversations_session,
45 db,
46 email_fields,
47 generate_user,
48 mock_notification_email,
49 notifications_session,
50 process_jobs,
51 push_collector,
52 real_admin_session,
53 real_editor_session,
54 session_scope,
55 testconfig,
56)
59@pytest.fixture(autouse=True)
60def _(testconfig):
61 pass
64@pytest.mark.parametrize("enabled", [True, False])
65def test_SetNotificationSettings_preferences_respected_editable(db, enabled):
66 user, token = generate_user()
68 # enable a notification type and check it gets delivered
69 topic_action = NotificationTopicAction.badge__add
71 with notifications_session(token) as notifications:
72 notifications.SetNotificationSettings(
73 notifications_pb2.SetNotificationSettingsReq(
74 preferences=[
75 notifications_pb2.SingleNotificationPreference(
76 topic=topic_action.topic,
77 action=topic_action.action,
78 delivery_method="push",
79 enabled=enabled,
80 )
81 ],
82 )
83 )
85 with session_scope() as session:
86 notify(
87 session,
88 user_id=user.id,
89 topic_action=topic_action.display,
90 data=notification_data_pb2.BadgeAdd(
91 badge_id="volunteer",
92 badge_name="Active Volunteer",
93 badge_description="This user is an active volunteer for Couchers.org",
94 ),
95 )
97 process_job()
99 with session_scope() as session:
100 deliv = session.execute(
101 select(NotificationDelivery)
102 .join(Notification, Notification.id == NotificationDelivery.notification_id)
103 .where(Notification.user_id == user.id)
104 .where(Notification.topic_action == topic_action)
105 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.push)
106 ).scalar_one_or_none()
108 if enabled:
109 assert deliv is not None
110 else:
111 assert deliv is None
114def test_SetNotificationSettings_preferences_not_editable(db):
115 user, token = generate_user()
117 # enable a notification type and check it gets delivered
118 topic_action = NotificationTopicAction.password_reset__start
120 with notifications_session(token) as notifications:
121 with pytest.raises(grpc.RpcError) as e:
122 notifications.SetNotificationSettings(
123 notifications_pb2.SetNotificationSettingsReq(
124 preferences=[
125 notifications_pb2.SingleNotificationPreference(
126 topic=topic_action.topic,
127 action=topic_action.action,
128 delivery_method="push",
129 enabled=False,
130 )
131 ],
132 )
133 )
134 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
135 assert e.value.details() == "That notification preference is not user editable."
138def test_unsubscribe(db):
139 # this is the ugliest test i've written
141 user, token = generate_user()
143 topic_action = NotificationTopicAction.badge__add
145 # first enable email notifs
146 with notifications_session(token) as notifications:
147 notifications.SetNotificationSettings(
148 notifications_pb2.SetNotificationSettingsReq(
149 preferences=[
150 notifications_pb2.SingleNotificationPreference(
151 topic=topic_action.topic,
152 action=topic_action.action,
153 delivery_method=method,
154 enabled=enabled,
155 )
156 for method, enabled in [("email", True), ("digest", False), ("push", False)]
157 ],
158 )
159 )
161 with mock_notification_email() as mock:
162 with session_scope() as session:
163 notify(
164 session,
165 user_id=user.id,
166 topic_action=topic_action.display,
167 data=notification_data_pb2.BadgeAdd(
168 badge_id="volunteer",
169 badge_name="Active Volunteer",
170 badge_description="This user is an active volunteer for Couchers.org",
171 ),
172 )
174 assert mock.call_count == 1
175 assert email_fields(mock).recipient == user.email
176 # very ugly
177 # http://localhost:3000/quick-link?payload=CAEiGAoOZnJpZW5kX3JlcXVlc3QSBmFjY2VwdA==&sig=BQdk024NTATm8zlR0krSXTBhP5U9TlFv7VhJeIHZtUg=
178 for link in re.findall(r'<a href="(.*?)"', email_fields(mock).html):
179 if "payload" not in link:
180 continue
181 print(link)
182 url_parts = urlparse(link)
183 params = parse_qs(url_parts.query)
184 print(params["payload"][0])
185 payload = unsubscribe_pb2.UnsubscribePayload.FromString(b64decode(params["payload"][0]))
186 if payload.HasField("topic_action"):
187 with auth_api_session() as (auth_api, metadata_interceptor):
188 assert (
189 auth_api.Unsubscribe(
190 auth_pb2.UnsubscribeReq(
191 payload=b64decode(params["payload"][0]),
192 sig=b64decode(params["sig"][0]),
193 )
194 ).response
195 == "You've been unsubscribed from email notifications of that type."
196 )
197 break
198 else:
199 raise Exception("Didn't find link")
201 with notifications_session(token) as notifications:
202 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq())
204 for group in res.groups:
205 for topic in group.topics:
206 for item in topic.items:
207 if topic == topic_action.topic and item == topic_action.action:
208 assert not item.email
210 with mock_notification_email() as mock:
211 with session_scope() as session:
212 notify(
213 session,
214 user_id=user.id,
215 topic_action=topic_action.display,
216 data=notification_data_pb2.BadgeAdd(
217 badge_id="volunteer",
218 badge_name="Active Volunteer",
219 badge_description="This user is an active volunteer for Couchers.org",
220 ),
221 )
223 assert mock.call_count == 0
226def test_unsubscribe_do_not_email(db):
227 user, token = generate_user()
229 _, token2 = generate_user(complete_profile=True)
230 with mock_notification_email() as mock:
231 with api_session(token2) as api:
232 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id))
234 assert mock.call_count == 1
235 assert email_fields(mock).recipient == user.email
236 # very ugly
237 # http://localhost:3000/quick-link?payload=CAEiGAoOZnJpZW5kX3JlcXVlc3QSBmFjY2VwdA==&sig=BQdk024NTATm8zlR0krSXTBhP5U9TlFv7VhJeIHZtUg=
238 for link in re.findall(r'<a href="(.*?)"', email_fields(mock).html):
239 if "payload" not in link:
240 continue
241 print(link)
242 url_parts = urlparse(link)
243 params = parse_qs(url_parts.query)
244 print(params["payload"][0])
245 payload = unsubscribe_pb2.UnsubscribePayload.FromString(b64decode(params["payload"][0]))
246 if payload.HasField("do_not_email"):
247 with auth_api_session() as (auth_api, metadata_interceptor):
248 assert (
249 auth_api.Unsubscribe(
250 auth_pb2.UnsubscribeReq(
251 payload=b64decode(params["payload"][0]),
252 sig=b64decode(params["sig"][0]),
253 )
254 ).response
255 == "You will not receive any non-security emails, and your hosting status has been turned off. You may still receive the newsletter, and need to unsubscribe from it separately."
256 )
257 break
258 else:
259 raise Exception("Didn't find link")
261 _, token3 = generate_user(complete_profile=True)
262 with mock_notification_email() as mock:
263 with api_session(token3) as api:
264 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id))
266 assert mock.call_count == 0
268 with session_scope() as session:
269 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one()
270 assert user_.do_not_email
273def test_get_do_not_email(db):
274 _, token = generate_user()
276 with session_scope() as session:
277 user = session.execute(select(User)).scalar_one()
278 user.do_not_email = False
280 with notifications_session(token) as notifications:
281 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq())
282 assert not res.do_not_email_enabled
284 with session_scope() as session:
285 user = session.execute(select(User)).scalar_one()
286 user.do_not_email = True
287 user.hosting_status = HostingStatus.cant_host
288 user.meetup_status = MeetupStatus.does_not_want_to_meetup
290 with notifications_session(token) as notifications:
291 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq())
292 assert res.do_not_email_enabled
295def test_set_do_not_email(db):
296 _, token = generate_user()
298 with session_scope() as session:
299 user = session.execute(select(User)).scalar_one()
300 user.do_not_email = False
301 user.hosting_status = HostingStatus.can_host
302 user.meetup_status = MeetupStatus.wants_to_meetup
304 with notifications_session(token) as notifications:
305 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=False))
307 with session_scope() as session:
308 user = session.execute(select(User)).scalar_one()
309 assert not user.do_not_email
311 with notifications_session(token) as notifications:
312 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=True))
314 with session_scope() as session:
315 user = session.execute(select(User)).scalar_one()
316 assert user.do_not_email
317 assert user.hosting_status == HostingStatus.cant_host
318 assert user.meetup_status == MeetupStatus.does_not_want_to_meetup
320 with notifications_session(token) as notifications:
321 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=False))
323 with session_scope() as session:
324 user = session.execute(select(User)).scalar_one()
325 assert not user.do_not_email
328def test_list_notifications(db, push_collector):
329 user1, token1 = generate_user()
330 user2, token2 = generate_user()
332 with api_session(token2) as api:
333 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
335 with notifications_session(token1) as notifications:
336 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq())
337 assert len(res.notifications) == 1
339 n = res.notifications[0]
341 assert n.topic == "friend_request"
342 assert n.action == "create"
343 assert n.key == "2"
344 assert n.title == f"{user2.name} wants to be your friend"
345 assert n.body == f"You've received a friend request from {user2.name}"
346 assert n.icon.startswith("http://localhost:5001/img/thumbnail/")
347 assert n.url == "http://localhost:3000/connections/friends/"
349 with conversations_session(token2) as c:
350 res = c.CreateGroupChat(conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user1.id]))
351 group_chat_id = res.group_chat_id
352 for i in range(17):
353 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text=f"Test message {i}"))
355 process_jobs()
357 all_notifs = []
358 with notifications_session(token1) as notifications:
359 page_token = None
360 for _ in range(100):
361 res = notifications.ListNotifications(
362 notifications_pb2.ListNotificationsReq(
363 page_size=5,
364 page_token=page_token,
365 )
366 )
367 assert len(res.notifications) == 5 or not res.next_page_token
368 all_notifs += res.notifications
369 page_token = res.next_page_token
370 if not page_token:
371 break
373 bodys = [f"Test message {16 - i}" for i in range(17)] + [f"You've received a friend request from {user2.name}"]
374 assert bodys == [n.body for n in all_notifs]
377def test_notifications_seen(db, push_collector):
378 user1, token1 = generate_user()
379 user2, token2 = generate_user()
380 user3, token3 = generate_user()
381 user4, token4 = generate_user()
383 with api_session(token2) as api:
384 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
386 with api_session(token3) as api:
387 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
389 with notifications_session(token1) as notifications, api_session(token1) as api:
390 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq())
391 assert len(res.notifications) == 2
392 assert [n.is_seen for n in res.notifications] == [False, False]
393 notification_ids = [n.notification_id for n in res.notifications]
394 # should be listed desc time
395 assert notification_ids[0] > notification_ids[1]
397 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2
399 with api_session(token4) as api:
400 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
402 with notifications_session(token1) as notifications, api_session(token1) as api:
403 # mark everything before just the last one as seen (pretend we didn't load the last one yet in the api)
404 notifications.MarkAllNotificationsSeen(
405 notifications_pb2.MarkAllNotificationsSeenReq(latest_notification_id=notification_ids[0])
406 )
408 # last one is still unseen
409 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 1
411 # mark the first one unseen
412 notifications.MarkNotificationSeen(
413 notifications_pb2.MarkNotificationSeenReq(notification_id=notification_ids[1], set_seen=False)
414 )
415 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2
417 # mark the last one seen
418 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq())
419 assert len(res.notifications) == 3
420 assert [n.is_seen for n in res.notifications] == [False, True, False]
421 notification_ids2 = [n.notification_id for n in res.notifications]
423 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2
425 notifications.MarkNotificationSeen(
426 notifications_pb2.MarkNotificationSeenReq(notification_id=notification_ids2[0], set_seen=True)
427 )
429 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq())
430 assert len(res.notifications) == 3
431 assert [n.is_seen for n in res.notifications] == [True, True, False]
433 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 1
436def test_GetVapidPublicKey(db):
437 _, token = generate_user()
439 with notifications_session(token) as notifications:
440 assert (
441 notifications.GetVapidPublicKey(empty_pb2.Empty()).vapid_public_key
442 == "BApMo2tGuon07jv-pEaAKZmVo6E_d4HfcdDeV6wx2k9wV8EovJ0ve00bdLzZm9fizDrGZXRYJFqCcRJUfBcgA0A"
443 )
446def test_RegisterPushNotificationSubscription(db):
447 _, token = generate_user()
449 subscription_info = {
450 "endpoint": "https://updates.push.services.mozilla.com/wpush/v2/gAAAAABmW2_iYKVyZRJPhAhktbkXd6Bc8zjIUvtVi5diYL7ZYn8FHka94kIdF46Mp8DwCDWlACnbKOEo97ikaa7JYowGLiGz3qsWL7Vo19LaV4I71mUDUOIKxWIsfp_kM77MlRJQKDUddv-sYyiffOyg63d1lnc_BMIyLXt69T5SEpfnfWTNb6I",
451 "expirationTime": None,
452 "keys": {
453 "auth": "TnuEJ1OdfEkf6HKcUovl0Q",
454 "p256dh": "BK7Rp8og3eFJPqm0ofR8F-l2mtNCCCWYo6f_5kSs8jPEFiKetnZHNOglvC6IrgU9vHmgFHlG7gHGtB1HM599sy0",
455 },
456 }
458 with notifications_session(token) as notifications:
459 res = notifications.RegisterPushNotificationSubscription(
460 notifications_pb2.RegisterPushNotificationSubscriptionReq(
461 full_subscription_json=json.dumps(subscription_info),
462 )
463 )
466def test_SendTestPushNotification(db, push_collector):
467 user, token = generate_user()
469 with notifications_session(token) as notifications:
470 notifications.SendTestPushNotification(empty_pb2.Empty())
472 push_collector.assert_user_has_count(user.id, 1)
473 push_collector.assert_user_push_matches_fields(
474 user.id,
475 title="Checking push notifications work!",
476 body="If you see this, then it's working :)",
477 )
479 # the above two are equivalent to this
481 push_collector.assert_user_has_single_matching(
482 user.id,
483 title="Checking push notifications work!",
484 body="If you see this, then it's working :)",
485 )
488def test_SendBlogPostNotification(db, push_collector):
489 super_user, super_token = generate_user(is_superuser=True)
491 user1, user1_token = generate_user()
492 # enabled email
493 user2, user2_token = generate_user()
494 # disabled push
495 user3, user3_token = generate_user()
497 topic_action = NotificationTopicAction.general__new_blog_post
499 with notifications_session(user2_token) as notifications:
500 notifications.SetNotificationSettings(
501 notifications_pb2.SetNotificationSettingsReq(
502 preferences=[
503 notifications_pb2.SingleNotificationPreference(
504 topic=topic_action.topic,
505 action=topic_action.action,
506 delivery_method="email",
507 enabled=True,
508 )
509 ],
510 )
511 )
513 with notifications_session(user3_token) as notifications:
514 notifications.SetNotificationSettings(
515 notifications_pb2.SetNotificationSettingsReq(
516 preferences=[
517 notifications_pb2.SingleNotificationPreference(
518 topic=topic_action.topic,
519 action=topic_action.action,
520 delivery_method="push",
521 enabled=False,
522 )
523 ],
524 )
525 )
527 with mock_notification_email() as mock:
528 with real_editor_session(super_token) as editor_api:
529 editor_api.SendBlogPostNotification(
530 editor_pb2.SendBlogPostNotificationReq(
531 title="Couchers.org v0.9.9 Release Notes",
532 blurb="Read about last major updates before v1!",
533 url="https://couchers.org/blog/2025/05/11/v0.9.9-release",
534 )
535 )
537 process_jobs()
539 assert mock.call_count == 1
540 assert email_fields(mock).recipient == user2.email
541 assert "Couchers.org v0.9.9 Release Notes" in email_fields(mock).html
542 assert "Couchers.org v0.9.9 Release Notes" in email_fields(mock).plain
543 assert "Read about last major updates before v1!" in email_fields(mock).html
544 assert "Read about last major updates before v1!" in email_fields(mock).plain
545 assert "https://couchers.org/blog/2025/05/11/v0.9.9-release" in email_fields(mock).html
546 assert "https://couchers.org/blog/2025/05/11/v0.9.9-release" in email_fields(mock).plain
548 push_collector.assert_user_has_count(user1.id, 1)
549 push_collector.assert_user_push_matches_fields(
550 user1.id,
551 title="New blog post: Couchers.org v0.9.9 Release Notes",
552 body="Read about last major updates before v1!",
553 url="https://couchers.org/blog/2025/05/11/v0.9.9-release",
554 )
556 push_collector.assert_user_has_count(user2.id, 1)
557 push_collector.assert_user_push_matches_fields(
558 user2.id,
559 title="New blog post: Couchers.org v0.9.9 Release Notes",
560 body="Read about last major updates before v1!",
561 url="https://couchers.org/blog/2025/05/11/v0.9.9-release",
562 )
564 push_collector.assert_user_has_count(user3.id, 0)
567def test_get_topic_actions_by_delivery_type(db):
568 user, token = generate_user()
570 # these are enabled by default
571 assert NotificationDeliveryType.push in NotificationTopicAction.reference__receive_friend.defaults
572 assert NotificationDeliveryType.push in NotificationTopicAction.host_request__accept.defaults
574 # these are disabled by default
575 assert NotificationDeliveryType.push not in NotificationTopicAction.event__create_any.defaults
576 assert NotificationDeliveryType.push not in NotificationTopicAction.discussion__create.defaults
578 with notifications_session(token) as notifications:
579 notifications.SetNotificationSettings(
580 notifications_pb2.SetNotificationSettingsReq(
581 preferences=[
582 notifications_pb2.SingleNotificationPreference(
583 topic=NotificationTopicAction.reference__receive_friend.topic,
584 action=NotificationTopicAction.reference__receive_friend.action,
585 delivery_method="push",
586 enabled=False,
587 ),
588 notifications_pb2.SingleNotificationPreference(
589 topic=NotificationTopicAction.event__create_any.topic,
590 action=NotificationTopicAction.event__create_any.action,
591 delivery_method="push",
592 enabled=True,
593 ),
594 ],
595 )
596 )
598 with session_scope() as session:
599 deliver = get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push)
600 assert NotificationTopicAction.reference__receive_friend not in deliver
601 assert NotificationTopicAction.host_request__accept in deliver
602 assert NotificationTopicAction.event__create_any in deliver
603 assert NotificationTopicAction.discussion__create not in deliver
604 assert NotificationTopicAction.account_deletion__start in deliver
607def test_event_reminder_email_sent(db):
608 user, token = generate_user()
609 title = "Board Game Night"
610 start_event_time = timestamp_pb2.Timestamp(seconds=1751690400)
611 expected_time_str = v2timestamp(start_event_time, user)
613 with mock_notification_email() as mock:
614 with session_scope() as session:
615 user_in_session = session.get(User, user.id)
617 notify(
618 session,
619 user_id=user.id,
620 topic_action="event:reminder",
621 data=notification_data_pb2.EventReminder(
622 event=events_pb2.Event(
623 event_id=1,
624 slug="board-game-night",
625 title=title,
626 start_time=start_event_time,
627 ),
628 user=user_model_to_pb(user_in_session, session, make_background_user_context(user_id=user.id)),
629 ),
630 )
632 assert mock.call_count == 1
633 assert email_fields(mock).recipient == user.email
634 assert title in email_fields(mock).html
635 assert title in email_fields(mock).plain
636 assert expected_time_str in email_fields(mock).html
637 assert expected_time_str in email_fields(mock).plain
640def test_RegisterMobilePushNotificationSubscription(db):
641 user, token = generate_user()
643 with notifications_session(token) as notifications:
644 notifications.RegisterMobilePushNotificationSubscription(
645 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
646 token="ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx]",
647 device_name="My iPhone",
648 device_type="ios",
649 )
650 )
652 # Check subscription was created
653 with session_scope() as session:
654 sub = session.execute(
655 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id)
656 ).scalar_one()
657 assert sub.platform == PushNotificationPlatform.expo
658 assert sub.token == "ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx]"
659 assert sub.device_name == "My iPhone"
660 assert sub.device_type == DeviceType.ios
661 assert sub.disabled_at == DATETIME_INFINITY
664def test_RegisterMobilePushNotificationSubscription_android(db):
665 user, token = generate_user()
667 with notifications_session(token) as notifications:
668 notifications.RegisterMobilePushNotificationSubscription(
669 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
670 token="ExponentPushToken[yyyyyyyyyyyyyyyyyyyyyy]",
671 device_name="My Android",
672 device_type="android",
673 )
674 )
676 with session_scope() as session:
677 sub = session.execute(
678 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id)
679 ).scalar_one()
680 assert sub.platform == PushNotificationPlatform.expo
681 assert sub.device_type == DeviceType.android
684def test_RegisterMobilePushNotificationSubscription_no_device_type(db):
685 user, token = generate_user()
687 with notifications_session(token) as notifications:
688 notifications.RegisterMobilePushNotificationSubscription(
689 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
690 token="ExponentPushToken[zzzzzzzzzzzzzzzzzzzzzz]",
691 )
692 )
694 with session_scope() as session:
695 sub = session.execute(
696 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id)
697 ).scalar_one()
698 assert sub.platform == PushNotificationPlatform.expo
699 assert sub.device_name is None
700 assert sub.device_type is None
703def test_RegisterMobilePushNotificationSubscription_re_enable(db):
704 user, token = generate_user()
706 # Create a disabled subscription directly in the DB
707 with session_scope() as session:
708 sub = PushNotificationSubscription(
709 user_id=user.id,
710 platform=PushNotificationPlatform.expo,
711 token="ExponentPushToken[reeeeeeeeeeeeeeeeeeeee]",
712 device_name="Old Device",
713 device_type=DeviceType.ios,
714 disabled_at=now(),
715 )
716 session.add(sub)
717 session.flush()
718 sub_id = sub.id
720 # Re-register with the same token
721 with notifications_session(token) as notifications:
722 notifications.RegisterMobilePushNotificationSubscription(
723 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
724 token="ExponentPushToken[reeeeeeeeeeeeeeeeeeeee]",
725 device_name="New Device Name",
726 device_type="android",
727 )
728 )
730 # Check subscription was re-enabled and updated
731 with session_scope() as session:
732 sub = session.execute(
733 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id)
734 ).scalar_one()
735 assert sub.disabled_at == DATETIME_INFINITY
736 assert sub.device_name == "New Device Name"
737 assert sub.device_type == DeviceType.android
740def test_RegisterMobilePushNotificationSubscription_already_exists(db):
741 user, token = generate_user()
743 # Create an active subscription directly in the DB
744 with session_scope() as session:
745 sub = PushNotificationSubscription(
746 user_id=user.id,
747 platform=PushNotificationPlatform.expo,
748 token="ExponentPushToken[existingtoken]",
749 device_name="Existing Device",
750 device_type=DeviceType.ios,
751 )
752 session.add(sub)
754 # Try to register with the same token - should just return without error
755 with notifications_session(token) as notifications:
756 notifications.RegisterMobilePushNotificationSubscription(
757 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
758 token="ExponentPushToken[existingtoken]",
759 device_name="Different Name",
760 )
761 )
763 # Check subscription was NOT modified (already active)
764 with session_scope() as session:
765 sub = session.execute(
766 select(PushNotificationSubscription).where(
767 PushNotificationSubscription.token == "ExponentPushToken[existingtoken]"
768 )
769 ).scalar_one()
770 assert sub.device_name == "Existing Device" # unchanged
773def test_SendTestMobilePushNotification(db, push_collector):
774 user, token = generate_user()
776 with notifications_session(token) as notifications:
777 notifications.SendTestMobilePushNotification(empty_pb2.Empty())
779 push_collector.assert_user_has_single_matching(
780 user.id,
781 title="Checking mobile push notifications work!",
782 body="If you see this on your phone, everything is wired up correctly 🎉",
783 )
786def test_get_expo_push_receipts(db):
787 from unittest.mock import Mock, patch
789 from couchers.notifications.expo_api import get_expo_push_receipts
791 mock_response = Mock()
792 mock_response.status_code = 200
793 mock_response.json.return_value = {
794 "data": {
795 "ticket-1": {"status": "ok"},
796 "ticket-2": {"status": "error", "details": {"error": "DeviceNotRegistered"}},
797 }
798 }
800 with patch("couchers.notifications.expo_api.requests.post", return_value=mock_response) as mock_post:
801 result = get_expo_push_receipts(["ticket-1", "ticket-2"])
803 mock_post.assert_called_once()
804 call_args = mock_post.call_args
805 assert call_args[0][0] == "https://exp.host/--/api/v2/push/getReceipts"
806 assert call_args[1]["json"] == {"ids": ["ticket-1", "ticket-2"]}
808 assert result == {
809 "ticket-1": {"status": "ok"},
810 "ticket-2": {"status": "error", "details": {"error": "DeviceNotRegistered"}},
811 }
814def test_get_expo_push_receipts_empty(db):
815 from couchers.notifications.expo_api import get_expo_push_receipts
817 result = get_expo_push_receipts([])
818 assert result == {}
821def test_check_expo_push_receipts_success(db):
822 """Test batch receipt checking with successful delivery."""
823 from datetime import timedelta
824 from unittest.mock import patch
826 from google.protobuf import empty_pb2
828 from couchers.jobs.handlers import check_expo_push_receipts
829 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome
831 user, token = generate_user()
833 # Create a push subscription and delivery attempt (old enough to be checked)
834 with session_scope() as session:
835 sub = PushNotificationSubscription(
836 user_id=user.id,
837 platform=PushNotificationPlatform.expo,
838 token="ExponentPushToken[testtoken123]",
839 device_name="Test Device",
840 device_type=DeviceType.ios,
841 )
842 session.add(sub)
843 session.flush()
845 attempt = PushNotificationDeliveryAttempt(
846 push_notification_subscription_id=sub.id,
847 outcome=PushNotificationDeliveryOutcome.success,
848 status_code=200,
849 expo_ticket_id="test-ticket-id",
850 )
851 session.add(attempt)
852 session.flush()
853 # Make the attempt old enough to be checked (>15 min)
854 attempt.time = now() - timedelta(minutes=20)
855 attempt_id = attempt.id
856 sub_id = sub.id
858 # Mock the receipt API call
859 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
860 mock_post.return_value.status_code = 200
861 mock_post.return_value.json.return_value = {"data": {"test-ticket-id": {"status": "ok"}}}
863 check_expo_push_receipts(empty_pb2.Empty())
865 # Verify the attempt was updated
866 with session_scope() as session:
867 attempt = session.execute(
868 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id)
869 ).scalar_one()
870 assert attempt.receipt_checked_at is not None
871 assert attempt.receipt_status == "ok"
872 assert attempt.receipt_error_code is None
874 # Subscription should still be enabled
875 sub = session.execute(
876 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id)
877 ).scalar_one()
878 assert sub.disabled_at == DATETIME_INFINITY
881def test_check_expo_push_receipts_device_not_registered(db):
882 """Test batch receipt checking with DeviceNotRegistered error disables subscription."""
883 from datetime import timedelta
884 from unittest.mock import patch
886 from google.protobuf import empty_pb2
888 from couchers.jobs.handlers import check_expo_push_receipts
889 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome
891 user, token = generate_user()
893 # Create a push subscription and delivery attempt
894 with session_scope() as session:
895 sub = PushNotificationSubscription(
896 user_id=user.id,
897 platform=PushNotificationPlatform.expo,
898 token="ExponentPushToken[devicegone]",
899 device_name="Test Device",
900 device_type=DeviceType.android,
901 )
902 session.add(sub)
903 session.flush()
905 attempt = PushNotificationDeliveryAttempt(
906 push_notification_subscription_id=sub.id,
907 outcome=PushNotificationDeliveryOutcome.success,
908 status_code=200,
909 expo_ticket_id="ticket-device-gone",
910 )
911 session.add(attempt)
912 session.flush()
913 # Make the attempt old enough to be checked
914 attempt.time = now() - timedelta(minutes=15)
915 attempt_id = attempt.id
916 sub_id = sub.id
918 # Mock the receipt API call with DeviceNotRegistered error
919 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
920 mock_post.return_value.status_code = 200
921 mock_post.return_value.json.return_value = {
922 "data": {
923 "ticket-device-gone": {
924 "status": "error",
925 "details": {"error": "DeviceNotRegistered"},
926 }
927 }
928 }
930 check_expo_push_receipts(empty_pb2.Empty())
932 # Verify the attempt was updated and subscription disabled
933 with session_scope() as session:
934 attempt = session.execute(
935 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id)
936 ).scalar_one()
937 assert attempt.receipt_checked_at is not None
938 assert attempt.receipt_status == "error"
939 assert attempt.receipt_error_code == "DeviceNotRegistered"
941 # Subscription should be disabled
942 sub = session.execute(
943 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id)
944 ).scalar_one()
945 assert sub.disabled_at <= now()
948def test_check_expo_push_receipts_not_found(db):
949 """Test batch receipt checking when ticket not found (expired)."""
950 from datetime import timedelta
951 from unittest.mock import patch
953 from google.protobuf import empty_pb2
955 from couchers.jobs.handlers import check_expo_push_receipts
956 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome
958 user, token = generate_user()
960 with session_scope() as session:
961 sub = PushNotificationSubscription(
962 user_id=user.id,
963 platform=PushNotificationPlatform.expo,
964 token="ExponentPushToken[notfound]",
965 )
966 session.add(sub)
967 session.flush()
969 attempt = PushNotificationDeliveryAttempt(
970 push_notification_subscription_id=sub.id,
971 outcome=PushNotificationDeliveryOutcome.success,
972 status_code=200,
973 expo_ticket_id="unknown-ticket",
974 )
975 session.add(attempt)
976 session.flush()
977 # Make the attempt old enough to be checked
978 attempt.time = now() - timedelta(minutes=15)
979 attempt_id = attempt.id
980 sub_id = sub.id
982 # Mock empty receipt response (ticket not found)
983 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
984 mock_post.return_value.status_code = 200
985 mock_post.return_value.json.return_value = {"data": {}}
987 check_expo_push_receipts(empty_pb2.Empty())
989 with session_scope() as session:
990 attempt = session.execute(
991 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id)
992 ).scalar_one()
993 assert attempt.receipt_checked_at is not None
994 assert attempt.receipt_status == "not_found"
996 # Subscription should still be enabled
997 sub = session.execute(
998 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id)
999 ).scalar_one()
1000 assert sub.disabled_at == DATETIME_INFINITY
1003def test_check_expo_push_receipts_skips_already_checked(db):
1004 """Test that already-checked receipts are not re-checked."""
1005 from datetime import timedelta
1006 from unittest.mock import patch
1008 from google.protobuf import empty_pb2
1010 from couchers.jobs.handlers import check_expo_push_receipts
1011 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome
1013 user, token = generate_user()
1015 # Create an attempt that was already checked
1016 with session_scope() as session:
1017 sub = PushNotificationSubscription(
1018 user_id=user.id,
1019 platform=PushNotificationPlatform.expo,
1020 token="ExponentPushToken[alreadychecked]",
1021 )
1022 session.add(sub)
1023 session.flush()
1025 attempt = PushNotificationDeliveryAttempt(
1026 push_notification_subscription_id=sub.id,
1027 outcome=PushNotificationDeliveryOutcome.success,
1028 status_code=200,
1029 expo_ticket_id="already-checked-ticket",
1030 receipt_checked_at=now(),
1031 receipt_status="ok",
1032 )
1033 session.add(attempt)
1034 session.flush()
1035 # Make the attempt old enough
1036 attempt.time = now() - timedelta(minutes=15)
1038 # Should not call the API since the only attempt is already checked
1039 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
1040 check_expo_push_receipts(empty_pb2.Empty())
1041 mock_post.assert_not_called()
1044def test_check_expo_push_receipts_skips_too_recent(db):
1045 """Test that too-recent receipts (<15 min) are not checked."""
1046 from datetime import timedelta
1047 from unittest.mock import patch
1049 from google.protobuf import empty_pb2
1051 from couchers.jobs.handlers import check_expo_push_receipts
1052 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome
1054 user, token = generate_user()
1056 # Create a recent attempt (not old enough to check)
1057 with session_scope() as session:
1058 sub = PushNotificationSubscription(
1059 user_id=user.id,
1060 platform=PushNotificationPlatform.expo,
1061 token="ExponentPushToken[recent]",
1062 )
1063 session.add(sub)
1064 session.flush()
1066 attempt = PushNotificationDeliveryAttempt(
1067 push_notification_subscription_id=sub.id,
1068 outcome=PushNotificationDeliveryOutcome.success,
1069 status_code=200,
1070 expo_ticket_id="recent-ticket",
1071 )
1072 session.add(attempt)
1073 session.flush()
1074 # Make the attempt only 5 minutes old (too recent)
1075 attempt.time = now() - timedelta(minutes=5)
1077 # Should not call the API since the attempt is too recent
1078 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
1079 check_expo_push_receipts(empty_pb2.Empty())
1080 mock_post.assert_not_called()
1083def test_check_expo_push_receipts_batch(db):
1084 """Test that multiple receipts are checked in a single batch."""
1085 from datetime import timedelta
1086 from unittest.mock import patch
1088 from google.protobuf import empty_pb2
1090 from couchers.jobs.handlers import check_expo_push_receipts
1091 from couchers.models import PushNotificationDeliveryAttempt, PushNotificationDeliveryOutcome
1093 user, token = generate_user()
1095 # Create multiple delivery attempts
1096 attempt_ids = []
1097 with session_scope() as session:
1098 sub = PushNotificationSubscription(
1099 user_id=user.id,
1100 platform=PushNotificationPlatform.expo,
1101 token="ExponentPushToken[batch]",
1102 )
1103 session.add(sub)
1104 session.flush()
1106 for i in range(3):
1107 attempt = PushNotificationDeliveryAttempt(
1108 push_notification_subscription_id=sub.id,
1109 outcome=PushNotificationDeliveryOutcome.success,
1110 status_code=200,
1111 expo_ticket_id=f"batch-ticket-{i}",
1112 )
1113 session.add(attempt)
1114 session.flush()
1115 attempt.time = now() - timedelta(minutes=20)
1116 attempt_ids.append(attempt.id)
1118 # Mock the batch receipt API call
1119 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
1120 mock_post.return_value.status_code = 200
1121 mock_post.return_value.json.return_value = {
1122 "data": {
1123 "batch-ticket-0": {"status": "ok"},
1124 "batch-ticket-1": {"status": "ok"},
1125 "batch-ticket-2": {"status": "ok"},
1126 }
1127 }
1129 check_expo_push_receipts(empty_pb2.Empty())
1131 # Should only call the API once for all tickets
1132 assert mock_post.call_count == 1
1134 # Verify all attempts were updated
1135 with session_scope() as session:
1136 for attempt_id in attempt_ids:
1137 attempt = session.execute(
1138 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id)
1139 ).scalar_one()
1140 assert attempt.receipt_checked_at is not None
1141 assert attempt.receipt_status == "ok"