Coverage for src/tests/test_account.py: 100%
474 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
1from datetime import timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import func
9from couchers import errors
10from couchers.crypto import hash_password, random_hex
11from couchers.db import session_scope
12from couchers.models import AccountDeletionReason, AccountDeletionToken, BackgroundJob, Upload, User
13from couchers.sql import couchers_select as select
14from couchers.utils import now
15from proto import account_pb2, api_pb2, auth_pb2
16from tests.test_fixtures import ( # noqa
17 account_session,
18 auth_api_session,
19 db,
20 email_fields,
21 fast_passwords,
22 generate_user,
23 mock_notification_email,
24 process_jobs,
25 push_collector,
26 real_account_session,
27 testconfig,
28)
31@pytest.fixture(autouse=True)
32def _(testconfig):
33 pass
36def test_GetAccountInfo(db, fast_passwords):
37 # with password
38 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
40 with account_session(token1) as account:
41 res = account.GetAccountInfo(empty_pb2.Empty())
42 assert res.email == "user@couchers.invalid"
43 assert res.username == user1.username
44 assert not res.has_strong_verification
45 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
46 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
47 assert not res.is_superuser
48 assert res.ui_language_preference == ""
51def test_GetAccountInfo_regression(db):
52 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
53 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
54 uploader_user, _ = generate_user()
55 with session_scope() as session:
56 key = random_hex(32)
57 filename = random_hex(32) + ".jpg"
58 session.add(
59 Upload(
60 key=key,
61 filename=filename,
62 creator_user_id=uploader_user.id,
63 )
64 )
66 user, token = generate_user(about_me=None, avatar_key=key)
68 with account_session(token) as account:
69 res = account.GetAccountInfo(empty_pb2.Empty())
72def test_ChangePasswordV2_normal(db, fast_passwords, push_collector):
73 # user has old password and is changing to new password
74 old_password = random_hex()
75 new_password = random_hex()
76 user, token = generate_user(hashed_password=hash_password(old_password))
78 with account_session(token) as account:
79 with mock_notification_email() as mock:
80 account.ChangePasswordV2(
81 account_pb2.ChangePasswordV2Req(
82 old_password=old_password,
83 new_password=new_password,
84 )
85 )
87 mock.assert_called_once()
88 assert email_fields(mock).subject == "[TEST] Your password was changed"
90 push_collector.assert_user_has_single_matching(
91 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed."
92 )
94 with session_scope() as session:
95 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
96 assert updated_user.hashed_password == hash_password(new_password)
99def test_ChangePasswordV2_regression(db, fast_passwords):
100 # send_password_changed_email wasn't working
101 # user has old password and is changing to new password
102 old_password = random_hex()
103 new_password = random_hex()
104 user, token = generate_user(hashed_password=hash_password(old_password))
106 with account_session(token) as account:
107 account.ChangePasswordV2(
108 account_pb2.ChangePasswordV2Req(
109 old_password=old_password,
110 new_password=new_password,
111 )
112 )
114 with session_scope() as session:
115 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
116 assert updated_user.hashed_password == hash_password(new_password)
119def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
120 # user has old password and is changing to new password, but used short password
121 old_password = random_hex()
122 new_password = random_hex(length=1)
123 user, token = generate_user(hashed_password=hash_password(old_password))
125 with account_session(token) as account:
126 with pytest.raises(grpc.RpcError) as e:
127 account.ChangePasswordV2(
128 account_pb2.ChangePasswordV2Req(
129 old_password=old_password,
130 new_password=new_password,
131 )
132 )
133 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
134 assert e.value.details() == errors.PASSWORD_TOO_SHORT
136 with session_scope() as session:
137 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
138 assert updated_user.hashed_password == hash_password(old_password)
141def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
142 # user has old password and is changing to new password, but used short password
143 old_password = random_hex()
144 new_password = random_hex(length=1000)
145 user, token = generate_user(hashed_password=hash_password(old_password))
147 with account_session(token) as account:
148 with pytest.raises(grpc.RpcError) as e:
149 account.ChangePasswordV2(
150 account_pb2.ChangePasswordV2Req(
151 old_password=old_password,
152 new_password=new_password,
153 )
154 )
155 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
156 assert e.value.details() == errors.PASSWORD_TOO_LONG
158 with session_scope() as session:
159 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
160 assert updated_user.hashed_password == hash_password(old_password)
163def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
164 # user has old password and is changing to new password, but used insecure password
165 old_password = random_hex()
166 new_password = "12345678"
167 user, token = generate_user(hashed_password=hash_password(old_password))
169 with account_session(token) as account:
170 with pytest.raises(grpc.RpcError) as e:
171 account.ChangePasswordV2(
172 account_pb2.ChangePasswordV2Req(
173 old_password=old_password,
174 new_password=new_password,
175 )
176 )
177 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
178 assert e.value.details() == errors.INSECURE_PASSWORD
180 with session_scope() as session:
181 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
182 assert updated_user.hashed_password == hash_password(old_password)
185def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
186 # user has old password and is changing to new password, but used wrong old password
187 old_password = random_hex()
188 new_password = random_hex()
189 user, token = generate_user(hashed_password=hash_password(old_password))
191 with account_session(token) as account:
192 with pytest.raises(grpc.RpcError) as e:
193 account.ChangePasswordV2(
194 account_pb2.ChangePasswordV2Req(
195 old_password="wrong password",
196 new_password=new_password,
197 )
198 )
199 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
200 assert e.value.details() == errors.INVALID_PASSWORD
202 with session_scope() as session:
203 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
204 assert updated_user.hashed_password == hash_password(old_password)
207def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
208 # user has old password and called with empty body
209 old_password = random_hex()
210 user, token = generate_user(hashed_password=hash_password(old_password))
212 with account_session(token) as account:
213 with pytest.raises(grpc.RpcError) as e:
214 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
215 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
216 assert e.value.details() == errors.PASSWORD_TOO_SHORT
218 with session_scope() as session:
219 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
220 assert updated_user.hashed_password == hash_password(old_password)
223def test_ChangeEmailV2_wrong_password(db, fast_passwords):
224 password = random_hex()
225 new_email = f"{random_hex()}@couchers.org.invalid"
226 user, token = generate_user(hashed_password=hash_password(password))
228 with account_session(token) as account:
229 with pytest.raises(grpc.RpcError) as e:
230 account.ChangeEmailV2(
231 account_pb2.ChangeEmailV2Req(
232 password="wrong password",
233 new_email=new_email,
234 )
235 )
236 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
237 assert e.value.details() == errors.INVALID_PASSWORD
239 with session_scope() as session:
240 assert (
241 session.execute(
242 select(func.count())
243 .select_from(User)
244 .where(User.new_email_token_created <= func.now())
245 .where(User.new_email_token_expiry >= func.now())
246 )
247 ).scalar_one() == 0
250def test_ChangeEmailV2_wrong_email(db, fast_passwords):
251 password = random_hex()
252 new_email = f"{random_hex()}@couchers.org.invalid"
253 user, token = generate_user(hashed_password=hash_password(password))
255 with account_session(token) as account:
256 with pytest.raises(grpc.RpcError) as e:
257 account.ChangeEmailV2(
258 account_pb2.ChangeEmailV2Req(
259 password="wrong password",
260 new_email=new_email,
261 )
262 )
263 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
264 assert e.value.details() == errors.INVALID_PASSWORD
266 with session_scope() as session:
267 assert (
268 session.execute(
269 select(func.count())
270 .select_from(User)
271 .where(User.new_email_token_created <= func.now())
272 .where(User.new_email_token_expiry >= func.now())
273 )
274 ).scalar_one() == 0
277def test_ChangeEmailV2_invalid_email(db, fast_passwords):
278 password = random_hex()
279 user, token = generate_user(hashed_password=hash_password(password))
281 with account_session(token) as account:
282 with pytest.raises(grpc.RpcError) as e:
283 account.ChangeEmailV2(
284 account_pb2.ChangeEmailV2Req(
285 password=password,
286 new_email="not a real email",
287 )
288 )
289 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
290 assert e.value.details() == errors.INVALID_EMAIL
292 with session_scope() as session:
293 assert (
294 session.execute(
295 select(func.count())
296 .select_from(User)
297 .where(User.new_email_token_created <= func.now())
298 .where(User.new_email_token_expiry >= func.now())
299 )
300 ).scalar_one() == 0
303def test_ChangeEmailV2_email_in_use(db, fast_passwords):
304 password = random_hex()
305 user, token = generate_user(hashed_password=hash_password(password))
306 user2, token2 = generate_user(hashed_password=hash_password(password))
308 with account_session(token) as account:
309 with pytest.raises(grpc.RpcError) as e:
310 account.ChangeEmailV2(
311 account_pb2.ChangeEmailV2Req(
312 password=password,
313 new_email=user2.email,
314 )
315 )
316 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
317 assert e.value.details() == errors.INVALID_EMAIL
319 with session_scope() as session:
320 assert (
321 session.execute(
322 select(func.count())
323 .select_from(User)
324 .where(User.new_email_token_created <= func.now())
325 .where(User.new_email_token_expiry >= func.now())
326 )
327 ).scalar_one() == 0
330def test_ChangeEmailV2_no_change(db, fast_passwords):
331 password = random_hex()
332 user, token = generate_user(hashed_password=hash_password(password))
334 with account_session(token) as account:
335 with pytest.raises(grpc.RpcError) as e:
336 account.ChangeEmailV2(
337 account_pb2.ChangeEmailV2Req(
338 password=password,
339 new_email=user.email,
340 )
341 )
342 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
343 assert e.value.details() == errors.INVALID_EMAIL
345 with session_scope() as session:
346 assert (
347 session.execute(
348 select(func.count())
349 .select_from(User)
350 .where(User.new_email_token_created <= func.now())
351 .where(User.new_email_token_expiry >= func.now())
352 )
353 ).scalar_one() == 0
356def test_ChangeEmailV2_wrong_token(db, fast_passwords):
357 password = random_hex()
358 new_email = f"{random_hex()}@couchers.org.invalid"
359 user, token = generate_user(hashed_password=hash_password(password))
361 with account_session(token) as account:
362 account.ChangeEmailV2(
363 account_pb2.ChangeEmailV2Req(
364 password=password,
365 new_email=new_email,
366 )
367 )
369 with auth_api_session() as (auth_api, metadata_interceptor):
370 with pytest.raises(grpc.RpcError) as e:
371 res = auth_api.ConfirmChangeEmailV2(
372 auth_pb2.ConfirmChangeEmailV2Req(
373 change_email_token="wrongtoken",
374 )
375 )
376 assert e.value.code() == grpc.StatusCode.NOT_FOUND
377 assert e.value.details() == errors.INVALID_TOKEN
379 with session_scope() as session:
380 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
381 assert user_updated.email == user.email
384def test_ChangeEmailV2_tokens_two_hour_window(db):
385 def two_hours_one_minute_in_future():
386 return now() + timedelta(hours=2, minutes=1)
388 def one_minute_ago():
389 return now() - timedelta(minutes=1)
391 password = random_hex()
392 new_email = f"{random_hex()}@couchers.org.invalid"
393 user, token = generate_user(hashed_password=hash_password(password))
395 with account_session(token) as account:
396 account.ChangeEmailV2(
397 account_pb2.ChangeEmailV2Req(
398 password=password,
399 new_email=new_email,
400 )
401 )
403 with session_scope() as session:
404 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
405 new_email_token = user.new_email_token
407 with patch("couchers.servicers.auth.now", one_minute_ago):
408 with auth_api_session() as (auth_api, metadata_interceptor):
409 with pytest.raises(grpc.RpcError) as e:
410 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
411 assert e.value.code() == grpc.StatusCode.NOT_FOUND
412 assert e.value.details() == errors.INVALID_TOKEN
414 with pytest.raises(grpc.RpcError) as e:
415 auth_api.ConfirmChangeEmailV2(
416 auth_pb2.ConfirmChangeEmailV2Req(
417 change_email_token=new_email_token,
418 )
419 )
420 assert e.value.code() == grpc.StatusCode.NOT_FOUND
421 assert e.value.details() == errors.INVALID_TOKEN
423 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
424 with auth_api_session() as (auth_api, metadata_interceptor):
425 with pytest.raises(grpc.RpcError) as e:
426 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
427 assert e.value.code() == grpc.StatusCode.NOT_FOUND
428 assert e.value.details() == errors.INVALID_TOKEN
430 with pytest.raises(grpc.RpcError) as e:
431 auth_api.ConfirmChangeEmailV2(
432 auth_pb2.ConfirmChangeEmailV2Req(
433 change_email_token=new_email_token,
434 )
435 )
436 assert e.value.code() == grpc.StatusCode.NOT_FOUND
437 assert e.value.details() == errors.INVALID_TOKEN
440def test_ChangeEmailV2(db, fast_passwords, push_collector):
441 password = random_hex()
442 new_email = f"{random_hex()}@couchers.org.invalid"
443 user, token = generate_user(hashed_password=hash_password(password))
444 user_id = user.id
446 with account_session(token) as account:
447 account.ChangeEmailV2(
448 account_pb2.ChangeEmailV2Req(
449 password=password,
450 new_email=new_email,
451 )
452 )
454 with session_scope() as session:
455 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
456 assert user_updated.email == user.email
457 assert user_updated.new_email == new_email
458 assert user_updated.new_email_token is not None
459 assert user_updated.new_email_token_created <= now()
460 assert user_updated.new_email_token_expiry >= now()
462 token = user_updated.new_email_token
464 process_jobs()
465 push_collector.assert_user_push_matches_fields(
466 user_id,
467 ix=0,
468 title="An email change was initiated on your account",
469 body=f"An email change to the email {new_email} was initiated on your account.",
470 )
472 with auth_api_session() as (auth_api, metadata_interceptor):
473 res = auth_api.ConfirmChangeEmailV2(
474 auth_pb2.ConfirmChangeEmailV2Req(
475 change_email_token=token,
476 )
477 )
479 with session_scope() as session:
480 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
481 assert user.email == new_email
482 assert user.new_email is None
483 assert user.new_email_token is None
484 assert user.new_email_token_created is None
485 assert user.new_email_token_expiry is None
487 process_jobs()
488 push_collector.assert_user_push_matches_fields(
489 user_id,
490 ix=1,
491 title="Email change completed",
492 body="Your new email address has been verified.",
493 )
496def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector):
497 password = random_hex()
498 new_email = f"{random_hex()}@couchers.org.invalid"
499 user, token = generate_user(hashed_password=hash_password(password))
501 with account_session(token) as account:
502 account.ChangeEmailV2(
503 account_pb2.ChangeEmailV2Req(
504 password=password,
505 new_email=new_email,
506 )
507 )
509 process_jobs()
511 with session_scope() as session:
512 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
513 assert len(jobs) == 2
514 payload_for_notification_email = jobs[0].payload
515 payload_for_confirmation_email_new_address = jobs[1].payload
516 uq_str1 = b"An email change to the email"
517 uq_str2 = (
518 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
519 )
520 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
521 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
522 )
524 push_collector.assert_user_has_single_matching(
525 user.id,
526 title="An email change was initiated on your account",
527 body=f"An email change to the email {new_email} was initiated on your account.",
528 )
531def test_ChangePreferredLanguage(db, fast_passwords):
532 # user changes from default to ISO 639-1 language code
533 newLanguageCode = "zh"
534 user, token = generate_user()
536 with account_session(token) as account:
537 res = account.GetAccountInfo(empty_pb2.Empty())
538 assert res.ui_language_preference == ""
540 account.ChangeLanguagePreference(
541 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=newLanguageCode)
542 )
543 res = account.GetAccountInfo(empty_pb2.Empty())
544 assert res.ui_language_preference == "zh"
547def test_contributor_form(db):
548 user, token = generate_user()
550 with account_session(token) as account:
551 res = account.GetContributorFormInfo(empty_pb2.Empty())
552 assert not res.filled_contributor_form
554 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
556 res = account.GetContributorFormInfo(empty_pb2.Empty())
557 assert res.filled_contributor_form
560def test_DeleteAccount_start(db):
561 user, token = generate_user()
563 with account_session(token) as account:
564 with mock_notification_email() as mock:
565 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
566 mock.assert_called_once()
567 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
569 with session_scope() as session:
570 deletion_token = session.execute(
571 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
572 ).scalar_one()
574 assert deletion_token.is_valid
575 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted
578def test_DeleteAccount_message_storage(db):
579 user, token = generate_user()
581 with account_session(token) as account:
582 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
583 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
584 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
585 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
586 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
587 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
589 with session_scope() as session:
590 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
593def test_full_delete_account_with_recovery(db, push_collector):
594 user, token = generate_user()
595 user_id = user.id
597 with account_session(token) as account:
598 with pytest.raises(grpc.RpcError) as e:
599 account.DeleteAccount(account_pb2.DeleteAccountReq())
600 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
601 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE
603 # Check the right email is sent
604 with mock_notification_email() as mock:
605 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
607 push_collector.assert_user_push_matches_fields(
608 user_id,
609 ix=0,
610 title="Account deletion initiated",
611 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.",
612 )
614 mock.assert_called_once()
615 e = email_fields(mock)
617 with session_scope() as session:
618 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
619 token = token_o.token
621 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
622 assert token_o.user == user_
623 assert not user_.is_deleted
624 assert not user_.undelete_token
625 assert not user_.undelete_until
627 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
628 assert e.recipient == user.email
629 assert "account deletion" in e.subject.lower()
630 assert token in e.plain
631 assert token in e.html
632 unique_string = "You requested that we delete your account from Couchers.org."
633 assert unique_string in e.plain
634 assert unique_string in e.html
635 url = f"http://localhost:3000/delete-account?token={token}"
636 assert url in e.plain
637 assert url in e.html
638 assert "support@couchers.org" in e.plain
639 assert "support@couchers.org" in e.html
641 with mock_notification_email() as mock:
642 with auth_api_session() as (auth_api, metadata_interceptor):
643 auth_api.ConfirmDeleteAccount(
644 auth_pb2.ConfirmDeleteAccountReq(
645 token=token,
646 )
647 )
649 push_collector.assert_user_push_matches_fields(
650 user_id,
651 ix=1,
652 title="Your Couchers.org account has been deleted",
653 body="You can still undo this by following the link we emailed to you within 7 days.",
654 )
656 mock.assert_called_once()
657 e = email_fields(mock)
659 with session_scope() as session:
660 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
662 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
663 assert user_.is_deleted
664 assert user_.undelete_token
665 assert user_.undelete_until > now()
667 undelete_token = user_.undelete_token
669 assert e.recipient == user.email
670 assert "account has been deleted" in e.subject.lower()
671 unique_string = "You have successfully deleted your account from Couchers.org."
672 assert unique_string in e.plain
673 assert unique_string in e.html
674 assert "7 days" in e.plain
675 assert "7 days" in e.html
676 url = f"http://localhost:3000/recover-account?token={undelete_token}"
677 assert url in e.plain
678 assert url in e.html
679 assert "support@couchers.org" in e.plain
680 assert "support@couchers.org" in e.html
682 with mock_notification_email() as mock:
683 with auth_api_session() as (auth_api, metadata_interceptor):
684 auth_api.RecoverAccount(
685 auth_pb2.RecoverAccountReq(
686 token=undelete_token,
687 )
688 )
690 push_collector.assert_user_push_matches_fields(
691 user_id,
692 ix=2,
693 title="Your Couchers.org account has been recovered!",
694 body="We have recovered your Couchers.org account as per your request! Welcome back!",
695 )
697 mock.assert_called_once()
698 e = email_fields(mock)
700 assert e.recipient == user.email
701 assert "account has been recovered" in e.subject.lower()
702 unique_string = "Your account on Couchers.org has been successfully recovered!"
703 assert unique_string in e.plain
704 assert unique_string in e.html
705 assert "support@couchers.org" in e.plain
706 assert "support@couchers.org" in e.html
708 with session_scope() as session:
709 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
711 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
712 assert not user.is_deleted
713 assert not user.undelete_token
714 assert not user.undelete_until
717def test_multiple_delete_tokens(db):
718 """
719 Make sure deletion tokens are deleted on delete
720 """
721 user, token = generate_user()
723 with account_session(token) as account:
724 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
725 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
726 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
728 with session_scope() as session:
729 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
730 token = session.execute(select(AccountDeletionToken)).scalars().first().token
732 with auth_api_session() as (auth_api, metadata_interceptor):
733 auth_api.ConfirmDeleteAccount(
734 auth_pb2.ConfirmDeleteAccountReq(
735 token=token,
736 )
737 )
739 with session_scope() as session:
740 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
743def test_ListActiveSessions_pagination(db, fast_passwords):
744 password = random_hex()
745 user, token = generate_user(hashed_password=hash_password(password))
747 with auth_api_session() as (auth_api, metadata_interceptor):
748 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
749 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
750 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
751 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
753 with real_account_session(token) as account:
754 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
755 assert len(res.active_sessions) == 3
756 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
757 assert len(res.active_sessions) == 2
758 assert not res.next_page_token
761def test_ListActiveSessions_details(db, fast_passwords):
762 password = random_hex()
763 user, token = generate_user(hashed_password=hash_password(password))
765 ips_user_agents = [
766 (
767 "108.123.33.162",
768 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
769 ),
770 (
771 "8.245.212.28",
772 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
773 ),
774 (
775 "95.254.140.156",
776 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
777 ),
778 ]
780 for ip, user_agent in ips_user_agents:
781 options = (("grpc.primary_user_agent", user_agent),)
782 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
783 auth_api.Authenticate(
784 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
785 )
787 def dummy_geoip(ip_address):
788 return {
789 "108.123.33.162": "Chicago, United States",
790 "8.245.212.28": "Sydney, Australia",
791 }.get(ip_address)
793 with real_account_session(token) as account:
794 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
795 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
796 print(res)
797 assert len(res.active_sessions) == 4
799 # this one currently making the API call
800 assert res.active_sessions[0].operating_system == "Other"
801 assert res.active_sessions[0].browser == "Other"
802 assert res.active_sessions[0].device == "Other"
803 assert res.active_sessions[0].approximate_location == "Unknown"
804 assert res.active_sessions[0].is_current_session
806 assert res.active_sessions[1].operating_system == "Ubuntu"
807 assert res.active_sessions[1].browser == "Firefox"
808 assert res.active_sessions[1].device == "Other"
809 assert res.active_sessions[1].approximate_location == "Unknown"
810 assert not res.active_sessions[1].is_current_session
812 assert res.active_sessions[2].operating_system == "Android"
813 assert res.active_sessions[2].browser == "Samsung Internet"
814 assert res.active_sessions[2].device == "K"
815 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
816 assert not res.active_sessions[2].is_current_session
818 assert res.active_sessions[3].operating_system == "iOS"
819 assert res.active_sessions[3].browser == "Mobile Safari"
820 assert res.active_sessions[3].device == "iPhone"
821 assert res.active_sessions[3].approximate_location == "Chicago, United States"
822 assert not res.active_sessions[3].is_current_session
825def test_LogOutSession(db, fast_passwords):
826 password = random_hex()
827 user, token = generate_user(hashed_password=hash_password(password))
829 with auth_api_session() as (auth_api, metadata_interceptor):
830 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
831 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
832 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
833 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
835 with real_account_session(token) as account:
836 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
837 assert len(res.active_sessions) == 5
838 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
840 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
841 assert len(res2.active_sessions) == 4
843 # ignore the first session as it changes
844 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
847def test_LogOutOtherSessions(db, fast_passwords):
848 password = random_hex()
849 user, token = generate_user(hashed_password=hash_password(password))
851 with auth_api_session() as (auth_api, metadata_interceptor):
852 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
853 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
854 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
855 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
857 with real_account_session(token) as account:
858 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
859 assert len(res.active_sessions) == 5
860 with pytest.raises(grpc.RpcError) as e:
861 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
862 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
863 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS
865 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
866 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
867 assert len(res.active_sessions) == 1