Coverage for app / backend / src / tests / test_account.py: 100%
698 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1from datetime import UTC, date, datetime, timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2, wrappers_pb2
7from sqlalchemy import select, update
8from sqlalchemy.sql import func
10from couchers import urls
11from couchers.crypto import hash_password, random_hex
12from couchers.db import session_scope
13from couchers.materialized_views import refresh_materialized_views_rapid
14from couchers.models import (
15 AccountDeletionReason,
16 AccountDeletionToken,
17 BackgroundJob,
18 InviteCode,
19 PhotoGalleryItem,
20 Upload,
21 User,
22)
23from couchers.proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2
24from couchers.utils import now, today
25from tests.fixtures.db import generate_user, make_volunteer
26from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email, process_jobs
27from tests.fixtures.sessions import (
28 account_session,
29 auth_api_session,
30 public_session,
31 real_account_session,
32 requests_session,
33)
34from tests.test_requests import valid_request_text
37@pytest.fixture(autouse=True)
38def _(testconfig):
39 pass
42def test_GetAccountInfo(db, fast_passwords):
43 # with password
44 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
46 with account_session(token1) as account:
47 res = account.GetAccountInfo(empty_pb2.Empty())
48 assert res.email == "user@couchers.invalid"
49 assert res.username == user1.username
50 assert not res.has_strong_verification
51 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
52 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
53 assert not res.is_superuser
54 assert res.ui_language_preference == ""
55 assert not res.is_volunteer
58def test_donation_banner_no_drive(db):
59 """Test that the banner is not shown when DONATION_DRIVE_START is None"""
60 # User has donated, but the drive is disabled, so the banner should not show
61 user, token = generate_user()
63 with patch("couchers.servicers.account.DONATION_DRIVE_START", None):
64 with account_session(token) as account:
65 res = account.GetAccountInfo(empty_pb2.Empty())
66 assert not res.should_show_donation_banner
69def test_donation_banner_never_donated(db):
70 """Test that banner is shown when user has never donated and drive is active"""
71 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
73 # Explicitly set last_donated=None since generate_user defaults to now()
74 user, token = generate_user(last_donated=None)
76 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start):
77 with account_session(token) as account:
78 res = account.GetAccountInfo(empty_pb2.Empty())
79 assert res.should_show_donation_banner
82def test_donation_banner_donated_before_drive(db):
83 """Test that banner is shown when user donated before drive start"""
84 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
86 user, token = generate_user()
88 # Set donation before drive start
89 with session_scope() as session:
90 last_donated = datetime(2025, 10, 15, tzinfo=UTC) # Before Nov 1
91 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated))
93 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start):
94 with account_session(token) as account:
95 res = account.GetAccountInfo(empty_pb2.Empty())
96 assert res.should_show_donation_banner
99def test_donation_banner_donated_after_drive(db):
100 """Test that banner is not shown when user donated after drive start"""
101 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
103 user, token = generate_user()
105 # Set donation after drive start
106 with session_scope() as session:
107 last_donated = datetime(2025, 11, 15, tzinfo=UTC) # After Nov 1
108 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated))
110 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start):
111 with account_session(token) as account:
112 res = account.GetAccountInfo(empty_pb2.Empty())
113 assert not res.should_show_donation_banner
116def test_donation_banner_donated_exactly_at_drive_start(db):
117 """Test that banner is not shown when user donated exactly at drive start time"""
118 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
120 user, token = generate_user()
122 # Set donation exactly at drive start
123 with session_scope() as session:
124 session.execute(update(User).where(User.id == user.id).values(last_donated=drive_start))
126 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start):
127 with account_session(token) as account:
128 res = account.GetAccountInfo(empty_pb2.Empty())
129 assert not res.should_show_donation_banner
132def test_GetAccountInfo_regression(db):
133 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
134 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
135 user, token = generate_user(about_me=None, complete_profile=False)
137 # add an avatar photo to the user's profile gallery
138 with session_scope() as session:
139 key = random_hex(32)
140 filename = random_hex(32) + ".jpg"
141 session.add(
142 Upload(
143 key=key,
144 filename=filename,
145 creator_user_id=user.id,
146 )
147 )
148 session.flush()
149 assert user.profile_gallery_id is not None
150 session.add(
151 PhotoGalleryItem(
152 gallery_id=user.profile_gallery_id,
153 upload_key=key,
154 position=0,
155 )
156 )
158 with account_session(token) as account:
159 res = account.GetAccountInfo(empty_pb2.Empty())
162def test_ChangePasswordV2_normal(db, fast_passwords, push_collector: PushCollector):
163 # user has old password and is changing to new password
164 old_password = random_hex()
165 new_password = random_hex()
166 user, token = generate_user(hashed_password=hash_password(old_password))
168 with account_session(token) as account:
169 with mock_notification_email() as mock:
170 account.ChangePasswordV2(
171 account_pb2.ChangePasswordV2Req(
172 old_password=old_password,
173 new_password=new_password,
174 )
175 )
177 mock.assert_called_once()
178 assert email_fields(mock).subject == "[TEST] Your password was changed"
180 push = push_collector.pop_for_user(user.id, last=True)
181 assert push.content.title == "Password changed"
182 assert push.content.body == "Your password was changed."
184 with session_scope() as session:
185 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
186 assert updated_user.hashed_password == hash_password(new_password)
189def test_ChangePasswordV2_regression(db, fast_passwords):
190 # send_password_changed_email wasn't working
191 # user has old password and is changing to new password
192 old_password = random_hex()
193 new_password = random_hex()
194 user, token = generate_user(hashed_password=hash_password(old_password))
196 with account_session(token) as account:
197 account.ChangePasswordV2(
198 account_pb2.ChangePasswordV2Req(
199 old_password=old_password,
200 new_password=new_password,
201 )
202 )
204 with session_scope() as session:
205 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
206 assert updated_user.hashed_password == hash_password(new_password)
209def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
210 # user has old password and is changing to new password, but used short password
211 old_password = random_hex()
212 new_password = random_hex(length=1)
213 user, token = generate_user(hashed_password=hash_password(old_password))
215 with account_session(token) as account:
216 with pytest.raises(grpc.RpcError) as e:
217 account.ChangePasswordV2(
218 account_pb2.ChangePasswordV2Req(
219 old_password=old_password,
220 new_password=new_password,
221 )
222 )
223 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
224 assert e.value.details() == "The password must be 8 or more characters long."
226 with session_scope() as session:
227 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
228 assert updated_user.hashed_password == hash_password(old_password)
231def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
232 # user has old password and is changing to new password, but used short password
233 old_password = random_hex()
234 new_password = random_hex(length=1000)
235 user, token = generate_user(hashed_password=hash_password(old_password))
237 with account_session(token) as account:
238 with pytest.raises(grpc.RpcError) as e:
239 account.ChangePasswordV2(
240 account_pb2.ChangePasswordV2Req(
241 old_password=old_password,
242 new_password=new_password,
243 )
244 )
245 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
246 assert e.value.details() == "The password must be less than 256 characters."
248 with session_scope() as session:
249 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
250 assert updated_user.hashed_password == hash_password(old_password)
253def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
254 # user has old password and is changing to new password, but used insecure password
255 old_password = random_hex()
256 new_password = "12345678"
257 user, token = generate_user(hashed_password=hash_password(old_password))
259 with account_session(token) as account:
260 with pytest.raises(grpc.RpcError) as e:
261 account.ChangePasswordV2(
262 account_pb2.ChangePasswordV2Req(
263 old_password=old_password,
264 new_password=new_password,
265 )
266 )
267 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
268 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable."
270 with session_scope() as session:
271 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
272 assert updated_user.hashed_password == hash_password(old_password)
275def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
276 # user has old password and is changing to new password, but used wrong old password
277 old_password = random_hex()
278 new_password = random_hex()
279 user, token = generate_user(hashed_password=hash_password(old_password))
281 with account_session(token) as account:
282 with pytest.raises(grpc.RpcError) as e:
283 account.ChangePasswordV2(
284 account_pb2.ChangePasswordV2Req(
285 old_password="wrong password",
286 new_password=new_password,
287 )
288 )
289 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
290 assert e.value.details() == "Wrong password."
292 with session_scope() as session:
293 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
294 assert updated_user.hashed_password == hash_password(old_password)
297def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
298 # user has old password and called with empty body
299 old_password = random_hex()
300 user, token = generate_user(hashed_password=hash_password(old_password))
302 with account_session(token) as account:
303 with pytest.raises(grpc.RpcError) as e:
304 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
305 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
306 assert e.value.details() == "The password must be 8 or more characters long."
308 with session_scope() as session:
309 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
310 assert updated_user.hashed_password == hash_password(old_password)
313def test_ChangeEmailV2_wrong_password(db, fast_passwords):
314 password = random_hex()
315 new_email = f"{random_hex()}@couchers.org.invalid"
316 user, token = generate_user(hashed_password=hash_password(password))
318 with account_session(token) as account:
319 with pytest.raises(grpc.RpcError) as e:
320 account.ChangeEmailV2(
321 account_pb2.ChangeEmailV2Req(
322 password="wrong password",
323 new_email=new_email,
324 )
325 )
326 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
327 assert e.value.details() == "Wrong password."
329 with session_scope() as session:
330 assert (
331 session.execute(
332 select(func.count())
333 .select_from(User)
334 .where(User.new_email_token_created <= func.now())
335 .where(User.new_email_token_expiry >= func.now())
336 )
337 ).scalar_one() == 0
340def test_ChangeEmailV2_wrong_email(db, fast_passwords):
341 password = random_hex()
342 new_email = f"{random_hex()}@couchers.org.invalid"
343 user, token = generate_user(hashed_password=hash_password(password))
345 with account_session(token) as account:
346 with pytest.raises(grpc.RpcError) as e:
347 account.ChangeEmailV2(
348 account_pb2.ChangeEmailV2Req(
349 password="wrong password",
350 new_email=new_email,
351 )
352 )
353 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
354 assert e.value.details() == "Wrong password."
356 with session_scope() as session:
357 assert (
358 session.execute(
359 select(func.count())
360 .select_from(User)
361 .where(User.new_email_token_created <= func.now())
362 .where(User.new_email_token_expiry >= func.now())
363 )
364 ).scalar_one() == 0
367def test_ChangeEmailV2_invalid_email(db, fast_passwords):
368 password = random_hex()
369 user, token = generate_user(hashed_password=hash_password(password))
371 with account_session(token) as account:
372 with pytest.raises(grpc.RpcError) as e:
373 account.ChangeEmailV2(
374 account_pb2.ChangeEmailV2Req(
375 password=password,
376 new_email="not a real email",
377 )
378 )
379 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
380 assert e.value.details() == "Invalid email."
382 with session_scope() as session:
383 assert (
384 session.execute(
385 select(func.count())
386 .select_from(User)
387 .where(User.new_email_token_created <= func.now())
388 .where(User.new_email_token_expiry >= func.now())
389 )
390 ).scalar_one() == 0
393def test_ChangeEmailV2_email_in_use(db, fast_passwords):
394 password = random_hex()
395 user, token = generate_user(hashed_password=hash_password(password))
396 user2, token2 = generate_user(hashed_password=hash_password(password))
398 with account_session(token) as account:
399 with pytest.raises(grpc.RpcError) as e:
400 account.ChangeEmailV2(
401 account_pb2.ChangeEmailV2Req(
402 password=password,
403 new_email=user2.email,
404 )
405 )
406 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
407 assert e.value.details() == "Invalid email."
409 with session_scope() as session:
410 assert (
411 session.execute(
412 select(func.count())
413 .select_from(User)
414 .where(User.new_email_token_created <= func.now())
415 .where(User.new_email_token_expiry >= func.now())
416 )
417 ).scalar_one() == 0
420def test_ChangeEmailV2_no_change(db, fast_passwords):
421 password = random_hex()
422 user, token = generate_user(hashed_password=hash_password(password))
424 with account_session(token) as account:
425 with pytest.raises(grpc.RpcError) as e:
426 account.ChangeEmailV2(
427 account_pb2.ChangeEmailV2Req(
428 password=password,
429 new_email=user.email,
430 )
431 )
432 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
433 assert e.value.details() == "Invalid email."
435 with session_scope() as session:
436 assert (
437 session.execute(
438 select(func.count())
439 .select_from(User)
440 .where(User.new_email_token_created <= func.now())
441 .where(User.new_email_token_expiry >= func.now())
442 )
443 ).scalar_one() == 0
446def test_ChangeEmailV2_wrong_token(db, fast_passwords):
447 password = random_hex()
448 new_email = f"{random_hex()}@couchers.org.invalid"
449 user, token = generate_user(hashed_password=hash_password(password))
451 with account_session(token) as account:
452 account.ChangeEmailV2(
453 account_pb2.ChangeEmailV2Req(
454 password=password,
455 new_email=new_email,
456 )
457 )
459 with auth_api_session() as (auth_api, metadata_interceptor):
460 with pytest.raises(grpc.RpcError) as e:
461 res = auth_api.ConfirmChangeEmailV2(
462 auth_pb2.ConfirmChangeEmailV2Req(
463 change_email_token="wrongtoken",
464 )
465 )
466 assert e.value.code() == grpc.StatusCode.NOT_FOUND
467 assert e.value.details() == "Invalid token."
469 with session_scope() as session:
470 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
471 assert user_updated.email == user.email
474def test_ChangeEmailV2_tokens_two_hour_window(db):
475 def two_hours_one_minute_in_future():
476 return now() + timedelta(hours=2, minutes=1)
478 def one_minute_ago():
479 return now() - timedelta(minutes=1)
481 password = random_hex()
482 new_email = f"{random_hex()}@couchers.org.invalid"
483 user, token = generate_user(hashed_password=hash_password(password))
485 with account_session(token) as account:
486 account.ChangeEmailV2(
487 account_pb2.ChangeEmailV2Req(
488 password=password,
489 new_email=new_email,
490 )
491 )
493 with session_scope() as session:
494 new_email_token = session.execute(select(User.new_email_token).where(User.id == user.id)).scalar_one()
496 with patch("couchers.servicers.auth.now", one_minute_ago):
497 with auth_api_session() as (auth_api, metadata_interceptor):
498 with pytest.raises(grpc.RpcError) as e:
499 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
500 assert e.value.code() == grpc.StatusCode.NOT_FOUND
501 assert e.value.details() == "Invalid token."
503 with pytest.raises(grpc.RpcError) as e:
504 auth_api.ConfirmChangeEmailV2(
505 auth_pb2.ConfirmChangeEmailV2Req(
506 change_email_token=new_email_token,
507 )
508 )
509 assert e.value.code() == grpc.StatusCode.NOT_FOUND
510 assert e.value.details() == "Invalid token."
512 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
513 with auth_api_session() as (auth_api, metadata_interceptor):
514 with pytest.raises(grpc.RpcError) as e:
515 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
516 assert e.value.code() == grpc.StatusCode.NOT_FOUND
517 assert e.value.details() == "Invalid token."
519 with pytest.raises(grpc.RpcError) as e:
520 auth_api.ConfirmChangeEmailV2(
521 auth_pb2.ConfirmChangeEmailV2Req(
522 change_email_token=new_email_token,
523 )
524 )
525 assert e.value.code() == grpc.StatusCode.NOT_FOUND
526 assert e.value.details() == "Invalid token."
529def test_ChangeEmailV2(db, fast_passwords, push_collector: PushCollector):
530 password = random_hex()
531 new_email = f"{random_hex()}@couchers.org.invalid"
532 user, token = generate_user(hashed_password=hash_password(password))
533 user_id = user.id
535 with account_session(token) as account:
536 account.ChangeEmailV2(
537 account_pb2.ChangeEmailV2Req(
538 password=password,
539 new_email=new_email,
540 )
541 )
543 with session_scope() as session:
544 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
545 assert user_updated.email == user.email
546 assert user_updated.new_email == new_email
547 assert user_updated.new_email_token is not None
548 assert user_updated.new_email_token_created
549 assert user_updated.new_email_token_created <= now()
550 assert user_updated.new_email_token_expiry
551 assert user_updated.new_email_token_expiry >= now()
553 token = user_updated.new_email_token
555 process_jobs()
556 push = push_collector.pop_for_user(user_id, last=True)
557 assert push.content.title == "Email change requested"
558 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address."
560 with auth_api_session() as (auth_api, metadata_interceptor):
561 auth_api.ConfirmChangeEmailV2(
562 auth_pb2.ConfirmChangeEmailV2Req(
563 change_email_token=token,
564 )
565 )
567 with session_scope() as session:
568 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
569 assert user.email == new_email
570 assert user.new_email is None
571 assert user.new_email_token is None
572 assert user.new_email_token_created is None
573 assert user.new_email_token_expiry is None
575 process_jobs()
576 push = push_collector.pop_for_user(user_id, last=True)
577 assert push.content.title == "Email verified"
578 assert push.content.body == "Your new email address has been verified."
581def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector: PushCollector):
582 password = random_hex()
583 new_email = f"{random_hex()}@couchers.org.invalid"
584 user, token = generate_user(hashed_password=hash_password(password))
586 with account_session(token) as account:
587 account.ChangeEmailV2(
588 account_pb2.ChangeEmailV2Req(
589 password=password,
590 new_email=new_email,
591 )
592 )
594 process_jobs()
596 with session_scope() as session:
597 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
598 assert len(jobs) == 2
599 uq_str1 = b"An email change to the email"
600 uq_str2 = (
601 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
602 )
603 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
604 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
605 )
607 push = push_collector.pop_for_user(user.id, last=True)
608 assert push.content.title == "Email change requested"
609 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address."
612def test_ChangeLanguagePreference(db, fast_passwords):
613 # user changes from default to ISO 639-1 language code
614 new_lang = "zh"
615 user, token = generate_user()
617 with real_account_session(token) as account:
618 res = account.GetAccountInfo(empty_pb2.Empty())
619 assert res.ui_language_preference == ""
621 # call will have info about the request
622 res, call = account.ChangeLanguagePreference.with_call(
623 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=new_lang)
624 )
626 # cookies are sent via initial metadata, so we check for it there
627 # the value of "set-cookie" will be the full cookie string, pull the key value from the string
628 cookie_values = [v.split(";")[0] for k, v in call.initial_metadata() if k == "set-cookie"]
629 assert any(val == "NEXT_LOCALE=zh" for val in cookie_values), (
630 f"Didn't find the right cookie, got {call.initial_metadata()}"
631 )
633 # the changed language preference should also be sent to the backend
634 res = account.GetAccountInfo(empty_pb2.Empty())
635 assert res.ui_language_preference == "zh"
638def test_contributor_form(db):
639 user, token = generate_user()
641 with account_session(token) as account:
642 res = account.GetContributorFormInfo(empty_pb2.Empty())
643 assert not res.filled_contributor_form
645 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
647 res = account.GetContributorFormInfo(empty_pb2.Empty())
648 assert res.filled_contributor_form
651def test_DeleteAccount_start(db):
652 user, token = generate_user()
654 with account_session(token) as account:
655 with mock_notification_email() as mock:
656 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
657 mock.assert_called_once()
658 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
660 with session_scope() as session:
661 deletion_token: AccountDeletionToken = session.execute(
662 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
663 ).scalar_one()
665 assert deletion_token.is_valid
666 assert session.execute(select(User).where(User.id == user.id)).scalar_one().deleted_at is None
669def test_DeleteAccount_message_storage(db):
670 user, token = generate_user()
672 with account_session(token) as account:
673 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
674 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
675 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
676 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
677 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
678 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
680 with session_scope() as session:
681 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
684def test_full_delete_account_with_recovery(db, push_collector: PushCollector):
685 user, token = generate_user()
686 user_id = user.id
688 with account_session(token) as account:
689 with pytest.raises(grpc.RpcError) as err:
690 account.DeleteAccount(account_pb2.DeleteAccountReq())
691 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION
692 assert err.value.details() == "Please confirm your account deletion."
694 # Check the right email is sent
695 with mock_notification_email() as mock:
696 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
698 push = push_collector.pop_for_user(user_id, last=True)
699 assert push.content.title == "Account deletion requested"
700 assert push.content.body == "Use the link we emailed you to confirm."
702 mock.assert_called_once()
703 e = email_fields(mock)
705 with session_scope() as session:
706 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
707 token = token_o.token
709 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
710 assert token_o.user == user_
711 assert user_.deleted_at is None
712 assert not user_.undelete_token
713 assert not user_.undelete_until
715 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion"
716 assert e.recipient == user.email
717 assert "account deletion" in e.subject.lower()
718 assert token in e.plain
719 assert token in e.html
720 unique_string = "You requested that we delete your account from Couchers.org."
721 assert unique_string in e.plain
722 assert unique_string in e.html
723 url = f"http://localhost:3000/delete-account?token={token}"
724 assert url in e.plain
725 assert url in e.html
726 assert "support@couchers.org" in e.plain
727 assert "support@couchers.org" in e.html
729 with mock_notification_email() as mock:
730 with auth_api_session() as (auth_api, metadata_interceptor):
731 auth_api.ConfirmDeleteAccount(
732 auth_pb2.ConfirmDeleteAccountReq(
733 token=token,
734 )
735 )
737 push = push_collector.pop_for_user(user_id, last=True)
738 assert push.content.title == "Account deleted"
739 assert push.content.body == "You can restore it within 7 days using the link we emailed you."
741 mock.assert_called_once()
742 e = email_fields(mock)
744 with session_scope() as session:
745 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
747 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
748 assert user_.deleted_at is not None
749 assert user_.undelete_token
750 assert user_.undelete_until
751 assert user_.undelete_until > now()
753 undelete_token = user_.undelete_token
755 assert e.recipient == user.email
756 assert "account has been deleted" in e.subject.lower()
757 unique_string = "You have successfully deleted your account from Couchers.org."
758 assert unique_string in e.plain
759 assert unique_string in e.html
760 assert "7 days" in e.plain
761 assert "7 days" in e.html
762 url = f"http://localhost:3000/recover-account?token={undelete_token}"
763 assert url in e.plain
764 assert url in e.html
765 assert "support@couchers.org" in e.plain
766 assert "support@couchers.org" in e.html
768 with mock_notification_email() as mock:
769 with auth_api_session() as (auth_api, metadata_interceptor):
770 auth_api.RecoverAccount(
771 auth_pb2.RecoverAccountReq(
772 token=undelete_token,
773 )
774 )
776 push = push_collector.pop_for_user(user_id, last=True)
777 assert push.content.title == "Account restored"
778 assert push.content.body == "Welcome back!"
780 mock.assert_called_once()
781 e = email_fields(mock)
783 assert e.recipient == user.email
784 assert "account has been recovered" in e.subject.lower()
785 unique_string = "Your account on Couchers.org has been successfully recovered!"
786 assert unique_string in e.plain
787 assert unique_string in e.html
788 assert "support@couchers.org" in e.plain
789 assert "support@couchers.org" in e.html
791 with session_scope() as session:
792 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
794 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
795 assert user.deleted_at is None
796 assert not user.undelete_token
797 assert not user.undelete_until
800def test_multiple_delete_tokens(db):
801 """
802 Make sure deletion tokens are deleted on delete
803 """
804 user, token = generate_user()
806 with account_session(token) as account:
807 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
808 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
809 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
811 with session_scope() as session:
812 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
813 token = session.execute(select(AccountDeletionToken.token).limit(1)).scalar_one()
815 with auth_api_session() as (auth_api, metadata_interceptor):
816 auth_api.ConfirmDeleteAccount(
817 auth_pb2.ConfirmDeleteAccountReq(
818 token=token,
819 )
820 )
822 with session_scope() as session:
823 assert not session.execute(select(AccountDeletionToken.token)).scalar_one_or_none()
826def test_ListActiveSessions_pagination(db, fast_passwords):
827 password = random_hex()
828 user, token = generate_user(hashed_password=hash_password(password))
830 with auth_api_session() as (auth_api, metadata_interceptor):
831 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
832 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
833 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
834 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
836 with real_account_session(token) as account:
837 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
838 assert len(res.active_sessions) == 3
839 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
840 assert len(res.active_sessions) == 2
841 assert not res.next_page_token
844def test_ListActiveSessions_details(db, fast_passwords):
845 password = random_hex()
846 user, token = generate_user(hashed_password=hash_password(password))
848 ips_user_agents = [
849 (
850 "108.123.33.162",
851 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
852 ),
853 (
854 "8.245.212.28",
855 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
856 ),
857 (
858 "95.254.140.156",
859 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
860 ),
861 ]
863 for ip, user_agent in ips_user_agents:
864 options = (("grpc.primary_user_agent", user_agent),)
865 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
866 auth_api.Authenticate(
867 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
868 )
870 def dummy_geoip(ip_address):
871 return {
872 "108.123.33.162": "Chicago, United States",
873 "8.245.212.28": "Sydney, Australia",
874 }.get(ip_address)
876 with real_account_session(token) as account:
877 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
878 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
879 print(res)
880 assert len(res.active_sessions) == 4
882 # this one currently making the API call
883 assert res.active_sessions[0].operating_system == "Other"
884 assert res.active_sessions[0].browser == "Other"
885 assert res.active_sessions[0].device == "Other"
886 assert res.active_sessions[0].approximate_location == "Unknown"
887 assert res.active_sessions[0].is_current_session
889 assert res.active_sessions[1].operating_system == "Ubuntu"
890 assert res.active_sessions[1].browser == "Firefox"
891 assert res.active_sessions[1].device == "Other"
892 assert res.active_sessions[1].approximate_location == "Unknown"
893 assert not res.active_sessions[1].is_current_session
895 assert res.active_sessions[2].operating_system == "Android"
896 assert res.active_sessions[2].browser == "Samsung Internet"
897 assert res.active_sessions[2].device == "K"
898 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
899 assert not res.active_sessions[2].is_current_session
901 assert res.active_sessions[3].operating_system == "iOS"
902 assert res.active_sessions[3].browser == "Mobile Safari"
903 assert res.active_sessions[3].device == "iPhone"
904 assert res.active_sessions[3].approximate_location == "Chicago, United States"
905 assert not res.active_sessions[3].is_current_session
908def test_LogOutSession(db, fast_passwords):
909 password = random_hex()
910 user, token = generate_user(hashed_password=hash_password(password))
912 with auth_api_session() as (auth_api, metadata_interceptor):
913 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
914 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
915 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
916 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
918 with real_account_session(token) as account:
919 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
920 assert len(res.active_sessions) == 5
921 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
923 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
924 assert len(res2.active_sessions) == 4
926 # ignore the first session as it changes
927 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
930def test_LogOutOtherSessions(db, fast_passwords):
931 password = random_hex()
932 user, token = generate_user(hashed_password=hash_password(password))
934 with auth_api_session() as (auth_api, metadata_interceptor):
935 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
936 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
937 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
938 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
940 with real_account_session(token) as account:
941 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
942 assert len(res.active_sessions) == 5
943 with pytest.raises(grpc.RpcError) as e:
944 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
945 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
946 assert e.value.details() == "Please confirm you want to log out of other sessions."
948 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
949 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
950 assert len(res.active_sessions) == 1
953def test_CreateInviteCode(db):
954 user, token = generate_user()
956 with account_session(token) as account:
957 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq())
958 code = res.code
959 assert len(code) == 8
961 with session_scope() as session:
962 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
963 assert invite.creator_user_id == user.id
964 assert invite.disabled is None
965 assert res.url == urls.invite_code_link(code=res.code)
968def test_DisableInviteCode(db):
969 user, token = generate_user()
971 with account_session(token) as account:
972 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
973 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code))
975 with session_scope() as session:
976 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
977 assert invite.disabled is not None
980def test_ListInviteCodes(db):
981 user, token = generate_user()
982 another_user, _ = generate_user()
984 with account_session(token) as account:
985 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
987 # simulate another_user having signed up with this invite code
988 with session_scope() as session:
989 session.execute(update(User).where(User.id == another_user.id).values(invite_code_id=code))
991 with account_session(token) as account:
992 res = account.ListInviteCodes(empty_pb2.Empty())
993 assert len(res.invite_codes) == 1
994 assert res.invite_codes[0].code == code
995 assert res.invite_codes[0].uses == 1
996 assert res.invite_codes[0].url == urls.invite_code_link(code=code)
999def test_reminders(db, moderator):
1000 # the strong verification reminder's absence is tested in test_strong_verification.py
1001 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite
1002 # we use LiteUser, so remember to refresh materialized views
1003 user, token = generate_user(complete_profile=False)
1004 complete_user, complete_token = generate_user(complete_profile=True)
1005 req_user1, req_user_token1 = generate_user(complete_profile=True)
1006 req_user2, req_user_token2 = generate_user(complete_profile=True)
1008 refresh_materialized_views_rapid(empty_pb2.Empty())
1009 with account_session(complete_token) as account:
1010 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1011 "complete_verification_reminder"
1012 ]
1013 with account_session(token) as account:
1014 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1015 "complete_profile_reminder",
1016 "complete_verification_reminder",
1017 ]
1019 today_plus_2 = (today() + timedelta(days=2)).isoformat()
1020 today_plus_3 = (today() + timedelta(days=3)).isoformat()
1021 with requests_session(req_user_token1) as api:
1022 host_request1_id = api.CreateHostRequest(
1023 requests_pb2.CreateHostRequestReq(
1024 host_user_id=user.id,
1025 from_date=today_plus_2,
1026 to_date=today_plus_3,
1027 text=valid_request_text("Test request 1"),
1028 )
1029 ).host_request_id
1030 moderator.approve_host_request(host_request1_id)
1032 with account_session(token) as account:
1033 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1034 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1035 "respond_to_host_request_reminder",
1036 "complete_profile_reminder",
1037 "complete_verification_reminder",
1038 ]
1039 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1040 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1042 with requests_session(req_user_token2) as api:
1043 host_request2_id = api.CreateHostRequest(
1044 requests_pb2.CreateHostRequestReq(
1045 host_user_id=user.id,
1046 from_date=today_plus_2,
1047 to_date=today_plus_3,
1048 text=valid_request_text("Test request 2"),
1049 )
1050 ).host_request_id
1051 moderator.approve_host_request(host_request2_id)
1053 refresh_materialized_views_rapid(empty_pb2.Empty())
1054 with account_session(token) as account:
1055 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1056 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1057 "respond_to_host_request_reminder",
1058 "respond_to_host_request_reminder",
1059 "complete_profile_reminder",
1060 "complete_verification_reminder",
1061 ]
1062 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1063 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1064 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1065 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1067 with requests_session(req_user_token1) as api:
1068 host_request3_id = api.CreateHostRequest(
1069 requests_pb2.CreateHostRequestReq(
1070 host_user_id=user.id,
1071 from_date=today_plus_2,
1072 to_date=today_plus_3,
1073 text=valid_request_text("Test request 3"),
1074 )
1075 ).host_request_id
1076 moderator.approve_host_request(host_request3_id)
1078 refresh_materialized_views_rapid(empty_pb2.Empty())
1079 with account_session(token) as account:
1080 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1081 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1082 "respond_to_host_request_reminder",
1083 "respond_to_host_request_reminder",
1084 "respond_to_host_request_reminder",
1085 "complete_profile_reminder",
1086 "complete_verification_reminder",
1087 ]
1088 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1089 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1090 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1091 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1092 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id
1093 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1095 # accept req
1096 with requests_session(token) as api:
1097 api.RespondHostRequest(
1098 requests_pb2.RespondHostRequestReq(
1099 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED
1100 )
1101 )
1103 refresh_materialized_views_rapid(empty_pb2.Empty())
1104 with account_session(token) as account:
1105 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1106 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1107 "respond_to_host_request_reminder",
1108 "respond_to_host_request_reminder",
1109 "complete_profile_reminder",
1110 "complete_verification_reminder",
1111 ]
1112 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id
1113 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1114 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id
1115 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1118def test_volunteer_stuff(db):
1119 # taken from couchers/app/backend/resources/badges.json
1120 board_member_id = 8347
1122 # with password
1123 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam", id=board_member_id)
1125 with account_session(token) as account:
1126 res = account.GetAccountInfo(empty_pb2.Empty())
1127 assert not res.is_volunteer
1129 with pytest.raises(grpc.RpcError) as e:
1130 account.GetMyVolunteerInfo(empty_pb2.Empty())
1131 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1132 assert (
1133 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us."
1134 )
1136 with pytest.raises(grpc.RpcError) as e:
1137 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq())
1138 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1139 assert (
1140 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us."
1141 )
1143 with session_scope() as session:
1144 session.add(
1145 make_volunteer(
1146 user_id=user.id,
1147 display_name="Great Volunteer",
1148 display_location="The Bitbucket",
1149 role="Lead Tester",
1150 started_volunteering=date(2020, 6, 1),
1151 )
1152 )
1154 with account_session(token) as account:
1155 res = account.GetAccountInfo(empty_pb2.Empty())
1156 assert res.is_volunteer
1158 res = account.GetMyVolunteerInfo(empty_pb2.Empty())
1160 assert res.display_name == "Great Volunteer"
1161 assert res.display_location == "The Bitbucket"
1162 assert res.role == "Lead Tester"
1163 assert res.started_volunteering == "2020-06-01"
1164 assert not res.stopped_volunteering
1165 assert res.show_on_team_page
1166 assert res.link_type == "couchers"
1167 assert res.link_text == "@tester"
1168 assert res.link_url == "http://localhost:3000/user/tester"
1170 res = account.UpdateMyVolunteerInfo(
1171 account_pb2.UpdateMyVolunteerInfoReq(
1172 display_name=wrappers_pb2.StringValue(value=""),
1173 link_type=wrappers_pb2.StringValue(value="website"),
1174 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"),
1175 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"),
1176 )
1177 )
1179 assert res.display_name == ""
1180 assert res.display_location == "The Bitbucket"
1181 assert res.role == "Lead Tester"
1182 assert res.started_volunteering == "2020-06-01"
1183 assert not res.stopped_volunteering
1184 assert res.show_on_team_page
1185 assert res.link_type == "website"
1186 assert res.link_text == "testervontester.com.invalid"
1187 assert res.link_url == "https://www.testervontester.com.invalid/"
1188 res = account.UpdateMyVolunteerInfo(
1189 account_pb2.UpdateMyVolunteerInfoReq(
1190 display_name=wrappers_pb2.StringValue(value=""),
1191 link_type=wrappers_pb2.StringValue(value="linkedin"),
1192 link_text=wrappers_pb2.StringValue(value="tester-vontester"),
1193 )
1194 )
1195 assert res.display_name == ""
1196 assert res.display_location == "The Bitbucket"
1197 assert res.role == "Lead Tester"
1198 assert res.started_volunteering == "2020-06-01"
1199 assert not res.stopped_volunteering
1200 assert res.show_on_team_page
1201 assert res.link_type == "linkedin"
1202 assert res.link_text == "tester-vontester"
1203 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/"
1205 res = account.UpdateMyVolunteerInfo(
1206 account_pb2.UpdateMyVolunteerInfoReq(
1207 display_name=wrappers_pb2.StringValue(value="Tester"),
1208 display_location=wrappers_pb2.StringValue(value=""),
1209 link_type=wrappers_pb2.StringValue(value="email"),
1210 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"),
1211 )
1212 )
1213 assert res.display_name == "Tester"
1214 assert res.display_location == ""
1215 assert res.role == "Lead Tester"
1216 assert res.started_volunteering == "2020-06-01"
1217 assert not res.stopped_volunteering
1218 assert res.show_on_team_page
1219 assert res.link_type == "email"
1220 assert res.link_text == "tester@vontester.com.invalid"
1221 assert res.link_url == "mailto:tester@vontester.com.invalid"
1223 refresh_materialized_views_rapid(empty_pb2.Empty())
1225 with public_session() as public:
1226 res = public.GetVolunteers(empty_pb2.Empty())
1227 assert len(res.current_volunteers) == 1
1228 v = res.current_volunteers[0]
1229 assert v.name == "Tester"
1230 assert v.username == "tester"
1231 assert v.is_board_member
1232 assert v.role == "Lead Tester"
1233 assert v.location == "Amsterdam"
1234 assert v.img.startswith("http://localhost:5001/img/thumbnail/")
1235 assert v.link_type == "email"
1236 assert v.link_text == "tester@vontester.com.invalid"
1237 assert v.link_url == "mailto:tester@vontester.com.invalid"