Coverage for src/tests/test_account.py: 100%
463 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-10-04 23:02 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-10-04 23:02 +0000
1from datetime import timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import func
9from couchers import errors
10from couchers.crypto import hash_password, random_hex
11from couchers.db import session_scope
12from couchers.models import AccountDeletionReason, AccountDeletionToken, BackgroundJob, Upload, User
13from couchers.sql import couchers_select as select
14from couchers.utils import now
15from proto import account_pb2, api_pb2, auth_pb2
16from tests.test_fixtures import ( # noqa
17 account_session,
18 auth_api_session,
19 db,
20 email_fields,
21 fast_passwords,
22 generate_user,
23 mock_notification_email,
24 process_jobs,
25 push_collector,
26 real_account_session,
27 testconfig,
28)
31@pytest.fixture(autouse=True)
32def _(testconfig):
33 pass
36def test_GetAccountInfo(db, fast_passwords):
37 # with password
38 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
40 with account_session(token1) as account:
41 res = account.GetAccountInfo(empty_pb2.Empty())
42 assert res.email == "user@couchers.invalid"
43 assert res.username == user1.username
44 assert not res.has_strong_verification
45 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
46 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
49def test_GetAccountInfo_regression(db):
50 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
51 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
52 uploader_user, _ = generate_user()
53 with session_scope() as session:
54 key = random_hex(32)
55 filename = random_hex(32) + ".jpg"
56 session.add(
57 Upload(
58 key=key,
59 filename=filename,
60 creator_user_id=uploader_user.id,
61 )
62 )
64 user, token = generate_user(about_me=None, avatar_key=key)
66 with account_session(token) as account:
67 res = account.GetAccountInfo(empty_pb2.Empty())
70def test_ChangePasswordV2_normal(db, fast_passwords, push_collector):
71 # user has old password and is changing to new password
72 old_password = random_hex()
73 new_password = random_hex()
74 user, token = generate_user(hashed_password=hash_password(old_password))
76 with account_session(token) as account:
77 with mock_notification_email() as mock:
78 account.ChangePasswordV2(
79 account_pb2.ChangePasswordV2Req(
80 old_password=old_password,
81 new_password=new_password,
82 )
83 )
85 mock.assert_called_once()
86 assert email_fields(mock).subject == "[TEST] Your password was changed"
88 push_collector.assert_user_has_single_matching(
89 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed."
90 )
92 with session_scope() as session:
93 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
94 assert updated_user.hashed_password == hash_password(new_password)
97def test_ChangePasswordV2_regression(db, fast_passwords):
98 # send_password_changed_email wasn't working
99 # user has old password and is changing to new password
100 old_password = random_hex()
101 new_password = random_hex()
102 user, token = generate_user(hashed_password=hash_password(old_password))
104 with account_session(token) as account:
105 account.ChangePasswordV2(
106 account_pb2.ChangePasswordV2Req(
107 old_password=old_password,
108 new_password=new_password,
109 )
110 )
112 with session_scope() as session:
113 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
114 assert updated_user.hashed_password == hash_password(new_password)
117def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
118 # user has old password and is changing to new password, but used short password
119 old_password = random_hex()
120 new_password = random_hex(length=1)
121 user, token = generate_user(hashed_password=hash_password(old_password))
123 with account_session(token) as account:
124 with pytest.raises(grpc.RpcError) as e:
125 account.ChangePasswordV2(
126 account_pb2.ChangePasswordV2Req(
127 old_password=old_password,
128 new_password=new_password,
129 )
130 )
131 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
132 assert e.value.details() == errors.PASSWORD_TOO_SHORT
134 with session_scope() as session:
135 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
136 assert updated_user.hashed_password == hash_password(old_password)
139def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
140 # user has old password and is changing to new password, but used short password
141 old_password = random_hex()
142 new_password = random_hex(length=1000)
143 user, token = generate_user(hashed_password=hash_password(old_password))
145 with account_session(token) as account:
146 with pytest.raises(grpc.RpcError) as e:
147 account.ChangePasswordV2(
148 account_pb2.ChangePasswordV2Req(
149 old_password=old_password,
150 new_password=new_password,
151 )
152 )
153 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
154 assert e.value.details() == errors.PASSWORD_TOO_LONG
156 with session_scope() as session:
157 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
158 assert updated_user.hashed_password == hash_password(old_password)
161def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
162 # user has old password and is changing to new password, but used insecure password
163 old_password = random_hex()
164 new_password = "12345678"
165 user, token = generate_user(hashed_password=hash_password(old_password))
167 with account_session(token) as account:
168 with pytest.raises(grpc.RpcError) as e:
169 account.ChangePasswordV2(
170 account_pb2.ChangePasswordV2Req(
171 old_password=old_password,
172 new_password=new_password,
173 )
174 )
175 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
176 assert e.value.details() == errors.INSECURE_PASSWORD
178 with session_scope() as session:
179 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
180 assert updated_user.hashed_password == hash_password(old_password)
183def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
184 # user has old password and is changing to new password, but used wrong old password
185 old_password = random_hex()
186 new_password = random_hex()
187 user, token = generate_user(hashed_password=hash_password(old_password))
189 with account_session(token) as account:
190 with pytest.raises(grpc.RpcError) as e:
191 account.ChangePasswordV2(
192 account_pb2.ChangePasswordV2Req(
193 old_password="wrong password",
194 new_password=new_password,
195 )
196 )
197 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
198 assert e.value.details() == errors.INVALID_PASSWORD
200 with session_scope() as session:
201 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
202 assert updated_user.hashed_password == hash_password(old_password)
205def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
206 # user has old password and called with empty body
207 old_password = random_hex()
208 user, token = generate_user(hashed_password=hash_password(old_password))
210 with account_session(token) as account:
211 with pytest.raises(grpc.RpcError) as e:
212 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
213 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
214 assert e.value.details() == errors.PASSWORD_TOO_SHORT
216 with session_scope() as session:
217 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
218 assert updated_user.hashed_password == hash_password(old_password)
221def test_ChangeEmailV2_wrong_password(db, fast_passwords):
222 password = random_hex()
223 new_email = f"{random_hex()}@couchers.org.invalid"
224 user, token = generate_user(hashed_password=hash_password(password))
226 with account_session(token) as account:
227 with pytest.raises(grpc.RpcError) as e:
228 account.ChangeEmailV2(
229 account_pb2.ChangeEmailV2Req(
230 password="wrong password",
231 new_email=new_email,
232 )
233 )
234 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
235 assert e.value.details() == errors.INVALID_PASSWORD
237 with session_scope() as session:
238 assert (
239 session.execute(
240 select(func.count())
241 .select_from(User)
242 .where(User.new_email_token_created <= func.now())
243 .where(User.new_email_token_expiry >= func.now())
244 )
245 ).scalar_one() == 0
248def test_ChangeEmailV2_wrong_email(db, fast_passwords):
249 password = random_hex()
250 new_email = f"{random_hex()}@couchers.org.invalid"
251 user, token = generate_user(hashed_password=hash_password(password))
253 with account_session(token) as account:
254 with pytest.raises(grpc.RpcError) as e:
255 account.ChangeEmailV2(
256 account_pb2.ChangeEmailV2Req(
257 password="wrong password",
258 new_email=new_email,
259 )
260 )
261 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
262 assert e.value.details() == errors.INVALID_PASSWORD
264 with session_scope() as session:
265 assert (
266 session.execute(
267 select(func.count())
268 .select_from(User)
269 .where(User.new_email_token_created <= func.now())
270 .where(User.new_email_token_expiry >= func.now())
271 )
272 ).scalar_one() == 0
275def test_ChangeEmailV2_invalid_email(db, fast_passwords):
276 password = random_hex()
277 user, token = generate_user(hashed_password=hash_password(password))
279 with account_session(token) as account:
280 with pytest.raises(grpc.RpcError) as e:
281 account.ChangeEmailV2(
282 account_pb2.ChangeEmailV2Req(
283 password=password,
284 new_email="not a real email",
285 )
286 )
287 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
288 assert e.value.details() == errors.INVALID_EMAIL
290 with session_scope() as session:
291 assert (
292 session.execute(
293 select(func.count())
294 .select_from(User)
295 .where(User.new_email_token_created <= func.now())
296 .where(User.new_email_token_expiry >= func.now())
297 )
298 ).scalar_one() == 0
301def test_ChangeEmailV2_email_in_use(db, fast_passwords):
302 password = random_hex()
303 user, token = generate_user(hashed_password=hash_password(password))
304 user2, token2 = generate_user(hashed_password=hash_password(password))
306 with account_session(token) as account:
307 with pytest.raises(grpc.RpcError) as e:
308 account.ChangeEmailV2(
309 account_pb2.ChangeEmailV2Req(
310 password=password,
311 new_email=user2.email,
312 )
313 )
314 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
315 assert e.value.details() == errors.INVALID_EMAIL
317 with session_scope() as session:
318 assert (
319 session.execute(
320 select(func.count())
321 .select_from(User)
322 .where(User.new_email_token_created <= func.now())
323 .where(User.new_email_token_expiry >= func.now())
324 )
325 ).scalar_one() == 0
328def test_ChangeEmailV2_no_change(db, fast_passwords):
329 password = random_hex()
330 user, token = generate_user(hashed_password=hash_password(password))
332 with account_session(token) as account:
333 with pytest.raises(grpc.RpcError) as e:
334 account.ChangeEmailV2(
335 account_pb2.ChangeEmailV2Req(
336 password=password,
337 new_email=user.email,
338 )
339 )
340 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
341 assert e.value.details() == errors.INVALID_EMAIL
343 with session_scope() as session:
344 assert (
345 session.execute(
346 select(func.count())
347 .select_from(User)
348 .where(User.new_email_token_created <= func.now())
349 .where(User.new_email_token_expiry >= func.now())
350 )
351 ).scalar_one() == 0
354def test_ChangeEmailV2_wrong_token(db, fast_passwords):
355 password = random_hex()
356 new_email = f"{random_hex()}@couchers.org.invalid"
357 user, token = generate_user(hashed_password=hash_password(password))
359 with account_session(token) as account:
360 account.ChangeEmailV2(
361 account_pb2.ChangeEmailV2Req(
362 password=password,
363 new_email=new_email,
364 )
365 )
367 with auth_api_session() as (auth_api, metadata_interceptor):
368 with pytest.raises(grpc.RpcError) as e:
369 res = auth_api.ConfirmChangeEmailV2(
370 auth_pb2.ConfirmChangeEmailV2Req(
371 change_email_token="wrongtoken",
372 )
373 )
374 assert e.value.code() == grpc.StatusCode.NOT_FOUND
375 assert e.value.details() == errors.INVALID_TOKEN
377 with session_scope() as session:
378 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
379 assert user_updated.email == user.email
382def test_ChangeEmailV2_tokens_two_hour_window(db):
383 def two_hours_one_minute_in_future():
384 return now() + timedelta(hours=2, minutes=1)
386 def one_minute_ago():
387 return now() - timedelta(minutes=1)
389 password = random_hex()
390 new_email = f"{random_hex()}@couchers.org.invalid"
391 user, token = generate_user(hashed_password=hash_password(password))
393 with account_session(token) as account:
394 account.ChangeEmailV2(
395 account_pb2.ChangeEmailV2Req(
396 password=password,
397 new_email=new_email,
398 )
399 )
401 with session_scope() as session:
402 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
403 new_email_token = user.new_email_token
405 with patch("couchers.servicers.auth.now", one_minute_ago):
406 with auth_api_session() as (auth_api, metadata_interceptor):
407 with pytest.raises(grpc.RpcError) as e:
408 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
409 assert e.value.code() == grpc.StatusCode.NOT_FOUND
410 assert e.value.details() == errors.INVALID_TOKEN
412 with pytest.raises(grpc.RpcError) as e:
413 auth_api.ConfirmChangeEmailV2(
414 auth_pb2.ConfirmChangeEmailV2Req(
415 change_email_token=new_email_token,
416 )
417 )
418 assert e.value.code() == grpc.StatusCode.NOT_FOUND
419 assert e.value.details() == errors.INVALID_TOKEN
421 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
422 with auth_api_session() as (auth_api, metadata_interceptor):
423 with pytest.raises(grpc.RpcError) as e:
424 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
425 assert e.value.code() == grpc.StatusCode.NOT_FOUND
426 assert e.value.details() == errors.INVALID_TOKEN
428 with pytest.raises(grpc.RpcError) as e:
429 auth_api.ConfirmChangeEmailV2(
430 auth_pb2.ConfirmChangeEmailV2Req(
431 change_email_token=new_email_token,
432 )
433 )
434 assert e.value.code() == grpc.StatusCode.NOT_FOUND
435 assert e.value.details() == errors.INVALID_TOKEN
438def test_ChangeEmailV2(db, fast_passwords, push_collector):
439 password = random_hex()
440 new_email = f"{random_hex()}@couchers.org.invalid"
441 user, token = generate_user(hashed_password=hash_password(password))
442 user_id = user.id
444 with account_session(token) as account:
445 account.ChangeEmailV2(
446 account_pb2.ChangeEmailV2Req(
447 password=password,
448 new_email=new_email,
449 )
450 )
452 with session_scope() as session:
453 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
454 assert user_updated.email == user.email
455 assert user_updated.new_email == new_email
456 assert user_updated.new_email_token is not None
457 assert user_updated.new_email_token_created <= now()
458 assert user_updated.new_email_token_expiry >= now()
460 token = user_updated.new_email_token
462 process_jobs()
463 push_collector.assert_user_push_matches_fields(
464 user_id,
465 ix=0,
466 title="An email change was initiated on your account",
467 body=f"An email change to the email {new_email} was initiated on your account.",
468 )
470 with auth_api_session() as (auth_api, metadata_interceptor):
471 res = auth_api.ConfirmChangeEmailV2(
472 auth_pb2.ConfirmChangeEmailV2Req(
473 change_email_token=token,
474 )
475 )
477 with session_scope() as session:
478 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
479 assert user.email == new_email
480 assert user.new_email is None
481 assert user.new_email_token is None
482 assert user.new_email_token_created is None
483 assert user.new_email_token_expiry is None
485 process_jobs()
486 push_collector.assert_user_push_matches_fields(
487 user_id,
488 ix=1,
489 title="Email change completed",
490 body="Your new email address has been verified.",
491 )
494def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector):
495 password = random_hex()
496 new_email = f"{random_hex()}@couchers.org.invalid"
497 user, token = generate_user(hashed_password=hash_password(password))
499 with account_session(token) as account:
500 account.ChangeEmailV2(
501 account_pb2.ChangeEmailV2Req(
502 password=password,
503 new_email=new_email,
504 )
505 )
507 process_jobs()
509 with session_scope() as session:
510 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
511 assert len(jobs) == 2
512 payload_for_notification_email = jobs[0].payload
513 payload_for_confirmation_email_new_address = jobs[1].payload
514 uq_str1 = b"An email change to the email"
515 uq_str2 = (
516 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
517 )
518 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
519 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
520 )
522 push_collector.assert_user_has_single_matching(
523 user.id,
524 title="An email change was initiated on your account",
525 body=f"An email change to the email {new_email} was initiated on your account.",
526 )
529def test_contributor_form(db):
530 user, token = generate_user()
532 with account_session(token) as account:
533 res = account.GetContributorFormInfo(empty_pb2.Empty())
534 assert not res.filled_contributor_form
536 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
538 res = account.GetContributorFormInfo(empty_pb2.Empty())
539 assert res.filled_contributor_form
542def test_DeleteAccount_start(db):
543 user, token = generate_user()
545 with account_session(token) as account:
546 with mock_notification_email() as mock:
547 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
548 mock.assert_called_once()
549 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
551 with session_scope() as session:
552 deletion_token = session.execute(
553 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
554 ).scalar_one()
556 assert deletion_token.is_valid
557 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted
560def test_DeleteAccount_message_storage(db):
561 user, token = generate_user()
563 with account_session(token) as account:
564 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
565 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
566 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
567 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
568 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
569 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
571 with session_scope() as session:
572 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
575def test_full_delete_account_with_recovery(db, push_collector):
576 user, token = generate_user()
577 user_id = user.id
579 with account_session(token) as account:
580 with pytest.raises(grpc.RpcError) as e:
581 account.DeleteAccount(account_pb2.DeleteAccountReq())
582 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
583 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE
585 # Check the right email is sent
586 with mock_notification_email() as mock:
587 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
589 push_collector.assert_user_push_matches_fields(
590 user_id,
591 ix=0,
592 title="Account deletion initiated",
593 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.",
594 )
596 mock.assert_called_once()
597 e = email_fields(mock)
599 with session_scope() as session:
600 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
601 token = token_o.token
603 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
604 assert token_o.user == user_
605 assert not user_.is_deleted
606 assert not user_.undelete_token
607 assert not user_.undelete_until
609 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
610 assert e.recipient == user.email
611 assert "account deletion" in e.subject.lower()
612 assert token in e.plain
613 assert token in e.html
614 unique_string = "You requested that we delete your account from Couchers.org."
615 assert unique_string in e.plain
616 assert unique_string in e.html
617 url = f"http://localhost:3000/delete-account?token={token}"
618 assert url in e.plain
619 assert url in e.html
620 assert "support@couchers.org" in e.plain
621 assert "support@couchers.org" in e.html
623 with mock_notification_email() as mock:
624 with auth_api_session() as (auth_api, metadata_interceptor):
625 auth_api.ConfirmDeleteAccount(
626 auth_pb2.ConfirmDeleteAccountReq(
627 token=token,
628 )
629 )
631 push_collector.assert_user_push_matches_fields(
632 user_id,
633 ix=1,
634 title="Your Couchers.org account has been deleted",
635 body="You can still undo this by following the link we emailed to you within 7 days.",
636 )
638 mock.assert_called_once()
639 e = email_fields(mock)
641 with session_scope() as session:
642 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
644 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
645 assert user_.is_deleted
646 assert user_.undelete_token
647 assert user_.undelete_until > now()
649 undelete_token = user_.undelete_token
651 assert e.recipient == user.email
652 assert "account has been deleted" in e.subject.lower()
653 unique_string = "You have successfully deleted your account from Couchers.org."
654 assert unique_string in e.plain
655 assert unique_string in e.html
656 assert "7 days" in e.plain
657 assert "7 days" in e.html
658 url = f"http://localhost:3000/recover-account?token={undelete_token}"
659 assert url in e.plain
660 assert url in e.html
661 assert "support@couchers.org" in e.plain
662 assert "support@couchers.org" in e.html
664 with mock_notification_email() as mock:
665 with auth_api_session() as (auth_api, metadata_interceptor):
666 auth_api.RecoverAccount(
667 auth_pb2.RecoverAccountReq(
668 token=undelete_token,
669 )
670 )
672 push_collector.assert_user_push_matches_fields(
673 user_id,
674 ix=2,
675 title="Your Couchers.org account has been recovered!",
676 body="We have recovered your Couchers.org account as per your request! Welcome back!",
677 )
679 mock.assert_called_once()
680 e = email_fields(mock)
682 assert e.recipient == user.email
683 assert "account has been recovered" in e.subject.lower()
684 unique_string = "Your account on Couchers.org has been successfully recovered!"
685 assert unique_string in e.plain
686 assert unique_string in e.html
687 assert "support@couchers.org" in e.plain
688 assert "support@couchers.org" in e.html
690 with session_scope() as session:
691 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
693 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
694 assert not user.is_deleted
695 assert not user.undelete_token
696 assert not user.undelete_until
699def test_multiple_delete_tokens(db):
700 """
701 Make sure deletion tokens are deleted on delete
702 """
703 user, token = generate_user()
705 with account_session(token) as account:
706 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
707 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
708 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
710 with session_scope() as session:
711 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
712 token = session.execute(select(AccountDeletionToken)).scalars().first().token
714 with auth_api_session() as (auth_api, metadata_interceptor):
715 auth_api.ConfirmDeleteAccount(
716 auth_pb2.ConfirmDeleteAccountReq(
717 token=token,
718 )
719 )
721 with session_scope() as session:
722 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
725def test_ListActiveSessions_pagination(db, fast_passwords):
726 password = random_hex()
727 user, token = generate_user(hashed_password=hash_password(password))
729 with auth_api_session() as (auth_api, metadata_interceptor):
730 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
731 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
732 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
733 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
735 with real_account_session(token) as account:
736 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
737 assert len(res.active_sessions) == 3
738 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
739 assert len(res.active_sessions) == 2
740 assert not res.next_page_token
743def test_ListActiveSessions_details(db, fast_passwords):
744 password = random_hex()
745 user, token = generate_user(hashed_password=hash_password(password))
747 ips_user_agents = [
748 (
749 "108.123.33.162",
750 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
751 ),
752 (
753 "8.245.212.28",
754 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
755 ),
756 (
757 "95.254.140.156",
758 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
759 ),
760 ]
762 for ip, user_agent in ips_user_agents:
763 options = (("grpc.primary_user_agent", user_agent),)
764 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
765 auth_api.Authenticate(
766 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
767 )
769 def dummy_geoip(ip_address):
770 return {
771 "108.123.33.162": "Chicago, United States",
772 "8.245.212.28": "Sydney, Australia",
773 }.get(ip_address)
775 with real_account_session(token) as account:
776 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
777 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
778 print(res)
779 assert len(res.active_sessions) == 4
781 # this one currently making the API call
782 assert res.active_sessions[0].operating_system == "Other"
783 assert res.active_sessions[0].browser == "Other"
784 assert res.active_sessions[0].device == "Other"
785 assert res.active_sessions[0].approximate_location == "Unknown"
786 assert res.active_sessions[0].is_current_session
788 assert res.active_sessions[1].operating_system == "Ubuntu"
789 assert res.active_sessions[1].browser == "Firefox"
790 assert res.active_sessions[1].device == "Other"
791 assert res.active_sessions[1].approximate_location == "Unknown"
792 assert not res.active_sessions[1].is_current_session
794 assert res.active_sessions[2].operating_system == "Android"
795 assert res.active_sessions[2].browser == "Samsung Internet"
796 assert res.active_sessions[2].device == "K"
797 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
798 assert not res.active_sessions[2].is_current_session
800 assert res.active_sessions[3].operating_system == "iOS"
801 assert res.active_sessions[3].browser == "Mobile Safari"
802 assert res.active_sessions[3].device == "iPhone"
803 assert res.active_sessions[3].approximate_location == "Chicago, United States"
804 assert not res.active_sessions[3].is_current_session
807def test_LogOutSession(db, fast_passwords):
808 password = random_hex()
809 user, token = generate_user(hashed_password=hash_password(password))
811 with auth_api_session() as (auth_api, metadata_interceptor):
812 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
813 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
814 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
815 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
817 with real_account_session(token) as account:
818 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
819 assert len(res.active_sessions) == 5
820 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
822 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
823 assert len(res2.active_sessions) == 4
825 # ignore the first session as it changes
826 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
829def test_LogOutOtherSessions(db, fast_passwords):
830 password = random_hex()
831 user, token = generate_user(hashed_password=hash_password(password))
833 with auth_api_session() as (auth_api, metadata_interceptor):
834 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
835 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
836 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
837 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
839 with real_account_session(token) as account:
840 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
841 assert len(res.active_sessions) == 5
842 with pytest.raises(grpc.RpcError) as e:
843 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
844 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
845 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS
847 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
848 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
849 assert len(res.active_sessions) == 1