Coverage for src/tests/test_account.py: 100%
479 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
1from datetime import timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import func
9from couchers import errors
10from couchers.crypto import hash_password, random_hex
11from couchers.db import session_scope
12from couchers.models import (
13 AccountDeletionReason,
14 AccountDeletionToken,
15 BackgroundJob,
16 Upload,
17 User,
18)
19from couchers.sql import couchers_select as select
20from couchers.utils import now
21from proto import account_pb2, api_pb2, auth_pb2
22from tests.test_fixtures import ( # noqa
23 account_session,
24 auth_api_session,
25 db,
26 email_fields,
27 fast_passwords,
28 generate_user,
29 mock_notification_email,
30 process_jobs,
31 push_collector,
32 real_account_session,
33 testconfig,
34)
37@pytest.fixture(autouse=True)
38def _(testconfig):
39 pass
42def test_GetAccountInfo(db, fast_passwords):
43 # with password
44 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
46 with account_session(token1) as account:
47 res = account.GetAccountInfo(empty_pb2.Empty())
48 assert res.email == "user@couchers.invalid"
49 assert res.username == user1.username
50 assert not res.has_strong_verification
51 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
52 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
53 assert not res.is_superuser
54 assert res.ui_language_preference == ""
57def test_GetAccountInfo_regression(db):
58 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
59 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
60 uploader_user, _ = generate_user()
61 with session_scope() as session:
62 key = random_hex(32)
63 filename = random_hex(32) + ".jpg"
64 session.add(
65 Upload(
66 key=key,
67 filename=filename,
68 creator_user_id=uploader_user.id,
69 )
70 )
72 user, token = generate_user(about_me=None, avatar_key=key)
74 with account_session(token) as account:
75 res = account.GetAccountInfo(empty_pb2.Empty())
78def test_ChangePasswordV2_normal(db, fast_passwords, push_collector):
79 # user has old password and is changing to new password
80 old_password = random_hex()
81 new_password = random_hex()
82 user, token = generate_user(hashed_password=hash_password(old_password))
84 with account_session(token) as account:
85 with mock_notification_email() as mock:
86 account.ChangePasswordV2(
87 account_pb2.ChangePasswordV2Req(
88 old_password=old_password,
89 new_password=new_password,
90 )
91 )
93 mock.assert_called_once()
94 assert email_fields(mock).subject == "[TEST] Your password was changed"
96 push_collector.assert_user_has_single_matching(
97 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed."
98 )
100 with session_scope() as session:
101 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
102 assert updated_user.hashed_password == hash_password(new_password)
105def test_ChangePasswordV2_regression(db, fast_passwords):
106 # send_password_changed_email wasn't working
107 # user has old password and is changing to new password
108 old_password = random_hex()
109 new_password = random_hex()
110 user, token = generate_user(hashed_password=hash_password(old_password))
112 with account_session(token) as account:
113 account.ChangePasswordV2(
114 account_pb2.ChangePasswordV2Req(
115 old_password=old_password,
116 new_password=new_password,
117 )
118 )
120 with session_scope() as session:
121 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
122 assert updated_user.hashed_password == hash_password(new_password)
125def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
126 # user has old password and is changing to new password, but used short password
127 old_password = random_hex()
128 new_password = random_hex(length=1)
129 user, token = generate_user(hashed_password=hash_password(old_password))
131 with account_session(token) as account:
132 with pytest.raises(grpc.RpcError) as e:
133 account.ChangePasswordV2(
134 account_pb2.ChangePasswordV2Req(
135 old_password=old_password,
136 new_password=new_password,
137 )
138 )
139 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
140 assert e.value.details() == errors.PASSWORD_TOO_SHORT
142 with session_scope() as session:
143 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
144 assert updated_user.hashed_password == hash_password(old_password)
147def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
148 # user has old password and is changing to new password, but used short password
149 old_password = random_hex()
150 new_password = random_hex(length=1000)
151 user, token = generate_user(hashed_password=hash_password(old_password))
153 with account_session(token) as account:
154 with pytest.raises(grpc.RpcError) as e:
155 account.ChangePasswordV2(
156 account_pb2.ChangePasswordV2Req(
157 old_password=old_password,
158 new_password=new_password,
159 )
160 )
161 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
162 assert e.value.details() == errors.PASSWORD_TOO_LONG
164 with session_scope() as session:
165 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
166 assert updated_user.hashed_password == hash_password(old_password)
169def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
170 # user has old password and is changing to new password, but used insecure password
171 old_password = random_hex()
172 new_password = "12345678"
173 user, token = generate_user(hashed_password=hash_password(old_password))
175 with account_session(token) as account:
176 with pytest.raises(grpc.RpcError) as e:
177 account.ChangePasswordV2(
178 account_pb2.ChangePasswordV2Req(
179 old_password=old_password,
180 new_password=new_password,
181 )
182 )
183 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
184 assert e.value.details() == errors.INSECURE_PASSWORD
186 with session_scope() as session:
187 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
188 assert updated_user.hashed_password == hash_password(old_password)
191def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
192 # user has old password and is changing to new password, but used wrong old password
193 old_password = random_hex()
194 new_password = random_hex()
195 user, token = generate_user(hashed_password=hash_password(old_password))
197 with account_session(token) as account:
198 with pytest.raises(grpc.RpcError) as e:
199 account.ChangePasswordV2(
200 account_pb2.ChangePasswordV2Req(
201 old_password="wrong password",
202 new_password=new_password,
203 )
204 )
205 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
206 assert e.value.details() == errors.INVALID_PASSWORD
208 with session_scope() as session:
209 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
210 assert updated_user.hashed_password == hash_password(old_password)
213def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
214 # user has old password and called with empty body
215 old_password = random_hex()
216 user, token = generate_user(hashed_password=hash_password(old_password))
218 with account_session(token) as account:
219 with pytest.raises(grpc.RpcError) as e:
220 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
221 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
222 assert e.value.details() == errors.PASSWORD_TOO_SHORT
224 with session_scope() as session:
225 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
226 assert updated_user.hashed_password == hash_password(old_password)
229def test_ChangeEmailV2_wrong_password(db, fast_passwords):
230 password = random_hex()
231 new_email = f"{random_hex()}@couchers.org.invalid"
232 user, token = generate_user(hashed_password=hash_password(password))
234 with account_session(token) as account:
235 with pytest.raises(grpc.RpcError) as e:
236 account.ChangeEmailV2(
237 account_pb2.ChangeEmailV2Req(
238 password="wrong password",
239 new_email=new_email,
240 )
241 )
242 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
243 assert e.value.details() == errors.INVALID_PASSWORD
245 with session_scope() as session:
246 assert (
247 session.execute(
248 select(func.count())
249 .select_from(User)
250 .where(User.new_email_token_created <= func.now())
251 .where(User.new_email_token_expiry >= func.now())
252 )
253 ).scalar_one() == 0
256def test_ChangeEmailV2_wrong_email(db, fast_passwords):
257 password = random_hex()
258 new_email = f"{random_hex()}@couchers.org.invalid"
259 user, token = generate_user(hashed_password=hash_password(password))
261 with account_session(token) as account:
262 with pytest.raises(grpc.RpcError) as e:
263 account.ChangeEmailV2(
264 account_pb2.ChangeEmailV2Req(
265 password="wrong password",
266 new_email=new_email,
267 )
268 )
269 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
270 assert e.value.details() == errors.INVALID_PASSWORD
272 with session_scope() as session:
273 assert (
274 session.execute(
275 select(func.count())
276 .select_from(User)
277 .where(User.new_email_token_created <= func.now())
278 .where(User.new_email_token_expiry >= func.now())
279 )
280 ).scalar_one() == 0
283def test_ChangeEmailV2_invalid_email(db, fast_passwords):
284 password = random_hex()
285 user, token = generate_user(hashed_password=hash_password(password))
287 with account_session(token) as account:
288 with pytest.raises(grpc.RpcError) as e:
289 account.ChangeEmailV2(
290 account_pb2.ChangeEmailV2Req(
291 password=password,
292 new_email="not a real email",
293 )
294 )
295 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
296 assert e.value.details() == errors.INVALID_EMAIL
298 with session_scope() as session:
299 assert (
300 session.execute(
301 select(func.count())
302 .select_from(User)
303 .where(User.new_email_token_created <= func.now())
304 .where(User.new_email_token_expiry >= func.now())
305 )
306 ).scalar_one() == 0
309def test_ChangeEmailV2_email_in_use(db, fast_passwords):
310 password = random_hex()
311 user, token = generate_user(hashed_password=hash_password(password))
312 user2, token2 = generate_user(hashed_password=hash_password(password))
314 with account_session(token) as account:
315 with pytest.raises(grpc.RpcError) as e:
316 account.ChangeEmailV2(
317 account_pb2.ChangeEmailV2Req(
318 password=password,
319 new_email=user2.email,
320 )
321 )
322 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
323 assert e.value.details() == errors.INVALID_EMAIL
325 with session_scope() as session:
326 assert (
327 session.execute(
328 select(func.count())
329 .select_from(User)
330 .where(User.new_email_token_created <= func.now())
331 .where(User.new_email_token_expiry >= func.now())
332 )
333 ).scalar_one() == 0
336def test_ChangeEmailV2_no_change(db, fast_passwords):
337 password = random_hex()
338 user, token = generate_user(hashed_password=hash_password(password))
340 with account_session(token) as account:
341 with pytest.raises(grpc.RpcError) as e:
342 account.ChangeEmailV2(
343 account_pb2.ChangeEmailV2Req(
344 password=password,
345 new_email=user.email,
346 )
347 )
348 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
349 assert e.value.details() == errors.INVALID_EMAIL
351 with session_scope() as session:
352 assert (
353 session.execute(
354 select(func.count())
355 .select_from(User)
356 .where(User.new_email_token_created <= func.now())
357 .where(User.new_email_token_expiry >= func.now())
358 )
359 ).scalar_one() == 0
362def test_ChangeEmailV2_wrong_token(db, fast_passwords):
363 password = random_hex()
364 new_email = f"{random_hex()}@couchers.org.invalid"
365 user, token = generate_user(hashed_password=hash_password(password))
367 with account_session(token) as account:
368 account.ChangeEmailV2(
369 account_pb2.ChangeEmailV2Req(
370 password=password,
371 new_email=new_email,
372 )
373 )
375 with auth_api_session() as (auth_api, metadata_interceptor):
376 with pytest.raises(grpc.RpcError) as e:
377 res = auth_api.ConfirmChangeEmailV2(
378 auth_pb2.ConfirmChangeEmailV2Req(
379 change_email_token="wrongtoken",
380 )
381 )
382 assert e.value.code() == grpc.StatusCode.NOT_FOUND
383 assert e.value.details() == errors.INVALID_TOKEN
385 with session_scope() as session:
386 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
387 assert user_updated.email == user.email
390def test_ChangeEmailV2_tokens_two_hour_window(db):
391 def two_hours_one_minute_in_future():
392 return now() + timedelta(hours=2, minutes=1)
394 def one_minute_ago():
395 return now() - timedelta(minutes=1)
397 password = random_hex()
398 new_email = f"{random_hex()}@couchers.org.invalid"
399 user, token = generate_user(hashed_password=hash_password(password))
401 with account_session(token) as account:
402 account.ChangeEmailV2(
403 account_pb2.ChangeEmailV2Req(
404 password=password,
405 new_email=new_email,
406 )
407 )
409 with session_scope() as session:
410 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
411 new_email_token = user.new_email_token
413 with patch("couchers.servicers.auth.now", one_minute_ago):
414 with auth_api_session() as (auth_api, metadata_interceptor):
415 with pytest.raises(grpc.RpcError) as e:
416 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
417 assert e.value.code() == grpc.StatusCode.NOT_FOUND
418 assert e.value.details() == errors.INVALID_TOKEN
420 with pytest.raises(grpc.RpcError) as e:
421 auth_api.ConfirmChangeEmailV2(
422 auth_pb2.ConfirmChangeEmailV2Req(
423 change_email_token=new_email_token,
424 )
425 )
426 assert e.value.code() == grpc.StatusCode.NOT_FOUND
427 assert e.value.details() == errors.INVALID_TOKEN
429 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
430 with auth_api_session() as (auth_api, metadata_interceptor):
431 with pytest.raises(grpc.RpcError) as e:
432 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
433 assert e.value.code() == grpc.StatusCode.NOT_FOUND
434 assert e.value.details() == errors.INVALID_TOKEN
436 with pytest.raises(grpc.RpcError) as e:
437 auth_api.ConfirmChangeEmailV2(
438 auth_pb2.ConfirmChangeEmailV2Req(
439 change_email_token=new_email_token,
440 )
441 )
442 assert e.value.code() == grpc.StatusCode.NOT_FOUND
443 assert e.value.details() == errors.INVALID_TOKEN
446def test_ChangeEmailV2(db, fast_passwords, push_collector):
447 password = random_hex()
448 new_email = f"{random_hex()}@couchers.org.invalid"
449 user, token = generate_user(hashed_password=hash_password(password))
450 user_id = user.id
452 with account_session(token) as account:
453 account.ChangeEmailV2(
454 account_pb2.ChangeEmailV2Req(
455 password=password,
456 new_email=new_email,
457 )
458 )
460 with session_scope() as session:
461 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
462 assert user_updated.email == user.email
463 assert user_updated.new_email == new_email
464 assert user_updated.new_email_token is not None
465 assert user_updated.new_email_token_created <= now()
466 assert user_updated.new_email_token_expiry >= now()
468 token = user_updated.new_email_token
470 process_jobs()
471 push_collector.assert_user_push_matches_fields(
472 user_id,
473 ix=0,
474 title="An email change was initiated on your account",
475 body=f"An email change to the email {new_email} was initiated on your account.",
476 )
478 with auth_api_session() as (auth_api, metadata_interceptor):
479 res = auth_api.ConfirmChangeEmailV2(
480 auth_pb2.ConfirmChangeEmailV2Req(
481 change_email_token=token,
482 )
483 )
485 with session_scope() as session:
486 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
487 assert user.email == new_email
488 assert user.new_email is None
489 assert user.new_email_token is None
490 assert user.new_email_token_created is None
491 assert user.new_email_token_expiry is None
493 process_jobs()
494 push_collector.assert_user_push_matches_fields(
495 user_id,
496 ix=1,
497 title="Email change completed",
498 body="Your new email address has been verified.",
499 )
502def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector):
503 password = random_hex()
504 new_email = f"{random_hex()}@couchers.org.invalid"
505 user, token = generate_user(hashed_password=hash_password(password))
507 with account_session(token) as account:
508 account.ChangeEmailV2(
509 account_pb2.ChangeEmailV2Req(
510 password=password,
511 new_email=new_email,
512 )
513 )
515 process_jobs()
517 with session_scope() as session:
518 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
519 assert len(jobs) == 2
520 payload_for_notification_email = jobs[0].payload
521 payload_for_confirmation_email_new_address = jobs[1].payload
522 uq_str1 = b"An email change to the email"
523 uq_str2 = (
524 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
525 )
526 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
527 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
528 )
530 push_collector.assert_user_has_single_matching(
531 user.id,
532 title="An email change was initiated on your account",
533 body=f"An email change to the email {new_email} was initiated on your account.",
534 )
537def test_ChangeLanguagePreference(db, fast_passwords):
538 # user changes from default to ISO 639-1 language code
539 newLanguageCode = "zh"
540 user, token = generate_user()
542 with real_account_session(token) as account:
543 res = account.GetAccountInfo(empty_pb2.Empty())
544 assert res.ui_language_preference == ""
546 request = account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=newLanguageCode)
548 # call will have info about the request
549 res, call = account.ChangeLanguagePreference.with_call(request)
551 # cookies are sent via initial metadata, so we check for it there
552 metadata = dict(call.initial_metadata())
554 assert "set-cookie" in metadata, "expected 'set-cookie' in initial metadata"
556 # the value of "set-cookie" will be the full cookie string, pull the key value from the string
557 key_val = metadata["set-cookie"].split(";")[0]
558 assert key_val == "couchers-preferred-language=zh", f"expected 'couchers-preferred-language=zh', got {key_val}"
560 # the changed language preference should also be sent to the backend
561 res = account.GetAccountInfo(empty_pb2.Empty())
562 assert res.ui_language_preference == "zh"
565def test_contributor_form(db):
566 user, token = generate_user()
568 with account_session(token) as account:
569 res = account.GetContributorFormInfo(empty_pb2.Empty())
570 assert not res.filled_contributor_form
572 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
574 res = account.GetContributorFormInfo(empty_pb2.Empty())
575 assert res.filled_contributor_form
578def test_DeleteAccount_start(db):
579 user, token = generate_user()
581 with account_session(token) as account:
582 with mock_notification_email() as mock:
583 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
584 mock.assert_called_once()
585 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
587 with session_scope() as session:
588 deletion_token = session.execute(
589 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
590 ).scalar_one()
592 assert deletion_token.is_valid
593 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted
596def test_DeleteAccount_message_storage(db):
597 user, token = generate_user()
599 with account_session(token) as account:
600 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
601 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
602 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
603 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
604 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
605 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
607 with session_scope() as session:
608 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
611def test_full_delete_account_with_recovery(db, push_collector):
612 user, token = generate_user()
613 user_id = user.id
615 with account_session(token) as account:
616 with pytest.raises(grpc.RpcError) as e:
617 account.DeleteAccount(account_pb2.DeleteAccountReq())
618 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
619 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE
621 # Check the right email is sent
622 with mock_notification_email() as mock:
623 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
625 push_collector.assert_user_push_matches_fields(
626 user_id,
627 ix=0,
628 title="Account deletion initiated",
629 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.",
630 )
632 mock.assert_called_once()
633 e = email_fields(mock)
635 with session_scope() as session:
636 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
637 token = token_o.token
639 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
640 assert token_o.user == user_
641 assert not user_.is_deleted
642 assert not user_.undelete_token
643 assert not user_.undelete_until
645 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
646 assert e.recipient == user.email
647 assert "account deletion" in e.subject.lower()
648 assert token in e.plain
649 assert token in e.html
650 unique_string = "You requested that we delete your account from Couchers.org."
651 assert unique_string in e.plain
652 assert unique_string in e.html
653 url = f"http://localhost:3000/delete-account?token={token}"
654 assert url in e.plain
655 assert url in e.html
656 assert "support@couchers.org" in e.plain
657 assert "support@couchers.org" in e.html
659 with mock_notification_email() as mock:
660 with auth_api_session() as (auth_api, metadata_interceptor):
661 auth_api.ConfirmDeleteAccount(
662 auth_pb2.ConfirmDeleteAccountReq(
663 token=token,
664 )
665 )
667 push_collector.assert_user_push_matches_fields(
668 user_id,
669 ix=1,
670 title="Your Couchers.org account has been deleted",
671 body="You can still undo this by following the link we emailed to you within 7 days.",
672 )
674 mock.assert_called_once()
675 e = email_fields(mock)
677 with session_scope() as session:
678 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
680 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
681 assert user_.is_deleted
682 assert user_.undelete_token
683 assert user_.undelete_until > now()
685 undelete_token = user_.undelete_token
687 assert e.recipient == user.email
688 assert "account has been deleted" in e.subject.lower()
689 unique_string = "You have successfully deleted your account from Couchers.org."
690 assert unique_string in e.plain
691 assert unique_string in e.html
692 assert "7 days" in e.plain
693 assert "7 days" in e.html
694 url = f"http://localhost:3000/recover-account?token={undelete_token}"
695 assert url in e.plain
696 assert url in e.html
697 assert "support@couchers.org" in e.plain
698 assert "support@couchers.org" in e.html
700 with mock_notification_email() as mock:
701 with auth_api_session() as (auth_api, metadata_interceptor):
702 auth_api.RecoverAccount(
703 auth_pb2.RecoverAccountReq(
704 token=undelete_token,
705 )
706 )
708 push_collector.assert_user_push_matches_fields(
709 user_id,
710 ix=2,
711 title="Your Couchers.org account has been recovered!",
712 body="We have recovered your Couchers.org account as per your request! Welcome back!",
713 )
715 mock.assert_called_once()
716 e = email_fields(mock)
718 assert e.recipient == user.email
719 assert "account has been recovered" in e.subject.lower()
720 unique_string = "Your account on Couchers.org has been successfully recovered!"
721 assert unique_string in e.plain
722 assert unique_string in e.html
723 assert "support@couchers.org" in e.plain
724 assert "support@couchers.org" in e.html
726 with session_scope() as session:
727 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
729 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
730 assert not user.is_deleted
731 assert not user.undelete_token
732 assert not user.undelete_until
735def test_multiple_delete_tokens(db):
736 """
737 Make sure deletion tokens are deleted on delete
738 """
739 user, token = generate_user()
741 with account_session(token) as account:
742 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
743 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
744 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
746 with session_scope() as session:
747 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
748 token = session.execute(select(AccountDeletionToken)).scalars().first().token
750 with auth_api_session() as (auth_api, metadata_interceptor):
751 auth_api.ConfirmDeleteAccount(
752 auth_pb2.ConfirmDeleteAccountReq(
753 token=token,
754 )
755 )
757 with session_scope() as session:
758 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
761def test_ListActiveSessions_pagination(db, fast_passwords):
762 password = random_hex()
763 user, token = generate_user(hashed_password=hash_password(password))
765 with auth_api_session() as (auth_api, metadata_interceptor):
766 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
767 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
768 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
769 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
771 with real_account_session(token) as account:
772 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
773 assert len(res.active_sessions) == 3
774 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
775 assert len(res.active_sessions) == 2
776 assert not res.next_page_token
779def test_ListActiveSessions_details(db, fast_passwords):
780 password = random_hex()
781 user, token = generate_user(hashed_password=hash_password(password))
783 ips_user_agents = [
784 (
785 "108.123.33.162",
786 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
787 ),
788 (
789 "8.245.212.28",
790 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
791 ),
792 (
793 "95.254.140.156",
794 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
795 ),
796 ]
798 for ip, user_agent in ips_user_agents:
799 options = (("grpc.primary_user_agent", user_agent),)
800 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
801 auth_api.Authenticate(
802 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
803 )
805 def dummy_geoip(ip_address):
806 return {
807 "108.123.33.162": "Chicago, United States",
808 "8.245.212.28": "Sydney, Australia",
809 }.get(ip_address)
811 with real_account_session(token) as account:
812 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
813 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
814 print(res)
815 assert len(res.active_sessions) == 4
817 # this one currently making the API call
818 assert res.active_sessions[0].operating_system == "Other"
819 assert res.active_sessions[0].browser == "Other"
820 assert res.active_sessions[0].device == "Other"
821 assert res.active_sessions[0].approximate_location == "Unknown"
822 assert res.active_sessions[0].is_current_session
824 assert res.active_sessions[1].operating_system == "Ubuntu"
825 assert res.active_sessions[1].browser == "Firefox"
826 assert res.active_sessions[1].device == "Other"
827 assert res.active_sessions[1].approximate_location == "Unknown"
828 assert not res.active_sessions[1].is_current_session
830 assert res.active_sessions[2].operating_system == "Android"
831 assert res.active_sessions[2].browser == "Samsung Internet"
832 assert res.active_sessions[2].device == "K"
833 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
834 assert not res.active_sessions[2].is_current_session
836 assert res.active_sessions[3].operating_system == "iOS"
837 assert res.active_sessions[3].browser == "Mobile Safari"
838 assert res.active_sessions[3].device == "iPhone"
839 assert res.active_sessions[3].approximate_location == "Chicago, United States"
840 assert not res.active_sessions[3].is_current_session
843def test_LogOutSession(db, fast_passwords):
844 password = random_hex()
845 user, token = generate_user(hashed_password=hash_password(password))
847 with auth_api_session() as (auth_api, metadata_interceptor):
848 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
849 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
850 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
851 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
853 with real_account_session(token) as account:
854 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
855 assert len(res.active_sessions) == 5
856 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
858 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
859 assert len(res2.active_sessions) == 4
861 # ignore the first session as it changes
862 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
865def test_LogOutOtherSessions(db, fast_passwords):
866 password = random_hex()
867 user, token = generate_user(hashed_password=hash_password(password))
869 with auth_api_session() as (auth_api, metadata_interceptor):
870 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
871 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
872 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
873 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
875 with real_account_session(token) as account:
876 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
877 assert len(res.active_sessions) == 5
878 with pytest.raises(grpc.RpcError) as e:
879 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
880 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
881 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS
883 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
884 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
885 assert len(res.active_sessions) == 1