Coverage for src/tests/test_account.py: 99%
701 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1from datetime import UTC, date, datetime, timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2, wrappers_pb2
7from sqlalchemy import update
8from sqlalchemy.sql import func
10from couchers import constants, urls
11from couchers.crypto import hash_password, random_hex
12from couchers.db import session_scope
13from couchers.materialized_views import refresh_materialized_views_rapid
14from couchers.models import (
15 AccountDeletionReason,
16 AccountDeletionToken,
17 BackgroundJob,
18 InviteCode,
19 Upload,
20 User,
21 Volunteer,
22)
23from couchers.proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2
24from couchers.sql import couchers_select as select
25from couchers.utils import now, today
26from tests.test_fixtures import ( # noqa
27 account_session,
28 auth_api_session,
29 email_fields,
30 generate_user,
31 mock_notification_email,
32 moderator,
33 process_jobs,
34 public_session,
35 real_account_session,
36 requests_session,
37)
38from tests.test_requests import valid_request_text
41@pytest.fixture(autouse=True)
42def _(testconfig):
43 pass
46def test_GetAccountInfo(db, fast_passwords):
47 # with password
48 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
50 with account_session(token1) as account:
51 res = account.GetAccountInfo(empty_pb2.Empty())
52 assert res.email == "user@couchers.invalid"
53 assert res.username == user1.username
54 assert not res.has_strong_verification
55 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
56 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
57 assert not res.is_superuser
58 assert res.ui_language_preference == ""
59 assert not res.is_volunteer
62def test_donation_banner_no_drive(db):
63 """Test that the banner is not shown when DONATION_DRIVE_START is None"""
65 original_value = constants.DONATION_DRIVE_START
66 try:
67 constants.DONATION_DRIVE_START = None
69 # User has donated, but the drive is disabled, so the banner should not show
70 user, token = generate_user()
72 with account_session(token) as account:
73 res = account.GetAccountInfo(empty_pb2.Empty())
74 assert not res.should_show_donation_banner
75 finally:
76 constants.DONATION_DRIVE_START = original_value
79def test_donation_banner_never_donated(db):
80 """Test that banner is shown when user has never donated and drive is active"""
82 original_value = constants.DONATION_DRIVE_START
83 try:
84 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
85 constants.DONATION_DRIVE_START = drive_start
87 # Explicitly set last_donated=None since generate_user defaults to now()
88 user, token = generate_user(last_donated=None)
90 with account_session(token) as account:
91 res = account.GetAccountInfo(empty_pb2.Empty())
92 assert res.should_show_donation_banner
93 finally:
94 constants.DONATION_DRIVE_START = original_value
97def test_donation_banner_donated_before_drive(db):
98 """Test that banner is shown when user donated before drive start"""
100 original_value = constants.DONATION_DRIVE_START
101 try:
102 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
103 constants.DONATION_DRIVE_START = drive_start
105 user, token = generate_user()
107 # Set donation before drive start
108 with session_scope() as session:
109 last_donated = datetime(2025, 10, 15, tzinfo=UTC) # Before Nov 1
110 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated))
112 with account_session(token) as account:
113 res = account.GetAccountInfo(empty_pb2.Empty())
114 assert res.should_show_donation_banner
115 finally:
116 constants.DONATION_DRIVE_START = original_value
119def test_donation_banner_donated_after_drive(db):
120 """Test that banner is not shown when user donated after drive start"""
122 original_value = constants.DONATION_DRIVE_START
123 try:
124 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
125 constants.DONATION_DRIVE_START = drive_start
127 user, token = generate_user()
129 # Set donation after drive start
130 with session_scope() as session:
131 last_donated = datetime(2025, 11, 15, tzinfo=UTC) # After Nov 1
132 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated))
134 with account_session(token) as account:
135 res = account.GetAccountInfo(empty_pb2.Empty())
136 assert not res.should_show_donation_banner
137 finally:
138 constants.DONATION_DRIVE_START = original_value
141def test_donation_banner_donated_exactly_at_drive_start(db):
142 """Test that banner is not shown when user donated exactly at drive start time"""
144 original_value = constants.DONATION_DRIVE_START
145 try:
146 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
147 constants.DONATION_DRIVE_START = drive_start
149 user, token = generate_user()
151 # Set donation exactly at drive start
152 with session_scope() as session:
153 session.execute(update(User).where(User.id == user.id).values(last_donated=drive_start))
155 with account_session(token) as account:
156 res = account.GetAccountInfo(empty_pb2.Empty())
157 assert not res.should_show_donation_banner
158 finally:
159 constants.DONATION_DRIVE_START = original_value
162def test_GetAccountInfo_regression(db):
163 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
164 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
165 uploader_user, _ = generate_user()
166 with session_scope() as session:
167 key = random_hex(32)
168 filename = random_hex(32) + ".jpg"
169 session.add(
170 Upload(
171 key=key,
172 filename=filename,
173 creator_user_id=uploader_user.id,
174 )
175 )
177 user, token = generate_user(about_me=None, avatar_key=key)
179 with account_session(token) as account:
180 res = account.GetAccountInfo(empty_pb2.Empty())
183def test_ChangePasswordV2_normal(db, fast_passwords, push_collector):
184 # user has old password and is changing to new password
185 old_password = random_hex()
186 new_password = random_hex()
187 user, token = generate_user(hashed_password=hash_password(old_password))
189 with account_session(token) as account:
190 with mock_notification_email() as mock:
191 account.ChangePasswordV2(
192 account_pb2.ChangePasswordV2Req(
193 old_password=old_password,
194 new_password=new_password,
195 )
196 )
198 mock.assert_called_once()
199 assert email_fields(mock).subject == "[TEST] Your password was changed"
201 push_collector.assert_user_has_single_matching(
202 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed."
203 )
205 with session_scope() as session:
206 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
207 assert updated_user.hashed_password == hash_password(new_password)
210def test_ChangePasswordV2_regression(db, fast_passwords):
211 # send_password_changed_email wasn't working
212 # user has old password and is changing to new password
213 old_password = random_hex()
214 new_password = random_hex()
215 user, token = generate_user(hashed_password=hash_password(old_password))
217 with account_session(token) as account:
218 account.ChangePasswordV2(
219 account_pb2.ChangePasswordV2Req(
220 old_password=old_password,
221 new_password=new_password,
222 )
223 )
225 with session_scope() as session:
226 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
227 assert updated_user.hashed_password == hash_password(new_password)
230def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
231 # user has old password and is changing to new password, but used short password
232 old_password = random_hex()
233 new_password = random_hex(length=1)
234 user, token = generate_user(hashed_password=hash_password(old_password))
236 with account_session(token) as account:
237 with pytest.raises(grpc.RpcError) as e:
238 account.ChangePasswordV2(
239 account_pb2.ChangePasswordV2Req(
240 old_password=old_password,
241 new_password=new_password,
242 )
243 )
244 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
245 assert e.value.details() == "The password must be 8 or more characters long."
247 with session_scope() as session:
248 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
249 assert updated_user.hashed_password == hash_password(old_password)
252def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
253 # user has old password and is changing to new password, but used short password
254 old_password = random_hex()
255 new_password = random_hex(length=1000)
256 user, token = generate_user(hashed_password=hash_password(old_password))
258 with account_session(token) as account:
259 with pytest.raises(grpc.RpcError) as e:
260 account.ChangePasswordV2(
261 account_pb2.ChangePasswordV2Req(
262 old_password=old_password,
263 new_password=new_password,
264 )
265 )
266 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
267 assert e.value.details() == "The password must be less than 256 characters."
269 with session_scope() as session:
270 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
271 assert updated_user.hashed_password == hash_password(old_password)
274def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
275 # user has old password and is changing to new password, but used insecure password
276 old_password = random_hex()
277 new_password = "12345678"
278 user, token = generate_user(hashed_password=hash_password(old_password))
280 with account_session(token) as account:
281 with pytest.raises(grpc.RpcError) as e:
282 account.ChangePasswordV2(
283 account_pb2.ChangePasswordV2Req(
284 old_password=old_password,
285 new_password=new_password,
286 )
287 )
288 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
289 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable."
291 with session_scope() as session:
292 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
293 assert updated_user.hashed_password == hash_password(old_password)
296def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
297 # user has old password and is changing to new password, but used wrong old password
298 old_password = random_hex()
299 new_password = random_hex()
300 user, token = generate_user(hashed_password=hash_password(old_password))
302 with account_session(token) as account:
303 with pytest.raises(grpc.RpcError) as e:
304 account.ChangePasswordV2(
305 account_pb2.ChangePasswordV2Req(
306 old_password="wrong password",
307 new_password=new_password,
308 )
309 )
310 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
311 assert e.value.details() == "Wrong password."
313 with session_scope() as session:
314 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
315 assert updated_user.hashed_password == hash_password(old_password)
318def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
319 # user has old password and called with empty body
320 old_password = random_hex()
321 user, token = generate_user(hashed_password=hash_password(old_password))
323 with account_session(token) as account:
324 with pytest.raises(grpc.RpcError) as e:
325 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
326 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
327 assert e.value.details() == "The password must be 8 or more characters long."
329 with session_scope() as session:
330 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
331 assert updated_user.hashed_password == hash_password(old_password)
334def test_ChangeEmailV2_wrong_password(db, fast_passwords):
335 password = random_hex()
336 new_email = f"{random_hex()}@couchers.org.invalid"
337 user, token = generate_user(hashed_password=hash_password(password))
339 with account_session(token) as account:
340 with pytest.raises(grpc.RpcError) as e:
341 account.ChangeEmailV2(
342 account_pb2.ChangeEmailV2Req(
343 password="wrong password",
344 new_email=new_email,
345 )
346 )
347 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
348 assert e.value.details() == "Wrong password."
350 with session_scope() as session:
351 assert (
352 session.execute(
353 select(func.count())
354 .select_from(User)
355 .where(User.new_email_token_created <= func.now())
356 .where(User.new_email_token_expiry >= func.now())
357 )
358 ).scalar_one() == 0
361def test_ChangeEmailV2_wrong_email(db, fast_passwords):
362 password = random_hex()
363 new_email = f"{random_hex()}@couchers.org.invalid"
364 user, token = generate_user(hashed_password=hash_password(password))
366 with account_session(token) as account:
367 with pytest.raises(grpc.RpcError) as e:
368 account.ChangeEmailV2(
369 account_pb2.ChangeEmailV2Req(
370 password="wrong password",
371 new_email=new_email,
372 )
373 )
374 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
375 assert e.value.details() == "Wrong password."
377 with session_scope() as session:
378 assert (
379 session.execute(
380 select(func.count())
381 .select_from(User)
382 .where(User.new_email_token_created <= func.now())
383 .where(User.new_email_token_expiry >= func.now())
384 )
385 ).scalar_one() == 0
388def test_ChangeEmailV2_invalid_email(db, fast_passwords):
389 password = random_hex()
390 user, token = generate_user(hashed_password=hash_password(password))
392 with account_session(token) as account:
393 with pytest.raises(grpc.RpcError) as e:
394 account.ChangeEmailV2(
395 account_pb2.ChangeEmailV2Req(
396 password=password,
397 new_email="not a real email",
398 )
399 )
400 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
401 assert e.value.details() == "Invalid email."
403 with session_scope() as session:
404 assert (
405 session.execute(
406 select(func.count())
407 .select_from(User)
408 .where(User.new_email_token_created <= func.now())
409 .where(User.new_email_token_expiry >= func.now())
410 )
411 ).scalar_one() == 0
414def test_ChangeEmailV2_email_in_use(db, fast_passwords):
415 password = random_hex()
416 user, token = generate_user(hashed_password=hash_password(password))
417 user2, token2 = generate_user(hashed_password=hash_password(password))
419 with account_session(token) as account:
420 with pytest.raises(grpc.RpcError) as e:
421 account.ChangeEmailV2(
422 account_pb2.ChangeEmailV2Req(
423 password=password,
424 new_email=user2.email,
425 )
426 )
427 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
428 assert e.value.details() == "Invalid email."
430 with session_scope() as session:
431 assert (
432 session.execute(
433 select(func.count())
434 .select_from(User)
435 .where(User.new_email_token_created <= func.now())
436 .where(User.new_email_token_expiry >= func.now())
437 )
438 ).scalar_one() == 0
441def test_ChangeEmailV2_no_change(db, fast_passwords):
442 password = random_hex()
443 user, token = generate_user(hashed_password=hash_password(password))
445 with account_session(token) as account:
446 with pytest.raises(grpc.RpcError) as e:
447 account.ChangeEmailV2(
448 account_pb2.ChangeEmailV2Req(
449 password=password,
450 new_email=user.email,
451 )
452 )
453 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
454 assert e.value.details() == "Invalid email."
456 with session_scope() as session:
457 assert (
458 session.execute(
459 select(func.count())
460 .select_from(User)
461 .where(User.new_email_token_created <= func.now())
462 .where(User.new_email_token_expiry >= func.now())
463 )
464 ).scalar_one() == 0
467def test_ChangeEmailV2_wrong_token(db, fast_passwords):
468 password = random_hex()
469 new_email = f"{random_hex()}@couchers.org.invalid"
470 user, token = generate_user(hashed_password=hash_password(password))
472 with account_session(token) as account:
473 account.ChangeEmailV2(
474 account_pb2.ChangeEmailV2Req(
475 password=password,
476 new_email=new_email,
477 )
478 )
480 with auth_api_session() as (auth_api, metadata_interceptor):
481 with pytest.raises(grpc.RpcError) as e:
482 res = auth_api.ConfirmChangeEmailV2(
483 auth_pb2.ConfirmChangeEmailV2Req(
484 change_email_token="wrongtoken",
485 )
486 )
487 assert e.value.code() == grpc.StatusCode.NOT_FOUND
488 assert e.value.details() == "Invalid token."
490 with session_scope() as session:
491 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
492 assert user_updated.email == user.email
495def test_ChangeEmailV2_tokens_two_hour_window(db):
496 def two_hours_one_minute_in_future():
497 return now() + timedelta(hours=2, minutes=1)
499 def one_minute_ago():
500 return now() - timedelta(minutes=1)
502 password = random_hex()
503 new_email = f"{random_hex()}@couchers.org.invalid"
504 user, token = generate_user(hashed_password=hash_password(password))
506 with account_session(token) as account:
507 account.ChangeEmailV2(
508 account_pb2.ChangeEmailV2Req(
509 password=password,
510 new_email=new_email,
511 )
512 )
514 with session_scope() as session:
515 new_email_token = session.execute(select(User.new_email_token).where(User.id == user.id)).scalar_one()
517 with patch("couchers.servicers.auth.now", one_minute_ago):
518 with auth_api_session() as (auth_api, metadata_interceptor):
519 with pytest.raises(grpc.RpcError) as e:
520 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
521 assert e.value.code() == grpc.StatusCode.NOT_FOUND
522 assert e.value.details() == "Invalid token."
524 with pytest.raises(grpc.RpcError) as e:
525 auth_api.ConfirmChangeEmailV2(
526 auth_pb2.ConfirmChangeEmailV2Req(
527 change_email_token=new_email_token,
528 )
529 )
530 assert e.value.code() == grpc.StatusCode.NOT_FOUND
531 assert e.value.details() == "Invalid token."
533 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
534 with auth_api_session() as (auth_api, metadata_interceptor):
535 with pytest.raises(grpc.RpcError) as e:
536 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
537 assert e.value.code() == grpc.StatusCode.NOT_FOUND
538 assert e.value.details() == "Invalid token."
540 with pytest.raises(grpc.RpcError) as e:
541 auth_api.ConfirmChangeEmailV2(
542 auth_pb2.ConfirmChangeEmailV2Req(
543 change_email_token=new_email_token,
544 )
545 )
546 assert e.value.code() == grpc.StatusCode.NOT_FOUND
547 assert e.value.details() == "Invalid token."
550def test_ChangeEmailV2(db, fast_passwords, push_collector):
551 password = random_hex()
552 new_email = f"{random_hex()}@couchers.org.invalid"
553 user, token = generate_user(hashed_password=hash_password(password))
554 user_id = user.id
556 with account_session(token) as account:
557 account.ChangeEmailV2(
558 account_pb2.ChangeEmailV2Req(
559 password=password,
560 new_email=new_email,
561 )
562 )
564 with session_scope() as session:
565 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
566 assert user_updated.email == user.email
567 assert user_updated.new_email == new_email
568 assert user_updated.new_email_token is not None
569 assert user_updated.new_email_token_created <= now()
570 assert user_updated.new_email_token_expiry >= now()
572 token = user_updated.new_email_token
574 process_jobs()
575 push_collector.assert_user_push_matches_fields(
576 user_id,
577 ix=0,
578 title="An email change was initiated on your account",
579 body=f"An email change to the email {new_email} was initiated on your account.",
580 )
582 with auth_api_session() as (auth_api, metadata_interceptor):
583 res = auth_api.ConfirmChangeEmailV2(
584 auth_pb2.ConfirmChangeEmailV2Req(
585 change_email_token=token,
586 )
587 )
589 with session_scope() as session:
590 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
591 assert user.email == new_email
592 assert user.new_email is None
593 assert user.new_email_token is None
594 assert user.new_email_token_created is None
595 assert user.new_email_token_expiry is None
597 process_jobs()
598 push_collector.assert_user_push_matches_fields(
599 user_id,
600 ix=1,
601 title="Email change completed",
602 body="Your new email address has been verified.",
603 )
606def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector):
607 password = random_hex()
608 new_email = f"{random_hex()}@couchers.org.invalid"
609 user, token = generate_user(hashed_password=hash_password(password))
611 with account_session(token) as account:
612 account.ChangeEmailV2(
613 account_pb2.ChangeEmailV2Req(
614 password=password,
615 new_email=new_email,
616 )
617 )
619 process_jobs()
621 with session_scope() as session:
622 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
623 assert len(jobs) == 2
624 payload_for_notification_email = jobs[0].payload
625 payload_for_confirmation_email_new_address = jobs[1].payload
626 uq_str1 = b"An email change to the email"
627 uq_str2 = (
628 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
629 )
630 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
631 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
632 )
634 push_collector.assert_user_has_single_matching(
635 user.id,
636 title="An email change was initiated on your account",
637 body=f"An email change to the email {new_email} was initiated on your account.",
638 )
641def test_ChangeLanguagePreference(db, fast_passwords):
642 # user changes from default to ISO 639-1 language code
643 newLanguageCode = "zh"
644 user, token = generate_user()
646 with real_account_session(token) as account:
647 res = account.GetAccountInfo(empty_pb2.Empty())
648 assert res.ui_language_preference == ""
650 # call will have info about the request
651 res, call = account.ChangeLanguagePreference.with_call(
652 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=newLanguageCode)
653 )
655 # cookies are sent via initial metadata, so we check for it there
656 for key, val in call.initial_metadata():
657 if key == "set-cookie":
658 # the value of "set-cookie" will be the full cookie string, pull the key value from the string
659 key_val = val.split(";")[0]
660 if key_val == "NEXT_LOCALE=zh":
661 # the changed language preference should also be sent to the backend
662 res = account.GetAccountInfo(empty_pb2.Empty())
663 assert res.ui_language_preference == "zh"
664 return
665 raise Exception(f"Didn't find right cookie, got {call.initial_metadata()}")
668def test_contributor_form(db):
669 user, token = generate_user()
671 with account_session(token) as account:
672 res = account.GetContributorFormInfo(empty_pb2.Empty())
673 assert not res.filled_contributor_form
675 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
677 res = account.GetContributorFormInfo(empty_pb2.Empty())
678 assert res.filled_contributor_form
681def test_DeleteAccount_start(db):
682 user, token = generate_user()
684 with account_session(token) as account:
685 with mock_notification_email() as mock:
686 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
687 mock.assert_called_once()
688 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
690 with session_scope() as session:
691 deletion_token: AccountDeletionToken = session.execute(
692 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
693 ).scalar_one()
695 assert deletion_token.is_valid
696 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted
699def test_DeleteAccount_message_storage(db):
700 user, token = generate_user()
702 with account_session(token) as account:
703 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
704 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
705 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
706 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
707 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
708 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
710 with session_scope() as session:
711 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
714def test_full_delete_account_with_recovery(db, push_collector):
715 user, token = generate_user()
716 user_id = user.id
718 with account_session(token) as account:
719 with pytest.raises(grpc.RpcError) as e:
720 account.DeleteAccount(account_pb2.DeleteAccountReq())
721 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
722 assert e.value.details() == "Please confirm your account deletion."
724 # Check the right email is sent
725 with mock_notification_email() as mock:
726 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
728 push_collector.assert_user_push_matches_fields(
729 user_id,
730 ix=0,
731 title="Account deletion initiated",
732 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.",
733 )
735 mock.assert_called_once()
736 e = email_fields(mock)
738 with session_scope() as session:
739 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
740 token = token_o.token
742 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
743 assert token_o.user == user_
744 assert not user_.is_deleted
745 assert not user_.undelete_token
746 assert not user_.undelete_until
748 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
749 assert e.recipient == user.email
750 assert "account deletion" in e.subject.lower()
751 assert token in e.plain
752 assert token in e.html
753 unique_string = "You requested that we delete your account from Couchers.org."
754 assert unique_string in e.plain
755 assert unique_string in e.html
756 url = f"http://localhost:3000/delete-account?token={token}"
757 assert url in e.plain
758 assert url in e.html
759 assert "support@couchers.org" in e.plain
760 assert "support@couchers.org" in e.html
762 with mock_notification_email() as mock:
763 with auth_api_session() as (auth_api, metadata_interceptor):
764 auth_api.ConfirmDeleteAccount(
765 auth_pb2.ConfirmDeleteAccountReq(
766 token=token,
767 )
768 )
770 push_collector.assert_user_push_matches_fields(
771 user_id,
772 ix=1,
773 title="Your Couchers.org account has been deleted",
774 body="You can still undo this by following the link we emailed to you within 7 days.",
775 )
777 mock.assert_called_once()
778 e = email_fields(mock)
780 with session_scope() as session:
781 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
783 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
784 assert user_.is_deleted
785 assert user_.undelete_token
786 assert user_.undelete_until > now()
788 undelete_token = user_.undelete_token
790 assert e.recipient == user.email
791 assert "account has been deleted" in e.subject.lower()
792 unique_string = "You have successfully deleted your account from Couchers.org."
793 assert unique_string in e.plain
794 assert unique_string in e.html
795 assert "7 days" in e.plain
796 assert "7 days" in e.html
797 url = f"http://localhost:3000/recover-account?token={undelete_token}"
798 assert url in e.plain
799 assert url in e.html
800 assert "support@couchers.org" in e.plain
801 assert "support@couchers.org" in e.html
803 with mock_notification_email() as mock:
804 with auth_api_session() as (auth_api, metadata_interceptor):
805 auth_api.RecoverAccount(
806 auth_pb2.RecoverAccountReq(
807 token=undelete_token,
808 )
809 )
811 push_collector.assert_user_push_matches_fields(
812 user_id,
813 ix=2,
814 title="Your Couchers.org account has been recovered!",
815 body="We have recovered your Couchers.org account as per your request! Welcome back!",
816 )
818 mock.assert_called_once()
819 e = email_fields(mock)
821 assert e.recipient == user.email
822 assert "account has been recovered" in e.subject.lower()
823 unique_string = "Your account on Couchers.org has been successfully recovered!"
824 assert unique_string in e.plain
825 assert unique_string in e.html
826 assert "support@couchers.org" in e.plain
827 assert "support@couchers.org" in e.html
829 with session_scope() as session:
830 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
832 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
833 assert not user.is_deleted
834 assert not user.undelete_token
835 assert not user.undelete_until
838def test_multiple_delete_tokens(db):
839 """
840 Make sure deletion tokens are deleted on delete
841 """
842 user, token = generate_user()
844 with account_session(token) as account:
845 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
846 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
847 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
849 with session_scope() as session:
850 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
851 token = session.execute(select(AccountDeletionToken).limit(1)).scalars().one_or_none().token
853 with auth_api_session() as (auth_api, metadata_interceptor):
854 auth_api.ConfirmDeleteAccount(
855 auth_pb2.ConfirmDeleteAccountReq(
856 token=token,
857 )
858 )
860 with session_scope() as session:
861 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
864def test_ListActiveSessions_pagination(db, fast_passwords):
865 password = random_hex()
866 user, token = generate_user(hashed_password=hash_password(password))
868 with auth_api_session() as (auth_api, metadata_interceptor):
869 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
870 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
871 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
872 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
874 with real_account_session(token) as account:
875 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
876 assert len(res.active_sessions) == 3
877 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
878 assert len(res.active_sessions) == 2
879 assert not res.next_page_token
882def test_ListActiveSessions_details(db, fast_passwords):
883 password = random_hex()
884 user, token = generate_user(hashed_password=hash_password(password))
886 ips_user_agents = [
887 (
888 "108.123.33.162",
889 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
890 ),
891 (
892 "8.245.212.28",
893 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
894 ),
895 (
896 "95.254.140.156",
897 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
898 ),
899 ]
901 for ip, user_agent in ips_user_agents:
902 options = (("grpc.primary_user_agent", user_agent),)
903 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
904 auth_api.Authenticate(
905 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
906 )
908 def dummy_geoip(ip_address):
909 return {
910 "108.123.33.162": "Chicago, United States",
911 "8.245.212.28": "Sydney, Australia",
912 }.get(ip_address)
914 with real_account_session(token) as account:
915 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
916 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
917 print(res)
918 assert len(res.active_sessions) == 4
920 # this one currently making the API call
921 assert res.active_sessions[0].operating_system == "Other"
922 assert res.active_sessions[0].browser == "Other"
923 assert res.active_sessions[0].device == "Other"
924 assert res.active_sessions[0].approximate_location == "Unknown"
925 assert res.active_sessions[0].is_current_session
927 assert res.active_sessions[1].operating_system == "Ubuntu"
928 assert res.active_sessions[1].browser == "Firefox"
929 assert res.active_sessions[1].device == "Other"
930 assert res.active_sessions[1].approximate_location == "Unknown"
931 assert not res.active_sessions[1].is_current_session
933 assert res.active_sessions[2].operating_system == "Android"
934 assert res.active_sessions[2].browser == "Samsung Internet"
935 assert res.active_sessions[2].device == "K"
936 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
937 assert not res.active_sessions[2].is_current_session
939 assert res.active_sessions[3].operating_system == "iOS"
940 assert res.active_sessions[3].browser == "Mobile Safari"
941 assert res.active_sessions[3].device == "iPhone"
942 assert res.active_sessions[3].approximate_location == "Chicago, United States"
943 assert not res.active_sessions[3].is_current_session
946def test_LogOutSession(db, fast_passwords):
947 password = random_hex()
948 user, token = generate_user(hashed_password=hash_password(password))
950 with auth_api_session() as (auth_api, metadata_interceptor):
951 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
952 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
953 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
954 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
956 with real_account_session(token) as account:
957 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
958 assert len(res.active_sessions) == 5
959 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
961 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
962 assert len(res2.active_sessions) == 4
964 # ignore the first session as it changes
965 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
968def test_LogOutOtherSessions(db, fast_passwords):
969 password = random_hex()
970 user, token = generate_user(hashed_password=hash_password(password))
972 with auth_api_session() as (auth_api, metadata_interceptor):
973 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
974 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
975 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
976 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
978 with real_account_session(token) as account:
979 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
980 assert len(res.active_sessions) == 5
981 with pytest.raises(grpc.RpcError) as e:
982 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
983 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
984 assert e.value.details() == "Please confirm you want to log out of other sessions."
986 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
987 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
988 assert len(res.active_sessions) == 1
991def test_CreateInviteCode(db):
992 user, token = generate_user()
994 with account_session(token) as account:
995 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq())
996 code = res.code
997 assert len(code) == 8
999 with session_scope() as session:
1000 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
1001 assert invite.creator_user_id == user.id
1002 assert invite.disabled is None
1003 assert res.url == urls.invite_code_link(code=res.code)
1006def test_DisableInviteCode(db):
1007 user, token = generate_user()
1008 code = "TEST1234"
1009 with session_scope() as session:
1010 session.add(InviteCode(id=code, creator_user_id=user.id))
1012 with account_session(token) as account:
1013 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code))
1015 with session_scope() as session:
1016 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
1017 assert invite.disabled is not None
1020def test_ListInviteCodes(db):
1021 user, token = generate_user()
1022 another_user, _ = generate_user()
1024 code = "LIST1234"
1025 with session_scope() as session:
1026 session.add(InviteCode(id=code, creator_user_id=user.id))
1027 session.execute(update(User).where(User.id == another_user.id).values(invite_code_id=code))
1029 with account_session(token) as account:
1030 res = account.ListInviteCodes(empty_pb2.Empty())
1031 assert len(res.invite_codes) == 1
1032 assert res.invite_codes[0].code == code
1033 assert res.invite_codes[0].uses == 1
1034 assert res.invite_codes[0].url == urls.invite_code_link(code=code)
1037def test_reminders(db, moderator):
1038 # the strong verification reminder's absence is tested in test_strong_verification.py
1039 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite
1040 # we use LiteUser, so remember to refresh materialized views
1041 user, token = generate_user(complete_profile=False)
1042 complete_user, complete_token = generate_user(complete_profile=True)
1043 req_user1, req_user_token1 = generate_user(complete_profile=True)
1044 req_user2, req_user_token2 = generate_user(complete_profile=True)
1046 refresh_materialized_views_rapid(None)
1047 with account_session(complete_token) as account:
1048 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1049 "complete_verification_reminder"
1050 ]
1051 with account_session(token) as account:
1052 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1053 "complete_profile_reminder",
1054 "complete_verification_reminder",
1055 ]
1057 today_plus_2 = (today() + timedelta(days=2)).isoformat()
1058 today_plus_3 = (today() + timedelta(days=3)).isoformat()
1059 with requests_session(req_user_token1) as api:
1060 host_request1_id = api.CreateHostRequest(
1061 requests_pb2.CreateHostRequestReq(
1062 host_user_id=user.id,
1063 from_date=today_plus_2,
1064 to_date=today_plus_3,
1065 text=valid_request_text("Test request 1"),
1066 )
1067 ).host_request_id
1068 moderator.approve_host_request(host_request1_id)
1070 with account_session(token) as account:
1071 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1072 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1073 "respond_to_host_request_reminder",
1074 "complete_profile_reminder",
1075 "complete_verification_reminder",
1076 ]
1077 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1078 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1080 with requests_session(req_user_token2) as api:
1081 host_request2_id = api.CreateHostRequest(
1082 requests_pb2.CreateHostRequestReq(
1083 host_user_id=user.id,
1084 from_date=today_plus_2,
1085 to_date=today_plus_3,
1086 text=valid_request_text("Test request 2"),
1087 )
1088 ).host_request_id
1089 moderator.approve_host_request(host_request2_id)
1091 refresh_materialized_views_rapid(None)
1092 with account_session(token) as account:
1093 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1094 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1095 "respond_to_host_request_reminder",
1096 "respond_to_host_request_reminder",
1097 "complete_profile_reminder",
1098 "complete_verification_reminder",
1099 ]
1100 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1101 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1102 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1103 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1105 with requests_session(req_user_token1) as api:
1106 host_request3_id = api.CreateHostRequest(
1107 requests_pb2.CreateHostRequestReq(
1108 host_user_id=user.id,
1109 from_date=today_plus_2,
1110 to_date=today_plus_3,
1111 text=valid_request_text("Test request 3"),
1112 )
1113 ).host_request_id
1114 moderator.approve_host_request(host_request3_id)
1116 refresh_materialized_views_rapid(None)
1117 with account_session(token) as account:
1118 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1119 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1120 "respond_to_host_request_reminder",
1121 "respond_to_host_request_reminder",
1122 "respond_to_host_request_reminder",
1123 "complete_profile_reminder",
1124 "complete_verification_reminder",
1125 ]
1126 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1127 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1128 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1129 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1130 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id
1131 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1133 # accept req
1134 with requests_session(token) as api:
1135 api.RespondHostRequest(
1136 requests_pb2.RespondHostRequestReq(
1137 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED
1138 )
1139 )
1141 refresh_materialized_views_rapid(None)
1142 with account_session(token) as account:
1143 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1144 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1145 "respond_to_host_request_reminder",
1146 "respond_to_host_request_reminder",
1147 "complete_profile_reminder",
1148 "complete_verification_reminder",
1149 ]
1150 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id
1151 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1152 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id
1153 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1156def test_volunteer_stuff(db):
1157 # taken from couchers/app/backend/resources/badges.json
1158 board_member_id = 8347
1160 # with password
1161 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam", id=board_member_id)
1163 with account_session(token) as account:
1164 res = account.GetAccountInfo(empty_pb2.Empty())
1165 assert not res.is_volunteer
1167 with pytest.raises(grpc.RpcError) as e:
1168 account.GetMyVolunteerInfo(empty_pb2.Empty())
1169 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1170 assert (
1171 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us."
1172 )
1174 with pytest.raises(grpc.RpcError) as e:
1175 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq())
1176 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1177 assert (
1178 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us."
1179 )
1181 with session_scope() as session:
1182 session.add(
1183 Volunteer(
1184 user_id=user.id,
1185 display_name="Great Volunteer",
1186 display_location="The Bitbucket",
1187 role="Lead Tester",
1188 started_volunteering=date(2020, 6, 1),
1189 show_on_team_page=True,
1190 )
1191 )
1193 with account_session(token) as account:
1194 res = account.GetAccountInfo(empty_pb2.Empty())
1195 assert res.is_volunteer
1197 res = account.GetMyVolunteerInfo(empty_pb2.Empty())
1199 assert res.display_name == "Great Volunteer"
1200 assert res.display_location == "The Bitbucket"
1201 assert res.role == "Lead Tester"
1202 assert res.started_volunteering == "2020-06-01"
1203 assert not res.stopped_volunteering
1204 assert res.show_on_team_page
1205 assert res.link_type == "couchers"
1206 assert res.link_text == "@tester"
1207 assert res.link_url == "http://localhost:3000/user/tester"
1209 res = account.UpdateMyVolunteerInfo(
1210 account_pb2.UpdateMyVolunteerInfoReq(
1211 display_name=wrappers_pb2.StringValue(value=""),
1212 link_type=wrappers_pb2.StringValue(value="website"),
1213 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"),
1214 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"),
1215 )
1216 )
1218 assert res.display_name == ""
1219 assert res.display_location == "The Bitbucket"
1220 assert res.role == "Lead Tester"
1221 assert res.started_volunteering == "2020-06-01"
1222 assert not res.stopped_volunteering
1223 assert res.show_on_team_page
1224 assert res.link_type == "website"
1225 assert res.link_text == "testervontester.com.invalid"
1226 assert res.link_url == "https://www.testervontester.com.invalid/"
1227 res = account.UpdateMyVolunteerInfo(
1228 account_pb2.UpdateMyVolunteerInfoReq(
1229 display_name=wrappers_pb2.StringValue(value=""),
1230 link_type=wrappers_pb2.StringValue(value="linkedin"),
1231 link_text=wrappers_pb2.StringValue(value="tester-vontester"),
1232 )
1233 )
1234 assert res.display_name == ""
1235 assert res.display_location == "The Bitbucket"
1236 assert res.role == "Lead Tester"
1237 assert res.started_volunteering == "2020-06-01"
1238 assert not res.stopped_volunteering
1239 assert res.show_on_team_page
1240 assert res.link_type == "linkedin"
1241 assert res.link_text == "tester-vontester"
1242 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/"
1244 res = account.UpdateMyVolunteerInfo(
1245 account_pb2.UpdateMyVolunteerInfoReq(
1246 display_name=wrappers_pb2.StringValue(value="Tester"),
1247 display_location=wrappers_pb2.StringValue(value=""),
1248 link_type=wrappers_pb2.StringValue(value="email"),
1249 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"),
1250 )
1251 )
1252 assert res.display_name == "Tester"
1253 assert res.display_location == ""
1254 assert res.role == "Lead Tester"
1255 assert res.started_volunteering == "2020-06-01"
1256 assert not res.stopped_volunteering
1257 assert res.show_on_team_page
1258 assert res.link_type == "email"
1259 assert res.link_text == "tester@vontester.com.invalid"
1260 assert res.link_url == "mailto:tester@vontester.com.invalid"
1262 refresh_materialized_views_rapid(None)
1264 with public_session() as public:
1265 res = public.GetVolunteers(empty_pb2.Empty())
1266 assert len(res.current_volunteers) == 1
1267 v = res.current_volunteers[0]
1268 assert v.name == "Tester"
1269 assert v.username == "tester"
1270 assert v.is_board_member
1271 assert v.role == "Lead Tester"
1272 assert v.location == "Amsterdam"
1273 assert v.img.startswith("http://localhost:5001/img/thumbnail/")
1274 assert v.link_type == "email"
1275 assert v.link_text == "tester@vontester.com.invalid"
1276 assert v.link_url == "mailto:tester@vontester.com.invalid"