Coverage for app/backend/src/tests/test_email.py: 100%
276 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1from datetime import timedelta
2from unittest.mock import patch
3from urllib.parse import parse_qs, urlparse
5import pytest
6from sqlalchemy import func, select, update
8import couchers.jobs.handlers
9from couchers.config import config
10from couchers.context import make_background_user_context, make_logged_out_context
11from couchers.crypto import b64decode, random_hex, urlsafe_secure_token
12from couchers.db import session_scope
13from couchers.i18n import LocalizationContext
14from couchers.models import (
15 ContentReport,
16 Email,
17 ModerationObjectType,
18 ModerationState,
19 ModerationVisibility,
20 Reference,
21 ReferenceType,
22 SignupFlow,
23 User,
24)
25from couchers.models.notifications import NotificationTopicAction
26from couchers.notifications.notify import notify
27from couchers.proto import api_pb2, auth_pb2, editor_pb2, events_pb2, notification_data_pb2, notifications_pb2
28from couchers.tasks import (
29 enforce_community_memberships,
30 maybe_send_reference_report_email,
31 send_content_report_email,
32 send_email_changed_confirmation_to_new_email,
33 send_signup_email,
34)
35from couchers.utils import Timestamp_from_datetime, now
36from tests.fixtures.db import generate_user, get_friend_relationship, make_friends
37from tests.fixtures.misc import EmailCollector, Moderator, process_jobs
38from tests.fixtures.sessions import (
39 api_session,
40 auth_api_session,
41 events_session,
42 notifications_session,
43 real_editor_session,
44)
45from tests.test_communities import create_community
48@pytest.fixture(autouse=True)
49def _(testconfig):
50 pass
53def test_signup_verification_email(db, email_collector: EmailCollector):
54 request_email = f"{random_hex(12)}@couchers.org.invalid"
56 flow = SignupFlow(name="Frodo", email=request_email, flow_token="")
58 with session_scope() as session:
59 context = make_logged_out_context(LocalizationContext.en_utc())
60 send_signup_email(context, session, flow)
62 email = email_collector.pop_for_recipient(request_email, last=True)
63 assert email.recipient == request_email
64 assert flow.email_token
65 assert flow.email_token in email.html
66 assert flow.email_token in email.html
69def test_report_email(db, email_collector: EmailCollector):
70 user_reporter, api_token_author = generate_user()
71 user_author, api_token_reported = generate_user()
73 with session_scope() as session:
74 report = ContentReport(
75 reporting_user_id=user_reporter.id,
76 reason="spam",
77 description="I think this is spam and does not belong on couchers",
78 content_ref="comment/123",
79 author_user_id=user_author.id,
80 user_agent="n/a",
81 page="https://couchers.org/comment/123",
82 )
83 session.add(report)
84 session.flush()
86 send_content_report_email(session, report)
88 # Load all data before session closes
89 author_username = report.author_user.username
90 author_id = report.author_user.id
91 author_email = report.author_user.email
92 reporting_username = report.reporting_user.username
93 reporting_id = report.reporting_user.id
94 reporting_email = report.reporting_user.email
95 reason = report.reason
96 description = report.description
98 email = email_collector.pop_for_recipient("reports@couchers.org.invalid", last=True)
99 assert email.recipient == "reports@couchers.org.invalid"
100 assert author_username in email.plain
101 assert str(author_id) in email.plain
102 assert author_email in email.plain
103 assert reporting_username in email.plain
104 assert str(reporting_id) in email.plain
105 assert reporting_email in email.plain
106 assert reason in email.plain
107 assert description in email.plain
108 assert "report" in email.subject.lower()
111def test_reference_report_email_not_sent(db, email_collector: EmailCollector):
112 from_user, api_token_author = generate_user()
113 to_user, api_token_reported = generate_user()
115 make_friends(from_user, to_user)
117 with session_scope() as session:
118 moderation_state = ModerationState(
119 object_type=ModerationObjectType.reference,
120 object_id=0,
121 visibility=ModerationVisibility.visible,
122 )
123 session.add(moderation_state)
124 session.flush()
125 reference = Reference(
126 from_user_id=from_user.id,
127 to_user_id=to_user.id,
128 reference_type=ReferenceType.friend,
129 text="This person was very nice to me.",
130 rating=0.9,
131 was_appropriate=True,
132 moderation_state_id=moderation_state.id,
133 )
134 session.add(reference)
135 session.flush()
136 moderation_state.object_id = reference.id
138 # no email sent for a positive ref
139 maybe_send_reference_report_email(session, reference)
141 assert email_collector.count_for_recipient("reports@couchers.org.invalid") == 0
144def test_reference_report_email(db, email_collector: EmailCollector):
145 from_user, api_token_author = generate_user()
146 to_user, api_token_reported = generate_user()
148 make_friends(from_user, to_user)
150 with session_scope() as session:
151 moderation_state = ModerationState(
152 object_type=ModerationObjectType.reference,
153 object_id=0,
154 visibility=ModerationVisibility.visible,
155 )
156 session.add(moderation_state)
157 session.flush()
158 reference = Reference(
159 from_user_id=from_user.id,
160 to_user_id=to_user.id,
161 reference_type=ReferenceType.friend,
162 text="This person was not nice to me.",
163 rating=0.3,
164 was_appropriate=False,
165 private_text="This is some private text for support",
166 moderation_state_id=moderation_state.id,
167 )
168 session.add(reference)
169 session.flush()
170 moderation_state.object_id = reference.id
172 maybe_send_reference_report_email(session, reference)
174 reference_text = reference.text
175 reference_private_text = reference.private_text
177 email = email_collector.pop_for_recipient("reports@couchers.org.invalid", last=True)
178 assert email.recipient == "reports@couchers.org.invalid"
179 assert "report" in email.subject.lower()
180 assert "reference" in email.subject.lower()
181 assert from_user.username in email.plain
182 assert str(from_user.id) in email.plain
183 assert from_user.email in email.plain
184 assert to_user.username in email.plain
185 assert str(to_user.id) in email.plain
186 assert to_user.email in email.plain
187 assert reference_text in email.plain
188 assert "friend" in email.plain.lower()
189 assert reference_private_text
190 assert reference_private_text in email.plain
193def test_email_patching_fails(db):
194 """
195 There was a problem where the mocking wasn't happening and the email dev
196 printing function was called instead, this makes sure the patching is
197 actually done
198 """
199 to_user, to_token = generate_user()
200 from_user, from_token = generate_user()
201 # Need a moderator to approve the friend request since UMS defers notification
202 mod_user, mod_token = generate_user(is_superuser=True)
203 moderator = Moderator(mod_user, mod_token)
205 patched_msg = random_hex(64)
207 def mock_queue_email(session, payload):
208 raise Exception(patched_msg)
210 with api_session(from_token) as api:
211 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=to_user.id))
213 friend_relationship = get_friend_relationship(from_user, to_user)
214 assert friend_relationship is not None
215 moderator.approve_friend_request(friend_relationship.id)
217 with patch("couchers.email.queuing._queue_email", mock_queue_email):
218 with pytest.raises(Exception) as e:
219 process_jobs()
221 assert str(e.value) == patched_msg
224def test_email_changed_confirmation_sent_to_new_email(db, email_collector: EmailCollector):
225 confirmation_token = urlsafe_secure_token()
226 user, user_token = generate_user()
227 user.new_email = f"{random_hex(12)}@couchers.org.invalid"
228 user.new_email_token = confirmation_token
230 with session_scope() as session:
231 user_context = make_background_user_context(user.id)
232 send_email_changed_confirmation_to_new_email(user_context, session, user)
234 email = email_collector.pop_for_recipient(user.new_email, last=True)
235 assert "new email" in email.subject
236 assert email.recipient == user.new_email
237 assert user.name in email.plain
238 assert user.name in email.html
239 assert user.email in email.plain
240 assert user.email in email.html
241 assert "Your old email address is" in email.plain
242 assert "Your old email address is" in email.html
243 assert f"http://localhost:3000/confirm-email?token={confirmation_token}" in email.plain
244 assert f"http://localhost:3000/confirm-email?token={confirmation_token}" in email.html
245 assert "support@couchers.org" in email.plain
246 assert "support@couchers.org" in email.html
249def test_do_not_email_security(db, email_collector: EmailCollector):
250 user, token = generate_user()
252 password_reset_token = urlsafe_secure_token()
254 with notifications_session(token) as notifications:
255 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=True))
257 # make sure we still get security emails
259 with session_scope() as session:
260 notify(
261 session,
262 user_id=user.id,
263 topic_action=NotificationTopicAction.password_reset__start,
264 key="",
265 data=notification_data_pb2.PasswordResetStart(
266 password_reset_token=password_reset_token,
267 ),
268 )
270 email = email_collector.pop_for_recipient(user.email, last=True)
271 assert email.recipient == user.email
272 assert "reset" in email.subject.lower()
273 assert password_reset_token in email.plain
274 assert password_reset_token in email.html
275 unique_string = "You asked for your password to be reset on Couchers.org."
276 assert unique_string in email.plain
277 assert unique_string in email.html
278 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in email.plain
279 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in email.html
280 assert "support@couchers.org" in email.plain
281 assert "support@couchers.org" in email.html
283 assert "/quick-link?payload=" not in email.plain
284 assert "/quick-link?payload=" not in email.html
287def test_do_not_email_non_security(db, email_collector: EmailCollector):
288 user, token1 = generate_user(complete_profile=True)
289 from_user, token2 = generate_user(complete_profile=True)
290 # Need a moderator to approve the friend request since UMS defers notification
291 mod_user, mod_token = generate_user(is_superuser=True)
292 moderator = Moderator(mod_user, mod_token)
294 with notifications_session(token1) as notifications:
295 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=True))
297 with api_session(token2) as api:
298 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id))
300 friend_relationship = get_friend_relationship(from_user, user)
301 assert friend_relationship is not None
302 moderator.approve_friend_request(friend_relationship.id)
304 assert email_collector.count_for_recipient(user.email) == 0
307def test_do_not_email_non_security_unsublink(db, email_collector: EmailCollector):
308 user, _ = generate_user(complete_profile=True)
309 from_user, token2 = generate_user(complete_profile=True)
310 # Need a moderator to approve the friend request since UMS defers notification
311 mod_user, mod_token = generate_user(is_superuser=True)
312 moderator = Moderator(mod_user, mod_token)
314 with api_session(token2) as api:
315 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id))
317 friend_relationship = get_friend_relationship(from_user, user)
318 assert friend_relationship is not None
319 moderator.approve_friend_request(friend_relationship.id)
321 email = email_collector.pop_for_recipient(user.email, last=True)
323 assert "/quick-link?payload=" in email.plain
324 assert "/quick-link?payload=" in email.html
327def test_email_prefix_config(db, email_collector: EmailCollector, monkeypatch):
328 user, _ = generate_user()
330 with session_scope() as session:
331 notify(
332 session,
333 user_id=user.id,
334 topic_action=NotificationTopicAction.donation__received,
335 key="",
336 data=notification_data_pb2.DonationReceived(
337 amount=20,
338 receipt_url="https://example.com/receipt/12345",
339 ),
340 )
342 email1 = email_collector.pop_for_recipient(user.email, last=True)
343 assert email1.sender_name == "Couchers.org"
344 assert email1.sender_email == "notify@couchers.org.invalid"
345 assert email1.subject == "[TEST] Thank you for your donation to Couchers.org!"
347 new_config = config.copy()
348 new_config.NOTIFICATION_EMAIL_SENDER = "TestCo"
349 new_config.NOTIFICATION_EMAIL_ADDRESS = "testco@testing.co.invalid"
350 new_config.NOTIFICATION_PREFIX = ""
352 monkeypatch.setattr(couchers.notifications.render_email, "config", new_config)
354 with session_scope() as session:
355 notify(
356 session,
357 user_id=user.id,
358 topic_action=NotificationTopicAction.donation__received,
359 key="",
360 data=notification_data_pb2.DonationReceived(
361 amount=20,
362 receipt_url="https://example.com/receipt/12345",
363 ),
364 )
366 email2 = email_collector.pop_for_recipient(user.email, last=True)
367 assert email2.sender_name == "TestCo"
368 assert email2.sender_email == "testco@testing.co.invalid"
369 assert email2.subject == "Thank you for your donation to Couchers.org!"
372def test_send_donation_email(db, monkeypatch):
373 user, _ = generate_user(name="Testy von Test", email="testing@couchers.org.invalid")
375 new_config = config.copy()
376 new_config.ENABLE_EMAIL = True
378 monkeypatch.setattr(couchers.jobs.handlers, "config", new_config)
380 with session_scope() as session:
381 notify(
382 session,
383 user_id=user.id,
384 topic_action=NotificationTopicAction.donation__received,
385 key="",
386 data=notification_data_pb2.DonationReceived(
387 amount=20,
388 receipt_url="https://example.com/receipt/12345",
389 ),
390 )
392 with patch("couchers.email.smtp.smtplib.SMTP"):
393 process_jobs()
395 with session_scope() as session:
396 email = session.execute(select(Email)).scalar_one()
397 assert email.subject == "[TEST] Thank you for your donation to Couchers.org!"
398 assert (
399 email.plain
400 == """Hi Testy von Test,
402Thank you so much for your donation of $20 to Couchers.org.
404Your contribution will go towards building and sustaining the Couchers.org platform and community, and is vital for our goal of a completely free and non-profit generation of couch surfing.
406You can download an invoice and receipt for the donation here:
408Download invoice: https://example.com/receipt/12345
410Couchers, Inc. is a 501(c)(3) nonprofit (EIN: 87-1734577) registered in the United States. No goods or services were provided in exchange for this contribution.
412If you have any questions about your donation, please email us at donations@couchers.org.
414Your generosity will help deliver the platform for everyone.
416Thank you!
418Aapeli and Itsi,
419Couchers.org Founders
421---
423This is a security email, you cannot unsubscribe from it.
424"""
425 )
427 assert "Thank you so much for your donation of $20 to Couchers.org." in email.html
428 assert email.sender_name == "Couchers.org"
429 assert email.sender_email == "notify@couchers.org.invalid"
430 assert email.recipient == "testing@couchers.org.invalid"
431 assert "https://example.com/receipt/12345" in email.html
432 assert not email.list_unsubscribe_header
433 assert email.source_data and ("donation:received" in email.source_data)
436def test_chat_missed_messages_list_unsubscribe_header(db, email_collector: EmailCollector):
437 """
438 Regression test: chat__missed_messages has key="" (it's a summary, not tied to a single chat).
439 The List-Unsubscribe header must use a topic_action unsubscribe link, not a topic_key link.
440 """
441 user, _ = generate_user()
443 with session_scope() as session:
444 notify(
445 session,
446 user_id=user.id,
447 topic_action=NotificationTopicAction.chat__missed_messages,
448 key="",
449 data=notification_data_pb2.ChatMissedMessages(
450 messages=[
451 notification_data_pb2.ChatMessage(
452 author=api_pb2.User(name="Test User", user_id=2, username="testuser"),
453 text="Hello!",
454 group_chat_id=99,
455 unseen_count=1,
456 ),
457 ],
458 ),
459 )
461 email = email_collector.pop_for_recipient(user.email, last=True)
463 assert email.list_unsubscribe_header
465 # Extract the List-Unsubscribe URL and call the Unsubscribe endpoint
466 url = email.list_unsubscribe_header.strip("<>")
467 url_parts = urlparse(url)
468 params = parse_qs(url_parts.query)
470 with auth_api_session() as (auth_api, metadata_interceptor):
471 res = auth_api.Unsubscribe(
472 auth_pb2.UnsubscribeReq(
473 payload=b64decode(params["payload"][0]),
474 sig=b64decode(params["sig"][0]),
475 )
476 )
477 assert res.response
480def test_email_deleted_users_regression(db, email_collector: EmailCollector, moderator: Moderator):
481 """
482 We introduced a bug in notify v2 where we would email deleted/banned users.
483 """
484 super_user, super_token = generate_user(is_superuser=True)
485 creating_user, creating_token = generate_user(complete_profile=True)
487 normal_user, _ = generate_user()
488 ban_user, _ = generate_user()
489 delete_user, _ = generate_user()
491 with session_scope() as session:
492 w = create_community(session, 0, 2, "Global Community", [super_user], [], None)
493 mr = create_community(session, 0, 2, "Macroregion", [super_user], [], w)
494 r = create_community(session, 0, 2, "Region", [super_user], [], mr)
495 c_id = create_community(
496 session,
497 0,
498 2,
499 "Non-global Community",
500 [super_user],
501 [creating_user, normal_user, ban_user, delete_user],
502 r,
503 ).id
505 enforce_community_memberships()
507 start_time = now() + timedelta(hours=2)
508 end_time = start_time + timedelta(hours=3)
509 with events_session(creating_token) as api:
510 res = api.CreateEvent(
511 events_pb2.CreateEventReq(
512 title="Dummy Title",
513 content="Dummy content.",
514 photo_key=None,
515 parent_community_id=c_id,
516 offline_information=events_pb2.OfflineEventInformation(
517 address="Near Null Island",
518 lat=0.1,
519 lng=0.2,
520 ),
521 start_time=Timestamp_from_datetime(start_time),
522 end_time=Timestamp_from_datetime(end_time),
523 timezone="UTC",
524 )
525 )
526 event_id = res.event_id
527 assert not res.is_deleted
529 moderator.approve_event_occurrence(event_id)
531 with events_session(creating_token) as api:
532 api.RequestCommunityInvite(events_pb2.RequestCommunityInviteReq(event_id=event_id))
534 email_collector.pop_for_mods(last=True)
536 with real_editor_session(super_token) as editor:
537 res = editor.ListEventCommunityInviteRequests(editor_pb2.ListEventCommunityInviteRequestsReq())
538 assert len(res.requests) == 1
539 # this will count everyone
540 assert res.requests[0].approx_users_to_notify == 5
542 with session_scope() as session:
543 session.execute(update(User).where(User.id == ban_user.id).values(banned_at=func.now()))
544 session.execute(update(User).where(User.id == delete_user.id).values(deleted_at=func.now()))
546 with real_editor_session(super_token) as editor:
547 res = editor.ListEventCommunityInviteRequests(editor_pb2.ListEventCommunityInviteRequestsReq())
548 assert len(res.requests) == 1
549 # should only notify creating_user, super_user and normal_user
550 assert res.requests[0].approx_users_to_notify == 3
552 editor.DecideEventCommunityInviteRequest(
553 editor_pb2.DecideEventCommunityInviteRequestReq(
554 event_community_invite_request_id=res.requests[0].event_community_invite_request_id,
555 approve=True,
556 )
557 )
559 assert email_collector.count() == 3