Coverage for app / backend / src / tests / test_account.py: 100%
697 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1from datetime import UTC, date, datetime, timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2, wrappers_pb2
7from sqlalchemy import select, update
8from sqlalchemy.sql import func
10from couchers import urls
11from couchers.crypto import hash_password, random_hex
12from couchers.db import session_scope
13from couchers.materialized_views import refresh_materialized_views_rapid
14from couchers.models import (
15 AccountDeletionReason,
16 AccountDeletionToken,
17 BackgroundJob,
18 InviteCode,
19 PhotoGalleryItem,
20 Upload,
21 User,
22)
23from couchers.proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2
24from couchers.utils import now, today
25from tests.fixtures.db import generate_user, make_volunteer
26from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email, process_jobs
27from tests.fixtures.sessions import (
28 account_session,
29 auth_api_session,
30 public_session,
31 real_account_session,
32 requests_session,
33)
34from tests.test_requests import valid_request_text
37@pytest.fixture(autouse=True)
38def _(testconfig):
39 pass
42def test_GetAccountInfo(db, fast_passwords):
43 # with password
44 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
46 with account_session(token1) as account:
47 res = account.GetAccountInfo(empty_pb2.Empty())
48 assert res.email == "user@couchers.invalid"
49 assert res.username == user1.username
50 assert not res.has_strong_verification
51 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
52 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
53 assert not res.is_superuser
54 assert res.ui_language_preference == ""
55 assert not res.is_volunteer
58def test_donation_banner_no_drive(db):
59 """Test that the banner is not shown when DONATION_DRIVE_START is None"""
60 # User has donated, but the drive is disabled, so the banner should not show
61 user, token = generate_user()
63 with patch("couchers.servicers.account.DONATION_DRIVE_START", None):
64 with account_session(token) as account:
65 res = account.GetAccountInfo(empty_pb2.Empty())
66 assert not res.should_show_donation_banner
69def test_donation_banner_never_donated(db):
70 """Test that banner is shown when user has never donated and drive is active"""
71 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
73 # Explicitly set last_donated=None since generate_user defaults to now()
74 user, token = generate_user(last_donated=None)
76 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start):
77 with account_session(token) as account:
78 res = account.GetAccountInfo(empty_pb2.Empty())
79 assert res.should_show_donation_banner
82def test_donation_banner_donated_before_drive(db):
83 """Test that banner is shown when user donated before drive start"""
84 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
86 user, token = generate_user()
88 # Set donation before drive start
89 with session_scope() as session:
90 last_donated = datetime(2025, 10, 15, tzinfo=UTC) # Before Nov 1
91 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated))
93 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start):
94 with account_session(token) as account:
95 res = account.GetAccountInfo(empty_pb2.Empty())
96 assert res.should_show_donation_banner
99def test_donation_banner_donated_after_drive(db):
100 """Test that banner is not shown when user donated after drive start"""
101 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
103 user, token = generate_user()
105 # Set donation after drive start
106 with session_scope() as session:
107 last_donated = datetime(2025, 11, 15, tzinfo=UTC) # After Nov 1
108 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated))
110 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start):
111 with account_session(token) as account:
112 res = account.GetAccountInfo(empty_pb2.Empty())
113 assert not res.should_show_donation_banner
116def test_donation_banner_donated_exactly_at_drive_start(db):
117 """Test that banner is not shown when user donated exactly at drive start time"""
118 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
120 user, token = generate_user()
122 # Set donation exactly at drive start
123 with session_scope() as session:
124 session.execute(update(User).where(User.id == user.id).values(last_donated=drive_start))
126 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start):
127 with account_session(token) as account:
128 res = account.GetAccountInfo(empty_pb2.Empty())
129 assert not res.should_show_donation_banner
132def test_GetAccountInfo_regression(db):
133 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
134 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
135 user, token = generate_user(about_me=None, complete_profile=False)
137 # add an avatar photo to the user's profile gallery
138 with session_scope() as session:
139 key = random_hex(32)
140 filename = random_hex(32) + ".jpg"
141 session.add(
142 Upload(
143 key=key,
144 filename=filename,
145 creator_user_id=user.id,
146 )
147 )
148 session.flush()
149 session.add(
150 PhotoGalleryItem(
151 gallery_id=user.profile_gallery_id,
152 upload_key=key,
153 position=0,
154 )
155 )
157 with account_session(token) as account:
158 res = account.GetAccountInfo(empty_pb2.Empty())
161def test_ChangePasswordV2_normal(db, fast_passwords, push_collector: PushCollector):
162 # user has old password and is changing to new password
163 old_password = random_hex()
164 new_password = random_hex()
165 user, token = generate_user(hashed_password=hash_password(old_password))
167 with account_session(token) as account:
168 with mock_notification_email() as mock:
169 account.ChangePasswordV2(
170 account_pb2.ChangePasswordV2Req(
171 old_password=old_password,
172 new_password=new_password,
173 )
174 )
176 mock.assert_called_once()
177 assert email_fields(mock).subject == "[TEST] Your password was changed"
179 push = push_collector.pop_for_user(user.id, last=True)
180 assert push.content.title == "Password changed"
181 assert push.content.body == "Your password was changed."
183 with session_scope() as session:
184 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
185 assert updated_user.hashed_password == hash_password(new_password)
188def test_ChangePasswordV2_regression(db, fast_passwords):
189 # send_password_changed_email wasn't working
190 # user has old password and is changing to new password
191 old_password = random_hex()
192 new_password = random_hex()
193 user, token = generate_user(hashed_password=hash_password(old_password))
195 with account_session(token) as account:
196 account.ChangePasswordV2(
197 account_pb2.ChangePasswordV2Req(
198 old_password=old_password,
199 new_password=new_password,
200 )
201 )
203 with session_scope() as session:
204 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
205 assert updated_user.hashed_password == hash_password(new_password)
208def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
209 # user has old password and is changing to new password, but used short password
210 old_password = random_hex()
211 new_password = random_hex(length=1)
212 user, token = generate_user(hashed_password=hash_password(old_password))
214 with account_session(token) as account:
215 with pytest.raises(grpc.RpcError) as e:
216 account.ChangePasswordV2(
217 account_pb2.ChangePasswordV2Req(
218 old_password=old_password,
219 new_password=new_password,
220 )
221 )
222 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
223 assert e.value.details() == "The password must be 8 or more characters long."
225 with session_scope() as session:
226 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
227 assert updated_user.hashed_password == hash_password(old_password)
230def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
231 # user has old password and is changing to new password, but used short password
232 old_password = random_hex()
233 new_password = random_hex(length=1000)
234 user, token = generate_user(hashed_password=hash_password(old_password))
236 with account_session(token) as account:
237 with pytest.raises(grpc.RpcError) as e:
238 account.ChangePasswordV2(
239 account_pb2.ChangePasswordV2Req(
240 old_password=old_password,
241 new_password=new_password,
242 )
243 )
244 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
245 assert e.value.details() == "The password must be less than 256 characters."
247 with session_scope() as session:
248 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
249 assert updated_user.hashed_password == hash_password(old_password)
252def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
253 # user has old password and is changing to new password, but used insecure password
254 old_password = random_hex()
255 new_password = "12345678"
256 user, token = generate_user(hashed_password=hash_password(old_password))
258 with account_session(token) as account:
259 with pytest.raises(grpc.RpcError) as e:
260 account.ChangePasswordV2(
261 account_pb2.ChangePasswordV2Req(
262 old_password=old_password,
263 new_password=new_password,
264 )
265 )
266 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
267 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable."
269 with session_scope() as session:
270 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
271 assert updated_user.hashed_password == hash_password(old_password)
274def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
275 # user has old password and is changing to new password, but used wrong old password
276 old_password = random_hex()
277 new_password = random_hex()
278 user, token = generate_user(hashed_password=hash_password(old_password))
280 with account_session(token) as account:
281 with pytest.raises(grpc.RpcError) as e:
282 account.ChangePasswordV2(
283 account_pb2.ChangePasswordV2Req(
284 old_password="wrong password",
285 new_password=new_password,
286 )
287 )
288 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
289 assert e.value.details() == "Wrong password."
291 with session_scope() as session:
292 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
293 assert updated_user.hashed_password == hash_password(old_password)
296def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
297 # user has old password and called with empty body
298 old_password = random_hex()
299 user, token = generate_user(hashed_password=hash_password(old_password))
301 with account_session(token) as account:
302 with pytest.raises(grpc.RpcError) as e:
303 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
304 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
305 assert e.value.details() == "The password must be 8 or more characters long."
307 with session_scope() as session:
308 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
309 assert updated_user.hashed_password == hash_password(old_password)
312def test_ChangeEmailV2_wrong_password(db, fast_passwords):
313 password = random_hex()
314 new_email = f"{random_hex()}@couchers.org.invalid"
315 user, token = generate_user(hashed_password=hash_password(password))
317 with account_session(token) as account:
318 with pytest.raises(grpc.RpcError) as e:
319 account.ChangeEmailV2(
320 account_pb2.ChangeEmailV2Req(
321 password="wrong password",
322 new_email=new_email,
323 )
324 )
325 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
326 assert e.value.details() == "Wrong password."
328 with session_scope() as session:
329 assert (
330 session.execute(
331 select(func.count())
332 .select_from(User)
333 .where(User.new_email_token_created <= func.now())
334 .where(User.new_email_token_expiry >= func.now())
335 )
336 ).scalar_one() == 0
339def test_ChangeEmailV2_wrong_email(db, fast_passwords):
340 password = random_hex()
341 new_email = f"{random_hex()}@couchers.org.invalid"
342 user, token = generate_user(hashed_password=hash_password(password))
344 with account_session(token) as account:
345 with pytest.raises(grpc.RpcError) as e:
346 account.ChangeEmailV2(
347 account_pb2.ChangeEmailV2Req(
348 password="wrong password",
349 new_email=new_email,
350 )
351 )
352 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
353 assert e.value.details() == "Wrong password."
355 with session_scope() as session:
356 assert (
357 session.execute(
358 select(func.count())
359 .select_from(User)
360 .where(User.new_email_token_created <= func.now())
361 .where(User.new_email_token_expiry >= func.now())
362 )
363 ).scalar_one() == 0
366def test_ChangeEmailV2_invalid_email(db, fast_passwords):
367 password = random_hex()
368 user, token = generate_user(hashed_password=hash_password(password))
370 with account_session(token) as account:
371 with pytest.raises(grpc.RpcError) as e:
372 account.ChangeEmailV2(
373 account_pb2.ChangeEmailV2Req(
374 password=password,
375 new_email="not a real email",
376 )
377 )
378 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
379 assert e.value.details() == "Invalid email."
381 with session_scope() as session:
382 assert (
383 session.execute(
384 select(func.count())
385 .select_from(User)
386 .where(User.new_email_token_created <= func.now())
387 .where(User.new_email_token_expiry >= func.now())
388 )
389 ).scalar_one() == 0
392def test_ChangeEmailV2_email_in_use(db, fast_passwords):
393 password = random_hex()
394 user, token = generate_user(hashed_password=hash_password(password))
395 user2, token2 = generate_user(hashed_password=hash_password(password))
397 with account_session(token) as account:
398 with pytest.raises(grpc.RpcError) as e:
399 account.ChangeEmailV2(
400 account_pb2.ChangeEmailV2Req(
401 password=password,
402 new_email=user2.email,
403 )
404 )
405 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
406 assert e.value.details() == "Invalid email."
408 with session_scope() as session:
409 assert (
410 session.execute(
411 select(func.count())
412 .select_from(User)
413 .where(User.new_email_token_created <= func.now())
414 .where(User.new_email_token_expiry >= func.now())
415 )
416 ).scalar_one() == 0
419def test_ChangeEmailV2_no_change(db, fast_passwords):
420 password = random_hex()
421 user, token = generate_user(hashed_password=hash_password(password))
423 with account_session(token) as account:
424 with pytest.raises(grpc.RpcError) as e:
425 account.ChangeEmailV2(
426 account_pb2.ChangeEmailV2Req(
427 password=password,
428 new_email=user.email,
429 )
430 )
431 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
432 assert e.value.details() == "Invalid email."
434 with session_scope() as session:
435 assert (
436 session.execute(
437 select(func.count())
438 .select_from(User)
439 .where(User.new_email_token_created <= func.now())
440 .where(User.new_email_token_expiry >= func.now())
441 )
442 ).scalar_one() == 0
445def test_ChangeEmailV2_wrong_token(db, fast_passwords):
446 password = random_hex()
447 new_email = f"{random_hex()}@couchers.org.invalid"
448 user, token = generate_user(hashed_password=hash_password(password))
450 with account_session(token) as account:
451 account.ChangeEmailV2(
452 account_pb2.ChangeEmailV2Req(
453 password=password,
454 new_email=new_email,
455 )
456 )
458 with auth_api_session() as (auth_api, metadata_interceptor):
459 with pytest.raises(grpc.RpcError) as e:
460 res = auth_api.ConfirmChangeEmailV2(
461 auth_pb2.ConfirmChangeEmailV2Req(
462 change_email_token="wrongtoken",
463 )
464 )
465 assert e.value.code() == grpc.StatusCode.NOT_FOUND
466 assert e.value.details() == "Invalid token."
468 with session_scope() as session:
469 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
470 assert user_updated.email == user.email
473def test_ChangeEmailV2_tokens_two_hour_window(db):
474 def two_hours_one_minute_in_future():
475 return now() + timedelta(hours=2, minutes=1)
477 def one_minute_ago():
478 return now() - timedelta(minutes=1)
480 password = random_hex()
481 new_email = f"{random_hex()}@couchers.org.invalid"
482 user, token = generate_user(hashed_password=hash_password(password))
484 with account_session(token) as account:
485 account.ChangeEmailV2(
486 account_pb2.ChangeEmailV2Req(
487 password=password,
488 new_email=new_email,
489 )
490 )
492 with session_scope() as session:
493 new_email_token = session.execute(select(User.new_email_token).where(User.id == user.id)).scalar_one()
495 with patch("couchers.servicers.auth.now", one_minute_ago):
496 with auth_api_session() as (auth_api, metadata_interceptor):
497 with pytest.raises(grpc.RpcError) as e:
498 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
499 assert e.value.code() == grpc.StatusCode.NOT_FOUND
500 assert e.value.details() == "Invalid token."
502 with pytest.raises(grpc.RpcError) as e:
503 auth_api.ConfirmChangeEmailV2(
504 auth_pb2.ConfirmChangeEmailV2Req(
505 change_email_token=new_email_token,
506 )
507 )
508 assert e.value.code() == grpc.StatusCode.NOT_FOUND
509 assert e.value.details() == "Invalid token."
511 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
512 with auth_api_session() as (auth_api, metadata_interceptor):
513 with pytest.raises(grpc.RpcError) as e:
514 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
515 assert e.value.code() == grpc.StatusCode.NOT_FOUND
516 assert e.value.details() == "Invalid token."
518 with pytest.raises(grpc.RpcError) as e:
519 auth_api.ConfirmChangeEmailV2(
520 auth_pb2.ConfirmChangeEmailV2Req(
521 change_email_token=new_email_token,
522 )
523 )
524 assert e.value.code() == grpc.StatusCode.NOT_FOUND
525 assert e.value.details() == "Invalid token."
528def test_ChangeEmailV2(db, fast_passwords, push_collector: PushCollector):
529 password = random_hex()
530 new_email = f"{random_hex()}@couchers.org.invalid"
531 user, token = generate_user(hashed_password=hash_password(password))
532 user_id = user.id
534 with account_session(token) as account:
535 account.ChangeEmailV2(
536 account_pb2.ChangeEmailV2Req(
537 password=password,
538 new_email=new_email,
539 )
540 )
542 with session_scope() as session:
543 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
544 assert user_updated.email == user.email
545 assert user_updated.new_email == new_email
546 assert user_updated.new_email_token is not None
547 assert user_updated.new_email_token_created
548 assert user_updated.new_email_token_created <= now()
549 assert user_updated.new_email_token_expiry
550 assert user_updated.new_email_token_expiry >= now()
552 token = user_updated.new_email_token
554 process_jobs()
555 push = push_collector.pop_for_user(user_id, last=True)
556 assert push.content.title == "Email change requested"
557 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address."
559 with auth_api_session() as (auth_api, metadata_interceptor):
560 auth_api.ConfirmChangeEmailV2(
561 auth_pb2.ConfirmChangeEmailV2Req(
562 change_email_token=token,
563 )
564 )
566 with session_scope() as session:
567 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
568 assert user.email == new_email
569 assert user.new_email is None
570 assert user.new_email_token is None
571 assert user.new_email_token_created is None
572 assert user.new_email_token_expiry is None
574 process_jobs()
575 push = push_collector.pop_for_user(user_id, last=True)
576 assert push.content.title == "Email verified"
577 assert push.content.body == "Your new email address has been verified."
580def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector: PushCollector):
581 password = random_hex()
582 new_email = f"{random_hex()}@couchers.org.invalid"
583 user, token = generate_user(hashed_password=hash_password(password))
585 with account_session(token) as account:
586 account.ChangeEmailV2(
587 account_pb2.ChangeEmailV2Req(
588 password=password,
589 new_email=new_email,
590 )
591 )
593 process_jobs()
595 with session_scope() as session:
596 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
597 assert len(jobs) == 2
598 uq_str1 = b"An email change to the email"
599 uq_str2 = (
600 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
601 )
602 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
603 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
604 )
606 push = push_collector.pop_for_user(user.id, last=True)
607 assert push.content.title == "Email change requested"
608 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address."
611def test_ChangeLanguagePreference(db, fast_passwords):
612 # user changes from default to ISO 639-1 language code
613 new_lang = "zh"
614 user, token = generate_user()
616 with real_account_session(token) as account:
617 res = account.GetAccountInfo(empty_pb2.Empty())
618 assert res.ui_language_preference == ""
620 # call will have info about the request
621 res, call = account.ChangeLanguagePreference.with_call(
622 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=new_lang)
623 )
625 # cookies are sent via initial metadata, so we check for it there
626 # the value of "set-cookie" will be the full cookie string, pull the key value from the string
627 cookie_values = [v.split(";")[0] for k, v in call.initial_metadata() if k == "set-cookie"]
628 assert any(val == "NEXT_LOCALE=zh" for val in cookie_values), (
629 f"Didn't find the right cookie, got {call.initial_metadata()}"
630 )
632 # the changed language preference should also be sent to the backend
633 res = account.GetAccountInfo(empty_pb2.Empty())
634 assert res.ui_language_preference == "zh"
637def test_contributor_form(db):
638 user, token = generate_user()
640 with account_session(token) as account:
641 res = account.GetContributorFormInfo(empty_pb2.Empty())
642 assert not res.filled_contributor_form
644 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
646 res = account.GetContributorFormInfo(empty_pb2.Empty())
647 assert res.filled_contributor_form
650def test_DeleteAccount_start(db):
651 user, token = generate_user()
653 with account_session(token) as account:
654 with mock_notification_email() as mock:
655 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
656 mock.assert_called_once()
657 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
659 with session_scope() as session:
660 deletion_token: AccountDeletionToken = session.execute(
661 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
662 ).scalar_one()
664 assert deletion_token.is_valid
665 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted
668def test_DeleteAccount_message_storage(db):
669 user, token = generate_user()
671 with account_session(token) as account:
672 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
673 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
674 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
675 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
676 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
677 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
679 with session_scope() as session:
680 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
683def test_full_delete_account_with_recovery(db, push_collector: PushCollector):
684 user, token = generate_user()
685 user_id = user.id
687 with account_session(token) as account:
688 with pytest.raises(grpc.RpcError) as err:
689 account.DeleteAccount(account_pb2.DeleteAccountReq())
690 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION
691 assert err.value.details() == "Please confirm your account deletion."
693 # Check the right email is sent
694 with mock_notification_email() as mock:
695 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
697 push = push_collector.pop_for_user(user_id, last=True)
698 assert push.content.title == "Account deletion requested"
699 assert push.content.body == "Use the link we emailed you to confirm."
701 mock.assert_called_once()
702 e = email_fields(mock)
704 with session_scope() as session:
705 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
706 token = token_o.token
708 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
709 assert token_o.user == user_
710 assert not user_.is_deleted
711 assert not user_.undelete_token
712 assert not user_.undelete_until
714 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
715 assert e.recipient == user.email
716 assert "account deletion" in e.subject.lower()
717 assert token in e.plain
718 assert token in e.html
719 unique_string = "You requested that we delete your account from Couchers.org."
720 assert unique_string in e.plain
721 assert unique_string in e.html
722 url = f"http://localhost:3000/delete-account?token={token}"
723 assert url in e.plain
724 assert url in e.html
725 assert "support@couchers.org" in e.plain
726 assert "support@couchers.org" in e.html
728 with mock_notification_email() as mock:
729 with auth_api_session() as (auth_api, metadata_interceptor):
730 auth_api.ConfirmDeleteAccount(
731 auth_pb2.ConfirmDeleteAccountReq(
732 token=token,
733 )
734 )
736 push = push_collector.pop_for_user(user_id, last=True)
737 assert push.content.title == "Account deleted"
738 assert push.content.body == "You can restore it within 7 days using the link we emailed you."
740 mock.assert_called_once()
741 e = email_fields(mock)
743 with session_scope() as session:
744 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
746 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
747 assert user_.is_deleted
748 assert user_.undelete_token
749 assert user_.undelete_until
750 assert user_.undelete_until > now()
752 undelete_token = user_.undelete_token
754 assert e.recipient == user.email
755 assert "account has been deleted" in e.subject.lower()
756 unique_string = "You have successfully deleted your account from Couchers.org."
757 assert unique_string in e.plain
758 assert unique_string in e.html
759 assert "7 days" in e.plain
760 assert "7 days" in e.html
761 url = f"http://localhost:3000/recover-account?token={undelete_token}"
762 assert url in e.plain
763 assert url in e.html
764 assert "support@couchers.org" in e.plain
765 assert "support@couchers.org" in e.html
767 with mock_notification_email() as mock:
768 with auth_api_session() as (auth_api, metadata_interceptor):
769 auth_api.RecoverAccount(
770 auth_pb2.RecoverAccountReq(
771 token=undelete_token,
772 )
773 )
775 push = push_collector.pop_for_user(user_id, last=True)
776 assert push.content.title == "Account restored"
777 assert push.content.body == "Welcome back!"
779 mock.assert_called_once()
780 e = email_fields(mock)
782 assert e.recipient == user.email
783 assert "account has been recovered" in e.subject.lower()
784 unique_string = "Your account on Couchers.org has been successfully recovered!"
785 assert unique_string in e.plain
786 assert unique_string in e.html
787 assert "support@couchers.org" in e.plain
788 assert "support@couchers.org" in e.html
790 with session_scope() as session:
791 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
793 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
794 assert not user.is_deleted
795 assert not user.undelete_token
796 assert not user.undelete_until
799def test_multiple_delete_tokens(db):
800 """
801 Make sure deletion tokens are deleted on delete
802 """
803 user, token = generate_user()
805 with account_session(token) as account:
806 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
807 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
808 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
810 with session_scope() as session:
811 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
812 token = session.execute(select(AccountDeletionToken.token).limit(1)).scalar_one()
814 with auth_api_session() as (auth_api, metadata_interceptor):
815 auth_api.ConfirmDeleteAccount(
816 auth_pb2.ConfirmDeleteAccountReq(
817 token=token,
818 )
819 )
821 with session_scope() as session:
822 assert not session.execute(select(AccountDeletionToken.token)).scalar_one_or_none()
825def test_ListActiveSessions_pagination(db, fast_passwords):
826 password = random_hex()
827 user, token = generate_user(hashed_password=hash_password(password))
829 with auth_api_session() as (auth_api, metadata_interceptor):
830 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
831 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
832 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
833 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
835 with real_account_session(token) as account:
836 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
837 assert len(res.active_sessions) == 3
838 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
839 assert len(res.active_sessions) == 2
840 assert not res.next_page_token
843def test_ListActiveSessions_details(db, fast_passwords):
844 password = random_hex()
845 user, token = generate_user(hashed_password=hash_password(password))
847 ips_user_agents = [
848 (
849 "108.123.33.162",
850 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
851 ),
852 (
853 "8.245.212.28",
854 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
855 ),
856 (
857 "95.254.140.156",
858 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
859 ),
860 ]
862 for ip, user_agent in ips_user_agents:
863 options = (("grpc.primary_user_agent", user_agent),)
864 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
865 auth_api.Authenticate(
866 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
867 )
869 def dummy_geoip(ip_address):
870 return {
871 "108.123.33.162": "Chicago, United States",
872 "8.245.212.28": "Sydney, Australia",
873 }.get(ip_address)
875 with real_account_session(token) as account:
876 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
877 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
878 print(res)
879 assert len(res.active_sessions) == 4
881 # this one currently making the API call
882 assert res.active_sessions[0].operating_system == "Other"
883 assert res.active_sessions[0].browser == "Other"
884 assert res.active_sessions[0].device == "Other"
885 assert res.active_sessions[0].approximate_location == "Unknown"
886 assert res.active_sessions[0].is_current_session
888 assert res.active_sessions[1].operating_system == "Ubuntu"
889 assert res.active_sessions[1].browser == "Firefox"
890 assert res.active_sessions[1].device == "Other"
891 assert res.active_sessions[1].approximate_location == "Unknown"
892 assert not res.active_sessions[1].is_current_session
894 assert res.active_sessions[2].operating_system == "Android"
895 assert res.active_sessions[2].browser == "Samsung Internet"
896 assert res.active_sessions[2].device == "K"
897 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
898 assert not res.active_sessions[2].is_current_session
900 assert res.active_sessions[3].operating_system == "iOS"
901 assert res.active_sessions[3].browser == "Mobile Safari"
902 assert res.active_sessions[3].device == "iPhone"
903 assert res.active_sessions[3].approximate_location == "Chicago, United States"
904 assert not res.active_sessions[3].is_current_session
907def test_LogOutSession(db, fast_passwords):
908 password = random_hex()
909 user, token = generate_user(hashed_password=hash_password(password))
911 with auth_api_session() as (auth_api, metadata_interceptor):
912 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
913 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
914 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
915 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
917 with real_account_session(token) as account:
918 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
919 assert len(res.active_sessions) == 5
920 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
922 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
923 assert len(res2.active_sessions) == 4
925 # ignore the first session as it changes
926 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
929def test_LogOutOtherSessions(db, fast_passwords):
930 password = random_hex()
931 user, token = generate_user(hashed_password=hash_password(password))
933 with auth_api_session() as (auth_api, metadata_interceptor):
934 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
935 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
936 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
937 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
939 with real_account_session(token) as account:
940 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
941 assert len(res.active_sessions) == 5
942 with pytest.raises(grpc.RpcError) as e:
943 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
944 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
945 assert e.value.details() == "Please confirm you want to log out of other sessions."
947 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
948 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
949 assert len(res.active_sessions) == 1
952def test_CreateInviteCode(db):
953 user, token = generate_user()
955 with account_session(token) as account:
956 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq())
957 code = res.code
958 assert len(code) == 8
960 with session_scope() as session:
961 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
962 assert invite.creator_user_id == user.id
963 assert invite.disabled is None
964 assert res.url == urls.invite_code_link(code=res.code)
967def test_DisableInviteCode(db):
968 user, token = generate_user()
970 with account_session(token) as account:
971 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
972 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code))
974 with session_scope() as session:
975 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
976 assert invite.disabled is not None
979def test_ListInviteCodes(db):
980 user, token = generate_user()
981 another_user, _ = generate_user()
983 with account_session(token) as account:
984 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
986 # simulate another_user having signed up with this invite code
987 with session_scope() as session:
988 session.execute(update(User).where(User.id == another_user.id).values(invite_code_id=code))
990 with account_session(token) as account:
991 res = account.ListInviteCodes(empty_pb2.Empty())
992 assert len(res.invite_codes) == 1
993 assert res.invite_codes[0].code == code
994 assert res.invite_codes[0].uses == 1
995 assert res.invite_codes[0].url == urls.invite_code_link(code=code)
998def test_reminders(db, moderator):
999 # the strong verification reminder's absence is tested in test_strong_verification.py
1000 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite
1001 # we use LiteUser, so remember to refresh materialized views
1002 user, token = generate_user(complete_profile=False)
1003 complete_user, complete_token = generate_user(complete_profile=True)
1004 req_user1, req_user_token1 = generate_user(complete_profile=True)
1005 req_user2, req_user_token2 = generate_user(complete_profile=True)
1007 refresh_materialized_views_rapid(empty_pb2.Empty())
1008 with account_session(complete_token) as account:
1009 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1010 "complete_verification_reminder"
1011 ]
1012 with account_session(token) as account:
1013 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1014 "complete_profile_reminder",
1015 "complete_verification_reminder",
1016 ]
1018 today_plus_2 = (today() + timedelta(days=2)).isoformat()
1019 today_plus_3 = (today() + timedelta(days=3)).isoformat()
1020 with requests_session(req_user_token1) as api:
1021 host_request1_id = api.CreateHostRequest(
1022 requests_pb2.CreateHostRequestReq(
1023 host_user_id=user.id,
1024 from_date=today_plus_2,
1025 to_date=today_plus_3,
1026 text=valid_request_text("Test request 1"),
1027 )
1028 ).host_request_id
1029 moderator.approve_host_request(host_request1_id)
1031 with account_session(token) as account:
1032 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1033 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1034 "respond_to_host_request_reminder",
1035 "complete_profile_reminder",
1036 "complete_verification_reminder",
1037 ]
1038 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1039 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1041 with requests_session(req_user_token2) as api:
1042 host_request2_id = api.CreateHostRequest(
1043 requests_pb2.CreateHostRequestReq(
1044 host_user_id=user.id,
1045 from_date=today_plus_2,
1046 to_date=today_plus_3,
1047 text=valid_request_text("Test request 2"),
1048 )
1049 ).host_request_id
1050 moderator.approve_host_request(host_request2_id)
1052 refresh_materialized_views_rapid(empty_pb2.Empty())
1053 with account_session(token) as account:
1054 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1055 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1056 "respond_to_host_request_reminder",
1057 "respond_to_host_request_reminder",
1058 "complete_profile_reminder",
1059 "complete_verification_reminder",
1060 ]
1061 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1062 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1063 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1064 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1066 with requests_session(req_user_token1) as api:
1067 host_request3_id = api.CreateHostRequest(
1068 requests_pb2.CreateHostRequestReq(
1069 host_user_id=user.id,
1070 from_date=today_plus_2,
1071 to_date=today_plus_3,
1072 text=valid_request_text("Test request 3"),
1073 )
1074 ).host_request_id
1075 moderator.approve_host_request(host_request3_id)
1077 refresh_materialized_views_rapid(empty_pb2.Empty())
1078 with account_session(token) as account:
1079 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1080 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1081 "respond_to_host_request_reminder",
1082 "respond_to_host_request_reminder",
1083 "respond_to_host_request_reminder",
1084 "complete_profile_reminder",
1085 "complete_verification_reminder",
1086 ]
1087 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1088 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1089 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1090 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1091 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id
1092 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1094 # accept req
1095 with requests_session(token) as api:
1096 api.RespondHostRequest(
1097 requests_pb2.RespondHostRequestReq(
1098 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED
1099 )
1100 )
1102 refresh_materialized_views_rapid(empty_pb2.Empty())
1103 with account_session(token) as account:
1104 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1105 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1106 "respond_to_host_request_reminder",
1107 "respond_to_host_request_reminder",
1108 "complete_profile_reminder",
1109 "complete_verification_reminder",
1110 ]
1111 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id
1112 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1113 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id
1114 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1117def test_volunteer_stuff(db):
1118 # taken from couchers/app/backend/resources/badges.json
1119 board_member_id = 8347
1121 # with password
1122 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam", id=board_member_id)
1124 with account_session(token) as account:
1125 res = account.GetAccountInfo(empty_pb2.Empty())
1126 assert not res.is_volunteer
1128 with pytest.raises(grpc.RpcError) as e:
1129 account.GetMyVolunteerInfo(empty_pb2.Empty())
1130 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1131 assert (
1132 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us."
1133 )
1135 with pytest.raises(grpc.RpcError) as e:
1136 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq())
1137 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1138 assert (
1139 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us."
1140 )
1142 with session_scope() as session:
1143 session.add(
1144 make_volunteer(
1145 user_id=user.id,
1146 display_name="Great Volunteer",
1147 display_location="The Bitbucket",
1148 role="Lead Tester",
1149 started_volunteering=date(2020, 6, 1),
1150 )
1151 )
1153 with account_session(token) as account:
1154 res = account.GetAccountInfo(empty_pb2.Empty())
1155 assert res.is_volunteer
1157 res = account.GetMyVolunteerInfo(empty_pb2.Empty())
1159 assert res.display_name == "Great Volunteer"
1160 assert res.display_location == "The Bitbucket"
1161 assert res.role == "Lead Tester"
1162 assert res.started_volunteering == "2020-06-01"
1163 assert not res.stopped_volunteering
1164 assert res.show_on_team_page
1165 assert res.link_type == "couchers"
1166 assert res.link_text == "@tester"
1167 assert res.link_url == "http://localhost:3000/user/tester"
1169 res = account.UpdateMyVolunteerInfo(
1170 account_pb2.UpdateMyVolunteerInfoReq(
1171 display_name=wrappers_pb2.StringValue(value=""),
1172 link_type=wrappers_pb2.StringValue(value="website"),
1173 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"),
1174 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"),
1175 )
1176 )
1178 assert res.display_name == ""
1179 assert res.display_location == "The Bitbucket"
1180 assert res.role == "Lead Tester"
1181 assert res.started_volunteering == "2020-06-01"
1182 assert not res.stopped_volunteering
1183 assert res.show_on_team_page
1184 assert res.link_type == "website"
1185 assert res.link_text == "testervontester.com.invalid"
1186 assert res.link_url == "https://www.testervontester.com.invalid/"
1187 res = account.UpdateMyVolunteerInfo(
1188 account_pb2.UpdateMyVolunteerInfoReq(
1189 display_name=wrappers_pb2.StringValue(value=""),
1190 link_type=wrappers_pb2.StringValue(value="linkedin"),
1191 link_text=wrappers_pb2.StringValue(value="tester-vontester"),
1192 )
1193 )
1194 assert res.display_name == ""
1195 assert res.display_location == "The Bitbucket"
1196 assert res.role == "Lead Tester"
1197 assert res.started_volunteering == "2020-06-01"
1198 assert not res.stopped_volunteering
1199 assert res.show_on_team_page
1200 assert res.link_type == "linkedin"
1201 assert res.link_text == "tester-vontester"
1202 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/"
1204 res = account.UpdateMyVolunteerInfo(
1205 account_pb2.UpdateMyVolunteerInfoReq(
1206 display_name=wrappers_pb2.StringValue(value="Tester"),
1207 display_location=wrappers_pb2.StringValue(value=""),
1208 link_type=wrappers_pb2.StringValue(value="email"),
1209 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"),
1210 )
1211 )
1212 assert res.display_name == "Tester"
1213 assert res.display_location == ""
1214 assert res.role == "Lead Tester"
1215 assert res.started_volunteering == "2020-06-01"
1216 assert not res.stopped_volunteering
1217 assert res.show_on_team_page
1218 assert res.link_type == "email"
1219 assert res.link_text == "tester@vontester.com.invalid"
1220 assert res.link_url == "mailto:tester@vontester.com.invalid"
1222 refresh_materialized_views_rapid(empty_pb2.Empty())
1224 with public_session() as public:
1225 res = public.GetVolunteers(empty_pb2.Empty())
1226 assert len(res.current_volunteers) == 1
1227 v = res.current_volunteers[0]
1228 assert v.name == "Tester"
1229 assert v.username == "tester"
1230 assert v.is_board_member
1231 assert v.role == "Lead Tester"
1232 assert v.location == "Amsterdam"
1233 assert v.img.startswith("http://localhost:5001/img/thumbnail/")
1234 assert v.link_type == "email"
1235 assert v.link_text == "tester@vontester.com.invalid"
1236 assert v.link_url == "mailto:tester@vontester.com.invalid"