Coverage for src/tests/test_account.py: 100%
464 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1from datetime import timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2
7from sqlalchemy.sql import func
9from couchers import errors
10from couchers.crypto import hash_password, random_hex
11from couchers.db import session_scope
12from couchers.models import AccountDeletionReason, AccountDeletionToken, BackgroundJob, Upload, User
13from couchers.sql import couchers_select as select
14from couchers.utils import now
15from proto import account_pb2, api_pb2, auth_pb2
16from tests.test_fixtures import ( # noqa
17 account_session,
18 auth_api_session,
19 db,
20 email_fields,
21 fast_passwords,
22 generate_user,
23 mock_notification_email,
24 process_jobs,
25 push_collector,
26 real_account_session,
27 testconfig,
28)
31@pytest.fixture(autouse=True)
32def _(testconfig):
33 pass
36def test_GetAccountInfo(db, fast_passwords):
37 # with password
38 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
40 with account_session(token1) as account:
41 res = account.GetAccountInfo(empty_pb2.Empty())
42 assert res.email == "user@couchers.invalid"
43 assert res.username == user1.username
44 assert not res.has_strong_verification
45 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
46 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
47 assert not res.is_superuser
50def test_GetAccountInfo_regression(db):
51 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
52 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
53 uploader_user, _ = generate_user()
54 with session_scope() as session:
55 key = random_hex(32)
56 filename = random_hex(32) + ".jpg"
57 session.add(
58 Upload(
59 key=key,
60 filename=filename,
61 creator_user_id=uploader_user.id,
62 )
63 )
65 user, token = generate_user(about_me=None, avatar_key=key)
67 with account_session(token) as account:
68 res = account.GetAccountInfo(empty_pb2.Empty())
71def test_ChangePasswordV2_normal(db, fast_passwords, push_collector):
72 # user has old password and is changing to new password
73 old_password = random_hex()
74 new_password = random_hex()
75 user, token = generate_user(hashed_password=hash_password(old_password))
77 with account_session(token) as account:
78 with mock_notification_email() as mock:
79 account.ChangePasswordV2(
80 account_pb2.ChangePasswordV2Req(
81 old_password=old_password,
82 new_password=new_password,
83 )
84 )
86 mock.assert_called_once()
87 assert email_fields(mock).subject == "[TEST] Your password was changed"
89 push_collector.assert_user_has_single_matching(
90 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed."
91 )
93 with session_scope() as session:
94 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
95 assert updated_user.hashed_password == hash_password(new_password)
98def test_ChangePasswordV2_regression(db, fast_passwords):
99 # send_password_changed_email wasn't working
100 # user has old password and is changing to new password
101 old_password = random_hex()
102 new_password = random_hex()
103 user, token = generate_user(hashed_password=hash_password(old_password))
105 with account_session(token) as account:
106 account.ChangePasswordV2(
107 account_pb2.ChangePasswordV2Req(
108 old_password=old_password,
109 new_password=new_password,
110 )
111 )
113 with session_scope() as session:
114 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
115 assert updated_user.hashed_password == hash_password(new_password)
118def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
119 # user has old password and is changing to new password, but used short password
120 old_password = random_hex()
121 new_password = random_hex(length=1)
122 user, token = generate_user(hashed_password=hash_password(old_password))
124 with account_session(token) as account:
125 with pytest.raises(grpc.RpcError) as e:
126 account.ChangePasswordV2(
127 account_pb2.ChangePasswordV2Req(
128 old_password=old_password,
129 new_password=new_password,
130 )
131 )
132 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
133 assert e.value.details() == errors.PASSWORD_TOO_SHORT
135 with session_scope() as session:
136 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
137 assert updated_user.hashed_password == hash_password(old_password)
140def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
141 # user has old password and is changing to new password, but used short password
142 old_password = random_hex()
143 new_password = random_hex(length=1000)
144 user, token = generate_user(hashed_password=hash_password(old_password))
146 with account_session(token) as account:
147 with pytest.raises(grpc.RpcError) as e:
148 account.ChangePasswordV2(
149 account_pb2.ChangePasswordV2Req(
150 old_password=old_password,
151 new_password=new_password,
152 )
153 )
154 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
155 assert e.value.details() == errors.PASSWORD_TOO_LONG
157 with session_scope() as session:
158 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
159 assert updated_user.hashed_password == hash_password(old_password)
162def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
163 # user has old password and is changing to new password, but used insecure password
164 old_password = random_hex()
165 new_password = "12345678"
166 user, token = generate_user(hashed_password=hash_password(old_password))
168 with account_session(token) as account:
169 with pytest.raises(grpc.RpcError) as e:
170 account.ChangePasswordV2(
171 account_pb2.ChangePasswordV2Req(
172 old_password=old_password,
173 new_password=new_password,
174 )
175 )
176 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
177 assert e.value.details() == errors.INSECURE_PASSWORD
179 with session_scope() as session:
180 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
181 assert updated_user.hashed_password == hash_password(old_password)
184def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
185 # user has old password and is changing to new password, but used wrong old password
186 old_password = random_hex()
187 new_password = random_hex()
188 user, token = generate_user(hashed_password=hash_password(old_password))
190 with account_session(token) as account:
191 with pytest.raises(grpc.RpcError) as e:
192 account.ChangePasswordV2(
193 account_pb2.ChangePasswordV2Req(
194 old_password="wrong password",
195 new_password=new_password,
196 )
197 )
198 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
199 assert e.value.details() == errors.INVALID_PASSWORD
201 with session_scope() as session:
202 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
203 assert updated_user.hashed_password == hash_password(old_password)
206def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
207 # user has old password and called with empty body
208 old_password = random_hex()
209 user, token = generate_user(hashed_password=hash_password(old_password))
211 with account_session(token) as account:
212 with pytest.raises(grpc.RpcError) as e:
213 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
214 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
215 assert e.value.details() == errors.PASSWORD_TOO_SHORT
217 with session_scope() as session:
218 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
219 assert updated_user.hashed_password == hash_password(old_password)
222def test_ChangeEmailV2_wrong_password(db, fast_passwords):
223 password = random_hex()
224 new_email = f"{random_hex()}@couchers.org.invalid"
225 user, token = generate_user(hashed_password=hash_password(password))
227 with account_session(token) as account:
228 with pytest.raises(grpc.RpcError) as e:
229 account.ChangeEmailV2(
230 account_pb2.ChangeEmailV2Req(
231 password="wrong password",
232 new_email=new_email,
233 )
234 )
235 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
236 assert e.value.details() == errors.INVALID_PASSWORD
238 with session_scope() as session:
239 assert (
240 session.execute(
241 select(func.count())
242 .select_from(User)
243 .where(User.new_email_token_created <= func.now())
244 .where(User.new_email_token_expiry >= func.now())
245 )
246 ).scalar_one() == 0
249def test_ChangeEmailV2_wrong_email(db, fast_passwords):
250 password = random_hex()
251 new_email = f"{random_hex()}@couchers.org.invalid"
252 user, token = generate_user(hashed_password=hash_password(password))
254 with account_session(token) as account:
255 with pytest.raises(grpc.RpcError) as e:
256 account.ChangeEmailV2(
257 account_pb2.ChangeEmailV2Req(
258 password="wrong password",
259 new_email=new_email,
260 )
261 )
262 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
263 assert e.value.details() == errors.INVALID_PASSWORD
265 with session_scope() as session:
266 assert (
267 session.execute(
268 select(func.count())
269 .select_from(User)
270 .where(User.new_email_token_created <= func.now())
271 .where(User.new_email_token_expiry >= func.now())
272 )
273 ).scalar_one() == 0
276def test_ChangeEmailV2_invalid_email(db, fast_passwords):
277 password = random_hex()
278 user, token = generate_user(hashed_password=hash_password(password))
280 with account_session(token) as account:
281 with pytest.raises(grpc.RpcError) as e:
282 account.ChangeEmailV2(
283 account_pb2.ChangeEmailV2Req(
284 password=password,
285 new_email="not a real email",
286 )
287 )
288 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
289 assert e.value.details() == errors.INVALID_EMAIL
291 with session_scope() as session:
292 assert (
293 session.execute(
294 select(func.count())
295 .select_from(User)
296 .where(User.new_email_token_created <= func.now())
297 .where(User.new_email_token_expiry >= func.now())
298 )
299 ).scalar_one() == 0
302def test_ChangeEmailV2_email_in_use(db, fast_passwords):
303 password = random_hex()
304 user, token = generate_user(hashed_password=hash_password(password))
305 user2, token2 = generate_user(hashed_password=hash_password(password))
307 with account_session(token) as account:
308 with pytest.raises(grpc.RpcError) as e:
309 account.ChangeEmailV2(
310 account_pb2.ChangeEmailV2Req(
311 password=password,
312 new_email=user2.email,
313 )
314 )
315 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
316 assert e.value.details() == errors.INVALID_EMAIL
318 with session_scope() as session:
319 assert (
320 session.execute(
321 select(func.count())
322 .select_from(User)
323 .where(User.new_email_token_created <= func.now())
324 .where(User.new_email_token_expiry >= func.now())
325 )
326 ).scalar_one() == 0
329def test_ChangeEmailV2_no_change(db, fast_passwords):
330 password = random_hex()
331 user, token = generate_user(hashed_password=hash_password(password))
333 with account_session(token) as account:
334 with pytest.raises(grpc.RpcError) as e:
335 account.ChangeEmailV2(
336 account_pb2.ChangeEmailV2Req(
337 password=password,
338 new_email=user.email,
339 )
340 )
341 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
342 assert e.value.details() == errors.INVALID_EMAIL
344 with session_scope() as session:
345 assert (
346 session.execute(
347 select(func.count())
348 .select_from(User)
349 .where(User.new_email_token_created <= func.now())
350 .where(User.new_email_token_expiry >= func.now())
351 )
352 ).scalar_one() == 0
355def test_ChangeEmailV2_wrong_token(db, fast_passwords):
356 password = random_hex()
357 new_email = f"{random_hex()}@couchers.org.invalid"
358 user, token = generate_user(hashed_password=hash_password(password))
360 with account_session(token) as account:
361 account.ChangeEmailV2(
362 account_pb2.ChangeEmailV2Req(
363 password=password,
364 new_email=new_email,
365 )
366 )
368 with auth_api_session() as (auth_api, metadata_interceptor):
369 with pytest.raises(grpc.RpcError) as e:
370 res = auth_api.ConfirmChangeEmailV2(
371 auth_pb2.ConfirmChangeEmailV2Req(
372 change_email_token="wrongtoken",
373 )
374 )
375 assert e.value.code() == grpc.StatusCode.NOT_FOUND
376 assert e.value.details() == errors.INVALID_TOKEN
378 with session_scope() as session:
379 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
380 assert user_updated.email == user.email
383def test_ChangeEmailV2_tokens_two_hour_window(db):
384 def two_hours_one_minute_in_future():
385 return now() + timedelta(hours=2, minutes=1)
387 def one_minute_ago():
388 return now() - timedelta(minutes=1)
390 password = random_hex()
391 new_email = f"{random_hex()}@couchers.org.invalid"
392 user, token = generate_user(hashed_password=hash_password(password))
394 with account_session(token) as account:
395 account.ChangeEmailV2(
396 account_pb2.ChangeEmailV2Req(
397 password=password,
398 new_email=new_email,
399 )
400 )
402 with session_scope() as session:
403 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
404 new_email_token = user.new_email_token
406 with patch("couchers.servicers.auth.now", one_minute_ago):
407 with auth_api_session() as (auth_api, metadata_interceptor):
408 with pytest.raises(grpc.RpcError) as e:
409 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
410 assert e.value.code() == grpc.StatusCode.NOT_FOUND
411 assert e.value.details() == errors.INVALID_TOKEN
413 with pytest.raises(grpc.RpcError) as e:
414 auth_api.ConfirmChangeEmailV2(
415 auth_pb2.ConfirmChangeEmailV2Req(
416 change_email_token=new_email_token,
417 )
418 )
419 assert e.value.code() == grpc.StatusCode.NOT_FOUND
420 assert e.value.details() == errors.INVALID_TOKEN
422 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
423 with auth_api_session() as (auth_api, metadata_interceptor):
424 with pytest.raises(grpc.RpcError) as e:
425 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
426 assert e.value.code() == grpc.StatusCode.NOT_FOUND
427 assert e.value.details() == errors.INVALID_TOKEN
429 with pytest.raises(grpc.RpcError) as e:
430 auth_api.ConfirmChangeEmailV2(
431 auth_pb2.ConfirmChangeEmailV2Req(
432 change_email_token=new_email_token,
433 )
434 )
435 assert e.value.code() == grpc.StatusCode.NOT_FOUND
436 assert e.value.details() == errors.INVALID_TOKEN
439def test_ChangeEmailV2(db, fast_passwords, push_collector):
440 password = random_hex()
441 new_email = f"{random_hex()}@couchers.org.invalid"
442 user, token = generate_user(hashed_password=hash_password(password))
443 user_id = user.id
445 with account_session(token) as account:
446 account.ChangeEmailV2(
447 account_pb2.ChangeEmailV2Req(
448 password=password,
449 new_email=new_email,
450 )
451 )
453 with session_scope() as session:
454 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
455 assert user_updated.email == user.email
456 assert user_updated.new_email == new_email
457 assert user_updated.new_email_token is not None
458 assert user_updated.new_email_token_created <= now()
459 assert user_updated.new_email_token_expiry >= now()
461 token = user_updated.new_email_token
463 process_jobs()
464 push_collector.assert_user_push_matches_fields(
465 user_id,
466 ix=0,
467 title="An email change was initiated on your account",
468 body=f"An email change to the email {new_email} was initiated on your account.",
469 )
471 with auth_api_session() as (auth_api, metadata_interceptor):
472 res = auth_api.ConfirmChangeEmailV2(
473 auth_pb2.ConfirmChangeEmailV2Req(
474 change_email_token=token,
475 )
476 )
478 with session_scope() as session:
479 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
480 assert user.email == new_email
481 assert user.new_email is None
482 assert user.new_email_token is None
483 assert user.new_email_token_created is None
484 assert user.new_email_token_expiry is None
486 process_jobs()
487 push_collector.assert_user_push_matches_fields(
488 user_id,
489 ix=1,
490 title="Email change completed",
491 body="Your new email address has been verified.",
492 )
495def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector):
496 password = random_hex()
497 new_email = f"{random_hex()}@couchers.org.invalid"
498 user, token = generate_user(hashed_password=hash_password(password))
500 with account_session(token) as account:
501 account.ChangeEmailV2(
502 account_pb2.ChangeEmailV2Req(
503 password=password,
504 new_email=new_email,
505 )
506 )
508 process_jobs()
510 with session_scope() as session:
511 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
512 assert len(jobs) == 2
513 payload_for_notification_email = jobs[0].payload
514 payload_for_confirmation_email_new_address = jobs[1].payload
515 uq_str1 = b"An email change to the email"
516 uq_str2 = (
517 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
518 )
519 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
520 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
521 )
523 push_collector.assert_user_has_single_matching(
524 user.id,
525 title="An email change was initiated on your account",
526 body=f"An email change to the email {new_email} was initiated on your account.",
527 )
530def test_contributor_form(db):
531 user, token = generate_user()
533 with account_session(token) as account:
534 res = account.GetContributorFormInfo(empty_pb2.Empty())
535 assert not res.filled_contributor_form
537 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
539 res = account.GetContributorFormInfo(empty_pb2.Empty())
540 assert res.filled_contributor_form
543def test_DeleteAccount_start(db):
544 user, token = generate_user()
546 with account_session(token) as account:
547 with mock_notification_email() as mock:
548 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
549 mock.assert_called_once()
550 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
552 with session_scope() as session:
553 deletion_token = session.execute(
554 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
555 ).scalar_one()
557 assert deletion_token.is_valid
558 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted
561def test_DeleteAccount_message_storage(db):
562 user, token = generate_user()
564 with account_session(token) as account:
565 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
566 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
567 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
568 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
569 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
570 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
572 with session_scope() as session:
573 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
576def test_full_delete_account_with_recovery(db, push_collector):
577 user, token = generate_user()
578 user_id = user.id
580 with account_session(token) as account:
581 with pytest.raises(grpc.RpcError) as e:
582 account.DeleteAccount(account_pb2.DeleteAccountReq())
583 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
584 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE
586 # Check the right email is sent
587 with mock_notification_email() as mock:
588 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
590 push_collector.assert_user_push_matches_fields(
591 user_id,
592 ix=0,
593 title="Account deletion initiated",
594 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.",
595 )
597 mock.assert_called_once()
598 e = email_fields(mock)
600 with session_scope() as session:
601 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
602 token = token_o.token
604 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
605 assert token_o.user == user_
606 assert not user_.is_deleted
607 assert not user_.undelete_token
608 assert not user_.undelete_until
610 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
611 assert e.recipient == user.email
612 assert "account deletion" in e.subject.lower()
613 assert token in e.plain
614 assert token in e.html
615 unique_string = "You requested that we delete your account from Couchers.org."
616 assert unique_string in e.plain
617 assert unique_string in e.html
618 url = f"http://localhost:3000/delete-account?token={token}"
619 assert url in e.plain
620 assert url in e.html
621 assert "support@couchers.org" in e.plain
622 assert "support@couchers.org" in e.html
624 with mock_notification_email() as mock:
625 with auth_api_session() as (auth_api, metadata_interceptor):
626 auth_api.ConfirmDeleteAccount(
627 auth_pb2.ConfirmDeleteAccountReq(
628 token=token,
629 )
630 )
632 push_collector.assert_user_push_matches_fields(
633 user_id,
634 ix=1,
635 title="Your Couchers.org account has been deleted",
636 body="You can still undo this by following the link we emailed to you within 7 days.",
637 )
639 mock.assert_called_once()
640 e = email_fields(mock)
642 with session_scope() as session:
643 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
645 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
646 assert user_.is_deleted
647 assert user_.undelete_token
648 assert user_.undelete_until > now()
650 undelete_token = user_.undelete_token
652 assert e.recipient == user.email
653 assert "account has been deleted" in e.subject.lower()
654 unique_string = "You have successfully deleted your account from Couchers.org."
655 assert unique_string in e.plain
656 assert unique_string in e.html
657 assert "7 days" in e.plain
658 assert "7 days" in e.html
659 url = f"http://localhost:3000/recover-account?token={undelete_token}"
660 assert url in e.plain
661 assert url in e.html
662 assert "support@couchers.org" in e.plain
663 assert "support@couchers.org" in e.html
665 with mock_notification_email() as mock:
666 with auth_api_session() as (auth_api, metadata_interceptor):
667 auth_api.RecoverAccount(
668 auth_pb2.RecoverAccountReq(
669 token=undelete_token,
670 )
671 )
673 push_collector.assert_user_push_matches_fields(
674 user_id,
675 ix=2,
676 title="Your Couchers.org account has been recovered!",
677 body="We have recovered your Couchers.org account as per your request! Welcome back!",
678 )
680 mock.assert_called_once()
681 e = email_fields(mock)
683 assert e.recipient == user.email
684 assert "account has been recovered" in e.subject.lower()
685 unique_string = "Your account on Couchers.org has been successfully recovered!"
686 assert unique_string in e.plain
687 assert unique_string in e.html
688 assert "support@couchers.org" in e.plain
689 assert "support@couchers.org" in e.html
691 with session_scope() as session:
692 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
694 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
695 assert not user.is_deleted
696 assert not user.undelete_token
697 assert not user.undelete_until
700def test_multiple_delete_tokens(db):
701 """
702 Make sure deletion tokens are deleted on delete
703 """
704 user, token = generate_user()
706 with account_session(token) as account:
707 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
708 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
709 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
711 with session_scope() as session:
712 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
713 token = session.execute(select(AccountDeletionToken)).scalars().first().token
715 with auth_api_session() as (auth_api, metadata_interceptor):
716 auth_api.ConfirmDeleteAccount(
717 auth_pb2.ConfirmDeleteAccountReq(
718 token=token,
719 )
720 )
722 with session_scope() as session:
723 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
726def test_ListActiveSessions_pagination(db, fast_passwords):
727 password = random_hex()
728 user, token = generate_user(hashed_password=hash_password(password))
730 with auth_api_session() as (auth_api, metadata_interceptor):
731 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
732 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
733 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
734 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
736 with real_account_session(token) as account:
737 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
738 assert len(res.active_sessions) == 3
739 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
740 assert len(res.active_sessions) == 2
741 assert not res.next_page_token
744def test_ListActiveSessions_details(db, fast_passwords):
745 password = random_hex()
746 user, token = generate_user(hashed_password=hash_password(password))
748 ips_user_agents = [
749 (
750 "108.123.33.162",
751 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
752 ),
753 (
754 "8.245.212.28",
755 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
756 ),
757 (
758 "95.254.140.156",
759 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
760 ),
761 ]
763 for ip, user_agent in ips_user_agents:
764 options = (("grpc.primary_user_agent", user_agent),)
765 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
766 auth_api.Authenticate(
767 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
768 )
770 def dummy_geoip(ip_address):
771 return {
772 "108.123.33.162": "Chicago, United States",
773 "8.245.212.28": "Sydney, Australia",
774 }.get(ip_address)
776 with real_account_session(token) as account:
777 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
778 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
779 print(res)
780 assert len(res.active_sessions) == 4
782 # this one currently making the API call
783 assert res.active_sessions[0].operating_system == "Other"
784 assert res.active_sessions[0].browser == "Other"
785 assert res.active_sessions[0].device == "Other"
786 assert res.active_sessions[0].approximate_location == "Unknown"
787 assert res.active_sessions[0].is_current_session
789 assert res.active_sessions[1].operating_system == "Ubuntu"
790 assert res.active_sessions[1].browser == "Firefox"
791 assert res.active_sessions[1].device == "Other"
792 assert res.active_sessions[1].approximate_location == "Unknown"
793 assert not res.active_sessions[1].is_current_session
795 assert res.active_sessions[2].operating_system == "Android"
796 assert res.active_sessions[2].browser == "Samsung Internet"
797 assert res.active_sessions[2].device == "K"
798 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
799 assert not res.active_sessions[2].is_current_session
801 assert res.active_sessions[3].operating_system == "iOS"
802 assert res.active_sessions[3].browser == "Mobile Safari"
803 assert res.active_sessions[3].device == "iPhone"
804 assert res.active_sessions[3].approximate_location == "Chicago, United States"
805 assert not res.active_sessions[3].is_current_session
808def test_LogOutSession(db, fast_passwords):
809 password = random_hex()
810 user, token = generate_user(hashed_password=hash_password(password))
812 with auth_api_session() as (auth_api, metadata_interceptor):
813 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
814 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
815 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
816 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
818 with real_account_session(token) as account:
819 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
820 assert len(res.active_sessions) == 5
821 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
823 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
824 assert len(res2.active_sessions) == 4
826 # ignore the first session as it changes
827 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
830def test_LogOutOtherSessions(db, fast_passwords):
831 password = random_hex()
832 user, token = generate_user(hashed_password=hash_password(password))
834 with auth_api_session() as (auth_api, metadata_interceptor):
835 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
836 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
837 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
838 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
840 with real_account_session(token) as account:
841 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
842 assert len(res.active_sessions) == 5
843 with pytest.raises(grpc.RpcError) as e:
844 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
845 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
846 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS
848 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
849 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
850 assert len(res.active_sessions) == 1