Coverage for app/backend/src/tests/test_auth.py: 100%
733 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import http.cookies
2from typing import cast
3from unittest.mock import DEFAULT, patch
5import grpc
6import pytest
7from google.protobuf import empty_pb2, wrappers_pb2
8from sqlalchemy import select, update
9from sqlalchemy.sql import delete, func
11from couchers import urls
12from couchers.crypto import hash_password, random_hex
13from couchers.db import session_scope
14from couchers.models import (
15 ContributeOption,
16 ContributorForm,
17 LoginToken,
18 NonvisibleUserAccess,
19 NonvisibleUserAccessType,
20 NonvisibleUserState,
21 PasswordResetToken,
22 SignupFlow,
23 User,
24 UserSession,
25)
26from couchers.proto import account_pb2, api_pb2, auth_pb2
27from couchers.utils import now
28from tests.fixtures.db import generate_user
29from tests.fixtures.misc import EmailCollector, PushCollector
30from tests.fixtures.sessions import (
31 MetadataKeeperInterceptor,
32 account_session,
33 api_session,
34 auth_api_session,
35 real_api_session,
36)
39@pytest.fixture(autouse=True)
40def _(testconfig, fast_passwords):
41 pass
44def get_session_cookie_tokens(metadata_interceptor: MetadataKeeperInterceptor) -> tuple[str, str]:
45 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"]
46 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value
47 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value
48 return sesh, uid
51def test_UsernameValid(db):
52 with auth_api_session() as (auth_api, metadata_interceptor):
53 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid
55 with auth_api_session() as (auth_api, metadata_interceptor):
56 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid
59def test_signup_incremental(db):
60 with auth_api_session() as (auth_api, metadata_interceptor):
61 res = auth_api.SignupFlow(
62 auth_pb2.SignupFlowReq(
63 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
64 )
65 )
67 flow_token = res.flow_token
68 assert res.flow_token
69 assert not res.HasField("auth_res")
70 assert not res.need_basic
71 assert res.need_account
72 assert not res.need_feedback
73 assert res.need_verify_email
74 assert res.need_accept_community_guidelines
75 assert res.need_motivations
77 # read out the signup token directly from the database for now
78 with session_scope() as session:
79 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
80 assert flow.email_sent
81 assert not flow.email_verified
82 email_token = flow.email_token
84 with auth_api_session() as (auth_api, metadata_interceptor):
85 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token))
87 assert res.flow_token == flow_token
88 assert not res.HasField("auth_res")
89 assert not res.need_basic
90 assert res.need_account
91 assert not res.need_feedback
92 assert res.need_verify_email
93 assert res.need_accept_community_guidelines
94 assert res.need_motivations
96 # Add feedback
97 with auth_api_session() as (auth_api, metadata_interceptor):
98 res = auth_api.SignupFlow(
99 auth_pb2.SignupFlowReq(
100 flow_token=flow_token,
101 feedback=auth_pb2.ContributorForm(
102 ideas="I'm a robot, incapable of original ideation",
103 features="I love all your features",
104 experience="I haven't done couch surfing before",
105 contribute=auth_pb2.CONTRIBUTE_OPTION_YES,
106 contribute_ways=["serving", "backend"],
107 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes",
108 ),
109 )
110 )
112 assert res.flow_token == flow_token
113 assert not res.HasField("auth_res")
114 assert not res.need_basic
115 assert res.need_account
116 assert not res.need_feedback
117 assert res.need_verify_email
118 assert res.need_accept_community_guidelines
119 assert res.need_motivations
121 # Agree to community guidelines
122 with auth_api_session() as (auth_api, metadata_interceptor):
123 res = auth_api.SignupFlow(
124 auth_pb2.SignupFlowReq(
125 flow_token=flow_token,
126 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
127 )
128 )
130 assert res.flow_token == flow_token
131 assert not res.HasField("auth_res")
132 assert not res.need_basic
133 assert res.need_account
134 assert not res.need_feedback
135 assert res.need_verify_email
136 assert not res.need_accept_community_guidelines
137 assert res.need_motivations
139 # Submit motivations
140 with auth_api_session() as (auth_api, metadata_interceptor):
141 res = auth_api.SignupFlow(
142 auth_pb2.SignupFlowReq(
143 flow_token=flow_token,
144 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
145 )
146 )
148 assert res.flow_token == flow_token
149 assert not res.HasField("auth_res")
150 assert not res.need_basic
151 assert res.need_account
152 assert not res.need_feedback
153 assert res.need_verify_email
154 assert not res.need_accept_community_guidelines
155 assert not res.need_motivations
157 # Verify email
158 with auth_api_session() as (auth_api, metadata_interceptor):
159 res = auth_api.SignupFlow(
160 auth_pb2.SignupFlowReq(
161 flow_token=flow_token,
162 email_token=email_token,
163 )
164 )
166 assert res.flow_token == flow_token
167 assert not res.HasField("auth_res")
168 assert not res.need_basic
169 assert res.need_account
170 assert not res.need_feedback
171 assert not res.need_verify_email
172 assert not res.need_accept_community_guidelines
173 assert not res.need_motivations
175 # Finally finish off account info
176 with auth_api_session() as (auth_api, metadata_interceptor):
177 res = auth_api.SignupFlow(
178 auth_pb2.SignupFlowReq(
179 flow_token=flow_token,
180 account=auth_pb2.SignupAccount(
181 username="frodo",
182 password="a very insecure password",
183 birthdate="1970-01-01",
184 gender="Bot",
185 hosting_status=api_pb2.HOSTING_STATUS_MAYBE,
186 city="New York City",
187 lat=40.7331,
188 lng=-73.9778,
189 radius=500,
190 accept_tos=True,
191 ),
192 )
193 )
195 assert not res.flow_token
196 assert res.HasField("auth_res")
197 assert res.auth_res.user_id
198 assert not res.auth_res.jailed
199 assert not res.need_basic
200 assert not res.need_account
201 assert not res.need_feedback
202 assert not res.need_verify_email
203 assert not res.need_accept_community_guidelines
204 assert not res.need_motivations
206 user_id = res.auth_res.user_id
208 sess_token, uid = get_session_cookie_tokens(metadata_interceptor)
209 assert uid == str(user_id)
211 with api_session(sess_token) as api:
212 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id)))
214 assert res.username == "frodo"
215 assert res.gender == "Bot"
216 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE
217 assert res.city == "New York City"
218 assert res.lat == 40.7331
219 assert res.lng == -73.9778
220 assert res.radius == 500
222 with session_scope() as session:
223 form = session.execute(select(ContributorForm)).scalar_one()
225 assert form.ideas == "I'm a robot, incapable of original ideation"
226 assert form.features == "I love all your features"
227 assert form.experience == "I haven't done couch surfing before"
228 assert form.contribute == ContributeOption.yes
229 assert form.contribute_ways == ["serving", "backend"]
230 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes"
233def test_signup_funnel_counters(db):
234 """Each per-step signup funnel counter should fire exactly once across an incremental signup."""
235 with patch.multiple(
236 "couchers.servicers.auth",
237 signup_initiations_counter=DEFAULT,
238 signup_account_filled_counter=DEFAULT,
239 signup_email_verified_counter=DEFAULT,
240 signup_guidelines_accepted_counter=DEFAULT,
241 signup_motivations_filled_counter=DEFAULT,
242 signup_completions_counter=DEFAULT,
243 ) as counters:
244 with auth_api_session() as (auth_api, metadata_interceptor):
245 res = auth_api.SignupFlow(
246 auth_pb2.SignupFlowReq(
247 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
248 )
249 )
250 flow_token = res.flow_token
251 counters["signup_initiations_counter"].inc.assert_called_once()
253 with session_scope() as session:
254 email_token = (
255 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token
256 )
258 with auth_api_session() as (auth_api, metadata_interceptor):
259 auth_api.SignupFlow(
260 auth_pb2.SignupFlowReq(
261 flow_token=flow_token,
262 account=auth_pb2.SignupAccount(
263 username="frodo",
264 password="a very insecure password",
265 birthdate="1970-01-01",
266 gender="Bot",
267 hosting_status=api_pb2.HOSTING_STATUS_MAYBE,
268 city="New York City",
269 lat=40.7331,
270 lng=-73.9778,
271 radius=500,
272 accept_tos=True,
273 ),
274 )
275 )
276 counters["signup_account_filled_counter"].inc.assert_called_once()
278 # accept the guidelines twice; the counter must still only fire once
279 with auth_api_session() as (auth_api, metadata_interceptor):
280 auth_api.SignupFlow(
281 auth_pb2.SignupFlowReq(
282 flow_token=flow_token,
283 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
284 )
285 )
286 with auth_api_session() as (auth_api, metadata_interceptor):
287 auth_api.SignupFlow(
288 auth_pb2.SignupFlowReq(
289 flow_token=flow_token,
290 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
291 )
292 )
293 counters["signup_guidelines_accepted_counter"].inc.assert_called_once()
295 with auth_api_session() as (auth_api, metadata_interceptor):
296 auth_api.SignupFlow(
297 auth_pb2.SignupFlowReq(
298 flow_token=flow_token,
299 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
300 )
301 )
302 counters["signup_motivations_filled_counter"].inc.assert_called_once()
304 with auth_api_session() as (auth_api, metadata_interceptor):
305 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token, email_token=email_token))
306 counters["signup_email_verified_counter"].inc.assert_called_once()
308 assert res.HasField("auth_res")
309 counters["signup_completions_counter"].labels.assert_called_once_with("Bot")
312def _quick_signup() -> int:
313 with auth_api_session() as (auth_api, metadata_interceptor):
314 res = auth_api.SignupFlow(
315 auth_pb2.SignupFlowReq(
316 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
317 account=auth_pb2.SignupAccount(
318 username="frodo",
319 password="a very insecure password",
320 birthdate="1970-01-01",
321 gender="Bot",
322 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
323 city="New York City",
324 lat=40.7331,
325 lng=-73.9778,
326 radius=500,
327 accept_tos=True,
328 ),
329 feedback=auth_pb2.ContributorForm(),
330 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
331 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
332 )
333 )
335 flow_token = res.flow_token
337 assert res.flow_token
338 assert not res.HasField("auth_res")
339 assert not res.need_basic
340 assert not res.need_account
341 assert not res.need_feedback
342 assert not res.need_motivations
343 assert res.need_verify_email
345 # read out the signup token directly from the database for now
346 with session_scope() as session:
347 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
348 assert flow.email_sent
349 assert not flow.email_verified
350 email_token = flow.email_token
352 with auth_api_session() as (auth_api, metadata_interceptor):
353 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
355 assert not res.flow_token
356 assert res.HasField("auth_res")
357 assert res.auth_res.user_id
358 assert not res.auth_res.jailed
359 assert not res.need_basic
360 assert not res.need_account
361 assert not res.need_feedback
362 assert not res.need_motivations
363 assert not res.need_verify_email
365 # make sure we got the right token in a cookie
366 with session_scope() as session:
367 token = session.execute(
368 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
369 ).scalar_one()
370 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
371 assert sesh == token
373 return cast(int, res.auth_res.user_id)
376def test_signup(db):
377 _quick_signup()
380def test_basic_login(db):
381 # Create our test user using signup
382 _quick_signup()
384 with auth_api_session() as (auth_api, metadata_interceptor):
385 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
387 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
389 with session_scope() as session:
390 token = session.execute(
391 select(UserSession.token)
392 .join(User, UserSession.user_id == User.id)
393 .where(User.username == "frodo")
394 .where(UserSession.token == reply_token)
395 .where(UserSession.is_valid)
396 ).scalar_one_or_none()
397 assert token
399 # log out
400 with auth_api_session() as (auth_api, metadata_interceptor):
401 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
404def test_login_part_signed_up_verified_email(db):
405 """
406 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up.
407 """
408 with auth_api_session() as (auth_api, metadata_interceptor):
409 res = auth_api.SignupFlow(
410 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"))
411 )
413 flow_token = res.flow_token
414 assert res.need_verify_email
416 # verify the email
417 with session_scope() as session:
418 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
419 flow_token = flow.flow_token
420 email_token = flow.email_token
421 with auth_api_session() as (auth_api, metadata_interceptor):
422 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
424 with EmailCollector() as email_collector:
425 with auth_api_session() as (auth_api, metadata_interceptor):
426 with pytest.raises(grpc.RpcError) as err:
427 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd"))
428 assert err.value.details() == "Please check your email for a link to continue signing up."
430 email = email_collector.pop_for_recipient("email@couchers.org.invalid", last=True)
431 assert email.recipient == "email@couchers.org.invalid"
432 assert flow_token in email.plain
433 assert flow_token in email.html
436def test_login_part_signed_up_not_verified_email(db):
437 with auth_api_session() as (auth_api, metadata_interceptor):
438 res = auth_api.SignupFlow(
439 auth_pb2.SignupFlowReq(
440 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
441 account=auth_pb2.SignupAccount(
442 username="frodo",
443 password="a very insecure password",
444 birthdate="1999-01-01",
445 gender="Bot",
446 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
447 city="New York City",
448 lat=40.7331,
449 lng=-73.9778,
450 radius=500,
451 accept_tos=True,
452 ),
453 )
454 )
456 flow_token = res.flow_token
457 assert res.need_verify_email
459 with EmailCollector() as email_collector:
460 with auth_api_session() as (auth_api, metadata_interceptor):
461 with pytest.raises(grpc.RpcError) as err:
462 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd"))
463 assert err.value.details() == "Please check your email for a link to continue signing up."
465 with session_scope() as session:
466 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
467 email_token = flow.email_token
469 email = email_collector.pop_for_recipient("email@couchers.org.invalid", last=True)
470 assert email.recipient == "email@couchers.org.invalid"
471 assert email_token
472 assert email_token in email.plain
473 assert email_token in email.html
476def test_banned_user(db):
477 user_id = _quick_signup()
479 with session_scope() as session:
480 session.execute(select(User)).scalar_one().banned_at = now()
482 with auth_api_session() as (auth_api, metadata_interceptor):
483 with pytest.raises(grpc.RpcError) as e:
484 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
485 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
486 assert e.value.details() == "Your account is suspended."
488 with session_scope() as session:
489 access = session.execute(select(NonvisibleUserAccess)).scalar_one()
490 assert access.access_type == NonvisibleUserAccessType.login_attempt
491 assert access.target_state == NonvisibleUserState.banned
492 assert access.target_user_id == user_id
493 assert access.actor_user_id == user_id
496def test_shadowed_user_login_logged(db):
497 user_id = _quick_signup()
499 with session_scope() as session:
500 session.execute(select(User)).scalar_one().shadowed_at = now()
502 with auth_api_session() as (auth_api, metadata_interceptor):
503 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
505 with session_scope() as session:
506 access = session.execute(select(NonvisibleUserAccess)).scalar_one()
507 assert access.access_type == NonvisibleUserAccessType.login_attempt
508 assert access.target_state == NonvisibleUserState.shadowed
509 assert access.target_user_id == user_id
510 assert access.actor_user_id == user_id
513def test_deleted_user(db):
514 user_id = _quick_signup()
516 with session_scope() as session:
517 session.execute(update(User).where(User.id == user_id).values(deleted_at=func.now()))
519 with auth_api_session() as (auth_api, metadata_interceptor):
520 with pytest.raises(grpc.RpcError) as e:
521 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
522 assert e.value.code() == grpc.StatusCode.NOT_FOUND
523 assert e.value.details() == "An account with that username or email was not found."
526def test_invalid_token(db):
527 user1, token1 = generate_user()
528 user2, token2 = generate_user()
530 wrong_token = random_hex(32)
532 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e:
533 res = api.GetUser(api_pb2.GetUserReq(user=user2.username))
535 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
536 assert e.value.details() == "Unauthorized"
539def test_password_reset_v2(db, email_collector: EmailCollector, push_collector: PushCollector):
540 user, token = generate_user(hashed_password=hash_password("mypassword"))
542 with auth_api_session() as (auth_api, metadata_interceptor):
543 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username))
545 with session_scope() as session:
546 password_reset_token = session.execute(select(PasswordResetToken.token)).scalar_one()
548 email = email_collector.pop_for_recipient(user.email, last=True)
549 assert email.recipient == user.email
550 assert "reset" in email.subject.lower()
551 assert password_reset_token in email.plain
552 assert password_reset_token in email.html
553 unique_string = "You asked for your password to be reset on Couchers.org."
554 assert unique_string in email.plain
555 assert unique_string in email.html
556 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in email.plain
557 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in email.html
558 assert "support@couchers.org" in email.plain
559 assert "support@couchers.org" in email.html
561 push = push_collector.pop_for_user(user.id, last=True)
562 assert push.content.title == "Password reset requested"
563 assert push.content.body == "Use the link we sent by email to complete it."
565 # make sure bad password are caught
566 with auth_api_session() as (auth_api, metadata_interceptor):
567 with pytest.raises(grpc.RpcError) as err:
568 auth_api.CompletePasswordResetV2(
569 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password")
570 )
571 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT
572 assert err.value.details() == "The password is insecure. Please use one that is not easily guessable."
574 # make sure we can set a good password
575 with auth_api_session() as (auth_api, metadata_interceptor):
576 pwd = random_hex()
577 auth_api.CompletePasswordResetV2(
578 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd)
579 )
581 push = push_collector.pop_for_user(user.id, last=True)
582 assert push.content.title == "Password reset"
583 assert push.content.body == "Your password was successfully reset."
585 session_token, _ = get_session_cookie_tokens(metadata_interceptor)
587 with session_scope() as session:
588 other_session_token = session.execute(
589 select(UserSession.token)
590 .join(User, UserSession.user_id == User.id)
591 .where(User.username == user.username)
592 .where(UserSession.token == session_token)
593 .where(UserSession.is_valid)
594 ).scalar_one_or_none()
595 assert other_session_token
597 # make sure we can't set a password again
598 with auth_api_session() as (auth_api, metadata_interceptor):
599 with pytest.raises(grpc.RpcError) as err:
600 auth_api.CompletePasswordResetV2(
601 auth_pb2.CompletePasswordResetV2Req(
602 password_reset_token=password_reset_token, new_password=random_hex()
603 )
604 )
605 assert err.value.code() == grpc.StatusCode.NOT_FOUND
606 assert err.value.details() == "Invalid token."
608 with session_scope() as session:
609 user = session.execute(select(User)).scalar_one()
610 assert user.hashed_password == hash_password(pwd)
613def test_password_reset_no_such_user(db):
614 user, token = generate_user()
616 with auth_api_session() as (auth_api, metadata_interceptor):
617 res = auth_api.ResetPassword(
618 auth_pb2.ResetPasswordReq(
619 user="nonexistentuser",
620 )
621 )
623 with session_scope() as session:
624 assert session.execute(select(PasswordResetToken)).scalar_one_or_none() is None
627def test_password_reset_invalid_token_v2(db):
628 password = random_hex()
629 user, token = generate_user(hashed_password=hash_password(password))
631 with auth_api_session() as (auth_api, metadata_interceptor):
632 res = auth_api.ResetPassword(
633 auth_pb2.ResetPasswordReq(
634 user=user.username,
635 )
636 )
638 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e:
639 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken"))
640 assert e.value.code() == grpc.StatusCode.NOT_FOUND
641 assert e.value.details() == "Invalid token."
643 with session_scope() as session:
644 user = session.execute(select(User)).scalar_one()
645 assert user.hashed_password == hash_password(password)
648def test_logout_invalid_token(db):
649 # Create our test user using signup
650 _quick_signup()
652 with auth_api_session() as (auth_api, metadata_interceptor):
653 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
655 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
657 # delete all login tokens
658 with session_scope() as session:
659 session.execute(delete(LoginToken))
661 # log out with non-existent token should still return a valid result
662 with auth_api_session() as (auth_api, metadata_interceptor):
663 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
665 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
666 # make sure we set an empty cookie
667 assert reply_token == ""
670def test_signup_without_password(db):
671 with auth_api_session() as (auth_api, metadata_interceptor):
672 with pytest.raises(grpc.RpcError) as e:
673 auth_api.SignupFlow(
674 auth_pb2.SignupFlowReq(
675 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
676 account=auth_pb2.SignupAccount(
677 username="frodo",
678 password="bad",
679 city="Minas Tirith",
680 birthdate="9999-12-31", # arbitrary future birthdate
681 gender="Robot",
682 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
683 lat=1,
684 lng=1,
685 radius=100,
686 accept_tos=True,
687 ),
688 feedback=auth_pb2.ContributorForm(),
689 )
690 )
691 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
692 assert e.value.details() == "The password must be 8 or more characters long."
695def test_signup_invalid_birthdate(db):
696 with auth_api_session() as (auth_api, metadata_interceptor):
697 with pytest.raises(grpc.RpcError) as e:
698 auth_api.SignupFlow(
699 auth_pb2.SignupFlowReq(
700 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
701 account=auth_pb2.SignupAccount(
702 username="frodo",
703 password="a very insecure password",
704 city="Minas Tirith",
705 birthdate="9999-12-31", # arbitrary future birthdate
706 gender="Robot",
707 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
708 lat=1,
709 lng=1,
710 radius=100,
711 accept_tos=True,
712 ),
713 feedback=auth_pb2.ContributorForm(),
714 )
715 )
716 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
717 assert e.value.details() == "You must be at least 18 years old to sign up."
719 res = auth_api.SignupFlow(
720 auth_pb2.SignupFlowReq(
721 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"),
722 account=auth_pb2.SignupAccount(
723 username="ceelo",
724 password="a very insecure password",
725 city="New York City",
726 birthdate="2000-12-31", # arbitrary birthdate older than 18 years
727 gender="Helicopter",
728 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
729 lat=1,
730 lng=1,
731 radius=100,
732 accept_tos=True,
733 ),
734 feedback=auth_pb2.ContributorForm(),
735 )
736 )
738 assert res.flow_token
740 with pytest.raises(grpc.RpcError) as e:
741 auth_api.SignupFlow(
742 auth_pb2.SignupFlowReq(
743 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"),
744 account=auth_pb2.SignupAccount(
745 username="franklin",
746 password="a very insecure password",
747 city="Los Santos",
748 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs
749 gender="Male",
750 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
751 lat=1,
752 lng=1,
753 radius=100,
754 accept_tos=True,
755 ),
756 feedback=auth_pb2.ContributorForm(),
757 )
758 )
759 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
760 assert e.value.details() == "You must be at least 18 years old to sign up."
762 with session_scope() as session:
763 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1
766def test_signup_invalid_email(db):
767 with auth_api_session() as (auth_api, metadata_interceptor):
768 with pytest.raises(grpc.RpcError) as e:
769 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a")))
770 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
771 assert e.value.details() == "Invalid email."
773 with auth_api_session() as (auth_api, metadata_interceptor):
774 with pytest.raises(grpc.RpcError) as e:
775 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b")))
776 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
777 assert e.value.details() == "Invalid email."
779 with auth_api_session() as (auth_api, metadata_interceptor):
780 with pytest.raises(grpc.RpcError) as e:
781 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.")))
782 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
783 assert e.value.details() == "Invalid email."
785 with auth_api_session() as (auth_api, metadata_interceptor):
786 with pytest.raises(grpc.RpcError) as e:
787 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c")))
788 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
789 assert e.value.details() == "Invalid email."
792def test_signup_existing_email(db):
793 # Signed up user
794 user, _ = generate_user()
796 with auth_api_session() as (auth_api, metadata_interceptor):
797 with pytest.raises(grpc.RpcError) as e:
798 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email)))
799 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
800 assert e.value.details() == "That email address is already associated with an account. Please log in instead!"
803def test_signup_banned_user_email(db):
804 user, _ = generate_user()
806 with session_scope() as session:
807 session.execute(update(User).where(User.id == user.id).values(banned_at=func.now()))
809 with auth_api_session() as (auth_api, _):
810 with pytest.raises(grpc.RpcError) as e:
811 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email)))
812 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
813 assert e.value.details() == "You cannot sign up with that email address."
816def test_signup_deleted_user_email(db):
817 user, _ = generate_user()
819 with session_scope() as session:
820 session.execute(update(User).where(User.id == user.id).values(deleted_at=func.now()))
822 with auth_api_session() as (auth_api, _):
823 with pytest.raises(grpc.RpcError) as e:
824 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email)))
825 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
826 assert e.value.details() == "You cannot sign up with that email address."
829def test_signup_continue_with_email(db):
830 testing_email = f"{random_hex(12)}@couchers.org.invalid"
831 with auth_api_session() as (auth_api, metadata_interceptor):
832 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
833 flow_token = res.flow_token
834 assert flow_token
836 # continue with same email, should just send another email to the user
837 with auth_api_session() as (auth_api, metadata_interceptor):
838 with pytest.raises(grpc.RpcError) as e:
839 res = auth_api.SignupFlow(
840 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
841 )
842 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
843 assert e.value.details() == "Please check your email for a link to continue signing up."
846def test_signup_resend_email(db, email_collector: EmailCollector):
847 with auth_api_session() as (auth_api, metadata_interceptor):
848 res = auth_api.SignupFlow(
849 auth_pb2.SignupFlowReq(
850 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
851 account=auth_pb2.SignupAccount(
852 username="frodo",
853 password="a very insecure password",
854 birthdate="1970-01-01",
855 gender="Bot",
856 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
857 city="New York City",
858 lat=40.7331,
859 lng=-73.9778,
860 radius=500,
861 accept_tos=True,
862 ),
863 feedback=auth_pb2.ContributorForm(),
864 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
865 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
866 )
867 )
869 email_collector.pop_for_recipient("email@couchers.org.invalid", last=True)
871 flow_token = res.flow_token
872 assert flow_token
874 with session_scope() as session:
875 flow = session.execute(select(SignupFlow)).scalar_one()
876 assert flow.flow_token == flow_token
877 assert flow.email_sent
878 assert not flow.email_verified
879 email_token = flow.email_token
881 # ask for a new signup email
882 with auth_api_session() as (auth_api, metadata_interceptor):
883 res = auth_api.SignupFlow(
884 auth_pb2.SignupFlowReq(
885 flow_token=flow_token,
886 resend_verification_email=True,
887 )
888 )
890 email = email_collector.pop_for_recipient("email@couchers.org.invalid", last=True)
891 assert email_token
892 assert email_token in email.plain
893 assert email_token in email.html
895 with session_scope() as session:
896 flow = session.execute(select(SignupFlow)).scalar_one()
897 assert not flow.email_verified
899 with auth_api_session() as (auth_api, metadata_interceptor):
900 res = auth_api.SignupFlow(
901 auth_pb2.SignupFlowReq(
902 email_token=email_token,
903 )
904 )
906 assert not res.flow_token
907 assert res.HasField("auth_res")
910def test_successful_authenticate(db):
911 user, _ = generate_user(hashed_password=hash_password("password"))
913 # Authenticate with username
914 with auth_api_session() as (auth_api, metadata_interceptor):
915 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password"))
916 assert not reply.jailed
918 # Authenticate with email
919 with auth_api_session() as (auth_api, metadata_interceptor):
920 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password"))
921 assert not reply.jailed
924def test_unsuccessful_authenticate(db):
925 user, _ = generate_user(hashed_password=hash_password("password"))
927 # Invalid password
928 with auth_api_session() as (auth_api, metadata_interceptor):
929 with pytest.raises(grpc.RpcError) as e:
930 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword"))
931 assert e.value.code() == grpc.StatusCode.NOT_FOUND
932 assert e.value.details() == "Wrong username/email or password."
934 # Invalid username
935 with auth_api_session() as (auth_api, metadata_interceptor):
936 with pytest.raises(grpc.RpcError) as e:
937 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password"))
938 assert e.value.code() == grpc.StatusCode.NOT_FOUND
939 assert e.value.details() == "An account with that username or email was not found."
941 # Invalid email
942 with auth_api_session() as (auth_api, metadata_interceptor):
943 with pytest.raises(grpc.RpcError) as e:
944 reply = auth_api.Authenticate(
945 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password")
946 )
947 assert e.value.code() == grpc.StatusCode.NOT_FOUND
948 assert e.value.details() == "An account with that username or email was not found."
950 # Invalid id
951 with auth_api_session() as (auth_api, metadata_interceptor):
952 with pytest.raises(grpc.RpcError) as e:
953 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password"))
954 assert e.value.code() == grpc.StatusCode.NOT_FOUND
955 assert e.value.details() == "An account with that username or email was not found."
958def test_complete_signup(db):
959 testing_email = f"{random_hex(12)}@couchers.org.invalid"
960 with auth_api_session() as (auth_api, metadata_interceptor):
961 reply = auth_api.SignupFlow(
962 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
963 )
965 flow_token = reply.flow_token
967 with auth_api_session() as (auth_api, metadata_interceptor):
968 # Invalid username
969 with pytest.raises(grpc.RpcError) as e:
970 auth_api.SignupFlow(
971 auth_pb2.SignupFlowReq(
972 flow_token=flow_token,
973 account=auth_pb2.SignupAccount(
974 username=" ",
975 password="a very insecure password",
976 city="Minas Tirith",
977 birthdate="1980-12-31",
978 gender="Robot",
979 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
980 lat=1,
981 lng=1,
982 radius=100,
983 accept_tos=True,
984 ),
985 )
986 )
987 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
988 assert e.value.details() == "Invalid username."
990 with auth_api_session() as (auth_api, metadata_interceptor):
991 # Invalid name
992 with pytest.raises(grpc.RpcError) as e:
993 auth_api.SignupFlow(
994 auth_pb2.SignupFlowReq(
995 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid")
996 )
997 )
998 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
999 assert e.value.details() == "Name not supported."
1001 with auth_api_session() as (auth_api, metadata_interceptor):
1002 # Hosting status required
1003 with pytest.raises(grpc.RpcError) as e:
1004 auth_api.SignupFlow(
1005 auth_pb2.SignupFlowReq(
1006 flow_token=flow_token,
1007 account=auth_pb2.SignupAccount(
1008 username="frodo",
1009 password="a very insecure password",
1010 city="Minas Tirith",
1011 birthdate="1980-12-31",
1012 gender="Robot",
1013 hosting_status=None,
1014 lat=1,
1015 lng=1,
1016 radius=100,
1017 accept_tos=True,
1018 ),
1019 )
1020 )
1021 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1022 assert e.value.details() == "Hosting status is required."
1024 user, _ = generate_user()
1025 with auth_api_session() as (auth_api, metadata_interceptor):
1026 # Username unavailable
1027 with pytest.raises(grpc.RpcError) as e:
1028 auth_api.SignupFlow(
1029 auth_pb2.SignupFlowReq(
1030 flow_token=flow_token,
1031 account=auth_pb2.SignupAccount(
1032 username=user.username,
1033 password="a very insecure password",
1034 city="Minas Tirith",
1035 birthdate="1980-12-31",
1036 gender="Robot",
1037 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1038 lat=1,
1039 lng=1,
1040 radius=100,
1041 accept_tos=True,
1042 ),
1043 )
1044 )
1045 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1046 assert e.value.details() == "Sorry, that username isn't available."
1048 with auth_api_session() as (auth_api, metadata_interceptor):
1049 # Invalid coordinate
1050 with pytest.raises(grpc.RpcError) as e:
1051 auth_api.SignupFlow(
1052 auth_pb2.SignupFlowReq(
1053 flow_token=flow_token,
1054 account=auth_pb2.SignupAccount(
1055 username="frodo",
1056 password="a very insecure password",
1057 city="Minas Tirith",
1058 birthdate="1980-12-31",
1059 gender="Robot",
1060 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1061 lat=0,
1062 lng=0,
1063 radius=100,
1064 accept_tos=True,
1065 ),
1066 )
1067 )
1068 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
1069 assert e.value.details() == "Invalid coordinate."
1072def test_signup_token_regression(db):
1073 # Repro steps:
1074 # 1. Start a signup
1075 # 2. Confirm the email
1076 # 3. Start a new signup with the same email
1077 # Expected: send a link to the email to continue signing up.
1078 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'`
1080 testing_email = f"{random_hex(12)}@couchers.org.invalid"
1082 # 1. Start a signup
1083 with auth_api_session() as (auth_api, metadata_interceptor):
1084 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
1085 flow_token = res.flow_token
1086 assert flow_token
1088 # 2. Confirm the email
1089 with session_scope() as session:
1090 email_token = session.execute(
1091 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token)
1092 ).scalar_one()
1094 with auth_api_session() as (auth_api, metadata_interceptor):
1095 auth_api.SignupFlow(
1096 auth_pb2.SignupFlowReq(
1097 flow_token=flow_token,
1098 email_token=email_token,
1099 )
1100 )
1102 # 3. Start a new signup with the same email
1103 with auth_api_session() as (auth_api, metadata_interceptor):
1104 with pytest.raises(grpc.RpcError) as e:
1105 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
1106 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1107 assert e.value.details() == "Please check your email for a link to continue signing up."
1110@pytest.mark.parametrize("opt_out", [True, False])
1111def test_opt_out_of_newsletter(db, opt_out):
1112 with auth_api_session() as (auth_api, metadata_interceptor):
1113 res = auth_api.SignupFlow(
1114 auth_pb2.SignupFlowReq(
1115 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1116 account=auth_pb2.SignupAccount(
1117 username="frodo",
1118 password="a very insecure password",
1119 birthdate="1970-01-01",
1120 gender="Bot",
1121 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1122 city="New York City",
1123 lat=40.7331,
1124 lng=-73.9778,
1125 radius=500,
1126 accept_tos=True,
1127 opt_out_of_newsletter=opt_out,
1128 ),
1129 feedback=auth_pb2.ContributorForm(),
1130 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1131 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
1132 )
1133 )
1135 with session_scope() as session:
1136 email_token = session.execute(
1137 select(SignupFlow.email_token).where(SignupFlow.flow_token == res.flow_token)
1138 ).scalar_one()
1140 with auth_api_session() as (auth_api, metadata_interceptor):
1141 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1143 user_id = res.auth_res.user_id
1145 with session_scope() as session:
1146 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1147 assert not user.in_sync_with_newsletter
1148 assert user.opt_out_of_newsletter == opt_out
1151def test_GetAuthState(db):
1152 user, token = generate_user()
1153 jailed_user, jailed_token = generate_user(accepted_tos=0)
1155 with auth_api_session() as (auth_api, metadata_interceptor):
1156 res = auth_api.GetAuthState(empty_pb2.Empty())
1157 assert not res.logged_in
1158 assert not res.HasField("auth_res")
1160 with auth_api_session() as (auth_api, metadata_interceptor):
1161 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1162 assert res.logged_in
1163 assert res.HasField("auth_res")
1164 assert res.auth_res.user_id == user.id
1165 assert not res.auth_res.jailed
1167 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1169 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1170 assert not res.logged_in
1171 assert not res.HasField("auth_res")
1173 with auth_api_session() as (auth_api, metadata_interceptor):
1174 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),))
1175 assert res.logged_in
1176 assert res.HasField("auth_res")
1177 assert res.auth_res.user_id == jailed_user.id
1178 assert res.auth_res.jailed
1181def test_signup_no_feedback_regression(db):
1182 """
1183 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup,
1184 this regression test checks that.
1185 """
1186 with auth_api_session() as (auth_api, metadata_interceptor):
1187 res = auth_api.SignupFlow(
1188 auth_pb2.SignupFlowReq(
1189 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1190 account=auth_pb2.SignupAccount(
1191 username="frodo",
1192 password="a very insecure password",
1193 birthdate="1970-01-01",
1194 gender="Bot",
1195 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1196 city="New York City",
1197 lat=40.7331,
1198 lng=-73.9778,
1199 radius=500,
1200 accept_tos=True,
1201 ),
1202 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1203 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
1204 )
1205 )
1207 flow_token = res.flow_token
1209 assert res.flow_token
1210 assert not res.HasField("auth_res")
1211 assert not res.need_basic
1212 assert not res.need_account
1213 assert not res.need_feedback
1214 assert not res.need_motivations
1215 assert res.need_verify_email
1217 # read out the signup token directly from the database for now
1218 with session_scope() as session:
1219 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1220 assert flow.email_sent
1221 assert not flow.email_verified
1222 email_token = flow.email_token
1224 with auth_api_session() as (auth_api, metadata_interceptor):
1225 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1227 assert not res.flow_token
1228 assert res.HasField("auth_res")
1229 assert res.auth_res.user_id
1230 assert not res.auth_res.jailed
1231 assert not res.need_basic
1232 assert not res.need_account
1233 assert not res.need_feedback
1234 assert not res.need_motivations
1235 assert not res.need_verify_email
1237 # make sure we got the right token in a cookie
1238 with session_scope() as session:
1239 token = session.execute(
1240 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
1241 ).scalar_one()
1242 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
1243 assert sesh == token
1246def test_banned_username(db):
1247 testing_email = f"{random_hex(12)}@couchers.org.invalid"
1248 with auth_api_session() as (auth_api, metadata_interceptor):
1249 reply = auth_api.SignupFlow(
1250 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
1251 )
1253 flow_token = reply.flow_token
1255 with auth_api_session() as (auth_api, metadata_interceptor):
1256 # Banned username
1257 with pytest.raises(grpc.RpcError) as e:
1258 auth_api.SignupFlow(
1259 auth_pb2.SignupFlowReq(
1260 flow_token=flow_token,
1261 account=auth_pb2.SignupAccount(
1262 username="thecouchersadminaccount",
1263 password="a very insecure password",
1264 city="Minas Tirith",
1265 birthdate="1980-12-31",
1266 gender="Robot",
1267 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1268 lat=1,
1269 lng=1,
1270 radius=100,
1271 accept_tos=True,
1272 ),
1273 )
1274 )
1275 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1276 assert e.value.details() == "Sorry, that username isn't available."
1279# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*
1282def test_GetInviteCodeInfo(db):
1283 user, token = generate_user(complete_profile=True)
1285 with account_session(token) as account:
1286 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1288 with auth_api_session() as (auth, _):
1289 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code))
1290 assert res.name == user.name
1291 assert res.username == user.username
1292 # Avatar URL should be a thumbnail URL with a hashed filename
1293 assert "/img/thumbnail/" in res.avatar_url
1294 assert res.avatar_url.endswith(".jpg")
1295 # Verify the hashed filename looks correct (64 char hex hash)
1296 assert len(res.avatar_url.split("/")[-1].replace(".jpg", "")) == 64
1297 assert res.url == urls.invite_code_link(code=code)
1300def test_GetInviteCodeInfo_no_avatar(db):
1301 user, token = generate_user(complete_profile=False)
1303 with account_session(token) as account:
1304 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1306 with auth_api_session() as (auth, _):
1307 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code))
1308 assert res.name == user.name
1309 assert res.username == user.username
1310 assert res.avatar_url == ""
1311 assert res.url == urls.invite_code_link(code=code)
1314def test_GetInviteCodeInfo_not_found(db):
1315 generate_user()
1317 with auth_api_session() as (auth, _):
1318 with pytest.raises(grpc.RpcError) as e:
1319 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE"))
1320 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1321 assert e.value.details() == "Invite code not found."
1324def test_SignupFlow_invite_code(db):
1325 user, token = generate_user()
1327 with account_session(token) as account:
1328 invite_code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1330 with auth_api_session() as (auth_api, _):
1331 # Signup basic step with invite code
1332 res = auth_api.SignupFlow(
1333 auth_pb2.SignupFlowReq(
1334 basic=auth_pb2.SignupBasic(
1335 name="Test User",
1336 email="inviteuser@example.com",
1337 invite_code=invite_code,
1338 )
1339 )
1340 )
1341 flow_token = res.flow_token
1342 assert flow_token
1344 # Confirm email
1345 with session_scope() as session:
1346 email_token = session.execute(
1347 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token)
1348 ).scalar_one()
1350 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1352 # Signup account step
1353 auth_api.SignupFlow(
1354 auth_pb2.SignupFlowReq(
1355 flow_token=flow_token,
1356 account=auth_pb2.SignupAccount(
1357 username="invited_user",
1358 password="secure password",
1359 birthdate="1990-01-01",
1360 gender="Other",
1361 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1362 city="Example City",
1363 lat=1,
1364 lng=5,
1365 radius=100,
1366 accept_tos=True,
1367 ),
1368 feedback=auth_pb2.ContributorForm(),
1369 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1370 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
1371 )
1372 )
1374 # Check that invite_code_id is stored in the final User object
1375 with session_scope() as session:
1376 invite_code_id = session.execute(
1377 select(User.invite_code_id).where(User.username == "invited_user")
1378 ).scalar_one()
1379 assert invite_code_id == invite_code
1382def test_signup_with_motivations(db):
1383 """
1384 Test signup flow with the new motivations step (heard_about_couchers and signup_motivations)
1385 """
1386 with auth_api_session() as (auth_api, metadata_interceptor):
1387 res = auth_api.SignupFlow(
1388 auth_pb2.SignupFlowReq(
1389 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1390 account=auth_pb2.SignupAccount(
1391 username="intentuser",
1392 password="a very insecure password",
1393 birthdate="1970-01-01",
1394 gender="Bot",
1395 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1396 city="New York City",
1397 lat=40.7331,
1398 lng=-73.9778,
1399 radius=500,
1400 accept_tos=True,
1401 ),
1402 motivations=auth_pb2.SignupMotivations(
1403 heard_about_couchers="friend",
1404 motivations=["hosting", "surfing", "events"],
1405 ),
1406 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1407 )
1408 )
1410 flow_token = res.flow_token
1411 assert flow_token
1412 assert not res.HasField("auth_res")
1413 assert res.need_verify_email
1415 # Verify the motivations are stored in the SignupFlow
1416 with session_scope() as session:
1417 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1418 assert flow.heard_about_couchers == "friend"
1419 assert set(flow.signup_motivations) == {"hosting", "surfing", "events"}
1420 email_token = flow.email_token
1422 # Complete signup by verifying email
1423 with auth_api_session() as (auth_api, metadata_interceptor):
1424 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1426 assert res.HasField("auth_res")
1427 user_id = res.auth_res.user_id
1429 # Verify the motivations are transferred to the User object
1430 with session_scope() as session:
1431 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1432 assert user.heard_about_couchers == "friend"
1433 assert user.signup_motivations is not None
1434 assert set(user.signup_motivations) == {"hosting", "surfing", "events"}
1437def test_signup_motivations_incremental(db):
1438 """
1439 Test that motivations can be submitted incrementally as a separate step
1440 """
1441 with auth_api_session() as (auth_api, metadata_interceptor):
1442 # First, basic signup
1443 res = auth_api.SignupFlow(
1444 auth_pb2.SignupFlowReq(
1445 basic=auth_pb2.SignupBasic(name="testing", email="email2@couchers.org.invalid"),
1446 )
1447 )
1449 flow_token = res.flow_token
1450 assert flow_token
1451 assert res.need_account
1452 assert res.need_motivations # New field
1454 # Submit motivations separately
1455 with auth_api_session() as (auth_api, metadata_interceptor):
1456 res = auth_api.SignupFlow(
1457 auth_pb2.SignupFlowReq(
1458 flow_token=flow_token,
1459 motivations=auth_pb2.SignupMotivations(
1460 heard_about_couchers="social_media",
1461 motivations=["surfing"],
1462 ),
1463 )
1464 )
1466 assert res.flow_token == flow_token
1467 assert not res.need_motivations # Should be filled now
1468 assert res.need_account # Still need account
1470 # Verify motivations are stored
1471 with session_scope() as session:
1472 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1473 assert flow.heard_about_couchers == "social_media"
1474 assert flow.signup_motivations == ["surfing"]
1477def test_signup_motivations_cannot_be_refilled(db):
1478 """
1479 Test that motivations cannot be submitted twice
1480 """
1481 with auth_api_session() as (auth_api, metadata_interceptor):
1482 res = auth_api.SignupFlow(
1483 auth_pb2.SignupFlowReq(
1484 basic=auth_pb2.SignupBasic(name="testing", email="email3@couchers.org.invalid"),
1485 motivations=auth_pb2.SignupMotivations(
1486 heard_about_couchers="friend",
1487 motivations=["hosting"],
1488 ),
1489 )
1490 )
1492 flow_token = res.flow_token
1494 # Try to submit motivations again - should fail
1495 with auth_api_session() as (auth_api, metadata_interceptor):
1496 with pytest.raises(grpc.RpcError) as e:
1497 auth_api.SignupFlow(
1498 auth_pb2.SignupFlowReq(
1499 flow_token=flow_token,
1500 motivations=auth_pb2.SignupMotivations(
1501 heard_about_couchers="different_source",
1502 motivations=["surfing"],
1503 ),
1504 )
1505 )
1506 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1507 assert e.value.details() == "You've already told us about why you are signing up."
1510def test_signup_motivations_required(db):
1511 """
1512 Test that signup cannot complete without providing motivations
1513 """
1514 with auth_api_session() as (auth_api, metadata_interceptor):
1515 res = auth_api.SignupFlow(
1516 auth_pb2.SignupFlowReq(
1517 basic=auth_pb2.SignupBasic(name="testing", email="email4@couchers.org.invalid"),
1518 account=auth_pb2.SignupAccount(
1519 username="nointents",
1520 password="a very insecure password",
1521 birthdate="1970-01-01",
1522 gender="Bot",
1523 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1524 city="New York City",
1525 lat=40.7331,
1526 lng=-73.9778,
1527 radius=500,
1528 accept_tos=True,
1529 ),
1530 # No motivations provided
1531 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1532 )
1533 )
1535 flow_token = res.flow_token
1536 assert not res.HasField("auth_res")
1537 assert res.need_motivations # Intents still required
1539 with session_scope() as session:
1540 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1541 email_token = flow.email_token
1543 # Verify email - signup still not complete without motivations
1544 with auth_api_session() as (auth_api, metadata_interceptor):
1545 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1547 assert not res.HasField("auth_res")
1548 assert res.need_motivations
1550 # Now submit motivations
1551 with auth_api_session() as (auth_api, metadata_interceptor):
1552 res = auth_api.SignupFlow(
1553 auth_pb2.SignupFlowReq(
1554 flow_token=flow_token,
1555 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
1556 )
1557 )
1559 assert res.HasField("auth_res")
1560 user_id = res.auth_res.user_id
1562 with session_scope() as session:
1563 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1564 assert user.signup_motivations == ["surfing"]
1567def test_signup_motivations_all_options(db):
1568 """
1569 Test all the different motivation options
1570 """
1571 with auth_api_session() as (auth_api, metadata_interceptor):
1572 res = auth_api.SignupFlow(
1573 auth_pb2.SignupFlowReq(
1574 basic=auth_pb2.SignupBasic(name="testing", email="email5@couchers.org.invalid"),
1575 motivations=auth_pb2.SignupMotivations(
1576 heard_about_couchers="other",
1577 motivations=["hosting", "surfing", "events"],
1578 ),
1579 )
1580 )
1582 flow_token = res.flow_token
1584 with session_scope() as session:
1585 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1586 assert flow.heard_about_couchers == "other"
1587 assert set(flow.signup_motivations) == {"hosting", "surfing", "events"}
1590def test_signup_motivations_empty_motivations_list(db):
1591 """
1592 Test that providing heard_about but empty motivations list is valid
1593 """
1594 with auth_api_session() as (auth_api, metadata_interceptor):
1595 res = auth_api.SignupFlow(
1596 auth_pb2.SignupFlowReq(
1597 basic=auth_pb2.SignupBasic(name="testing", email="email6@couchers.org.invalid"),
1598 motivations=auth_pb2.SignupMotivations(
1599 heard_about_couchers="former_cs_member",
1600 motivations=[], # No specific motivations selected
1601 ),
1602 )
1603 )
1605 flow_token = res.flow_token
1607 with session_scope() as session:
1608 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1609 assert flow.heard_about_couchers == "former_cs_member"
1610 assert flow.signup_motivations == []