Coverage for app/backend/src/tests/test_account.py: 100%
742 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1from datetime import UTC, date, datetime, timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2, wrappers_pb2
7from sqlalchemy import select, update
8from sqlalchemy.sql import func
10from couchers import urls
11from couchers.crypto import hash_password, random_hex
12from couchers.db import session_scope
13from couchers.materialized_views import refresh_materialized_views_rapid
14from couchers.models import (
15 AccountDeletionReason,
16 AccountDeletionToken,
17 BackgroundJob,
18 HostingStatus,
19 InviteCode,
20 PhotoGalleryItem,
21 SleepingArrangement,
22 Upload,
23 User,
24)
25from couchers.proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2
26from couchers.utils import now, today
27from tests.fixtures.db import generate_user, make_volunteer
28from tests.fixtures.misc import EmailCollector, PushCollector, process_jobs
29from tests.fixtures.sessions import (
30 account_session,
31 auth_api_session,
32 public_session,
33 real_account_session,
34 requests_session,
35)
36from tests.test_requests import valid_request_text
39@pytest.fixture(autouse=True)
40def _(testconfig):
41 pass
44def test_GetAccountInfo(db, fast_passwords):
45 # with password
46 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
48 with account_session(token1) as account:
49 res = account.GetAccountInfo(empty_pb2.Empty())
50 assert res.email == "user@couchers.invalid"
51 assert res.username == user1.username
52 assert not res.has_strong_verification
53 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED
54 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED
55 assert not res.is_superuser
56 assert res.ui_language_preference == ""
57 assert not res.is_volunteer
60def test_donation_banner_no_drive(db):
61 """Test that the banner is not shown when no drive is configured (flag unset)"""
62 # User has donated, but there's no drive, so the banner should not show
63 user, token = generate_user()
65 with account_session(token) as account:
66 res = account.GetAccountInfo(empty_pb2.Empty())
67 assert not res.should_show_donation_banner
70def test_donation_banner_never_donated(db, feature_flags):
71 """Test that banner is shown when user has never donated and drive is active"""
72 # Explicitly set last_donated=None since generate_user defaults to now()
73 user, token = generate_user(last_donated=None)
75 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
76 feature_flags.set("donation_drive_start", int(drive_start.timestamp()))
77 with account_session(token) as account:
78 res = account.GetAccountInfo(empty_pb2.Empty())
79 assert res.should_show_donation_banner
82def test_donation_banner_donated_before_drive(db, feature_flags):
83 """Test that banner is shown when user donated before drive start"""
84 user, token = generate_user()
86 # Set donation before drive start
87 with session_scope() as session:
88 last_donated = datetime(2025, 10, 15, tzinfo=UTC) # Before Nov 1
89 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated))
91 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
92 feature_flags.set("donation_drive_start", int(drive_start.timestamp()))
93 with account_session(token) as account:
94 res = account.GetAccountInfo(empty_pb2.Empty())
95 assert res.should_show_donation_banner
98def test_donation_banner_donated_after_drive(db, feature_flags):
99 """Test that banner is not shown when user donated after drive start"""
100 user, token = generate_user()
102 # Set donation after drive start
103 with session_scope() as session:
104 last_donated = datetime(2025, 11, 15, tzinfo=UTC) # After Nov 1
105 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated))
107 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
108 feature_flags.set("donation_drive_start", int(drive_start.timestamp()))
109 with account_session(token) as account:
110 res = account.GetAccountInfo(empty_pb2.Empty())
111 assert not res.should_show_donation_banner
114def test_donation_banner_donated_exactly_at_drive_start(db, feature_flags):
115 """Test that banner is not shown when user donated exactly at drive start time"""
116 drive_start = datetime(2025, 11, 1, tzinfo=UTC)
118 user, token = generate_user()
120 # Set donation exactly at drive start
121 with session_scope() as session:
122 session.execute(update(User).where(User.id == user.id).values(last_donated=drive_start))
124 feature_flags.set("donation_drive_start", int(drive_start.timestamp()))
125 with account_session(token) as account:
126 res = account.GetAccountInfo(empty_pb2.Empty())
127 assert not res.should_show_donation_banner
130def test_GetAccountInfo_regression(db):
131 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
132 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
133 user, token = generate_user(about_me=None, complete_profile=False)
135 # add an avatar photo to the user's profile gallery
136 with session_scope() as session:
137 key = random_hex(32)
138 filename = random_hex(32) + ".jpg"
139 session.add(
140 Upload(
141 key=key,
142 filename=filename,
143 creator_user_id=user.id,
144 )
145 )
146 session.flush()
147 assert user.profile_gallery_id is not None
148 session.add(
149 PhotoGalleryItem(
150 gallery_id=user.profile_gallery_id,
151 upload_key=key,
152 position=0,
153 )
154 )
156 with account_session(token) as account:
157 res = account.GetAccountInfo(empty_pb2.Empty())
160def test_ChangePasswordV2_normal(db, fast_passwords, email_collector: EmailCollector, push_collector: PushCollector):
161 # user has old password and is changing to new password
162 old_password = random_hex()
163 new_password = random_hex()
164 user, token = generate_user(hashed_password=hash_password(old_password))
166 with account_session(token) as account:
167 account.ChangePasswordV2(
168 account_pb2.ChangePasswordV2Req(
169 old_password=old_password,
170 new_password=new_password,
171 )
172 )
174 email = email_collector.pop_for_recipient(user.email, last=True)
175 assert email.subject == "[TEST] Your password was changed"
177 push = push_collector.pop_for_user(user.id, last=True)
178 assert push.content.title == "Password changed"
179 assert push.content.body == "Your password was changed."
181 with session_scope() as session:
182 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
183 assert updated_user.hashed_password == hash_password(new_password)
186def test_ChangePasswordV2_regression(db, fast_passwords):
187 # send_password_changed_email wasn't working
188 # user has old password and is changing to new password
189 old_password = random_hex()
190 new_password = random_hex()
191 user, token = generate_user(hashed_password=hash_password(old_password))
193 with account_session(token) as account:
194 account.ChangePasswordV2(
195 account_pb2.ChangePasswordV2Req(
196 old_password=old_password,
197 new_password=new_password,
198 )
199 )
201 with session_scope() as session:
202 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
203 assert updated_user.hashed_password == hash_password(new_password)
206def test_ChangePasswordV2_normal_short_password(db, fast_passwords):
207 # user has old password and is changing to new password, but used short password
208 old_password = random_hex()
209 new_password = random_hex(length=1)
210 user, token = generate_user(hashed_password=hash_password(old_password))
212 with account_session(token) as account:
213 with pytest.raises(grpc.RpcError) as e:
214 account.ChangePasswordV2(
215 account_pb2.ChangePasswordV2Req(
216 old_password=old_password,
217 new_password=new_password,
218 )
219 )
220 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
221 assert e.value.details() == "The password must be 8 or more characters long."
223 with session_scope() as session:
224 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
225 assert updated_user.hashed_password == hash_password(old_password)
228def test_ChangePasswordV2_normal_long_password(db, fast_passwords):
229 # user has old password and is changing to new password, but used short password
230 old_password = random_hex()
231 new_password = random_hex(length=1000)
232 user, token = generate_user(hashed_password=hash_password(old_password))
234 with account_session(token) as account:
235 with pytest.raises(grpc.RpcError) as e:
236 account.ChangePasswordV2(
237 account_pb2.ChangePasswordV2Req(
238 old_password=old_password,
239 new_password=new_password,
240 )
241 )
242 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
243 assert e.value.details() == "The password must be less than 256 characters."
245 with session_scope() as session:
246 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
247 assert updated_user.hashed_password == hash_password(old_password)
250def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords):
251 # user has old password and is changing to new password, but used insecure password
252 old_password = random_hex()
253 new_password = "12345678"
254 user, token = generate_user(hashed_password=hash_password(old_password))
256 with account_session(token) as account:
257 with pytest.raises(grpc.RpcError) as e:
258 account.ChangePasswordV2(
259 account_pb2.ChangePasswordV2Req(
260 old_password=old_password,
261 new_password=new_password,
262 )
263 )
264 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
265 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable."
267 with session_scope() as session:
268 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
269 assert updated_user.hashed_password == hash_password(old_password)
272def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords):
273 # user has old password and is changing to new password, but used wrong old password
274 old_password = random_hex()
275 new_password = random_hex()
276 user, token = generate_user(hashed_password=hash_password(old_password))
278 with account_session(token) as account:
279 with pytest.raises(grpc.RpcError) as e:
280 account.ChangePasswordV2(
281 account_pb2.ChangePasswordV2Req(
282 old_password="Wrong password",
283 new_password=new_password,
284 )
285 )
286 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
287 assert e.value.details() == "Wrong password."
289 with session_scope() as session:
290 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
291 assert updated_user.hashed_password == hash_password(old_password)
294def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords):
295 # user has old password and called with empty body
296 old_password = random_hex()
297 user, token = generate_user(hashed_password=hash_password(old_password))
299 with account_session(token) as account:
300 with pytest.raises(grpc.RpcError) as e:
301 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password))
302 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
303 assert e.value.details() == "The password must be 8 or more characters long."
305 with session_scope() as session:
306 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
307 assert updated_user.hashed_password == hash_password(old_password)
310def test_ChangeEmailV2_wrong_password(db, fast_passwords):
311 password = random_hex()
312 new_email = f"{random_hex()}@couchers.org.invalid"
313 user, token = generate_user(hashed_password=hash_password(password))
315 with account_session(token) as account:
316 with pytest.raises(grpc.RpcError) as e:
317 account.ChangeEmailV2(
318 account_pb2.ChangeEmailV2Req(
319 password="Wrong password",
320 new_email=new_email,
321 )
322 )
323 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
324 assert e.value.details() == "Wrong password."
326 with session_scope() as session:
327 assert (
328 session.execute(
329 select(func.count())
330 .select_from(User)
331 .where(User.new_email_token_created <= func.now())
332 .where(User.new_email_token_expiry >= func.now())
333 )
334 ).scalar_one() == 0
337def test_ChangeEmailV2_wrong_email(db, fast_passwords):
338 password = random_hex()
339 new_email = f"{random_hex()}@couchers.org.invalid"
340 user, token = generate_user(hashed_password=hash_password(password))
342 with account_session(token) as account:
343 with pytest.raises(grpc.RpcError) as e:
344 account.ChangeEmailV2(
345 account_pb2.ChangeEmailV2Req(
346 password="Wrong password",
347 new_email=new_email,
348 )
349 )
350 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
351 assert e.value.details() == "Wrong password."
353 with session_scope() as session:
354 assert (
355 session.execute(
356 select(func.count())
357 .select_from(User)
358 .where(User.new_email_token_created <= func.now())
359 .where(User.new_email_token_expiry >= func.now())
360 )
361 ).scalar_one() == 0
364def test_ChangeEmailV2_invalid_email(db, fast_passwords):
365 password = random_hex()
366 user, token = generate_user(hashed_password=hash_password(password))
368 with account_session(token) as account:
369 with pytest.raises(grpc.RpcError) as e:
370 account.ChangeEmailV2(
371 account_pb2.ChangeEmailV2Req(
372 password=password,
373 new_email="not a real email",
374 )
375 )
376 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
377 assert e.value.details() == "Invalid email."
379 with session_scope() as session:
380 assert (
381 session.execute(
382 select(func.count())
383 .select_from(User)
384 .where(User.new_email_token_created <= func.now())
385 .where(User.new_email_token_expiry >= func.now())
386 )
387 ).scalar_one() == 0
390def test_ChangeEmailV2_email_in_use(db, fast_passwords):
391 password = random_hex()
392 user, token = generate_user(hashed_password=hash_password(password))
393 user2, token2 = generate_user(hashed_password=hash_password(password))
395 with account_session(token) as account:
396 with pytest.raises(grpc.RpcError) as e:
397 account.ChangeEmailV2(
398 account_pb2.ChangeEmailV2Req(
399 password=password,
400 new_email=user2.email,
401 )
402 )
403 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
404 assert e.value.details() == "Invalid email."
406 with session_scope() as session:
407 assert (
408 session.execute(
409 select(func.count())
410 .select_from(User)
411 .where(User.new_email_token_created <= func.now())
412 .where(User.new_email_token_expiry >= func.now())
413 )
414 ).scalar_one() == 0
417def test_ChangeEmailV2_no_change(db, fast_passwords):
418 password = random_hex()
419 user, token = generate_user(hashed_password=hash_password(password))
421 with account_session(token) as account:
422 with pytest.raises(grpc.RpcError) as e:
423 account.ChangeEmailV2(
424 account_pb2.ChangeEmailV2Req(
425 password=password,
426 new_email=user.email,
427 )
428 )
429 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
430 assert e.value.details() == "Invalid email."
432 with session_scope() as session:
433 assert (
434 session.execute(
435 select(func.count())
436 .select_from(User)
437 .where(User.new_email_token_created <= func.now())
438 .where(User.new_email_token_expiry >= func.now())
439 )
440 ).scalar_one() == 0
443def test_ChangeEmailV2_wrong_token(db, fast_passwords):
444 password = random_hex()
445 new_email = f"{random_hex()}@couchers.org.invalid"
446 user, token = generate_user(hashed_password=hash_password(password))
448 with account_session(token) as account:
449 account.ChangeEmailV2(
450 account_pb2.ChangeEmailV2Req(
451 password=password,
452 new_email=new_email,
453 )
454 )
456 with auth_api_session() as (auth_api, metadata_interceptor):
457 with pytest.raises(grpc.RpcError) as e:
458 res = auth_api.ConfirmChangeEmailV2(
459 auth_pb2.ConfirmChangeEmailV2Req(
460 change_email_token="wrongtoken",
461 )
462 )
463 assert e.value.code() == grpc.StatusCode.NOT_FOUND
464 assert e.value.details() == "Invalid token."
466 with session_scope() as session:
467 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
468 assert user_updated.email == user.email
471def test_ChangeEmailV2_tokens_two_hour_window(db):
472 def two_hours_one_minute_in_future():
473 return now() + timedelta(hours=2, minutes=1)
475 def one_minute_ago():
476 return now() - timedelta(minutes=1)
478 password = random_hex()
479 new_email = f"{random_hex()}@couchers.org.invalid"
480 user, token = generate_user(hashed_password=hash_password(password))
482 with account_session(token) as account:
483 account.ChangeEmailV2(
484 account_pb2.ChangeEmailV2Req(
485 password=password,
486 new_email=new_email,
487 )
488 )
490 with session_scope() as session:
491 new_email_token = session.execute(select(User.new_email_token).where(User.id == user.id)).scalar_one()
493 with patch("couchers.servicers.auth.now", one_minute_ago):
494 with auth_api_session() as (auth_api, metadata_interceptor):
495 with pytest.raises(grpc.RpcError) as e:
496 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
497 assert e.value.code() == grpc.StatusCode.NOT_FOUND
498 assert e.value.details() == "Invalid token."
500 with pytest.raises(grpc.RpcError) as e:
501 auth_api.ConfirmChangeEmailV2(
502 auth_pb2.ConfirmChangeEmailV2Req(
503 change_email_token=new_email_token,
504 )
505 )
506 assert e.value.code() == grpc.StatusCode.NOT_FOUND
507 assert e.value.details() == "Invalid token."
509 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
510 with auth_api_session() as (auth_api, metadata_interceptor):
511 with pytest.raises(grpc.RpcError) as e:
512 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req())
513 assert e.value.code() == grpc.StatusCode.NOT_FOUND
514 assert e.value.details() == "Invalid token."
516 with pytest.raises(grpc.RpcError) as e:
517 auth_api.ConfirmChangeEmailV2(
518 auth_pb2.ConfirmChangeEmailV2Req(
519 change_email_token=new_email_token,
520 )
521 )
522 assert e.value.code() == grpc.StatusCode.NOT_FOUND
523 assert e.value.details() == "Invalid token."
526def test_ChangeEmailV2(db, fast_passwords, push_collector: PushCollector):
527 password = random_hex()
528 new_email = f"{random_hex()}@couchers.org.invalid"
529 user, token = generate_user(hashed_password=hash_password(password))
530 user_id = user.id
532 with account_session(token) as account:
533 account.ChangeEmailV2(
534 account_pb2.ChangeEmailV2Req(
535 password=password,
536 new_email=new_email,
537 )
538 )
540 with session_scope() as session:
541 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one()
542 assert user_updated.email == user.email
543 assert user_updated.new_email == new_email
544 assert user_updated.new_email_token is not None
545 assert user_updated.new_email_token_created
546 assert user_updated.new_email_token_created <= now()
547 assert user_updated.new_email_token_expiry
548 assert user_updated.new_email_token_expiry >= now()
550 token = user_updated.new_email_token
552 process_jobs()
553 push = push_collector.pop_for_user(user_id, last=True)
554 assert push.content.title == "Email change requested"
555 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address."
557 with auth_api_session() as (auth_api, metadata_interceptor):
558 auth_api.ConfirmChangeEmailV2(
559 auth_pb2.ConfirmChangeEmailV2Req(
560 change_email_token=token,
561 )
562 )
564 with session_scope() as session:
565 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
566 assert user.email == new_email
567 assert user.new_email is None
568 assert user.new_email_token is None
569 assert user.new_email_token_created is None
570 assert user.new_email_token_expiry is None
572 process_jobs()
573 push = push_collector.pop_for_user(user_id, last=True)
574 assert push.content.title == "Email verified"
575 assert push.content.body == "Your new email address has been verified."
578def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector: PushCollector):
579 password = random_hex()
580 new_email = f"{random_hex()}@couchers.org.invalid"
581 user, token = generate_user(hashed_password=hash_password(password))
583 with account_session(token) as account:
584 account.ChangeEmailV2(
585 account_pb2.ChangeEmailV2Req(
586 password=password,
587 new_email=new_email,
588 )
589 )
591 process_jobs()
593 with session_scope() as session:
594 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all()
595 assert len(jobs) == 2
596 uq_str1 = b"An email change to the email"
597 uq_str2 = (
598 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is"
599 )
600 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or (
601 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload
602 )
604 push = push_collector.pop_for_user(user.id, last=True)
605 assert push.content.title == "Email change requested"
606 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address."
609def test_ChangeLanguagePreference(db, fast_passwords):
610 # user changes from default to ISO 639-1 language code
611 new_lang = "zh"
612 user, token = generate_user()
614 with real_account_session(token) as account:
615 res = account.GetAccountInfo(empty_pb2.Empty())
616 assert res.ui_language_preference == ""
618 # call will have info about the request
619 res, call = account.ChangeLanguagePreference.with_call(
620 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=new_lang)
621 )
623 # cookies are sent via initial metadata, so we check for it there
624 # the value of "set-cookie" will be the full cookie string, pull the key value from the string
625 cookie_values = [v.split(";")[0] for k, v in call.initial_metadata() if k == "set-cookie"]
626 assert any(val == "NEXT_LOCALE=zh" for val in cookie_values), (
627 f"Didn't find the right cookie, got {call.initial_metadata()}"
628 )
630 # the changed language preference should also be sent to the backend
631 res = account.GetAccountInfo(empty_pb2.Empty())
632 assert res.ui_language_preference == "zh"
635def test_contributor_form(db):
636 user, token = generate_user()
638 with account_session(token) as account:
639 res = account.GetContributorFormInfo(empty_pb2.Empty())
640 assert not res.filled_contributor_form
642 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
644 res = account.GetContributorFormInfo(empty_pb2.Empty())
645 assert res.filled_contributor_form
648def test_DeleteAccount_start(db, email_collector: EmailCollector):
649 user, token = generate_user()
651 with account_session(token) as account:
652 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
653 email = email_collector.pop_for_recipient(user.email, last=True)
654 assert email.subject == "[TEST] Confirm your Couchers.org account deletion"
656 with session_scope() as session:
657 deletion_token: AccountDeletionToken = session.execute(
658 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
659 ).scalar_one()
661 assert deletion_token.is_valid
662 assert session.execute(select(User).where(User.id == user.id)).scalar_one().deleted_at is None
665def test_DeleteAccount_message_storage(db):
666 user, token = generate_user()
668 with account_session(token) as account:
669 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
670 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
671 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
672 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
673 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
674 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
676 with session_scope() as session:
677 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
680def test_full_delete_account_with_recovery(db, email_collector: EmailCollector, push_collector: PushCollector):
681 user, token = generate_user()
682 user_id = user.id
684 with account_session(token) as account:
685 with pytest.raises(grpc.RpcError) as err:
686 account.DeleteAccount(account_pb2.DeleteAccountReq())
687 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION
688 assert err.value.details() == "Please confirm your account deletion."
690 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
692 email = email_collector.pop_for_recipient(user.email, last=True)
693 assert email.subject == "[TEST] Confirm your Couchers.org account deletion"
694 assert email.recipient == user.email
695 assert "account deletion" in email.subject.lower()
696 unique_string = "You requested that we delete your account from Couchers.org."
697 assert unique_string in email.plain
698 assert unique_string in email.html
699 assert "support@couchers.org" in email.plain
700 assert "support@couchers.org" in email.html
702 push = push_collector.pop_for_user(user_id, last=True)
703 assert push.content.title == "Account deletion requested"
704 assert push.content.body == "Use the link we emailed you to confirm."
706 with session_scope() as session:
707 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
708 delete_token = token_o.token
710 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
711 assert token_o.user == user_
712 assert user_.deleted_at is None
713 assert not user_.undelete_token
714 assert not user_.undelete_until
716 assert delete_token in email.plain
717 assert delete_token in email.html
718 delete_url = f"http://localhost:3000/delete-account?token={delete_token}"
719 assert delete_url in email.plain
720 assert delete_url in email.html
722 with auth_api_session() as (auth_api, metadata_interceptor):
723 auth_api.ConfirmDeleteAccount(
724 auth_pb2.ConfirmDeleteAccountReq(
725 token=delete_token,
726 )
727 )
729 email = email_collector.pop_for_recipient(user.email, last=True)
730 assert email.recipient == user.email
731 assert "account has been deleted" in email.subject.lower()
732 unique_string = "You have successfully deleted your account from Couchers.org."
733 assert unique_string in email.plain
734 assert unique_string in email.html
735 assert "7 days" in email.plain
736 assert "7 days" in email.html
737 assert "support@couchers.org" in email.plain
738 assert "support@couchers.org" in email.html
740 push = push_collector.pop_for_user(user_id, last=True)
741 assert push.content.title == "Account deleted"
742 assert push.content.body == "You can restore it within 7 days using the link we emailed you."
744 with session_scope() as session:
745 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
747 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one()
748 assert user_.deleted_at is not None
749 assert user_.undelete_token
750 assert user_.undelete_until
751 assert user_.undelete_until > now()
753 undelete_token = user_.undelete_token
755 undelete_url = f"http://localhost:3000/recover-account?token={undelete_token}"
756 assert undelete_url in email.plain
757 assert undelete_url in email.html
759 with auth_api_session() as (auth_api, metadata_interceptor):
760 auth_api.RecoverAccount(
761 auth_pb2.RecoverAccountReq(
762 token=undelete_token,
763 )
764 )
766 email = email_collector.pop_for_recipient(user.email, last=True)
767 assert email.recipient == user.email
768 assert "account has been recovered" in email.subject.lower()
769 unique_string = "Your account on Couchers.org has been successfully recovered!"
770 assert unique_string in email.plain
771 assert unique_string in email.html
772 assert "support@couchers.org" in email.plain
773 assert "support@couchers.org" in email.html
775 push = push_collector.pop_for_user(user_id, last=True)
776 assert push.content.title == "Account restored"
777 assert push.content.body == "Welcome back!"
779 with session_scope() as session:
780 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
782 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
783 assert user.deleted_at is None
784 assert not user.undelete_token
785 assert not user.undelete_until
788def test_multiple_delete_tokens(db):
789 """
790 Make sure deletion tokens are deleted on delete
791 """
792 user, token = generate_user()
794 with account_session(token) as account:
795 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
796 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
797 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
799 with session_scope() as session:
800 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
801 token = session.execute(select(AccountDeletionToken.token).limit(1)).scalar_one()
803 with auth_api_session() as (auth_api, metadata_interceptor):
804 auth_api.ConfirmDeleteAccount(
805 auth_pb2.ConfirmDeleteAccountReq(
806 token=token,
807 )
808 )
810 with session_scope() as session:
811 assert not session.execute(select(AccountDeletionToken.token)).scalar_one_or_none()
814def test_ListActiveSessions_pagination(db, fast_passwords):
815 password = random_hex()
816 user, token = generate_user(hashed_password=hash_password(password))
818 with auth_api_session() as (auth_api, metadata_interceptor):
819 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
820 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
821 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
822 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
824 with real_account_session(token) as account:
825 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3))
826 assert len(res.active_sessions) == 3
827 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3))
828 assert len(res.active_sessions) == 2
829 assert not res.next_page_token
832def test_ListActiveSessions_details(db, fast_passwords):
833 password = random_hex()
834 user, token = generate_user(hashed_password=hash_password(password))
836 ips_user_agents = [
837 (
838 "108.123.33.162",
839 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1",
840 ),
841 (
842 "8.245.212.28",
843 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36",
844 ),
845 (
846 "95.254.140.156",
847 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
848 ),
849 ]
851 for ip, user_agent in ips_user_agents:
852 options = (("grpc.primary_user_agent", user_agent),)
853 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor):
854 auth_api.Authenticate(
855 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),)
856 )
858 def dummy_geoip(ip_address):
859 return {
860 "108.123.33.162": "Chicago, United States",
861 "8.245.212.28": "Sydney, Australia",
862 }.get(ip_address)
864 with real_account_session(token) as account:
865 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip):
866 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
867 print(res)
868 assert len(res.active_sessions) == 4
870 # this one currently making the API call
871 assert res.active_sessions[0].operating_system == "Other"
872 assert res.active_sessions[0].browser == "Other"
873 assert res.active_sessions[0].device == "Other"
874 assert res.active_sessions[0].approximate_location == "Unknown"
875 assert res.active_sessions[0].is_current_session
877 assert res.active_sessions[1].operating_system == "Ubuntu"
878 assert res.active_sessions[1].browser == "Firefox"
879 assert res.active_sessions[1].device == "Other"
880 assert res.active_sessions[1].approximate_location == "Unknown"
881 assert not res.active_sessions[1].is_current_session
883 assert res.active_sessions[2].operating_system == "Android"
884 assert res.active_sessions[2].browser == "Samsung Internet"
885 assert res.active_sessions[2].device == "K"
886 assert res.active_sessions[2].approximate_location == "Sydney, Australia"
887 assert not res.active_sessions[2].is_current_session
889 assert res.active_sessions[3].operating_system == "iOS"
890 assert res.active_sessions[3].browser == "Mobile Safari"
891 assert res.active_sessions[3].device == "iPhone"
892 assert res.active_sessions[3].approximate_location == "Chicago, United States"
893 assert not res.active_sessions[3].is_current_session
896def test_LogOutSession(db, fast_passwords):
897 password = random_hex()
898 user, token = generate_user(hashed_password=hash_password(password))
900 with auth_api_session() as (auth_api, metadata_interceptor):
901 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
902 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
903 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
904 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
906 with real_account_session(token) as account:
907 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
908 assert len(res.active_sessions) == 5
909 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created))
911 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
912 assert len(res2.active_sessions) == 4
914 # ignore the first session as it changes
915 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:]
918def test_LogOutOtherSessions(db, fast_passwords):
919 password = random_hex()
920 user, token = generate_user(hashed_password=hash_password(password))
922 with auth_api_session() as (auth_api, metadata_interceptor):
923 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
924 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
925 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
926 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password))
928 with real_account_session(token) as account:
929 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
930 assert len(res.active_sessions) == 5
931 with pytest.raises(grpc.RpcError) as e:
932 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False))
933 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
934 assert e.value.details() == "Please confirm you want to log out of other sessions."
936 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True))
937 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq())
938 assert len(res.active_sessions) == 1
941def test_CreateInviteCode(db):
942 user, token = generate_user()
944 with account_session(token) as account:
945 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq())
946 code = res.code
947 assert len(code) == 8
949 with session_scope() as session:
950 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
951 assert invite.creator_user_id == user.id
952 assert invite.disabled is None
953 assert res.url == urls.invite_code_link(code=res.code)
956def test_DisableInviteCode(db):
957 user, token = generate_user()
959 with account_session(token) as account:
960 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
961 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code))
963 with session_scope() as session:
964 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one()
965 assert invite.disabled is not None
968def test_ListInviteCodes(db):
969 user, token = generate_user()
970 another_user, _ = generate_user()
972 with account_session(token) as account:
973 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
975 # simulate another_user having signed up with this invite code
976 with session_scope() as session:
977 session.execute(update(User).where(User.id == another_user.id).values(invite_code_id=code))
979 with account_session(token) as account:
980 res = account.ListInviteCodes(empty_pb2.Empty())
981 assert len(res.invite_codes) == 1
982 assert res.invite_codes[0].code == code
983 assert res.invite_codes[0].uses == 1
984 assert res.invite_codes[0].url == urls.invite_code_link(code=code)
987def test_reminders(db, moderator):
988 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite
989 # we use LiteUser, so remember to refresh materialized views
990 user, token = generate_user(complete_profile=False)
991 complete_user, complete_token = generate_user(complete_profile=True)
992 req_user1, req_user_token1 = generate_user(complete_profile=True)
993 req_user2, req_user_token2 = generate_user(complete_profile=True)
995 refresh_materialized_views_rapid(empty_pb2.Empty())
996 with account_session(complete_token) as account:
997 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == []
998 with account_session(token) as account:
999 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1000 "complete_profile_reminder",
1001 ]
1003 today_plus_2 = (today() + timedelta(days=2)).isoformat()
1004 today_plus_3 = (today() + timedelta(days=3)).isoformat()
1005 with requests_session(req_user_token1) as api:
1006 host_request1_id = api.CreateHostRequest(
1007 requests_pb2.CreateHostRequestReq(
1008 host_user_id=user.id,
1009 from_date=today_plus_2,
1010 to_date=today_plus_3,
1011 text=valid_request_text("Test request 1"),
1012 )
1013 ).host_request_id
1014 moderator.approve_host_request(host_request1_id)
1016 with account_session(token) as account:
1017 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1018 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1019 "respond_to_host_request_reminder",
1020 "complete_profile_reminder",
1021 ]
1022 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1023 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1025 with requests_session(req_user_token2) as api:
1026 host_request2_id = api.CreateHostRequest(
1027 requests_pb2.CreateHostRequestReq(
1028 host_user_id=user.id,
1029 from_date=today_plus_2,
1030 to_date=today_plus_3,
1031 text=valid_request_text("Test request 2"),
1032 )
1033 ).host_request_id
1034 moderator.approve_host_request(host_request2_id)
1036 refresh_materialized_views_rapid(empty_pb2.Empty())
1037 with account_session(token) as account:
1038 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1039 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1040 "respond_to_host_request_reminder",
1041 "respond_to_host_request_reminder",
1042 "complete_profile_reminder",
1043 ]
1044 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1045 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1046 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1047 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1049 with requests_session(req_user_token1) as api:
1050 host_request3_id = api.CreateHostRequest(
1051 requests_pb2.CreateHostRequestReq(
1052 host_user_id=user.id,
1053 from_date=today_plus_2,
1054 to_date=today_plus_3,
1055 text=valid_request_text("Test request 3"),
1056 )
1057 ).host_request_id
1058 moderator.approve_host_request(host_request3_id)
1060 refresh_materialized_views_rapid(empty_pb2.Empty())
1061 with account_session(token) as account:
1062 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1063 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1064 "respond_to_host_request_reminder",
1065 "respond_to_host_request_reminder",
1066 "respond_to_host_request_reminder",
1067 "complete_profile_reminder",
1068 ]
1069 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id
1070 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1071 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id
1072 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1073 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id
1074 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1076 # accept req
1077 with requests_session(token) as api:
1078 api.RespondHostRequest(
1079 requests_pb2.RespondHostRequestReq(
1080 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED
1081 )
1082 )
1084 refresh_materialized_views_rapid(empty_pb2.Empty())
1085 with account_session(token) as account:
1086 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1087 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1088 "respond_to_host_request_reminder",
1089 "respond_to_host_request_reminder",
1090 "complete_profile_reminder",
1091 ]
1092 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id
1093 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id
1094 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id
1095 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1097 # host replies to req2 with a message: reminder should clear even though it's still pending
1098 with requests_session(token) as api:
1099 api.SendHostRequestMessage(
1100 requests_pb2.SendHostRequestMessageReq(host_request_id=host_request2_id, text="Let me think about it")
1101 )
1103 refresh_materialized_views_rapid(empty_pb2.Empty())
1104 with account_session(token) as account:
1105 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1106 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1107 "respond_to_host_request_reminder",
1108 "complete_profile_reminder",
1109 ]
1110 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request3_id
1111 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1113 # surfer sending a message should not clear the reminder
1114 with requests_session(req_user_token1) as api:
1115 api.SendHostRequestMessage(
1116 requests_pb2.SendHostRequestMessageReq(host_request_id=host_request3_id, text="Any update?")
1117 )
1119 refresh_materialized_views_rapid(empty_pb2.Empty())
1120 with account_session(token) as account:
1121 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1122 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [
1123 "respond_to_host_request_reminder",
1124 "complete_profile_reminder",
1125 ]
1126 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request3_id
1127 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id
1130def test_confirm_host_request_reminder(db, moderator):
1131 host, host_token = generate_user(complete_profile=True)
1132 surfer, surfer_token = generate_user(complete_profile=True)
1134 today_plus_10 = (today() + timedelta(days=10)).isoformat()
1135 today_plus_12 = (today() + timedelta(days=12)).isoformat()
1137 with requests_session(surfer_token) as api:
1138 host_request_id = api.CreateHostRequest(
1139 requests_pb2.CreateHostRequestReq(
1140 host_user_id=host.id,
1141 from_date=today_plus_10,
1142 to_date=today_plus_12,
1143 text=valid_request_text("Please host me"),
1144 )
1145 ).host_request_id
1146 moderator.approve_host_request(host_request_id)
1148 refresh_materialized_views_rapid(empty_pb2.Empty())
1149 with account_session(surfer_token) as account:
1150 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == []
1152 with requests_session(host_token) as api:
1153 api.RespondHostRequest(
1154 requests_pb2.RespondHostRequestReq(
1155 host_request_id=host_request_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED
1156 )
1157 )
1159 refresh_materialized_views_rapid(empty_pb2.Empty())
1160 with account_session(surfer_token) as account:
1161 reminders = account.GetReminders(empty_pb2.Empty()).reminders
1162 assert [reminder.WhichOneof("reminder") for reminder in reminders] == ["confirm_host_request_reminder"]
1163 assert reminders[0].confirm_host_request_reminder.host_request_id == host_request_id
1164 assert reminders[0].confirm_host_request_reminder.host_user.user_id == host.id
1166 # after surfer confirms, reminder should clear
1167 with requests_session(surfer_token) as api:
1168 api.RespondHostRequest(
1169 requests_pb2.RespondHostRequestReq(
1170 host_request_id=host_request_id, status=conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED
1171 )
1172 )
1174 refresh_materialized_views_rapid(empty_pb2.Empty())
1175 with account_session(surfer_token) as account:
1176 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == []
1179def test_my_home_reminder(db):
1180 # can_host with incomplete my home (max_guests not set) → reminder shown
1181 can_host_incomplete, token1 = generate_user(hosting_status=HostingStatus.can_host)
1182 # maybe with incomplete my home → reminder shown
1183 maybe_incomplete, token2 = generate_user(hosting_status=HostingStatus.maybe)
1184 # cant_host → no reminder regardless of my home completion
1185 cant_host, token3 = generate_user(hosting_status=HostingStatus.cant_host)
1186 # can_host with fully completed my home → no reminder
1187 can_host_complete, token4 = generate_user(
1188 hosting_status=HostingStatus.can_host,
1189 max_guests=2,
1190 sleeping_arrangement=SleepingArrangement.private,
1191 # about_place is set by default in make_user
1192 )
1194 with account_session(token1) as account:
1195 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1196 "complete_my_home_reminder",
1197 ]
1199 with account_session(token2) as account:
1200 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [
1201 "complete_my_home_reminder",
1202 ]
1204 with account_session(token3) as account:
1205 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == []
1207 with account_session(token4) as account:
1208 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == []
1211def test_volunteer_stuff(db):
1212 # taken from couchers/app/backend/resources/badges.json
1213 board_member_id = 8347
1215 # with password
1216 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam", id=board_member_id)
1218 with account_session(token) as account:
1219 res = account.GetAccountInfo(empty_pb2.Empty())
1220 assert not res.is_volunteer
1222 with pytest.raises(grpc.RpcError) as e:
1223 account.GetMyVolunteerInfo(empty_pb2.Empty())
1224 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1225 assert (
1226 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us."
1227 )
1229 with pytest.raises(grpc.RpcError) as e:
1230 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq())
1231 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1232 assert (
1233 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us."
1234 )
1236 with session_scope() as session:
1237 session.add(
1238 make_volunteer(
1239 user_id=user.id,
1240 display_name="Great Volunteer",
1241 display_location="The Bitbucket",
1242 role="Lead Tester",
1243 started_volunteering=date(2020, 6, 1),
1244 )
1245 )
1247 with account_session(token) as account:
1248 res = account.GetAccountInfo(empty_pb2.Empty())
1249 assert res.is_volunteer
1251 res = account.GetMyVolunteerInfo(empty_pb2.Empty())
1253 assert res.display_name == "Great Volunteer"
1254 assert res.display_location == "The Bitbucket"
1255 assert res.role == "Lead Tester"
1256 assert res.started_volunteering == "2020-06-01"
1257 assert not res.stopped_volunteering
1258 assert res.show_on_team_page
1259 assert res.link_type == "couchers"
1260 assert res.link_text == "@tester"
1261 assert res.link_url == "http://localhost:3000/user/tester"
1263 res = account.UpdateMyVolunteerInfo(
1264 account_pb2.UpdateMyVolunteerInfoReq(
1265 display_name=wrappers_pb2.StringValue(value=""),
1266 link_type=wrappers_pb2.StringValue(value="website"),
1267 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"),
1268 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"),
1269 )
1270 )
1272 assert res.display_name == ""
1273 assert res.display_location == "The Bitbucket"
1274 assert res.role == "Lead Tester"
1275 assert res.started_volunteering == "2020-06-01"
1276 assert not res.stopped_volunteering
1277 assert res.show_on_team_page
1278 assert res.link_type == "website"
1279 assert res.link_text == "testervontester.com.invalid"
1280 assert res.link_url == "https://www.testervontester.com.invalid/"
1281 res = account.UpdateMyVolunteerInfo(
1282 account_pb2.UpdateMyVolunteerInfoReq(
1283 display_name=wrappers_pb2.StringValue(value=""),
1284 link_type=wrappers_pb2.StringValue(value="linkedin"),
1285 link_text=wrappers_pb2.StringValue(value="tester-vontester"),
1286 )
1287 )
1288 assert res.display_name == ""
1289 assert res.display_location == "The Bitbucket"
1290 assert res.role == "Lead Tester"
1291 assert res.started_volunteering == "2020-06-01"
1292 assert not res.stopped_volunteering
1293 assert res.show_on_team_page
1294 assert res.link_type == "linkedin"
1295 assert res.link_text == "tester-vontester"
1296 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/"
1298 res = account.UpdateMyVolunteerInfo(
1299 account_pb2.UpdateMyVolunteerInfoReq(
1300 display_name=wrappers_pb2.StringValue(value="Tester"),
1301 display_location=wrappers_pb2.StringValue(value=""),
1302 link_type=wrappers_pb2.StringValue(value="email"),
1303 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"),
1304 )
1305 )
1306 assert res.display_name == "Tester"
1307 assert res.display_location == ""
1308 assert res.role == "Lead Tester"
1309 assert res.started_volunteering == "2020-06-01"
1310 assert not res.stopped_volunteering
1311 assert res.show_on_team_page
1312 assert res.link_type == "email"
1313 assert res.link_text == "tester@vontester.com.invalid"
1314 assert res.link_url == "mailto:tester@vontester.com.invalid"
1316 refresh_materialized_views_rapid(empty_pb2.Empty())
1318 with public_session() as public:
1319 res = public.GetVolunteers(empty_pb2.Empty())
1320 assert len(res.current_volunteers) == 1
1321 v = res.current_volunteers[0]
1322 assert v.name == "Tester"
1323 assert v.username == "tester"
1324 assert v.is_board_member
1325 assert v.role == "Lead Tester"
1326 assert v.location == "Amsterdam"
1327 assert v.img.startswith("http://localhost:5001/img/thumbnail/")
1328 assert v.link_type == "email"
1329 assert v.link_text == "tester@vontester.com.invalid"
1330 assert v.link_url == "mailto:tester@vontester.com.invalid"