Coverage for src/tests/test_account.py: 99%
642 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-09-14 15:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-09-14 15:31 +0000
1from datetime import date, timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2, wrappers_pb2
7from sqlalchemy.sql import func
9from couchers import errors, urls
10from couchers.crypto import hash_password, random_hex
11from couchers.db import session_scope
12from couchers.materialized_views import refresh_materialized_views_rapid
13from couchers.models import (
14 AccountDeletionReason,
15 AccountDeletionToken,
16 BackgroundJob,
17 InviteCode,
18 Upload,
19 User,
20 Volunteer,
21)
22from couchers.sql import couchers_select as select
23from couchers.utils import now, today
24from proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2
25from tests.test_fixtures import ( # noqa
26 account_session,
27 auth_api_session,
28 db,
29 email_fields,
30 fast_passwords,
31 generate_user,
32 mock_notification_email,
33 process_jobs,
34 public_session,
35 push_collector,
36 real_account_session,
37 requests_session,
38 testconfig,
39)
42@pytest.fixture(autouse=True)
43def _(testconfig):
44 pass
47def test_GetAccountInfo(db, fast_passwords):
48 # with password
49 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
51 with account_session(token1) as account:
52 res = account.GetAccountInfo(empty_pb2.Empty())
53 assert res.email == "user@couchers.invalid"
54 assert res.username == user1.username
55 assert not res.has_strong_verification
56 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
57 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
58 assert not res.is_superuser
59 assert res.ui_language_preference == ""
60 assert not res.is_volunteer
63def test_GetAccountInfo_regression(db):
64 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
65 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
66 uploader_user, _ = generate_user()
67 with session_scope() as session:
68 key = random_hex(32)
69 filename = random_hex(32) + ".jpg"
70 session.add(
71 Upload(
72 key=key,
73 filename=filename,
74 creator_user_id=uploader_user.id,
75 )
76 )
78 user, token = generate_user(about_me=None, avatar_key=key)
80 with account_session(token) as account:
81 res = account.GetAccountInfo(empty_pb2.Empty())
84def test_ChangePasswordV2_normal(db, fast_passwords, push_collector):
85 # user has old password and is changing to new password
86 old_password = random_hex()
87 new_password = random_hex()
88 user, token = generate_user(hashed_password=hash_password(old_password))
90 with account_session(token) as account:
91 with mock_notification_email() as mock:
92 account.ChangePasswordV2(
93 account_pb2.ChangePasswordV2Req(
94 old_password=old_password,
95 new_password=new_password,
96 )
97 )
99 mock.assert_called_once()
100 assert email_fields(mock).subject == "[TEST] Your password was changed"
102 push_collector.assert_user_has_single_matching(
103 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed."
104 )
106 with session_scope() as session:
107 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
108 assert updated_user.hashed_password == hash_password(new_password)
111def test_ChangePasswordV2_regression(db, fast_passwords):
112 # send_password_changed_email wasn't working
113 # user has old password and is changing to new password
114 old_password = random_hex()
115 new_password = random_hex()
116 user, token = generate_user(hashed_password=hash_password(old_password))
118 with account_session(token) as account:
119 account.ChangePasswordV2(
120 account_pb2.ChangePasswordV2Req(
121 old_password=old_password,
122 new_password=new_password,
123 )
124 )
126 with session_scope() as session:
127 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
128 assert updated_user.hashed_password == hash_password(new_password)
131def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
132 # user has old password and is changing to new password, but used short password
133 old_password = random_hex()
134 new_password = random_hex(length=1)
135 user, token = generate_user(hashed_password=hash_password(old_password))
137 with account_session(token) as account:
138 with pytest.raises(grpc.RpcError) as e:
139 account.ChangePasswordV2(
140 account_pb2.ChangePasswordV2Req(
141 old_password=old_password,
142 new_password=new_password,
143 )
144 )
145 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
146 assert e.value.details() == errors.PASSWORD_TOO_SHORT
148 with session_scope() as session:
149 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
150 assert updated_user.hashed_password == hash_password(old_password)
153def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
154 # user has old password and is changing to new password, but used short password
155 old_password = random_hex()
156 new_password = random_hex(length=1000)
157 user, token = generate_user(hashed_password=hash_password(old_password))
159 with account_session(token) as account:
160 with pytest.raises(grpc.RpcError) as e:
161 account.ChangePasswordV2(
162 account_pb2.ChangePasswordV2Req(
163 old_password=old_password,
164 new_password=new_password,
165 )
166 )
167 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
168 assert e.value.details() == errors.PASSWORD_TOO_LONG
170 with session_scope() as session:
171 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
172 assert updated_user.hashed_password == hash_password(old_password)
175def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
176 # user has old password and is changing to new password, but used insecure password
177 old_password = random_hex()
178 new_password = "12345678"
179 user, token = generate_user(hashed_password=hash_password(old_password))
181 with account_session(token) as account:
182 with pytest.raises(grpc.RpcError) as e:
183 account.ChangePasswordV2(
184 account_pb2.ChangePasswordV2Req(
185 old_password=old_password,
186 new_password=new_password,
187 )
188 )
189 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
190 assert e.value.details() == errors.INSECURE_PASSWORD
192 with session_scope() as session:
193 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
194 assert updated_user.hashed_password == hash_password(old_password)
197def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
198 # user has old password and is changing to new password, but used wrong old password
199 old_password = random_hex()
200 new_password = random_hex()
201 user, token = generate_user(hashed_password=hash_password(old_password))
203 with account_session(token) as account:
204 with pytest.raises(grpc.RpcError) as e:
205 account.ChangePasswordV2(
206 account_pb2.ChangePasswordV2Req(
207 old_password="wrong password",
208 new_password=new_password,
209 )
210 )
211 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
212 assert e.value.details() == errors.INVALID_PASSWORD
214 with session_scope() as session:
215 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
216 assert updated_user.hashed_password == hash_password(old_password)
219def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
220 # user has old password and called with empty body
221 old_password = random_hex()
222 user, token = generate_user(hashed_password=hash_password(old_password))
224 with account_session(token) as account:
225 with pytest.raises(grpc.RpcError) as e:
226 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
227 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
228 assert e.value.details() == errors.PASSWORD_TOO_SHORT
230 with session_scope() as session:
231 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
232 assert updated_user.hashed_password == hash_password(old_password)
235def test_ChangeEmailV2_wrong_password(db, fast_passwords):
236 password = random_hex()
237 new_email = f"{random_hex()}@couchers.org.invalid"
238 user, token = generate_user(hashed_password=hash_password(password))
240 with account_session(token) as account:
241 with pytest.raises(grpc.RpcError) as e:
242 account.ChangeEmailV2(
243 account_pb2.ChangeEmailV2Req(
244 password="wrong password",
245 new_email=new_email,
246 )
247 )
248 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
249 assert e.value.details() == errors.INVALID_PASSWORD
251 with session_scope() as session:
252 assert (
253 session.execute(
254 select(func.count())
255 .select_from(User)
256 .where(User.new_email_token_created <= func.now())
257 .where(User.new_email_token_expiry >= func.now())
258 )
259 ).scalar_one() == 0
262def test_ChangeEmailV2_wrong_email(db, fast_passwords):
263 password = random_hex()
264 new_email = f"{random_hex()}@couchers.org.invalid"
265 user, token = generate_user(hashed_password=hash_password(password))
267 with account_session(token) as account:
268 with pytest.raises(grpc.RpcError) as e:
269 account.ChangeEmailV2(
270 account_pb2.ChangeEmailV2Req(
271 password="wrong password",
272 new_email=new_email,
273 )
274 )
275 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
276 assert e.value.details() == errors.INVALID_PASSWORD
278 with session_scope() as session:
279 assert (
280 session.execute(
281 select(func.count())
282 .select_from(User)
283 .where(User.new_email_token_created <= func.now())
284 .where(User.new_email_token_expiry >= func.now())
285 )
286 ).scalar_one() == 0
289def test_ChangeEmailV2_invalid_email(db, fast_passwords):
290 password = random_hex()
291 user, token = generate_user(hashed_password=hash_password(password))
293 with account_session(token) as account:
294 with pytest.raises(grpc.RpcError) as e:
295 account.ChangeEmailV2(
296 account_pb2.ChangeEmailV2Req(
297 password=password,
298 new_email="not a real email",
299 )
300 )
301 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
302 assert e.value.details() == errors.INVALID_EMAIL
304 with session_scope() as session:
305 assert (
306 session.execute(
307 select(func.count())
308 .select_from(User)
309 .where(User.new_email_token_created <= func.now())
310 .where(User.new_email_token_expiry >= func.now())
311 )
312 ).scalar_one() == 0
315def test_ChangeEmailV2_email_in_use(db, fast_passwords):
316 password = random_hex()
317 user, token = generate_user(hashed_password=hash_password(password))
318 user2, token2 = generate_user(hashed_password=hash_password(password))
320 with account_session(token) as account:
321 with pytest.raises(grpc.RpcError) as e:
322 account.ChangeEmailV2(
323 account_pb2.ChangeEmailV2Req(
324 password=password,
325 new_email=user2.email,
326 )
327 )
328 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
329 assert e.value.details() == errors.INVALID_EMAIL
331 with session_scope() as session:
332 assert (
333 session.execute(
334 select(func.count())
335 .select_from(User)
336 .where(User.new_email_token_created <= func.now())
337 .where(User.new_email_token_expiry >= func.now())
338 )
339 ).scalar_one() == 0
342def test_ChangeEmailV2_no_change(db, fast_passwords):
343 password = random_hex()
344 user, token = generate_user(hashed_password=hash_password(password))
346 with account_session(token) as account:
347 with pytest.raises(grpc.RpcError) as e:
348 account.ChangeEmailV2(
349 account_pb2.ChangeEmailV2Req(
350 password=password,
351 new_email=user.email,
352 )
353 )
354 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
355 assert e.value.details() == errors.INVALID_EMAIL
357 with session_scope() as session:
358 assert (
359 session.execute(
360 select(func.count())
361 .select_from(User)
362 .where(User.new_email_token_created <= func.now())
363 .where(User.new_email_token_expiry >= func.now())
364 )
365 ).scalar_one() == 0
368def test_ChangeEmailV2_wrong_token(db, fast_passwords):
369 password = random_hex()
370 new_email = f"{random_hex()}@couchers.org.invalid"
371 user, token = generate_user(hashed_password=hash_password(password))
373 with account_session(token) as account:
374 account.ChangeEmailV2(
375 account_pb2.ChangeEmailV2Req(
376 password=password,
377 new_email=new_email,
378 )
379 )
381 with auth_api_session() as (auth_api, metadata_interceptor):
382 with pytest.raises(grpc.RpcError) as e:
383 res = auth_api.ConfirmChangeEmailV2(
384 auth_pb2.ConfirmChangeEmailV2Req(
385 change_email_token="wrongtoken",
386 )
387 )
388 assert e.value.code() == grpc.StatusCode.NOT_FOUND
389 assert e.value.details() == errors.INVALID_TOKEN
391 with session_scope() as session:
392 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
393 assert user_updated.email == user.email
396def test_ChangeEmailV2_tokens_two_hour_window(db):
397 def two_hours_one_minute_in_future():
398 return now() + timedelta(hours=2, minutes=1)
400 def one_minute_ago():
401 return now() - timedelta(minutes=1)
403 password = random_hex()
404 new_email = f"{random_hex()}@couchers.org.invalid"
405 user, token = generate_user(hashed_password=hash_password(password))
407 with account_session(token) as account:
408 account.ChangeEmailV2(
409 account_pb2.ChangeEmailV2Req(
410 password=password,
411 new_email=new_email,
412 )
413 )
415 with session_scope() as session:
416 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
417 new_email_token = user.new_email_token
419 with patch("couchers.servicers.auth.now", one_minute_ago):
420 with auth_api_session() as (auth_api, metadata_interceptor):
421 with pytest.raises(grpc.RpcError) as e:
422 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
423 assert e.value.code() == grpc.StatusCode.NOT_FOUND
424 assert e.value.details() == errors.INVALID_TOKEN
426 with pytest.raises(grpc.RpcError) as e:
427 auth_api.ConfirmChangeEmailV2(
428 auth_pb2.ConfirmChangeEmailV2Req(
429 change_email_token=new_email_token,
430 )
431 )
432 assert e.value.code() == grpc.StatusCode.NOT_FOUND
433 assert e.value.details() == errors.INVALID_TOKEN
435 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
436 with auth_api_session() as (auth_api, metadata_interceptor):
437 with pytest.raises(grpc.RpcError) as e:
438 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
439 assert e.value.code() == grpc.StatusCode.NOT_FOUND
440 assert e.value.details() == errors.INVALID_TOKEN
442 with pytest.raises(grpc.RpcError) as e:
443 auth_api.ConfirmChangeEmailV2(
444 auth_pb2.ConfirmChangeEmailV2Req(
445 change_email_token=new_email_token,
446 )
447 )
448 assert e.value.code() == grpc.StatusCode.NOT_FOUND
449 assert e.value.details() == errors.INVALID_TOKEN
452def test_ChangeEmailV2(db, fast_passwords, push_collector):
453 password = random_hex()
454 new_email = f"{random_hex()}@couchers.org.invalid"
455 user, token = generate_user(hashed_password=hash_password(password))
456 user_id = user.id
458 with account_session(token) as account:
459 account.ChangeEmailV2(
460 account_pb2.ChangeEmailV2Req(
461 password=password,
462 new_email=new_email,
463 )
464 )
466 with session_scope() as session:
467 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
468 assert user_updated.email == user.email
469 assert user_updated.new_email == new_email
470 assert user_updated.new_email_token is not None
471 assert user_updated.new_email_token_created <= now()
472 assert user_updated.new_email_token_expiry >= now()
474 token = user_updated.new_email_token
476 process_jobs()
477 push_collector.assert_user_push_matches_fields(
478 user_id,
479 ix=0,
480 title="An email change was initiated on your account",
481 body=f"An email change to the email {new_email} was initiated on your account.",
482 )
484 with auth_api_session() as (auth_api, metadata_interceptor):
485 res = auth_api.ConfirmChangeEmailV2(
486 auth_pb2.ConfirmChangeEmailV2Req(
487 change_email_token=token,
488 )
489 )
491 with session_scope() as session:
492 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
493 assert user.email == new_email
494 assert user.new_email is None
495 assert user.new_email_token is None
496 assert user.new_email_token_created is None
497 assert user.new_email_token_expiry is None
499 process_jobs()
500 push_collector.assert_user_push_matches_fields(
501 user_id,
502 ix=1,
503 title="Email change completed",
504 body="Your new email address has been verified.",
505 )
508def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector):
509 password = random_hex()
510 new_email = f"{random_hex()}@couchers.org.invalid"
511 user, token = generate_user(hashed_password=hash_password(password))
513 with account_session(token) as account:
514 account.ChangeEmailV2(
515 account_pb2.ChangeEmailV2Req(
516 password=password,
517 new_email=new_email,
518 )
519 )
521 process_jobs()
523 with session_scope() as session:
524 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
525 assert len(jobs) == 2
526 payload_for_notification_email = jobs[0].payload
527 payload_for_confirmation_email_new_address = jobs[1].payload
528 uq_str1 = b"An email change to the email"
529 uq_str2 = (
530 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
531 )
532 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
533 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
534 )
536 push_collector.assert_user_has_single_matching(
537 user.id,
538 title="An email change was initiated on your account",
539 body=f"An email change to the email {new_email} was initiated on your account.",
540 )
543def test_ChangeLanguagePreference(db, fast_passwords):
544 # user changes from default to ISO 639-1 language code
545 newLanguageCode = "zh"
546 user, token = generate_user()
548 with real_account_session(token) as account:
549 res = account.GetAccountInfo(empty_pb2.Empty())
550 assert res.ui_language_preference == ""
552 # call will have info about the request
553 res, call = account.ChangeLanguagePreference.with_call(
554 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=newLanguageCode)
555 )
557 # cookies are sent via initial metadata, so we check for it there
558 for key, val in call.initial_metadata():
559 if key == "set-cookie":
560 # the value of "set-cookie" will be the full cookie string, pull the key value from the string
561 key_val = val.split(";")[0]
562 if key_val == "NEXT_LOCALE=zh":
563 # the changed language preference should also be sent to the backend
564 res = account.GetAccountInfo(empty_pb2.Empty())
565 assert res.ui_language_preference == "zh"
566 return
567 raise Exception(f"Didn't find right cookie, got {call.initial_metadata()}")
570def test_contributor_form(db):
571 user, token = generate_user()
573 with account_session(token) as account:
574 res = account.GetContributorFormInfo(empty_pb2.Empty())
575 assert not res.filled_contributor_form
577 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
579 res = account.GetContributorFormInfo(empty_pb2.Empty())
580 assert res.filled_contributor_form
583def test_DeleteAccount_start(db):
584 user, token = generate_user()
586 with account_session(token) as account:
587 with mock_notification_email() as mock:
588 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
589 mock.assert_called_once()
590 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
592 with session_scope() as session:
593 deletion_token = session.execute(
594 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
595 ).scalar_one()
597 assert deletion_token.is_valid
598 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted
601def test_DeleteAccount_message_storage(db):
602 user, token = generate_user()
604 with account_session(token) as account:
605 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
606 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
607 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
608 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
609 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
610 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
612 with session_scope() as session:
613 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
616def test_full_delete_account_with_recovery(db, push_collector):
617 user, token = generate_user()
618 user_id = user.id
620 with account_session(token) as account:
621 with pytest.raises(grpc.RpcError) as e:
622 account.DeleteAccount(account_pb2.DeleteAccountReq())
623 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
624 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE
626 # Check the right email is sent
627 with mock_notification_email() as mock:
628 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
630 push_collector.assert_user_push_matches_fields(
631 user_id,
632 ix=0,
633 title="Account deletion initiated",
634 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.",
635 )
637 mock.assert_called_once()
638 e = email_fields(mock)
640 with session_scope() as session:
641 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
642 token = token_o.token
644 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
645 assert token_o.user == user_
646 assert not user_.is_deleted
647 assert not user_.undelete_token
648 assert not user_.undelete_until
650 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
651 assert e.recipient == user.email
652 assert "account deletion" in e.subject.lower()
653 assert token in e.plain
654 assert token in e.html
655 unique_string = "You requested that we delete your account from Couchers.org."
656 assert unique_string in e.plain
657 assert unique_string in e.html
658 url = f"http://localhost:3000/delete-account?token={token}"
659 assert url in e.plain
660 assert url in e.html
661 assert "support@couchers.org" in e.plain
662 assert "support@couchers.org" in e.html
664 with mock_notification_email() as mock:
665 with auth_api_session() as (auth_api, metadata_interceptor):
666 auth_api.ConfirmDeleteAccount(
667 auth_pb2.ConfirmDeleteAccountReq(
668 token=token,
669 )
670 )
672 push_collector.assert_user_push_matches_fields(
673 user_id,
674 ix=1,
675 title="Your Couchers.org account has been deleted",
676 body="You can still undo this by following the link we emailed to you within 7 days.",
677 )
679 mock.assert_called_once()
680 e = email_fields(mock)
682 with session_scope() as session:
683 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
685 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
686 assert user_.is_deleted
687 assert user_.undelete_token
688 assert user_.undelete_until > now()
690 undelete_token = user_.undelete_token
692 assert e.recipient == user.email
693 assert "account has been deleted" in e.subject.lower()
694 unique_string = "You have successfully deleted your account from Couchers.org."
695 assert unique_string in e.plain
696 assert unique_string in e.html
697 assert "7 days" in e.plain
698 assert "7 days" in e.html
699 url = f"http://localhost:3000/recover-account?token={undelete_token}"
700 assert url in e.plain
701 assert url in e.html
702 assert "support@couchers.org" in e.plain
703 assert "support@couchers.org" in e.html
705 with mock_notification_email() as mock:
706 with auth_api_session() as (auth_api, metadata_interceptor):
707 auth_api.RecoverAccount(
708 auth_pb2.RecoverAccountReq(
709 token=undelete_token,
710 )
711 )
713 push_collector.assert_user_push_matches_fields(
714 user_id,
715 ix=2,
716 title="Your Couchers.org account has been recovered!",
717 body="We have recovered your Couchers.org account as per your request! Welcome back!",
718 )
720 mock.assert_called_once()
721 e = email_fields(mock)
723 assert e.recipient == user.email
724 assert "account has been recovered" in e.subject.lower()
725 unique_string = "Your account on Couchers.org has been successfully recovered!"
726 assert unique_string in e.plain
727 assert unique_string in e.html
728 assert "support@couchers.org" in e.plain
729 assert "support@couchers.org" in e.html
731 with session_scope() as session:
732 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
734 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
735 assert not user.is_deleted
736 assert not user.undelete_token
737 assert not user.undelete_until
740def test_multiple_delete_tokens(db):
741 """
742 Make sure deletion tokens are deleted on delete
743 """
744 user, token = generate_user()
746 with account_session(token) as account:
747 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
748 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
749 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
751 with session_scope() as session:
752 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
753 token = session.execute(select(AccountDeletionToken).limit(1)).scalars().one_or_none().token
755 with auth_api_session() as (auth_api, metadata_interceptor):
756 auth_api.ConfirmDeleteAccount(
757 auth_pb2.ConfirmDeleteAccountReq(
758 token=token,
759 )
760 )
762 with session_scope() as session:
763 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
766def test_ListActiveSessions_pagination(db, fast_passwords):
767 password = random_hex()
768 user, token = generate_user(hashed_password=hash_password(password))
770 with auth_api_session() as (auth_api, metadata_interceptor):
771 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
772 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
773 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
774 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
776 with real_account_session(token) as account:
777 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
778 assert len(res.active_sessions) == 3
779 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
780 assert len(res.active_sessions) == 2
781 assert not res.next_page_token
784def test_ListActiveSessions_details(db, fast_passwords):
785 password = random_hex()
786 user, token = generate_user(hashed_password=hash_password(password))
788 ips_user_agents = [
789 (
790 "108.123.33.162",
791 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
792 ),
793 (
794 "8.245.212.28",
795 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
796 ),
797 (
798 "95.254.140.156",
799 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
800 ),
801 ]
803 for ip, user_agent in ips_user_agents:
804 options = (("grpc.primary_user_agent", user_agent),)
805 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
806 auth_api.Authenticate(
807 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
808 )
810 def dummy_geoip(ip_address):
811 return {
812 "108.123.33.162": "Chicago, United States",
813 "8.245.212.28": "Sydney, Australia",
814 }.get(ip_address)
816 with real_account_session(token) as account:
817 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
818 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
819 print(res)
820 assert len(res.active_sessions) == 4
822 # this one currently making the API call
823 assert res.active_sessions[0].operating_system == "Other"
824 assert res.active_sessions[0].browser == "Other"
825 assert res.active_sessions[0].device == "Other"
826 assert res.active_sessions[0].approximate_location == "Unknown"
827 assert res.active_sessions[0].is_current_session
829 assert res.active_sessions[1].operating_system == "Ubuntu"
830 assert res.active_sessions[1].browser == "Firefox"
831 assert res.active_sessions[1].device == "Other"
832 assert res.active_sessions[1].approximate_location == "Unknown"
833 assert not res.active_sessions[1].is_current_session
835 assert res.active_sessions[2].operating_system == "Android"
836 assert res.active_sessions[2].browser == "Samsung Internet"
837 assert res.active_sessions[2].device == "K"
838 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
839 assert not res.active_sessions[2].is_current_session
841 assert res.active_sessions[3].operating_system == "iOS"
842 assert res.active_sessions[3].browser == "Mobile Safari"
843 assert res.active_sessions[3].device == "iPhone"
844 assert res.active_sessions[3].approximate_location == "Chicago, United States"
845 assert not res.active_sessions[3].is_current_session
848def test_LogOutSession(db, fast_passwords):
849 password = random_hex()
850 user, token = generate_user(hashed_password=hash_password(password))
852 with auth_api_session() as (auth_api, metadata_interceptor):
853 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
854 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
855 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
856 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
858 with real_account_session(token) as account:
859 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
860 assert len(res.active_sessions) == 5
861 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
863 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
864 assert len(res2.active_sessions) == 4
866 # ignore the first session as it changes
867 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
870def test_LogOutOtherSessions(db, fast_passwords):
871 password = random_hex()
872 user, token = generate_user(hashed_password=hash_password(password))
874 with auth_api_session() as (auth_api, metadata_interceptor):
875 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
876 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
877 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
878 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
880 with real_account_session(token) as account:
881 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
882 assert len(res.active_sessions) == 5
883 with pytest.raises(grpc.RpcError) as e:
884 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
885 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
886 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS
888 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
889 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
890 assert len(res.active_sessions) == 1
893def test_CreateInviteCode(db):
894 user, token = generate_user()
896 with account_session(token) as account:
897 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq())
898 code = res.code
899 assert len(code) == 8
901 with session_scope() as session:
902 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
903 assert invite.creator_user_id == user.id
904 assert invite.disabled is None
905 assert res.url == urls.invite_code_link(code=res.code)
908def test_DisableInviteCode(db):
909 user, token = generate_user()
910 code = "TEST1234"
911 with session_scope() as session:
912 session.add(InviteCode(id=code, creator_user_id=user.id))
913 session.commit()
915 with account_session(token) as account:
916 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code))
918 with session_scope() as session:
919 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
920 assert invite.disabled is not None
923def test_ListInviteCodes(db):
924 user, token = generate_user()
925 another_user, _ = generate_user()
927 code = "LIST1234"
928 with session_scope() as session:
929 session.add(InviteCode(id=code, creator_user_id=user.id))
930 db_other_user = session.execute(select(User).where(User.id == another_user.id)).scalar_one()
931 db_other_user.invite_code_id = code
932 session.commit()
934 with account_session(token) as account:
935 res = account.ListInviteCodes(empty_pb2.Empty())
936 assert len(res.invite_codes) == 1
937 assert res.invite_codes[0].code == code
938 assert res.invite_codes[0].uses == 1
939 assert res.invite_codes[0].url == urls.invite_code_link(code=code)
942def test_reminders(db):
943 # the strong verification reminder's absence is tested in test_strong_verification.py
944 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite
945 # we use LiteUser, so remember to refresh materialized views
946 user, token = generate_user(complete_profile=False)
947 complete_user, complete_token = generate_user(complete_profile=True)
948 req_user1, req_user_token1 = generate_user(complete_profile=True)
949 req_user2, req_user_token2 = generate_user(complete_profile=True)
951 refresh_materialized_views_rapid(None)
952 with account_session(complete_token) as account:
953 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
954 "complete_verification_reminder"
955 ]
956 with account_session(token) as account:
957 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
958 "complete_profile_reminder",
959 "complete_verification_reminder",
960 ]
962 today_plus_2 = (today() + timedelta(days=2)).isoformat()
963 today_plus_3 = (today() + timedelta(days=3)).isoformat()
964 with requests_session(req_user_token1) as api:
965 host_request1_id = api.CreateHostRequest(
966 requests_pb2.CreateHostRequestReq(
967 host_user_id=user.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request 1"
968 )
969 ).host_request_id
971 with account_session(token) as account:
972 reminders = account.GetReminders(empty_pb2.Empty()).reminders
973 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
974 "respond_to_host_request_reminder",
975 "complete_profile_reminder",
976 "complete_verification_reminder",
977 ]
978 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
979 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
981 with requests_session(req_user_token2) as api:
982 host_request2_id = api.CreateHostRequest(
983 requests_pb2.CreateHostRequestReq(
984 host_user_id=user.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request 2"
985 )
986 ).host_request_id
988 refresh_materialized_views_rapid(None)
989 with account_session(token) as account:
990 reminders = account.GetReminders(empty_pb2.Empty()).reminders
991 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
992 "respond_to_host_request_reminder",
993 "respond_to_host_request_reminder",
994 "complete_profile_reminder",
995 "complete_verification_reminder",
996 ]
997 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
998 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
999 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1000 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1002 with requests_session(req_user_token1) as api:
1003 host_request3_id = api.CreateHostRequest(
1004 requests_pb2.CreateHostRequestReq(
1005 host_user_id=user.id, from_date=today_plus_2, to_date=today_plus_3, text="Test request 3"
1006 )
1007 ).host_request_id
1009 refresh_materialized_views_rapid(None)
1010 with account_session(token) as account:
1011 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1012 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1013 "respond_to_host_request_reminder",
1014 "respond_to_host_request_reminder",
1015 "respond_to_host_request_reminder",
1016 "complete_profile_reminder",
1017 "complete_verification_reminder",
1018 ]
1019 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1020 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1021 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1022 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1023 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id
1024 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1026 # accept req
1027 with requests_session(token) as api:
1028 api.RespondHostRequest(
1029 requests_pb2.RespondHostRequestReq(
1030 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED
1031 )
1032 )
1034 refresh_materialized_views_rapid(None)
1035 with account_session(token) as account:
1036 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1037 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1038 "respond_to_host_request_reminder",
1039 "respond_to_host_request_reminder",
1040 "complete_profile_reminder",
1041 "complete_verification_reminder",
1042 ]
1043 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id
1044 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1045 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id
1046 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1049def test_volunteer_stuff(db):
1050 # with password
1051 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam")
1053 with account_session(token) as account:
1054 res = account.GetAccountInfo(empty_pb2.Empty())
1055 assert not res.is_volunteer
1057 with pytest.raises(grpc.RpcError) as e:
1058 account.GetMyVolunteerInfo(empty_pb2.Empty())
1059 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1060 assert e.value.details() == errors.NOT_A_VOLUNTEER
1062 with pytest.raises(grpc.RpcError) as e:
1063 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq())
1064 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1065 assert e.value.details() == errors.NOT_A_VOLUNTEER
1067 with session_scope() as session:
1068 session.add(
1069 Volunteer(
1070 user_id=user.id,
1071 display_name="Great Volunteer",
1072 display_location="The Bitbucket",
1073 role="Lead Tester",
1074 started_volunteering=date(2020, 6, 1),
1075 show_on_team_page=True,
1076 )
1077 )
1079 with account_session(token) as account:
1080 res = account.GetAccountInfo(empty_pb2.Empty())
1081 assert res.is_volunteer
1083 res = account.GetMyVolunteerInfo(empty_pb2.Empty())
1085 assert res.display_name == "Great Volunteer"
1086 assert res.display_location == "The Bitbucket"
1087 assert res.role == "Lead Tester"
1088 assert res.started_volunteering == "2020-06-01"
1089 assert not res.stopped_volunteering
1090 assert res.show_on_team_page
1091 assert res.link_type == "couchers"
1092 assert res.link_text == "@tester"
1093 assert res.link_url == "http://localhost:3000/user/tester"
1095 res = account.UpdateMyVolunteerInfo(
1096 account_pb2.UpdateMyVolunteerInfoReq(
1097 display_name=wrappers_pb2.StringValue(value=""),
1098 link_type=wrappers_pb2.StringValue(value="website"),
1099 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"),
1100 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"),
1101 )
1102 )
1104 assert res.display_name == ""
1105 assert res.display_location == "The Bitbucket"
1106 assert res.role == "Lead Tester"
1107 assert res.started_volunteering == "2020-06-01"
1108 assert not res.stopped_volunteering
1109 assert res.show_on_team_page
1110 assert res.link_type == "website"
1111 assert res.link_text == "testervontester.com.invalid"
1112 assert res.link_url == "https://www.testervontester.com.invalid/"
1113 res = account.UpdateMyVolunteerInfo(
1114 account_pb2.UpdateMyVolunteerInfoReq(
1115 display_name=wrappers_pb2.StringValue(value=""),
1116 link_type=wrappers_pb2.StringValue(value="linkedin"),
1117 link_text=wrappers_pb2.StringValue(value="tester-vontester"),
1118 )
1119 )
1120 assert res.display_name == ""
1121 assert res.display_location == "The Bitbucket"
1122 assert res.role == "Lead Tester"
1123 assert res.started_volunteering == "2020-06-01"
1124 assert not res.stopped_volunteering
1125 assert res.show_on_team_page
1126 assert res.link_type == "linkedin"
1127 assert res.link_text == "tester-vontester"
1128 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/"
1130 res = account.UpdateMyVolunteerInfo(
1131 account_pb2.UpdateMyVolunteerInfoReq(
1132 display_name=wrappers_pb2.StringValue(value="Tester"),
1133 display_location=wrappers_pb2.StringValue(value=""),
1134 link_type=wrappers_pb2.StringValue(value="email"),
1135 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"),
1136 )
1137 )
1138 assert res.display_name == "Tester"
1139 assert res.display_location == ""
1140 assert res.role == "Lead Tester"
1141 assert res.started_volunteering == "2020-06-01"
1142 assert not res.stopped_volunteering
1143 assert res.show_on_team_page
1144 assert res.link_type == "email"
1145 assert res.link_text == "tester@vontester.com.invalid"
1146 assert res.link_url == "mailto:tester@vontester.com.invalid"
1148 refresh_materialized_views_rapid(None)
1150 with public_session() as public:
1151 res = public.GetVolunteers(empty_pb2.Empty())
1152 assert len(res.current_volunteers) == 1
1153 v = res.current_volunteers[0]
1154 assert v.name == "Tester"
1155 assert v.username == "tester"
1156 assert v.is_board_member
1157 assert v.role == "Lead Tester"
1158 assert v.location == "Amsterdam"
1159 assert v.img.startswith("http://localhost:5001/img/thumbnail/")
1160 assert v.link_type == "email"
1161 assert v.link_text == "tester@vontester.com.invalid"
1162 assert v.link_url == "mailto:tester@vontester.com.invalid"