Coverage for app/backend/src/tests/test_notifications.py: 99%
781 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import html
2import json
3import re
4from datetime import timedelta
5from unittest.mock import Mock, patch
6from urllib.parse import parse_qs, urlparse
8import grpc
9import pytest
10from google.protobuf import empty_pb2, timestamp_pb2
11from sqlalchemy import select, update
13from couchers.config import config
14from couchers.constants import DATETIME_INFINITY
15from couchers.context import make_background_user_context
16from couchers.crypto import b64decode
17from couchers.db import session_scope
18from couchers.i18n import LocalizationContext
19from couchers.jobs.handlers import check_expo_push_receipts
20from couchers.jobs.worker import process_job
21from couchers.models import (
22 DeviceType,
23 HostingStatus,
24 MeetupStatus,
25 Notification,
26 NotificationDelivery,
27 NotificationDeliveryType,
28 NotificationTopicAction,
29 PushNotificationDeliveryAttempt,
30 PushNotificationDeliveryOutcome,
31 PushNotificationPlatform,
32 PushNotificationSubscription,
33 User,
34)
35from couchers.notifications.background import handle_notification
36from couchers.notifications.expo_api import get_expo_push_receipts
37from couchers.notifications.notify import notify
38from couchers.notifications.settings import get_topic_actions_by_delivery_type
39from couchers.proto import (
40 api_pb2,
41 auth_pb2,
42 conversations_pb2,
43 editor_pb2,
44 events_pb2,
45 notification_data_pb2,
46 notifications_pb2,
47)
48from couchers.proto.internal import jobs_pb2, unsubscribe_pb2
49from couchers.servicers.api import user_model_to_pb
50from couchers.utils import not_none, now
51from tests.fixtures.db import generate_user
52from tests.fixtures.misc import EmailCollector, PushCollector, process_jobs
53from tests.fixtures.sessions import (
54 api_session,
55 auth_api_session,
56 conversations_session,
57 notifications_session,
58 real_editor_session,
59)
62@pytest.fixture(autouse=True)
63def _(testconfig):
64 pass
67@pytest.mark.parametrize("enabled", [True, False])
68def test_SetNotificationSettings_preferences_respected_editable(db, enabled):
69 user, token = generate_user()
71 # enable a notification type and check it gets delivered
72 topic_action = NotificationTopicAction.badge__add
74 with notifications_session(token) as notifications:
75 notifications.SetNotificationSettings(
76 notifications_pb2.SetNotificationSettingsReq(
77 preferences=[
78 notifications_pb2.SingleNotificationPreference(
79 topic=topic_action.topic,
80 action=topic_action.action,
81 delivery_method="push",
82 enabled=enabled,
83 )
84 ],
85 )
86 )
88 with session_scope() as session:
89 notify(
90 session,
91 user_id=user.id,
92 topic_action=topic_action,
93 key="",
94 data=notification_data_pb2.BadgeAdd(
95 badge_id="volunteer",
96 badge_name="Active Volunteer",
97 badge_description="This user is an active volunteer for Couchers.org",
98 ),
99 )
101 process_job()
103 with session_scope() as session:
104 deliv = session.execute(
105 select(NotificationDelivery)
106 .join(Notification, Notification.id == NotificationDelivery.notification_id)
107 .where(Notification.user_id == user.id)
108 .where(Notification.topic_action == topic_action)
109 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.push)
110 ).scalar_one_or_none()
112 if enabled:
113 assert deliv is not None
114 else:
115 assert deliv is None
118def test_SetNotificationSettings_preferences_not_editable(db):
119 user, token = generate_user()
121 # enable a notification type and check it gets delivered
122 topic_action = NotificationTopicAction.password_reset__start
124 with notifications_session(token) as notifications:
125 with pytest.raises(grpc.RpcError) as e:
126 notifications.SetNotificationSettings(
127 notifications_pb2.SetNotificationSettingsReq(
128 preferences=[
129 notifications_pb2.SingleNotificationPreference(
130 topic=topic_action.topic,
131 action=topic_action.action,
132 delivery_method="push",
133 enabled=False,
134 )
135 ],
136 )
137 )
138 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
139 assert e.value.details() == "That notification preference is not user editable."
142def test_unsubscribe(db, email_collector: EmailCollector):
143 # this is the ugliest test i've written
145 user, token = generate_user()
147 topic_action = NotificationTopicAction.badge__add
149 # first enable email notifs
150 with notifications_session(token) as notifications:
151 notifications.SetNotificationSettings(
152 notifications_pb2.SetNotificationSettingsReq(
153 preferences=[
154 notifications_pb2.SingleNotificationPreference(
155 topic=topic_action.topic,
156 action=topic_action.action,
157 delivery_method=method,
158 enabled=enabled,
159 )
160 for method, enabled in [("email", True), ("digest", False), ("push", False)]
161 ],
162 )
163 )
165 with session_scope() as session:
166 notify(
167 session,
168 user_id=user.id,
169 topic_action=topic_action,
170 key="",
171 data=notification_data_pb2.BadgeAdd(
172 badge_id="volunteer",
173 badge_name="Active Volunteer",
174 badge_description="This user is an active volunteer for Couchers.org",
175 ),
176 )
178 email = email_collector.pop_for_recipient(user.email, last=True)
180 # very ugly
181 # http://localhost:3000/quick-link?payload=CAEiGAoOZnJpZW5kX3JlcXVlc3QSBmFjY2VwdA==&sig=BQdk024NTATm8zlR0krSXTBhP5U9TlFv7VhJeIHZtUg=
182 for link in re.findall(r'<a href="(.*?)"', email.html): 182 ↛ 203line 182 didn't jump to line 203 because the loop on line 182 didn't complete
183 if "payload" not in link:
184 continue
185 print(link)
186 url_parts = urlparse(html.unescape(link))
187 params = parse_qs(url_parts.query)
188 print(params["payload"][0])
189 payload = unsubscribe_pb2.UnsubscribePayload.FromString(b64decode(params["payload"][0]))
190 if payload.HasField("topic_action"): 190 ↛ 182line 190 didn't jump to line 182 because the condition on line 190 was always true
191 with auth_api_session() as (auth_api, metadata_interceptor):
192 assert (
193 auth_api.Unsubscribe(
194 auth_pb2.UnsubscribeReq(
195 payload=b64decode(params["payload"][0]),
196 sig=b64decode(params["sig"][0]),
197 )
198 ).response
199 == "You've been unsubscribed from email notifications of that type."
200 )
201 break
202 else:
203 raise Exception("Didn't find link")
205 with notifications_session(token) as notifications:
206 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq())
208 for group in res.groups:
209 for topic in group.topics:
210 for item in topic.items:
211 if topic == topic_action.topic and item == topic_action.action: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 assert not item.email
214 with session_scope() as session:
215 notify(
216 session,
217 user_id=user.id,
218 topic_action=topic_action,
219 key="",
220 data=notification_data_pb2.BadgeAdd(
221 badge_id="volunteer",
222 badge_name="Active Volunteer",
223 badge_description="This user is an active volunteer for Couchers.org",
224 ),
225 )
227 assert email_collector.count_for_recipient(user.email) == 0
230def test_unsubscribe_do_not_email(db, email_collector: EmailCollector, moderator):
231 user, token = generate_user()
233 _, token2 = generate_user(complete_profile=True)
234 with api_session(token2) as api:
235 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id))
236 res = api.ListFriendRequests(empty_pb2.Empty())
237 fr_id = res.sent[0].friend_request_id
239 # Moderator approves the friend request, which triggers the notification email
240 moderator.approve_friend_request(fr_id)
242 email = email_collector.pop_for_recipient(user.email, last=True)
243 assert email.recipient == user.email
244 # very ugly
245 # http://localhost:3000/quick-link?payload=CAEiGAoOZnJpZW5kX3JlcXVlc3QSBmFjY2VwdA==&sig=BQdk024NTATm8zlR0krSXTBhP5U9TlFv7VhJeIHZtUg=
246 for link in re.findall(r'<a href="(.*?)"', email.html): 246 ↛ 267line 246 didn't jump to line 267 because the loop on line 246 didn't complete
247 if "payload" not in link:
248 continue
249 print(link)
250 url_parts = urlparse(html.unescape(link))
251 params = parse_qs(url_parts.query)
252 print(params["payload"][0])
253 payload = unsubscribe_pb2.UnsubscribePayload.FromString(b64decode(params["payload"][0]))
254 if payload.HasField("do_not_email"):
255 with auth_api_session() as (auth_api, metadata_interceptor):
256 assert (
257 auth_api.Unsubscribe(
258 auth_pb2.UnsubscribeReq(
259 payload=b64decode(params["payload"][0]),
260 sig=b64decode(params["sig"][0]),
261 )
262 ).response
263 == "You will not receive any non-security emails, and your hosting status has been turned off. You may still receive the newsletter, and need to unsubscribe from it separately."
264 )
265 break
266 else:
267 raise Exception("Didn't find link")
269 _, token3 = generate_user(complete_profile=True)
270 with api_session(token3) as api:
271 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id))
272 res = api.ListFriendRequests(empty_pb2.Empty())
273 fr_id3 = res.sent[0].friend_request_id
275 # Approving this friend request should NOT send an email since user has do_not_email set
276 moderator.approve_friend_request(fr_id3)
278 assert email_collector.count_for_recipient(user.email) == 0
280 with session_scope() as session:
281 user_ = session.execute(select(User).where(User.id == user.id)).scalar_one()
282 assert user_.do_not_email
285def test_get_do_not_email(db):
286 _, token = generate_user()
288 with session_scope() as session:
289 user = session.execute(select(User)).scalar_one()
290 user.do_not_email = False
292 with notifications_session(token) as notifications:
293 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq())
294 assert not res.do_not_email_enabled
296 with session_scope() as session:
297 user = session.execute(select(User)).scalar_one()
298 user.do_not_email = True
299 user.hosting_status = HostingStatus.cant_host
300 user.meetup_status = MeetupStatus.does_not_want_to_meetup
302 with notifications_session(token) as notifications:
303 res = notifications.GetNotificationSettings(notifications_pb2.GetNotificationSettingsReq())
304 assert res.do_not_email_enabled
307def test_set_do_not_email(db):
308 _, token = generate_user()
310 with session_scope() as session:
311 user = session.execute(select(User)).scalar_one()
312 user.do_not_email = False
313 user.hosting_status = HostingStatus.can_host
314 user.meetup_status = MeetupStatus.wants_to_meetup
316 with notifications_session(token) as notifications:
317 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=False))
319 with session_scope() as session:
320 user = session.execute(select(User)).scalar_one()
321 assert not user.do_not_email
323 with notifications_session(token) as notifications:
324 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=True))
326 with session_scope() as session:
327 user = session.execute(select(User)).scalar_one()
328 assert user.do_not_email
329 assert user.hosting_status == HostingStatus.cant_host
330 assert user.meetup_status == MeetupStatus.does_not_want_to_meetup
332 with notifications_session(token) as notifications:
333 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=False))
335 with session_scope() as session:
336 user = session.execute(select(User)).scalar_one()
337 assert not user.do_not_email
340def test_list_notifications(db, push_collector: PushCollector, moderator):
341 user1, token1 = generate_user()
342 user2, token2 = generate_user()
344 with api_session(token2) as api:
345 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
346 res = api.ListFriendRequests(empty_pb2.Empty())
347 fr_id = res.sent[0].friend_request_id
349 # Moderator approves the friend request so the notification is sent
350 moderator.approve_friend_request(fr_id)
352 with notifications_session(token1) as notifications:
353 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq())
354 assert len(res.notifications) == 1
356 n = res.notifications[0]
358 assert n.topic == "friend_request"
359 assert n.action == "create"
360 assert n.key == str(user2.id)
361 assert n.title == f"Friend request from {user2.name}"
362 assert n.body == f"{user2.name} wants to be your friend."
363 assert n.icon.startswith("http://localhost:5001/img/thumbnail/")
364 assert n.url == f"http://localhost:3000/connections/friends/?from={user2.id}"
366 with conversations_session(token2) as c:
367 res = c.CreateGroupChat(conversations_pb2.CreateGroupChatReq(recipient_user_ids=[user1.id]))
368 group_chat_id = res.group_chat_id
369 moderator.approve_group_chat(group_chat_id)
370 for i in range(17):
371 c.SendMessage(conversations_pb2.SendMessageReq(group_chat_id=group_chat_id, text=f"Test message {i}"))
373 process_jobs()
375 all_notifs = []
376 with notifications_session(token1) as notifications:
377 page_token = None
378 for _ in range(100): 378 ↛ 391line 378 didn't jump to line 391
379 res = notifications.ListNotifications(
380 notifications_pb2.ListNotificationsReq(
381 page_size=5,
382 page_token=page_token,
383 )
384 )
385 assert len(res.notifications) == 5 or not res.next_page_token
386 all_notifs += res.notifications
387 page_token = res.next_page_token
388 if not page_token:
389 break
391 bodys = [f"Test message {16 - i}" for i in range(17)] + [f"{user2.name} wants to be your friend."]
392 assert bodys == [n.body for n in all_notifs]
395def test_notifications_seen(db, push_collector: PushCollector, moderator):
396 user1, token1 = generate_user()
397 user2, token2 = generate_user()
398 user3, token3 = generate_user()
399 user4, token4 = generate_user()
401 with api_session(token2) as api:
402 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
403 res = api.ListFriendRequests(empty_pb2.Empty())
404 fr_id2 = res.sent[0].friend_request_id
406 with api_session(token3) as api:
407 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
408 res = api.ListFriendRequests(empty_pb2.Empty())
409 fr_id3 = res.sent[0].friend_request_id
411 # Moderator approves the friend requests so notifications are sent
412 moderator.approve_friend_request(fr_id2)
413 moderator.approve_friend_request(fr_id3)
415 with notifications_session(token1) as notifications, api_session(token1) as api:
416 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq())
417 assert len(res.notifications) == 2
418 assert [n.is_seen for n in res.notifications] == [False, False]
419 notification_ids = [n.notification_id for n in res.notifications]
420 # should be listed desc time
421 assert notification_ids[0] > notification_ids[1]
423 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2
425 with api_session(token4) as api:
426 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
427 res = api.ListFriendRequests(empty_pb2.Empty())
428 fr_id4 = res.sent[0].friend_request_id
430 # Moderator approves the friend request so notification is sent
431 moderator.approve_friend_request(fr_id4)
433 with notifications_session(token1) as notifications, api_session(token1) as api:
434 # mark everything before just the last one as seen (pretend we didn't load the last one yet in the api)
435 notifications.MarkAllNotificationsSeen(
436 notifications_pb2.MarkAllNotificationsSeenReq(latest_notification_id=notification_ids[0])
437 )
439 # last one is still unseen
440 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 1
442 # mark the first one unseen
443 notifications.MarkNotificationSeen(
444 notifications_pb2.MarkNotificationSeenReq(notification_id=notification_ids[1], set_seen=False)
445 )
446 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2
448 # mark the last one seen
449 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq())
450 assert len(res.notifications) == 3
451 assert [n.is_seen for n in res.notifications] == [False, True, False]
452 notification_ids2 = [n.notification_id for n in res.notifications]
454 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 2
456 notifications.MarkNotificationSeen(
457 notifications_pb2.MarkNotificationSeenReq(notification_id=notification_ids2[0], set_seen=True)
458 )
460 res = notifications.ListNotifications(notifications_pb2.ListNotificationsReq())
461 assert len(res.notifications) == 3
462 assert [n.is_seen for n in res.notifications] == [True, True, False]
464 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 1
467def test_unseen_notification_count_excludes_ums_hidden(db, moderator):
468 user1, token1 = generate_user()
469 user2, token2 = generate_user()
471 with api_session(token2) as api:
472 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
473 res = api.ListFriendRequests(empty_pb2.Empty())
474 fr_id = res.sent[0].friend_request_id
476 # Before moderation the friend request is shadowed, so the resulting notification
477 # is not visible to the recipient and must not contribute to their unseen count.
478 with api_session(token1) as api:
479 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 0
481 moderator.approve_friend_request(fr_id)
483 with api_session(token1) as api:
484 assert api.Ping(api_pb2.PingReq()).unseen_notification_count == 1
487def test_GetVapidPublicKey(db):
488 _, token = generate_user()
490 with notifications_session(token) as notifications:
491 assert (
492 notifications.GetVapidPublicKey(empty_pb2.Empty()).vapid_public_key
493 == "BApMo2tGuon07jv-pEaAKZmVo6E_d4HfcdDeV6wx2k9wV8EovJ0ve00bdLzZm9fizDrGZXRYJFqCcRJUfBcgA0A"
494 )
497def test_RegisterPushNotificationSubscription(db):
498 _, token = generate_user()
500 subscription_info = {
501 "endpoint": "https://updates.push.services.mozilla.com/wpush/v2/gAAAAABmW2_iYKVyZRJPhAhktbkXd6Bc8zjIUvtVi5diYL7ZYn8FHka94kIdF46Mp8DwCDWlACnbKOEo97ikaa7JYowGLiGz3qsWL7Vo19LaV4I71mUDUOIKxWIsfp_kM77MlRJQKDUddv-sYyiffOyg63d1lnc_BMIyLXt69T5SEpfnfWTNb6I",
502 "expirationTime": None,
503 "keys": {
504 "auth": "TnuEJ1OdfEkf6HKcUovl0Q",
505 "p256dh": "BK7Rp8og3eFJPqm0ofR8F-l2mtNCCCWYo6f_5kSs8jPEFiKetnZHNOglvC6IrgU9vHmgFHlG7gHGtB1HM599sy0",
506 },
507 }
509 with notifications_session(token) as notifications:
510 res = notifications.RegisterPushNotificationSubscription(
511 notifications_pb2.RegisterPushNotificationSubscriptionReq(
512 full_subscription_json=json.dumps(subscription_info),
513 )
514 )
517def test_RegisterPushNotificationSubscription_invalid_endpoint(db):
518 _, token = generate_user()
520 subscription_info = {
521 "endpoint": "https://permanently-removed.invalid/some-id",
522 "expirationTime": None,
523 "keys": {
524 "auth": "TnuEJ1OdfEkf6HKcUovl0Q",
525 "p256dh": "BK7Rp8og3eFJPqm0ofR8F-l2mtNCCCWYo6f_5kSs8jPEFiKetnZHNOglvC6IrgU9vHmgFHlG7gHGtB1HM599sy0",
526 },
527 }
529 with notifications_session(token) as notifications:
530 with pytest.raises(grpc.RpcError) as e:
531 notifications.RegisterPushNotificationSubscription(
532 notifications_pb2.RegisterPushNotificationSubscriptionReq(
533 full_subscription_json=json.dumps(subscription_info),
534 )
535 )
536 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
539def test_SendTestPushNotification(db, push_collector: PushCollector):
540 user, token = generate_user()
542 with notifications_session(token) as notifications:
543 notifications.SendTestPushNotification(empty_pb2.Empty())
545 assert push_collector.count_for_user(user.id) == 1
546 push = push_collector.pop_for_user(user.id, last=True)
547 assert push.content.title == "Push notifications test"
548 assert push.content.body == "If you see this, then it's working :)"
551def test_SendBlogPostNotification(db, email_collector: EmailCollector, push_collector: PushCollector):
552 super_user, super_token = generate_user(is_superuser=True)
554 user1, user1_token = generate_user()
555 # enabled email
556 user2, user2_token = generate_user()
557 # disabled push
558 user3, user3_token = generate_user()
560 topic_action = NotificationTopicAction.general__new_blog_post
562 with notifications_session(user2_token) as notifications:
563 notifications.SetNotificationSettings(
564 notifications_pb2.SetNotificationSettingsReq(
565 preferences=[
566 notifications_pb2.SingleNotificationPreference(
567 topic=topic_action.topic,
568 action=topic_action.action,
569 delivery_method="email",
570 enabled=True,
571 )
572 ],
573 )
574 )
576 with notifications_session(user3_token) as notifications:
577 notifications.SetNotificationSettings(
578 notifications_pb2.SetNotificationSettingsReq(
579 preferences=[
580 notifications_pb2.SingleNotificationPreference(
581 topic=topic_action.topic,
582 action=topic_action.action,
583 delivery_method="push",
584 enabled=False,
585 )
586 ],
587 )
588 )
590 with real_editor_session(super_token) as editor_api:
591 editor_api.SendBlogPostNotification(
592 editor_pb2.SendBlogPostNotificationReq(
593 title="Couchers.org v0.9.9 Release Notes",
594 blurb="Read about last major updates before v1!",
595 url="https://couchers.org/blog/2025/05/11/v0.9.9-release",
596 )
597 )
599 email = email_collector.pop_for_recipient(user2.email, last=True)
600 assert email.recipient == user2.email
601 assert "Couchers.org v0.9.9 Release Notes" in email.html
602 assert "Couchers.org v0.9.9 Release Notes" in email.plain
603 assert "Read about last major updates before v1!" in email.html
604 assert "Read about last major updates before v1!" in email.plain
605 assert "https://couchers.org/blog/2025/05/11/v0.9.9-release" in email.html
606 assert "https://couchers.org/blog/2025/05/11/v0.9.9-release" in email.plain
608 push = push_collector.pop_for_user(user1.id, last=True)
609 assert push.content.title == "New blog post: Couchers.org v0.9.9 Release Notes"
610 assert push.content.body == "Read about last major updates before v1!"
611 assert push.content.action_url == "https://couchers.org/blog/2025/05/11/v0.9.9-release"
613 push = push_collector.pop_for_user(user2.id, last=True)
614 assert push.content.title == "New blog post: Couchers.org v0.9.9 Release Notes"
615 assert push.content.body == "Read about last major updates before v1!"
616 assert push.content.action_url == "https://couchers.org/blog/2025/05/11/v0.9.9-release"
618 assert push_collector.count_for_user(user3.id) == 0
621def test_get_topic_actions_by_delivery_type(db):
622 user, token = generate_user()
624 # these are enabled by default
625 assert NotificationDeliveryType.push in NotificationTopicAction.reference__receive_friend.defaults
626 assert NotificationDeliveryType.push in NotificationTopicAction.host_request__accept.defaults
628 # these are disabled by default
629 assert NotificationDeliveryType.push not in NotificationTopicAction.event__create_any.defaults
630 assert NotificationDeliveryType.push not in NotificationTopicAction.discussion__create.defaults
632 with notifications_session(token) as notifications:
633 notifications.SetNotificationSettings(
634 notifications_pb2.SetNotificationSettingsReq(
635 preferences=[
636 notifications_pb2.SingleNotificationPreference(
637 topic=NotificationTopicAction.reference__receive_friend.topic,
638 action=NotificationTopicAction.reference__receive_friend.action,
639 delivery_method="push",
640 enabled=False,
641 ),
642 notifications_pb2.SingleNotificationPreference(
643 topic=NotificationTopicAction.event__create_any.topic,
644 action=NotificationTopicAction.event__create_any.action,
645 delivery_method="push",
646 enabled=True,
647 ),
648 ],
649 )
650 )
652 with session_scope() as session:
653 deliver = get_topic_actions_by_delivery_type(session, user.id, NotificationDeliveryType.push)
654 assert NotificationTopicAction.reference__receive_friend not in deliver
655 assert NotificationTopicAction.host_request__accept in deliver
656 assert NotificationTopicAction.event__create_any in deliver
657 assert NotificationTopicAction.discussion__create not in deliver
658 assert NotificationTopicAction.account_deletion__start in deliver
661def test_event_reminder_email_sent(db, email_collector: EmailCollector):
662 user, token = generate_user()
663 title = "Board Game Night"
664 start_event_time = timestamp_pb2.Timestamp(seconds=1751690400)
666 loc_context = LocalizationContext.from_user(user)
667 expected_time_str = loc_context.localize_datetime(start_event_time, with_year=False, with_day_of_week=True)
669 with session_scope() as session:
670 user_in_session = session.get_one(User, user.id)
672 notify(
673 session,
674 user_id=user.id,
675 topic_action=NotificationTopicAction.event__reminder,
676 key="",
677 data=notification_data_pb2.EventReminder(
678 event=events_pb2.Event(
679 event_id=1,
680 slug="board-game-night",
681 title=title,
682 start_time=start_event_time,
683 ),
684 user=user_model_to_pb(user_in_session, session, make_background_user_context(user_id=user.id)),
685 ),
686 )
688 email = email_collector.pop_for_recipient(user.email, last=True)
689 assert email.recipient == user.email
690 assert title in email.html
691 assert title in email.plain
692 assert expected_time_str in email.html
693 assert expected_time_str in email.plain
696def test_RegisterMobilePushNotificationSubscription(db):
697 user, token = generate_user()
699 with notifications_session(token) as notifications:
700 notifications.RegisterMobilePushNotificationSubscription(
701 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
702 token="ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx]",
703 device_name="My iPhone",
704 device_type="ios",
705 )
706 )
708 # Check subscription was created
709 with session_scope() as session:
710 sub = session.execute(
711 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id)
712 ).scalar_one()
713 assert sub.platform == PushNotificationPlatform.expo
714 assert sub.token == "ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx]"
715 assert sub.device_name == "My iPhone"
716 assert sub.device_type == DeviceType.ios
717 assert sub.disabled_at == DATETIME_INFINITY
720def test_RegisterMobilePushNotificationSubscription_android(db):
721 user, token = generate_user()
723 with notifications_session(token) as notifications:
724 notifications.RegisterMobilePushNotificationSubscription(
725 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
726 token="ExponentPushToken[yyyyyyyyyyyyyyyyyyyyyy]",
727 device_name="My Android",
728 device_type="android",
729 )
730 )
732 with session_scope() as session:
733 sub = session.execute(
734 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id)
735 ).scalar_one()
736 assert sub.platform == PushNotificationPlatform.expo
737 assert sub.device_type == DeviceType.android
740def test_RegisterMobilePushNotificationSubscription_no_device_type(db):
741 user, token = generate_user()
743 with notifications_session(token) as notifications:
744 notifications.RegisterMobilePushNotificationSubscription(
745 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
746 token="ExponentPushToken[zzzzzzzzzzzzzzzzzzzzzz]",
747 )
748 )
750 with session_scope() as session:
751 sub = session.execute(
752 select(PushNotificationSubscription).where(PushNotificationSubscription.user_id == user.id)
753 ).scalar_one()
754 assert sub.platform == PushNotificationPlatform.expo
755 assert sub.device_name is None
756 assert sub.device_type is None
759def test_RegisterMobilePushNotificationSubscription_re_enable(db):
760 user, token = generate_user()
762 # Create a disabled subscription directly in the DB
763 with session_scope() as session:
764 sub = PushNotificationSubscription(
765 user_id=user.id,
766 platform=PushNotificationPlatform.expo,
767 token="ExponentPushToken[reeeeeeeeeeeeeeeeeeeee]",
768 device_name="Old Device",
769 device_type=DeviceType.ios,
770 )
771 sub.disabled_at = now()
772 session.add(sub)
773 session.flush()
774 sub_id = sub.id
776 # Re-register with the same token
777 with notifications_session(token) as notifications:
778 notifications.RegisterMobilePushNotificationSubscription(
779 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
780 token="ExponentPushToken[reeeeeeeeeeeeeeeeeeeee]",
781 device_name="New Device Name",
782 device_type="android",
783 )
784 )
786 # Check subscription was re-enabled and updated
787 with session_scope() as session:
788 sub = session.execute(
789 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id)
790 ).scalar_one()
791 assert sub.disabled_at == DATETIME_INFINITY
792 assert sub.device_name == "New Device Name"
793 assert sub.device_type == DeviceType.android
796def test_RegisterMobilePushNotificationSubscription_already_exists(db):
797 user, token = generate_user()
799 # Create an active subscription directly in the DB
800 with session_scope() as session:
801 sub = PushNotificationSubscription(
802 user_id=user.id,
803 platform=PushNotificationPlatform.expo,
804 token="ExponentPushToken[existingtoken]",
805 device_name="Existing Device",
806 device_type=DeviceType.ios,
807 )
808 session.add(sub)
810 # Try to register with the same token - should just return without error
811 with notifications_session(token) as notifications:
812 notifications.RegisterMobilePushNotificationSubscription(
813 notifications_pb2.RegisterMobilePushNotificationSubscriptionReq(
814 token="ExponentPushToken[existingtoken]",
815 device_name="Different Name",
816 )
817 )
819 # Check subscription was NOT modified (already active)
820 with session_scope() as session:
821 sub = session.execute(
822 select(PushNotificationSubscription).where(
823 PushNotificationSubscription.token == "ExponentPushToken[existingtoken]"
824 )
825 ).scalar_one()
826 assert sub.device_name == "Existing Device" # unchanged
829def test_SendTestMobilePushNotification(db, push_collector: PushCollector):
830 user, token = generate_user()
832 with notifications_session(token) as notifications:
833 notifications.SendTestMobilePushNotification(empty_pb2.Empty())
835 push = push_collector.pop_for_user(user.id, last=True)
836 assert push.content.title == "Mobile notifications test"
837 assert push.content.body == "If you see this on your phone, everything is wired up correctly 🎉"
840def test_get_expo_push_receipts(db):
841 mock_response = Mock()
842 mock_response.status_code = 200
843 mock_response.json.return_value = {
844 "data": {
845 "ticket-1": {"status": "ok"},
846 "ticket-2": {"status": "error", "details": {"error": "DeviceNotRegistered"}},
847 }
848 }
850 with patch("couchers.notifications.expo_api.requests.post", return_value=mock_response) as mock_post:
851 result = get_expo_push_receipts(["ticket-1", "ticket-2"])
853 mock_post.assert_called_once()
854 call_args = mock_post.call_args
855 assert call_args[0][0] == "https://exp.host/--/api/v2/push/getReceipts"
856 assert call_args[1]["json"] == {"ids": ["ticket-1", "ticket-2"]}
858 assert result == {
859 "ticket-1": {"status": "ok"},
860 "ticket-2": {"status": "error", "details": {"error": "DeviceNotRegistered"}},
861 }
864def test_get_expo_push_receipts_empty(db):
865 result = get_expo_push_receipts([])
866 assert result == {}
869def test_check_expo_push_receipts_success(db):
870 """Test batch receipt checking with successful delivery."""
871 user, token = generate_user()
873 # Create a push subscription and delivery attempt (old enough to be checked)
874 with session_scope() as session:
875 sub = PushNotificationSubscription(
876 user_id=user.id,
877 platform=PushNotificationPlatform.expo,
878 token="ExponentPushToken[testtoken123]",
879 device_name="Test Device",
880 device_type=DeviceType.ios,
881 )
882 session.add(sub)
883 session.flush()
885 attempt = PushNotificationDeliveryAttempt(
886 push_notification_subscription_id=sub.id,
887 outcome=PushNotificationDeliveryOutcome.success,
888 status_code=200,
889 expo_ticket_id="test-ticket-id",
890 )
891 session.add(attempt)
892 session.flush()
893 # Make the attempt old enough to be checked (>15 min)
894 attempt.time = now() - timedelta(minutes=20)
895 attempt_id = attempt.id
896 sub_id = sub.id
898 # Mock the receipt API call
899 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
900 mock_post.return_value.status_code = 200
901 mock_post.return_value.json.return_value = {"data": {"test-ticket-id": {"status": "ok"}}}
903 check_expo_push_receipts(empty_pb2.Empty())
905 # Verify the attempt was updated
906 with session_scope() as session:
907 attempt = session.execute(
908 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id)
909 ).scalar_one()
910 assert attempt.receipt_checked_at is not None
911 assert attempt.receipt_status == "ok"
912 assert attempt.receipt_error_code is None
914 # Subscription should still be enabled
915 sub = session.execute(
916 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id)
917 ).scalar_one()
918 assert sub.disabled_at == DATETIME_INFINITY
921def test_check_expo_push_receipts_device_not_registered(db):
922 """Test batch receipt checking with DeviceNotRegistered error disables subscription."""
923 user, token = generate_user()
925 # Create a push subscription and delivery attempt
926 with session_scope() as session:
927 sub = PushNotificationSubscription(
928 user_id=user.id,
929 platform=PushNotificationPlatform.expo,
930 token="ExponentPushToken[devicegone]",
931 device_name="Test Device",
932 device_type=DeviceType.android,
933 )
934 session.add(sub)
935 session.flush()
937 attempt = PushNotificationDeliveryAttempt(
938 push_notification_subscription_id=sub.id,
939 outcome=PushNotificationDeliveryOutcome.success,
940 status_code=200,
941 expo_ticket_id="ticket-device-gone",
942 )
943 session.add(attempt)
944 session.flush()
945 # Make the attempt old enough to be checked
946 attempt.time = now() - timedelta(minutes=15)
947 attempt_id = attempt.id
948 sub_id = sub.id
950 # Mock the receipt API call with DeviceNotRegistered error
951 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
952 mock_post.return_value.status_code = 200
953 mock_post.return_value.json.return_value = {
954 "data": {
955 "ticket-device-gone": {
956 "status": "error",
957 "details": {"error": "DeviceNotRegistered"},
958 }
959 }
960 }
962 check_expo_push_receipts(empty_pb2.Empty())
964 # Verify the attempt was updated and subscription disabled
965 with session_scope() as session:
966 attempt = session.execute(
967 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id)
968 ).scalar_one()
969 assert attempt.receipt_checked_at is not None
970 assert attempt.receipt_status == "error"
971 assert attempt.receipt_error_code == "DeviceNotRegistered"
973 # Subscription should be disabled
974 sub = session.execute(
975 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id)
976 ).scalar_one()
977 assert sub.disabled_at <= now()
980def test_check_expo_push_receipts_not_found(db):
981 """Test batch receipt checking when ticket not found (expired)."""
982 user, token = generate_user()
984 with session_scope() as session:
985 sub = PushNotificationSubscription(
986 user_id=user.id,
987 platform=PushNotificationPlatform.expo,
988 token="ExponentPushToken[notfound]",
989 )
990 session.add(sub)
991 session.flush()
993 attempt = PushNotificationDeliveryAttempt(
994 push_notification_subscription_id=sub.id,
995 outcome=PushNotificationDeliveryOutcome.success,
996 status_code=200,
997 expo_ticket_id="unknown-ticket",
998 )
999 session.add(attempt)
1000 session.flush()
1001 # Make the attempt old enough to be checked
1002 attempt.time = now() - timedelta(minutes=15)
1003 attempt_id = attempt.id
1004 sub_id = sub.id
1006 # Mock empty receipt response (ticket not found)
1007 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
1008 mock_post.return_value.status_code = 200
1009 mock_post.return_value.json.return_value = {"data": {}}
1011 check_expo_push_receipts(empty_pb2.Empty())
1013 with session_scope() as session:
1014 attempt = session.execute(
1015 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id)
1016 ).scalar_one()
1017 assert attempt.receipt_checked_at is not None
1018 assert attempt.receipt_status == "not_found"
1020 # Subscription should still be enabled
1021 sub = session.execute(
1022 select(PushNotificationSubscription).where(PushNotificationSubscription.id == sub_id)
1023 ).scalar_one()
1024 assert sub.disabled_at == DATETIME_INFINITY
1027def test_check_expo_push_receipts_skips_already_checked(db):
1028 """Test that already-checked receipts are not re-checked."""
1029 user, token = generate_user()
1031 # Create an attempt that was already checked
1032 with session_scope() as session:
1033 sub = PushNotificationSubscription(
1034 user_id=user.id,
1035 platform=PushNotificationPlatform.expo,
1036 token="ExponentPushToken[alreadychecked]",
1037 )
1038 session.add(sub)
1039 session.flush()
1041 attempt = PushNotificationDeliveryAttempt(
1042 push_notification_subscription_id=sub.id,
1043 outcome=PushNotificationDeliveryOutcome.success,
1044 status_code=200,
1045 expo_ticket_id="already-checked-ticket",
1046 receipt_checked_at=now(),
1047 receipt_status="ok",
1048 )
1049 session.add(attempt)
1050 session.flush()
1051 # Make the attempt old enough
1052 attempt.time = now() - timedelta(minutes=15)
1054 # Should not call the API since the only attempt is already checked
1055 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
1056 check_expo_push_receipts(empty_pb2.Empty())
1057 mock_post.assert_not_called()
1060def test_SendDevPushNotification_success(db, push_collector: PushCollector):
1061 """Test SendDevPushNotification sends push with all specified parameters."""
1062 user, token = generate_user()
1064 # Enable dev APIs for this test
1065 config.ENABLE_DEV_APIS = True
1067 with notifications_session(token) as notifications:
1068 notifications.SendDevPushNotification(
1069 notifications_pb2.SendDevPushNotificationReq(
1070 title="Test Dev Title",
1071 body="Test dev notification body",
1072 icon="https://example.com/icon.png",
1073 url="https://example.com/action",
1074 key="test-key",
1075 ttl=3600,
1076 )
1077 )
1079 push = push_collector.pop_for_user(user.id, last=True)
1080 assert push.content.title == "Test Dev Title"
1081 assert push.content.body == "Test dev notification body"
1082 assert push.content.action_url == "https://example.com/action"
1083 assert push.content.icon_url == "https://example.com/icon.png"
1084 assert push.topic_action == "adhoc:testing"
1085 assert push.key == "test-key"
1086 assert push.ttl == 3600
1089def test_SendDevPushNotification_minimal(db, push_collector: PushCollector):
1090 """Test SendDevPushNotification with minimal parameters."""
1091 user, token = generate_user()
1093 config.ENABLE_DEV_APIS = True
1095 with notifications_session(token) as notifications:
1096 notifications.SendDevPushNotification(
1097 notifications_pb2.SendDevPushNotificationReq(
1098 title="Minimal Title",
1099 body="Minimal body",
1100 )
1101 )
1103 push = push_collector.pop_for_user(user.id, last=True)
1104 assert push.content.title == "Minimal Title"
1105 assert push.content.body == "Minimal body"
1106 assert push.topic_action == "adhoc:testing"
1109def test_SendDevPushNotification_disabled(db, push_collector: PushCollector):
1110 """Test SendDevPushNotification fails when ENABLE_DEV_APIS is disabled."""
1111 user, token = generate_user()
1113 # Ensure dev APIs are disabled (default in tests)
1114 config.ENABLE_DEV_APIS = False
1116 with notifications_session(token) as notifications:
1117 with pytest.raises(grpc.RpcError) as e:
1118 notifications.SendDevPushNotification(
1119 notifications_pb2.SendDevPushNotificationReq(
1120 title="Should Fail",
1121 body="This should not be sent",
1122 )
1123 )
1124 assert e.value.code() == grpc.StatusCode.UNAVAILABLE
1125 assert "Development APIs are not enabled" in not_none(e.value.details())
1127 assert push_collector.count_for_user(user.id) == 0
1130def test_SendDevPushNotification_push_notifications_disabled(db, push_collector: PushCollector):
1131 """Test SendDevPushNotification fails when push notifications are disabled."""
1132 user, token = generate_user()
1134 config.ENABLE_DEV_APIS = True
1135 config.PUSH_NOTIFICATIONS_ENABLED = False
1137 with notifications_session(token) as notifications:
1138 with pytest.raises(grpc.RpcError) as e:
1139 notifications.SendDevPushNotification(
1140 notifications_pb2.SendDevPushNotificationReq(
1141 title="Should Fail",
1142 body="This should not be sent",
1143 )
1144 )
1145 assert e.value.code() == grpc.StatusCode.UNAVAILABLE
1146 assert "Push notifications are currently disabled" in not_none(e.value.details())
1148 assert push_collector.count_for_user(user.id) == 0
1151def test_check_expo_push_receipts_skips_too_recent(db):
1152 """Test that too-recent receipts (<15 min) are not checked."""
1153 user, token = generate_user()
1155 # Create a recent attempt (not old enough to check)
1156 with session_scope() as session:
1157 sub = PushNotificationSubscription(
1158 user_id=user.id,
1159 platform=PushNotificationPlatform.expo,
1160 token="ExponentPushToken[recent]",
1161 )
1162 session.add(sub)
1163 session.flush()
1165 attempt = PushNotificationDeliveryAttempt(
1166 push_notification_subscription_id=sub.id,
1167 outcome=PushNotificationDeliveryOutcome.success,
1168 status_code=200,
1169 expo_ticket_id="recent-ticket",
1170 )
1171 session.add(attempt)
1172 session.flush()
1173 # Make the attempt only 5 minutes old (too recent)
1174 attempt.time = now() - timedelta(minutes=5)
1176 # Should not call the API since the attempt is too recent
1177 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
1178 check_expo_push_receipts(empty_pb2.Empty())
1179 mock_post.assert_not_called()
1182def test_check_expo_push_receipts_batch(db):
1183 """Test that multiple receipts are checked in a single batch."""
1184 user, token = generate_user()
1186 # Create multiple delivery attempts
1187 attempt_ids = []
1188 with session_scope() as session:
1189 sub = PushNotificationSubscription(
1190 user_id=user.id,
1191 platform=PushNotificationPlatform.expo,
1192 token="ExponentPushToken[batch]",
1193 )
1194 session.add(sub)
1195 session.flush()
1197 for i in range(3):
1198 attempt = PushNotificationDeliveryAttempt(
1199 push_notification_subscription_id=sub.id,
1200 outcome=PushNotificationDeliveryOutcome.success,
1201 status_code=200,
1202 expo_ticket_id=f"batch-ticket-{i}",
1203 )
1204 session.add(attempt)
1205 session.flush()
1206 attempt.time = now() - timedelta(minutes=20)
1207 attempt_ids.append(attempt.id)
1209 # Mock the batch receipt API call
1210 with patch("couchers.notifications.expo_api.requests.post") as mock_post:
1211 mock_post.return_value.status_code = 200
1212 mock_post.return_value.json.return_value = {
1213 "data": {
1214 "batch-ticket-0": {"status": "ok"},
1215 "batch-ticket-1": {"status": "ok"},
1216 "batch-ticket-2": {"status": "ok"},
1217 }
1218 }
1220 check_expo_push_receipts(empty_pb2.Empty())
1222 # Should only call the API once for all tickets
1223 assert mock_post.call_count == 1
1225 # Verify all attempts were updated
1226 with session_scope() as session:
1227 for attempt_id in attempt_ids:
1228 attempt = session.execute(
1229 select(PushNotificationDeliveryAttempt).where(PushNotificationDeliveryAttempt.id == attempt_id)
1230 ).scalar_one()
1231 assert attempt.receipt_checked_at is not None
1232 assert attempt.receipt_status == "ok"
1235def test_DebugRedeliverPushNotification_success(db, push_collector: PushCollector):
1236 """Test DebugRedeliverPushNotification redelivers an existing notification."""
1237 user, token = generate_user()
1239 config.ENABLE_DEV_APIS = True
1241 # Create a notification for the user
1242 with session_scope() as session:
1243 notify(
1244 session,
1245 user_id=user.id,
1246 topic_action=NotificationTopicAction.badge__add,
1247 key="test-badge",
1248 data=notification_data_pb2.BadgeAdd(
1249 badge_id="volunteer",
1250 badge_name="Active Volunteer",
1251 badge_description="This user is an active volunteer for Couchers.org",
1252 ),
1253 )
1255 process_job()
1257 # Pop the initial push notification
1258 push_collector.pop_for_user(user.id, last=True)
1260 # Get the notification_id
1261 with session_scope() as session:
1262 notification = session.execute(select(Notification).where(Notification.user_id == user.id)).scalar_one()
1263 notification_id = notification.id
1265 # Redeliver the notification
1266 with notifications_session(token) as notifications:
1267 notifications.DebugRedeliverPushNotification(
1268 notifications_pb2.DebugRedeliverPushNotificationReq(notification_id=notification_id)
1269 )
1271 # Verify a new push was sent
1272 push = push_collector.pop_for_user(user.id, last=True)
1273 assert "Active Volunteer" in push.content.title
1274 assert push.topic_action == "badge:add"
1275 assert push.key == "test-badge"
1278def test_DebugRedeliverPushNotification_not_found(db, push_collector: PushCollector):
1279 """Test DebugRedeliverPushNotification fails when notification doesn't exist."""
1280 user, token = generate_user()
1282 config.ENABLE_DEV_APIS = True
1284 with notifications_session(token) as notifications:
1285 with pytest.raises(grpc.RpcError) as e:
1286 notifications.DebugRedeliverPushNotification(
1287 notifications_pb2.DebugRedeliverPushNotificationReq(notification_id=999999)
1288 )
1289 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1290 assert "notification not found" in not_none(e.value.details()).lower()
1292 assert push_collector.count_for_user(user.id) == 0
1295def test_DebugRedeliverPushNotification_wrong_user(db, push_collector: PushCollector):
1296 """Test DebugRedeliverPushNotification fails when notification belongs to another user."""
1297 user1, token1 = generate_user()
1298 user2, token2 = generate_user()
1300 config.ENABLE_DEV_APIS = True
1302 # Create a notification for user1
1303 with session_scope() as session:
1304 notify(
1305 session,
1306 user_id=user1.id,
1307 topic_action=NotificationTopicAction.badge__add,
1308 key="test-badge",
1309 data=notification_data_pb2.BadgeAdd(
1310 badge_id="volunteer",
1311 badge_name="Active Volunteer",
1312 badge_description="This user is an active volunteer for Couchers.org",
1313 ),
1314 )
1316 process_job()
1318 # Get the notification_id
1319 with session_scope() as session:
1320 notification = session.execute(select(Notification).where(Notification.user_id == user1.id)).scalar_one()
1321 notification_id = notification.id
1323 # user2 tries to redeliver user1's notification
1324 with notifications_session(token2) as notifications:
1325 with pytest.raises(grpc.RpcError) as e:
1326 notifications.DebugRedeliverPushNotification(
1327 notifications_pb2.DebugRedeliverPushNotificationReq(notification_id=notification_id)
1328 )
1329 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1330 assert "notification not found" in not_none(e.value.details()).lower()
1332 assert push_collector.count_for_user(user2.id) == 0
1335def test_DebugRedeliverPushNotification_disabled(db, push_collector: PushCollector):
1336 """Test DebugRedeliverPushNotification fails when ENABLE_DEV_APIS is disabled."""
1337 user, token = generate_user()
1339 config.ENABLE_DEV_APIS = False
1341 with notifications_session(token) as notifications:
1342 with pytest.raises(grpc.RpcError) as e:
1343 notifications.DebugRedeliverPushNotification(
1344 notifications_pb2.DebugRedeliverPushNotificationReq(notification_id=1)
1345 )
1346 assert e.value.code() == grpc.StatusCode.UNAVAILABLE
1347 assert "Development APIs are not enabled" in not_none(e.value.details())
1349 assert push_collector.count_for_user(user.id) == 0
1352def test_DebugRedeliverPushNotification_push_notifications_disabled(db, push_collector: PushCollector):
1353 """Test DebugRedeliverPushNotification fails when push notifications are disabled."""
1354 user, token = generate_user()
1356 config.ENABLE_DEV_APIS = True
1357 config.PUSH_NOTIFICATIONS_ENABLED = False
1359 with notifications_session(token) as notifications:
1360 with pytest.raises(grpc.RpcError) as e:
1361 notifications.DebugRedeliverPushNotification(
1362 notifications_pb2.DebugRedeliverPushNotificationReq(notification_id=1)
1363 )
1364 assert e.value.code() == grpc.StatusCode.UNAVAILABLE
1365 assert "Push notifications are currently disabled" in not_none(e.value.details())
1367 assert push_collector.count_for_user(user.id) == 0
1370def test_handle_notification_email_delivery(db, email_collector: EmailCollector):
1371 """Test that email notifications are delivered when email preference is enabled."""
1372 user, token = generate_user()
1374 topic_action = NotificationTopicAction.badge__add
1376 # Enable email notifications for this topic
1377 with notifications_session(token) as notifications:
1378 notifications.SetNotificationSettings(
1379 notifications_pb2.SetNotificationSettingsReq(
1380 preferences=[
1381 notifications_pb2.SingleNotificationPreference(
1382 topic=topic_action.topic,
1383 action=topic_action.action,
1384 delivery_method="email",
1385 enabled=True,
1386 )
1387 ],
1388 )
1389 )
1391 with session_scope() as session:
1392 notify(
1393 session,
1394 user_id=user.id,
1395 topic_action=topic_action,
1396 key="test-badge",
1397 data=notification_data_pb2.BadgeAdd(
1398 badge_id="volunteer",
1399 badge_name="Active Volunteer",
1400 badge_description="This user is an active volunteer",
1401 ),
1402 )
1404 email = email_collector.pop_for_recipient(user.email, last=True)
1405 assert email.recipient == user.email
1407 with session_scope() as session:
1408 delivery = session.execute(
1409 select(NotificationDelivery)
1410 .join(Notification, Notification.id == NotificationDelivery.notification_id)
1411 .where(Notification.user_id == user.id)
1412 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.email)
1413 ).scalar_one()
1414 assert delivery.delivered is not None
1417def test_handle_notification_push_delivery(db, push_collector: PushCollector):
1418 """Test that push notifications are delivered immediately when push preference is enabled."""
1419 user, token = generate_user()
1421 topic_action = NotificationTopicAction.badge__add
1423 with session_scope() as session:
1424 notify(
1425 session,
1426 user_id=user.id,
1427 topic_action=topic_action,
1428 key="test-badge",
1429 data=notification_data_pb2.BadgeAdd(
1430 badge_id="volunteer",
1431 badge_name="Active Volunteer",
1432 badge_description="This user is an active volunteer",
1433 ),
1434 )
1436 process_job()
1438 push = push_collector.pop_for_user(user.id, last=True)
1439 assert "Active Volunteer" in push.content.title
1441 with session_scope() as session:
1442 delivery = session.execute(
1443 select(NotificationDelivery)
1444 .join(Notification, Notification.id == NotificationDelivery.notification_id)
1445 .where(Notification.user_id == user.id)
1446 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.push)
1447 ).scalar_one()
1448 assert delivery.delivered is not None
1451def test_handle_notification_digest_delivery(db):
1452 """Test that digest notifications are queued without a delivered timestamp."""
1453 user, token = generate_user()
1455 topic_action = NotificationTopicAction.badge__add
1457 # Enable only digest notifications for this topic
1458 with notifications_session(token) as notifications:
1459 notifications.SetNotificationSettings(
1460 notifications_pb2.SetNotificationSettingsReq(
1461 preferences=[
1462 notifications_pb2.SingleNotificationPreference(
1463 topic=topic_action.topic,
1464 action=topic_action.action,
1465 delivery_method="push",
1466 enabled=False,
1467 ),
1468 notifications_pb2.SingleNotificationPreference(
1469 topic=topic_action.topic,
1470 action=topic_action.action,
1471 delivery_method="digest",
1472 enabled=True,
1473 ),
1474 ],
1475 )
1476 )
1478 with session_scope() as session:
1479 notify(
1480 session,
1481 user_id=user.id,
1482 topic_action=topic_action,
1483 key="test-badge",
1484 data=notification_data_pb2.BadgeAdd(
1485 badge_id="volunteer",
1486 badge_name="Active Volunteer",
1487 badge_description="This user is an active volunteer",
1488 ),
1489 )
1491 process_job()
1493 # Verify digest NotificationDelivery was created WITHOUT delivered timestamp
1494 with session_scope() as session:
1495 delivery = session.execute(
1496 select(NotificationDelivery)
1497 .join(Notification, Notification.id == NotificationDelivery.notification_id)
1498 .where(Notification.user_id == user.id)
1499 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.digest)
1500 ).scalar_one()
1501 assert delivery.delivered is None
1504def test_handle_notification_banned_user_no_email(db, email_collector: EmailCollector):
1505 """Test that banned users don't receive email notifications."""
1506 user, token = generate_user()
1508 topic_action = NotificationTopicAction.badge__add
1510 # Enable email notifications
1511 with notifications_session(token) as notifications:
1512 notifications.SetNotificationSettings(
1513 notifications_pb2.SetNotificationSettingsReq(
1514 preferences=[
1515 notifications_pb2.SingleNotificationPreference(
1516 topic=topic_action.topic,
1517 action=topic_action.action,
1518 delivery_method="email",
1519 enabled=True,
1520 )
1521 ],
1522 )
1523 )
1525 # Ban the user
1526 with session_scope() as session:
1527 session.execute(update(User).where(User.id == user.id).values(banned_at=now()))
1529 with session_scope() as session:
1530 notify(
1531 session,
1532 user_id=user.id,
1533 topic_action=topic_action,
1534 key="test-badge",
1535 data=notification_data_pb2.BadgeAdd(
1536 badge_id="volunteer",
1537 badge_name="Active Volunteer",
1538 badge_description="This user is an active volunteer",
1539 ),
1540 )
1542 # Email should not be sent to the banned user
1543 assert email_collector.count_for_recipient(user.email) == 0
1546def test_handle_notification_deleted_user_no_regular_email(db, email_collector: EmailCollector):
1547 """Test that deleted users don't receive non-account-deletion email notifications."""
1548 user, token = generate_user()
1550 topic_action = NotificationTopicAction.badge__add
1552 # Enable email notifications
1553 with notifications_session(token) as notifications:
1554 notifications.SetNotificationSettings(
1555 notifications_pb2.SetNotificationSettingsReq(
1556 preferences=[
1557 notifications_pb2.SingleNotificationPreference(
1558 topic=topic_action.topic,
1559 action=topic_action.action,
1560 delivery_method="email",
1561 enabled=True,
1562 )
1563 ],
1564 )
1565 )
1567 # Delete the user
1568 with session_scope() as session:
1569 session.execute(update(User).where(User.id == user.id).values(deleted_at=now()))
1571 with session_scope() as session:
1572 notify(
1573 session,
1574 user_id=user.id,
1575 topic_action=topic_action,
1576 key="test-badge",
1577 data=notification_data_pb2.BadgeAdd(
1578 badge_id="volunteer",
1579 badge_name="Active Volunteer",
1580 badge_description="This user is an active volunteer",
1581 ),
1582 )
1584 # Email should not be sent to deleted user for non-account-deletion notification
1585 assert email_collector.count_for_recipient(user.email) == 0
1588def test_handle_notification_deleted_user_receives_account_deletion_email(db, email_collector: EmailCollector):
1589 """Test that deleted users CAN receive account deletion notifications."""
1590 user, token = generate_user()
1592 topic_action = NotificationTopicAction.account_deletion__complete
1594 # Delete the user
1595 with session_scope() as session:
1596 session.execute(update(User).where(User.id == user.id).values(deleted_at=now()))
1598 with session_scope() as session:
1599 notify(
1600 session,
1601 user_id=user.id,
1602 topic_action=topic_action,
1603 key="",
1604 data=notification_data_pb2.AccountDeletionComplete(
1605 undelete_token="test-token",
1606 undelete_days=7,
1607 ),
1608 )
1610 # Email SHOULD be sent to deleted user for account deletion notification
1611 email = email_collector.pop_for_recipient(user.email, last=True)
1612 assert email.recipient == user.email
1615def test_handle_notification_do_not_email_respected(db, email_collector: EmailCollector):
1616 """Test that users with do_not_email set don't receive non-critical emails."""
1617 user, token = generate_user()
1619 topic_action = NotificationTopicAction.badge__add
1621 # Enable email notifications
1622 with notifications_session(token) as notifications:
1623 notifications.SetNotificationSettings(
1624 notifications_pb2.SetNotificationSettingsReq(
1625 preferences=[
1626 notifications_pb2.SingleNotificationPreference(
1627 topic=topic_action.topic,
1628 action=topic_action.action,
1629 delivery_method="email",
1630 enabled=True,
1631 )
1632 ],
1633 )
1634 )
1636 # Set do_not_email (requires hosting/meetup status to be set due to DB constraint)
1637 with session_scope() as session:
1638 session.execute(
1639 update(User)
1640 .where(User.id == user.id)
1641 .values(
1642 hosting_status=HostingStatus.cant_host,
1643 meetup_status=MeetupStatus.does_not_want_to_meetup,
1644 do_not_email=True,
1645 )
1646 )
1648 with session_scope() as session:
1649 notify(
1650 session,
1651 user_id=user.id,
1652 topic_action=topic_action,
1653 key="test-badge",
1654 data=notification_data_pb2.BadgeAdd(
1655 badge_id="volunteer",
1656 badge_name="Active Volunteer",
1657 badge_description="This user is an active volunteer",
1658 ),
1659 )
1661 # Email should not be sent when do_not_email is True
1662 assert email_collector.count_for_recipient(user.email) == 0
1665def test_handle_notification_critical_bypasses_do_not_email(db, email_collector: EmailCollector):
1666 """Test that critical notifications bypass do_not_email setting."""
1667 user, token = generate_user()
1669 topic_action = NotificationTopicAction.password__change
1671 # Set do_not_email (requires hosting/meetup status to be set due to DB constraint)
1672 with session_scope() as session:
1673 session.execute(
1674 update(User)
1675 .where(User.id == user.id)
1676 .values(
1677 hosting_status=HostingStatus.cant_host,
1678 meetup_status=MeetupStatus.does_not_want_to_meetup,
1679 do_not_email=True,
1680 )
1681 )
1683 with session_scope() as session:
1684 notify(
1685 session,
1686 user_id=user.id,
1687 topic_action=topic_action,
1688 key="",
1689 data=None,
1690 )
1692 # Critical email SHOULD be sent even with do_not_email=True
1693 email = email_collector.pop_for_recipient(user.email, last=True)
1694 assert email.recipient == user.email
1697def test_handle_notification_duplicate_delivery_skipped(db, push_collector: PushCollector):
1698 """Test that duplicate deliveries are skipped when NotificationDelivery already exists."""
1699 user, token = generate_user()
1701 topic_action = NotificationTopicAction.badge__add
1703 # Create notification manually
1704 with session_scope() as session:
1705 notification = Notification(
1706 user_id=user.id,
1707 topic_action=topic_action,
1708 key="test-badge",
1709 data=notification_data_pb2.BadgeAdd(
1710 badge_id="volunteer",
1711 badge_name="Active Volunteer",
1712 badge_description="This user is an active volunteer",
1713 ).SerializeToString(),
1714 )
1715 session.add(notification)
1716 session.flush()
1717 notification_id = notification.id
1719 # Manually create a push delivery (simulating it was already delivered)
1720 session.add(
1721 NotificationDelivery(
1722 notification_id=notification_id,
1723 delivery_type=NotificationDeliveryType.push,
1724 delivered=now(),
1725 )
1726 )
1728 # Try to handle the notification again
1729 handle_notification(jobs_pb2.HandleNotificationPayload(notification_id=notification_id))
1731 # No new push should be sent since delivery already exists
1732 assert push_collector.count_for_user(user.id) == 0
1734 # Verify only one delivery exists
1735 with session_scope() as session:
1736 delivery_count = len(
1737 session.execute(
1738 select(NotificationDelivery)
1739 .where(NotificationDelivery.notification_id == notification_id)
1740 .where(NotificationDelivery.delivery_type == NotificationDeliveryType.push)
1741 )
1742 .scalars()
1743 .all()
1744 )
1745 assert delivery_count == 1
1748def test_handle_notification_deferred_when_content_not_visible(db, moderator):
1749 """Test that notifications linked to non-visible moderated content are deferred."""
1750 user1, token1 = generate_user(complete_profile=True)
1751 user2, token2 = generate_user(complete_profile=True)
1753 # Create a friend request (which creates a moderation state)
1754 # This also queues a notification via SendFriendRequest
1755 with api_session(token2) as api:
1756 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
1758 # Process the queued job (handle_notification)
1759 process_job()
1761 # The notification should exist but have no deliveries because content is shadowed
1762 with session_scope() as session:
1763 notification = session.execute(
1764 select(Notification)
1765 .where(Notification.user_id == user1.id)
1766 .where(Notification.topic_action == NotificationTopicAction.friend_request__create)
1767 ).scalar_one()
1769 deliveries = (
1770 session.execute(select(NotificationDelivery).where(NotificationDelivery.notification_id == notification.id))
1771 .scalars()
1772 .all()
1773 )
1774 # No deliveries because content is not yet visible (shadowed)
1775 assert len(deliveries) == 0
1778def test_handle_notification_delivered_when_content_visible(db, moderator):
1779 """Test that notifications linked to visible moderated content are delivered."""
1780 user1, token1 = generate_user(complete_profile=True)
1781 user2, token2 = generate_user(complete_profile=True)
1783 # Create a friend request
1784 with api_session(token2) as api:
1785 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user1.id))
1786 res = api.ListFriendRequests(empty_pb2.Empty())
1787 fr_id = res.sent[0].friend_request_id
1789 # Process initial job (which is deferred because content is shadowed)
1790 process_job()
1792 # Approve the friend request so it becomes visible (this queues the notification job again)
1793 moderator.approve_friend_request(fr_id)
1795 # Process the notification job that was re-queued after approval
1796 process_jobs()
1798 # Notification should have been delivered
1799 with session_scope() as session:
1800 notification = session.execute(
1801 select(Notification)
1802 .where(Notification.user_id == user1.id)
1803 .where(Notification.topic_action == NotificationTopicAction.friend_request__create)
1804 ).scalar_one()
1806 deliveries = (
1807 session.execute(select(NotificationDelivery).where(NotificationDelivery.notification_id == notification.id))
1808 .scalars()
1809 .all()
1810 )
1811 # At least one delivery should exist
1812 assert len(deliveries) > 0
1815def test_notification_serializes_shadowed_actor(db, moderator):
1816 recipient, _ = generate_user(complete_profile=True)
1817 sender, sender_token = generate_user(complete_profile=True)
1819 with session_scope() as session:
1820 session.execute(update(User).where(User.id == sender.id).values(shadowed_at=now()))
1822 with api_session(sender_token) as api:
1823 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=recipient.id))
1825 process_job()
1827 with session_scope() as session:
1828 notification = session.execute(
1829 select(Notification)
1830 .where(Notification.user_id == recipient.id)
1831 .where(Notification.topic_action == NotificationTopicAction.friend_request__create)
1832 ).scalar_one()
1833 data = notification_data_pb2.FriendRequestCreate.FromString(notification.data)
1834 assert data.other_user.user_id == sender.id
1835 assert not data.other_user.is_ghost
1838def test_handle_notification_multiple_delivery_types(
1839 db, email_collector: EmailCollector, push_collector: PushCollector
1840):
1841 """Test that multiple delivery types are processed for a single notification."""
1842 user, token = generate_user()
1844 topic_action = NotificationTopicAction.badge__add
1846 # Enable both email and push notifications
1847 with notifications_session(token) as notifications:
1848 notifications.SetNotificationSettings(
1849 notifications_pb2.SetNotificationSettingsReq(
1850 preferences=[
1851 notifications_pb2.SingleNotificationPreference(
1852 topic=topic_action.topic,
1853 action=topic_action.action,
1854 delivery_method="email",
1855 enabled=True,
1856 ),
1857 notifications_pb2.SingleNotificationPreference(
1858 topic=topic_action.topic,
1859 action=topic_action.action,
1860 delivery_method="push",
1861 enabled=True,
1862 ),
1863 notifications_pb2.SingleNotificationPreference(
1864 topic=topic_action.topic,
1865 action=topic_action.action,
1866 delivery_method="digest",
1867 enabled=True,
1868 ),
1869 ],
1870 )
1871 )
1873 with session_scope() as session:
1874 notify(
1875 session,
1876 user_id=user.id,
1877 topic_action=topic_action,
1878 key="test-badge",
1879 data=notification_data_pb2.BadgeAdd(
1880 badge_id="volunteer",
1881 badge_name="Active Volunteer",
1882 badge_description="This user is an active volunteer",
1883 ),
1884 )
1886 # Email should be sent
1887 email_collector.pop_for_recipient(user.email, last=True)
1889 # Push should be sent
1890 push = push_collector.pop_for_user(user.id, last=True)
1891 assert "Active Volunteer" in push.content.title
1893 # All three delivery types should have deliveries
1894 with session_scope() as session:
1895 notification = session.execute(select(Notification).where(Notification.user_id == user.id)).scalar_one()
1897 deliveries = (
1898 session.execute(select(NotificationDelivery).where(NotificationDelivery.notification_id == notification.id))
1899 .scalars()
1900 .all()
1901 )
1903 delivery_types = {d.delivery_type for d in deliveries}
1904 assert NotificationDeliveryType.email in delivery_types
1905 assert NotificationDeliveryType.push in delivery_types
1906 assert NotificationDeliveryType.digest in delivery_types
1908 # Email and push should have delivered timestamps
1909 for delivery in deliveries:
1910 if delivery.delivery_type in [NotificationDeliveryType.email, NotificationDeliveryType.push]:
1911 assert delivery.delivered is not None
1912 elif delivery.delivery_type == NotificationDeliveryType.digest: 1912 ↛ 1909line 1912 didn't jump to line 1909 because the condition on line 1912 was always true
1913 assert delivery.delivered is None