Coverage for app/backend/src/tests/test_email.py: 100%

276 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1from datetime import timedelta 

2from unittest.mock import patch 

3from urllib.parse import parse_qs, urlparse 

4 

5import pytest 

6from sqlalchemy import func, select, update 

7 

8import couchers.jobs.handlers 

9from couchers.config import config 

10from couchers.context import make_background_user_context, make_logged_out_context 

11from couchers.crypto import b64decode, random_hex, urlsafe_secure_token 

12from couchers.db import session_scope 

13from couchers.i18n import LocalizationContext 

14from couchers.models import ( 

15 ContentReport, 

16 Email, 

17 ModerationObjectType, 

18 ModerationState, 

19 ModerationVisibility, 

20 Reference, 

21 ReferenceType, 

22 SignupFlow, 

23 User, 

24) 

25from couchers.models.notifications import NotificationTopicAction 

26from couchers.notifications.notify import notify 

27from couchers.proto import api_pb2, auth_pb2, editor_pb2, events_pb2, notification_data_pb2, notifications_pb2 

28from couchers.tasks import ( 

29 enforce_community_memberships, 

30 maybe_send_reference_report_email, 

31 send_content_report_email, 

32 send_email_changed_confirmation_to_new_email, 

33 send_signup_email, 

34) 

35from couchers.utils import Timestamp_from_datetime, now 

36from tests.fixtures.db import generate_user, get_friend_relationship, make_friends 

37from tests.fixtures.misc import EmailCollector, Moderator, process_jobs 

38from tests.fixtures.sessions import ( 

39 api_session, 

40 auth_api_session, 

41 events_session, 

42 notifications_session, 

43 real_editor_session, 

44) 

45from tests.test_communities import create_community 

46 

47 

48@pytest.fixture(autouse=True) 

49def _(testconfig): 

50 pass 

51 

52 

53def test_signup_verification_email(db, email_collector: EmailCollector): 

54 request_email = f"{random_hex(12)}@couchers.org.invalid" 

55 

56 flow = SignupFlow(name="Frodo", email=request_email, flow_token="") 

57 

58 with session_scope() as session: 

59 context = make_logged_out_context(LocalizationContext.en_utc()) 

60 send_signup_email(context, session, flow) 

61 

62 email = email_collector.pop_for_recipient(request_email, last=True) 

63 assert email.recipient == request_email 

64 assert flow.email_token 

65 assert flow.email_token in email.html 

66 assert flow.email_token in email.html 

67 

68 

69def test_report_email(db, email_collector: EmailCollector): 

70 user_reporter, api_token_author = generate_user() 

71 user_author, api_token_reported = generate_user() 

72 

73 with session_scope() as session: 

74 report = ContentReport( 

75 reporting_user_id=user_reporter.id, 

76 reason="spam", 

77 description="I think this is spam and does not belong on couchers", 

78 content_ref="comment/123", 

79 author_user_id=user_author.id, 

80 user_agent="n/a", 

81 page="https://couchers.org/comment/123", 

82 ) 

83 session.add(report) 

84 session.flush() 

85 

86 send_content_report_email(session, report) 

87 

88 # Load all data before session closes 

89 author_username = report.author_user.username 

90 author_id = report.author_user.id 

91 author_email = report.author_user.email 

92 reporting_username = report.reporting_user.username 

93 reporting_id = report.reporting_user.id 

94 reporting_email = report.reporting_user.email 

95 reason = report.reason 

96 description = report.description 

97 

98 email = email_collector.pop_for_recipient("reports@couchers.org.invalid", last=True) 

99 assert email.recipient == "reports@couchers.org.invalid" 

100 assert author_username in email.plain 

101 assert str(author_id) in email.plain 

102 assert author_email in email.plain 

103 assert reporting_username in email.plain 

104 assert str(reporting_id) in email.plain 

105 assert reporting_email in email.plain 

106 assert reason in email.plain 

107 assert description in email.plain 

108 assert "report" in email.subject.lower() 

109 

110 

111def test_reference_report_email_not_sent(db, email_collector: EmailCollector): 

112 from_user, api_token_author = generate_user() 

113 to_user, api_token_reported = generate_user() 

114 

115 make_friends(from_user, to_user) 

116 

117 with session_scope() as session: 

118 moderation_state = ModerationState( 

119 object_type=ModerationObjectType.reference, 

120 object_id=0, 

121 visibility=ModerationVisibility.visible, 

122 ) 

123 session.add(moderation_state) 

124 session.flush() 

125 reference = Reference( 

126 from_user_id=from_user.id, 

127 to_user_id=to_user.id, 

128 reference_type=ReferenceType.friend, 

129 text="This person was very nice to me.", 

130 rating=0.9, 

131 was_appropriate=True, 

132 moderation_state_id=moderation_state.id, 

133 ) 

134 session.add(reference) 

135 session.flush() 

136 moderation_state.object_id = reference.id 

137 

138 # no email sent for a positive ref 

139 maybe_send_reference_report_email(session, reference) 

140 

141 assert email_collector.count_for_recipient("reports@couchers.org.invalid") == 0 

142 

143 

144def test_reference_report_email(db, email_collector: EmailCollector): 

145 from_user, api_token_author = generate_user() 

146 to_user, api_token_reported = generate_user() 

147 

148 make_friends(from_user, to_user) 

149 

150 with session_scope() as session: 

151 moderation_state = ModerationState( 

152 object_type=ModerationObjectType.reference, 

153 object_id=0, 

154 visibility=ModerationVisibility.visible, 

155 ) 

156 session.add(moderation_state) 

157 session.flush() 

158 reference = Reference( 

159 from_user_id=from_user.id, 

160 to_user_id=to_user.id, 

161 reference_type=ReferenceType.friend, 

162 text="This person was not nice to me.", 

163 rating=0.3, 

164 was_appropriate=False, 

165 private_text="This is some private text for support", 

166 moderation_state_id=moderation_state.id, 

167 ) 

168 session.add(reference) 

169 session.flush() 

170 moderation_state.object_id = reference.id 

171 

172 maybe_send_reference_report_email(session, reference) 

173 

174 reference_text = reference.text 

175 reference_private_text = reference.private_text 

176 

177 email = email_collector.pop_for_recipient("reports@couchers.org.invalid", last=True) 

178 assert email.recipient == "reports@couchers.org.invalid" 

179 assert "report" in email.subject.lower() 

180 assert "reference" in email.subject.lower() 

181 assert from_user.username in email.plain 

182 assert str(from_user.id) in email.plain 

183 assert from_user.email in email.plain 

184 assert to_user.username in email.plain 

185 assert str(to_user.id) in email.plain 

186 assert to_user.email in email.plain 

187 assert reference_text in email.plain 

188 assert "friend" in email.plain.lower() 

189 assert reference_private_text 

190 assert reference_private_text in email.plain 

191 

192 

193def test_email_patching_fails(db): 

194 """ 

195 There was a problem where the mocking wasn't happening and the email dev 

196 printing function was called instead, this makes sure the patching is 

197 actually done 

198 """ 

199 to_user, to_token = generate_user() 

200 from_user, from_token = generate_user() 

201 # Need a moderator to approve the friend request since UMS defers notification 

202 mod_user, mod_token = generate_user(is_superuser=True) 

203 moderator = Moderator(mod_user, mod_token) 

204 

205 patched_msg = random_hex(64) 

206 

207 def mock_queue_email(session, payload): 

208 raise Exception(patched_msg) 

209 

210 with api_session(from_token) as api: 

211 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=to_user.id)) 

212 

213 friend_relationship = get_friend_relationship(from_user, to_user) 

214 assert friend_relationship is not None 

215 moderator.approve_friend_request(friend_relationship.id) 

216 

217 with patch("couchers.email.queuing._queue_email", mock_queue_email): 

218 with pytest.raises(Exception) as e: 

219 process_jobs() 

220 

221 assert str(e.value) == patched_msg 

222 

223 

224def test_email_changed_confirmation_sent_to_new_email(db, email_collector: EmailCollector): 

225 confirmation_token = urlsafe_secure_token() 

226 user, user_token = generate_user() 

227 user.new_email = f"{random_hex(12)}@couchers.org.invalid" 

228 user.new_email_token = confirmation_token 

229 

230 with session_scope() as session: 

231 user_context = make_background_user_context(user.id) 

232 send_email_changed_confirmation_to_new_email(user_context, session, user) 

233 

234 email = email_collector.pop_for_recipient(user.new_email, last=True) 

235 assert "new email" in email.subject 

236 assert email.recipient == user.new_email 

237 assert user.name in email.plain 

238 assert user.name in email.html 

239 assert user.email in email.plain 

240 assert user.email in email.html 

241 assert "Your old email address is" in email.plain 

242 assert "Your old email address is" in email.html 

243 assert f"http://localhost:3000/confirm-email?token={confirmation_token}" in email.plain 

244 assert f"http://localhost:3000/confirm-email?token={confirmation_token}" in email.html 

245 assert "support@couchers.org" in email.plain 

246 assert "support@couchers.org" in email.html 

247 

248 

249def test_do_not_email_security(db, email_collector: EmailCollector): 

250 user, token = generate_user() 

251 

252 password_reset_token = urlsafe_secure_token() 

253 

254 with notifications_session(token) as notifications: 

255 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=True)) 

256 

257 # make sure we still get security emails 

258 

259 with session_scope() as session: 

260 notify( 

261 session, 

262 user_id=user.id, 

263 topic_action=NotificationTopicAction.password_reset__start, 

264 key="", 

265 data=notification_data_pb2.PasswordResetStart( 

266 password_reset_token=password_reset_token, 

267 ), 

268 ) 

269 

270 email = email_collector.pop_for_recipient(user.email, last=True) 

271 assert email.recipient == user.email 

272 assert "reset" in email.subject.lower() 

273 assert password_reset_token in email.plain 

274 assert password_reset_token in email.html 

275 unique_string = "You asked for your password to be reset on Couchers.org." 

276 assert unique_string in email.plain 

277 assert unique_string in email.html 

278 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in email.plain 

279 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in email.html 

280 assert "support@couchers.org" in email.plain 

281 assert "support@couchers.org" in email.html 

282 

283 assert "/quick-link?payload=" not in email.plain 

284 assert "/quick-link?payload=" not in email.html 

285 

286 

287def test_do_not_email_non_security(db, email_collector: EmailCollector): 

288 user, token1 = generate_user(complete_profile=True) 

289 from_user, token2 = generate_user(complete_profile=True) 

290 # Need a moderator to approve the friend request since UMS defers notification 

291 mod_user, mod_token = generate_user(is_superuser=True) 

292 moderator = Moderator(mod_user, mod_token) 

293 

294 with notifications_session(token1) as notifications: 

295 notifications.SetNotificationSettings(notifications_pb2.SetNotificationSettingsReq(enable_do_not_email=True)) 

296 

297 with api_session(token2) as api: 

298 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id)) 

299 

300 friend_relationship = get_friend_relationship(from_user, user) 

301 assert friend_relationship is not None 

302 moderator.approve_friend_request(friend_relationship.id) 

303 

304 assert email_collector.count_for_recipient(user.email) == 0 

305 

306 

307def test_do_not_email_non_security_unsublink(db, email_collector: EmailCollector): 

308 user, _ = generate_user(complete_profile=True) 

309 from_user, token2 = generate_user(complete_profile=True) 

310 # Need a moderator to approve the friend request since UMS defers notification 

311 mod_user, mod_token = generate_user(is_superuser=True) 

312 moderator = Moderator(mod_user, mod_token) 

313 

314 with api_session(token2) as api: 

315 api.SendFriendRequest(api_pb2.SendFriendRequestReq(user_id=user.id)) 

316 

317 friend_relationship = get_friend_relationship(from_user, user) 

318 assert friend_relationship is not None 

319 moderator.approve_friend_request(friend_relationship.id) 

320 

321 email = email_collector.pop_for_recipient(user.email, last=True) 

322 

323 assert "/quick-link?payload=" in email.plain 

324 assert "/quick-link?payload=" in email.html 

325 

326 

327def test_email_prefix_config(db, email_collector: EmailCollector, monkeypatch): 

328 user, _ = generate_user() 

329 

330 with session_scope() as session: 

331 notify( 

332 session, 

333 user_id=user.id, 

334 topic_action=NotificationTopicAction.donation__received, 

335 key="", 

336 data=notification_data_pb2.DonationReceived( 

337 amount=20, 

338 receipt_url="https://example.com/receipt/12345", 

339 ), 

340 ) 

341 

342 email1 = email_collector.pop_for_recipient(user.email, last=True) 

343 assert email1.sender_name == "Couchers.org" 

344 assert email1.sender_email == "notify@couchers.org.invalid" 

345 assert email1.subject == "[TEST] Thank you for your donation to Couchers.org!" 

346 

347 new_config = config.copy() 

348 new_config.NOTIFICATION_EMAIL_SENDER = "TestCo" 

349 new_config.NOTIFICATION_EMAIL_ADDRESS = "testco@testing.co.invalid" 

350 new_config.NOTIFICATION_PREFIX = "" 

351 

352 monkeypatch.setattr(couchers.notifications.render_email, "config", new_config) 

353 

354 with session_scope() as session: 

355 notify( 

356 session, 

357 user_id=user.id, 

358 topic_action=NotificationTopicAction.donation__received, 

359 key="", 

360 data=notification_data_pb2.DonationReceived( 

361 amount=20, 

362 receipt_url="https://example.com/receipt/12345", 

363 ), 

364 ) 

365 

366 email2 = email_collector.pop_for_recipient(user.email, last=True) 

367 assert email2.sender_name == "TestCo" 

368 assert email2.sender_email == "testco@testing.co.invalid" 

369 assert email2.subject == "Thank you for your donation to Couchers.org!" 

370 

371 

372def test_send_donation_email(db, monkeypatch): 

373 user, _ = generate_user(name="Testy von Test", email="testing@couchers.org.invalid") 

374 

375 new_config = config.copy() 

376 new_config.ENABLE_EMAIL = True 

377 

378 monkeypatch.setattr(couchers.jobs.handlers, "config", new_config) 

379 

380 with session_scope() as session: 

381 notify( 

382 session, 

383 user_id=user.id, 

384 topic_action=NotificationTopicAction.donation__received, 

385 key="", 

386 data=notification_data_pb2.DonationReceived( 

387 amount=20, 

388 receipt_url="https://example.com/receipt/12345", 

389 ), 

390 ) 

391 

392 with patch("couchers.email.smtp.smtplib.SMTP"): 

393 process_jobs() 

394 

395 with session_scope() as session: 

396 email = session.execute(select(Email)).scalar_one() 

397 assert email.subject == "[TEST] Thank you for your donation to Couchers.org!" 

398 assert ( 

399 email.plain 

400 == """Hi Testy von Test, 

401 

402Thank you so much for your donation of $20 to Couchers.org. 

403 

404Your contribution will go towards building and sustaining the Couchers.org platform and community, and is vital for our goal of a completely free and non-profit generation of couch surfing. 

405 

406You can download an invoice and receipt for the donation here: 

407 

408Download invoice: https://example.com/receipt/12345 

409 

410Couchers, Inc. is a 501(c)(3) nonprofit (EIN: 87-1734577) registered in the United States. No goods or services were provided in exchange for this contribution. 

411 

412If you have any questions about your donation, please email us at donations@couchers.org. 

413 

414Your generosity will help deliver the platform for everyone. 

415 

416Thank you! 

417 

418Aapeli and Itsi, 

419Couchers.org Founders 

420 

421--- 

422 

423This is a security email, you cannot unsubscribe from it. 

424""" 

425 ) 

426 

427 assert "Thank you so much for your donation of $20 to Couchers.org." in email.html 

428 assert email.sender_name == "Couchers.org" 

429 assert email.sender_email == "notify@couchers.org.invalid" 

430 assert email.recipient == "testing@couchers.org.invalid" 

431 assert "https://example.com/receipt/12345" in email.html 

432 assert not email.list_unsubscribe_header 

433 assert email.source_data and ("donation:received" in email.source_data) 

434 

435 

436def test_chat_missed_messages_list_unsubscribe_header(db, email_collector: EmailCollector): 

437 """ 

438 Regression test: chat__missed_messages has key="" (it's a summary, not tied to a single chat). 

439 The List-Unsubscribe header must use a topic_action unsubscribe link, not a topic_key link. 

440 """ 

441 user, _ = generate_user() 

442 

443 with session_scope() as session: 

444 notify( 

445 session, 

446 user_id=user.id, 

447 topic_action=NotificationTopicAction.chat__missed_messages, 

448 key="", 

449 data=notification_data_pb2.ChatMissedMessages( 

450 messages=[ 

451 notification_data_pb2.ChatMessage( 

452 author=api_pb2.User(name="Test User", user_id=2, username="testuser"), 

453 text="Hello!", 

454 group_chat_id=99, 

455 unseen_count=1, 

456 ), 

457 ], 

458 ), 

459 ) 

460 

461 email = email_collector.pop_for_recipient(user.email, last=True) 

462 

463 assert email.list_unsubscribe_header 

464 

465 # Extract the List-Unsubscribe URL and call the Unsubscribe endpoint 

466 url = email.list_unsubscribe_header.strip("<>") 

467 url_parts = urlparse(url) 

468 params = parse_qs(url_parts.query) 

469 

470 with auth_api_session() as (auth_api, metadata_interceptor): 

471 res = auth_api.Unsubscribe( 

472 auth_pb2.UnsubscribeReq( 

473 payload=b64decode(params["payload"][0]), 

474 sig=b64decode(params["sig"][0]), 

475 ) 

476 ) 

477 assert res.response 

478 

479 

480def test_email_deleted_users_regression(db, email_collector: EmailCollector, moderator: Moderator): 

481 """ 

482 We introduced a bug in notify v2 where we would email deleted/banned users. 

483 """ 

484 super_user, super_token = generate_user(is_superuser=True) 

485 creating_user, creating_token = generate_user(complete_profile=True) 

486 

487 normal_user, _ = generate_user() 

488 ban_user, _ = generate_user() 

489 delete_user, _ = generate_user() 

490 

491 with session_scope() as session: 

492 w = create_community(session, 0, 2, "Global Community", [super_user], [], None) 

493 mr = create_community(session, 0, 2, "Macroregion", [super_user], [], w) 

494 r = create_community(session, 0, 2, "Region", [super_user], [], mr) 

495 c_id = create_community( 

496 session, 

497 0, 

498 2, 

499 "Non-global Community", 

500 [super_user], 

501 [creating_user, normal_user, ban_user, delete_user], 

502 r, 

503 ).id 

504 

505 enforce_community_memberships() 

506 

507 start_time = now() + timedelta(hours=2) 

508 end_time = start_time + timedelta(hours=3) 

509 with events_session(creating_token) as api: 

510 res = api.CreateEvent( 

511 events_pb2.CreateEventReq( 

512 title="Dummy Title", 

513 content="Dummy content.", 

514 photo_key=None, 

515 parent_community_id=c_id, 

516 offline_information=events_pb2.OfflineEventInformation( 

517 address="Near Null Island", 

518 lat=0.1, 

519 lng=0.2, 

520 ), 

521 start_time=Timestamp_from_datetime(start_time), 

522 end_time=Timestamp_from_datetime(end_time), 

523 timezone="UTC", 

524 ) 

525 ) 

526 event_id = res.event_id 

527 assert not res.is_deleted 

528 

529 moderator.approve_event_occurrence(event_id) 

530 

531 with events_session(creating_token) as api: 

532 api.RequestCommunityInvite(events_pb2.RequestCommunityInviteReq(event_id=event_id)) 

533 

534 email_collector.pop_for_mods(last=True) 

535 

536 with real_editor_session(super_token) as editor: 

537 res = editor.ListEventCommunityInviteRequests(editor_pb2.ListEventCommunityInviteRequestsReq()) 

538 assert len(res.requests) == 1 

539 # this will count everyone 

540 assert res.requests[0].approx_users_to_notify == 5 

541 

542 with session_scope() as session: 

543 session.execute(update(User).where(User.id == ban_user.id).values(banned_at=func.now())) 

544 session.execute(update(User).where(User.id == delete_user.id).values(deleted_at=func.now())) 

545 

546 with real_editor_session(super_token) as editor: 

547 res = editor.ListEventCommunityInviteRequests(editor_pb2.ListEventCommunityInviteRequestsReq()) 

548 assert len(res.requests) == 1 

549 # should only notify creating_user, super_user and normal_user 

550 assert res.requests[0].approx_users_to_notify == 3 

551 

552 editor.DecideEventCommunityInviteRequest( 

553 editor_pb2.DecideEventCommunityInviteRequestReq( 

554 event_community_invite_request_id=res.requests[0].event_community_invite_request_id, 

555 approve=True, 

556 ) 

557 ) 

558 

559 assert email_collector.count() == 3