Coverage for app / backend / src / tests / test_auth.py: 100%
701 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import http.cookies
2from typing import cast
4import grpc
5import pytest
6from google.protobuf import empty_pb2, wrappers_pb2
7from sqlalchemy import select, update
8from sqlalchemy.sql import delete, func
10from couchers import urls
11from couchers.crypto import hash_password, random_hex
12from couchers.db import session_scope
13from couchers.models import (
14 ContributeOption,
15 ContributorForm,
16 LoginToken,
17 PasswordResetToken,
18 SignupFlow,
19 User,
20 UserSession,
21)
22from couchers.proto import account_pb2, api_pb2, auth_pb2
23from couchers.utils import now
24from tests.fixtures.db import generate_user
25from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email
26from tests.fixtures.sessions import (
27 MetadataKeeperInterceptor,
28 account_session,
29 api_session,
30 auth_api_session,
31 real_api_session,
32)
35@pytest.fixture(autouse=True)
36def _(testconfig, fast_passwords):
37 pass
40def get_session_cookie_tokens(metadata_interceptor: MetadataKeeperInterceptor) -> tuple[str, str]:
41 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"]
42 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value
43 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value
44 return sesh, uid
47def test_UsernameValid(db):
48 with auth_api_session() as (auth_api, metadata_interceptor):
49 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid
51 with auth_api_session() as (auth_api, metadata_interceptor):
52 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid
55def test_signup_incremental(db):
56 with auth_api_session() as (auth_api, metadata_interceptor):
57 res = auth_api.SignupFlow(
58 auth_pb2.SignupFlowReq(
59 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
60 )
61 )
63 flow_token = res.flow_token
64 assert res.flow_token
65 assert not res.HasField("auth_res")
66 assert not res.need_basic
67 assert res.need_account
68 assert not res.need_feedback
69 assert res.need_verify_email
70 assert res.need_accept_community_guidelines
71 assert res.need_motivations
73 # read out the signup token directly from the database for now
74 with session_scope() as session:
75 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
76 assert flow.email_sent
77 assert not flow.email_verified
78 email_token = flow.email_token
80 with auth_api_session() as (auth_api, metadata_interceptor):
81 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token))
83 assert res.flow_token == flow_token
84 assert not res.HasField("auth_res")
85 assert not res.need_basic
86 assert res.need_account
87 assert not res.need_feedback
88 assert res.need_verify_email
89 assert res.need_accept_community_guidelines
90 assert res.need_motivations
92 # Add feedback
93 with auth_api_session() as (auth_api, metadata_interceptor):
94 res = auth_api.SignupFlow(
95 auth_pb2.SignupFlowReq(
96 flow_token=flow_token,
97 feedback=auth_pb2.ContributorForm(
98 ideas="I'm a robot, incapable of original ideation",
99 features="I love all your features",
100 experience="I haven't done couch surfing before",
101 contribute=auth_pb2.CONTRIBUTE_OPTION_YES,
102 contribute_ways=["serving", "backend"],
103 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes",
104 ),
105 )
106 )
108 assert res.flow_token == flow_token
109 assert not res.HasField("auth_res")
110 assert not res.need_basic
111 assert res.need_account
112 assert not res.need_feedback
113 assert res.need_verify_email
114 assert res.need_accept_community_guidelines
115 assert res.need_motivations
117 # Agree to community guidelines
118 with auth_api_session() as (auth_api, metadata_interceptor):
119 res = auth_api.SignupFlow(
120 auth_pb2.SignupFlowReq(
121 flow_token=flow_token,
122 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
123 )
124 )
126 assert res.flow_token == flow_token
127 assert not res.HasField("auth_res")
128 assert not res.need_basic
129 assert res.need_account
130 assert not res.need_feedback
131 assert res.need_verify_email
132 assert not res.need_accept_community_guidelines
133 assert res.need_motivations
135 # Submit motivations
136 with auth_api_session() as (auth_api, metadata_interceptor):
137 res = auth_api.SignupFlow(
138 auth_pb2.SignupFlowReq(
139 flow_token=flow_token,
140 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
141 )
142 )
144 assert res.flow_token == flow_token
145 assert not res.HasField("auth_res")
146 assert not res.need_basic
147 assert res.need_account
148 assert not res.need_feedback
149 assert res.need_verify_email
150 assert not res.need_accept_community_guidelines
151 assert not res.need_motivations
153 # Verify email
154 with auth_api_session() as (auth_api, metadata_interceptor):
155 res = auth_api.SignupFlow(
156 auth_pb2.SignupFlowReq(
157 flow_token=flow_token,
158 email_token=email_token,
159 )
160 )
162 assert res.flow_token == flow_token
163 assert not res.HasField("auth_res")
164 assert not res.need_basic
165 assert res.need_account
166 assert not res.need_feedback
167 assert not res.need_verify_email
168 assert not res.need_accept_community_guidelines
169 assert not res.need_motivations
171 # Finally finish off account info
172 with auth_api_session() as (auth_api, metadata_interceptor):
173 res = auth_api.SignupFlow(
174 auth_pb2.SignupFlowReq(
175 flow_token=flow_token,
176 account=auth_pb2.SignupAccount(
177 username="frodo",
178 password="a very insecure password",
179 birthdate="1970-01-01",
180 gender="Bot",
181 hosting_status=api_pb2.HOSTING_STATUS_MAYBE,
182 city="New York City",
183 lat=40.7331,
184 lng=-73.9778,
185 radius=500,
186 accept_tos=True,
187 ),
188 )
189 )
191 assert not res.flow_token
192 assert res.HasField("auth_res")
193 assert res.auth_res.user_id
194 assert not res.auth_res.jailed
195 assert not res.need_basic
196 assert not res.need_account
197 assert not res.need_feedback
198 assert not res.need_verify_email
199 assert not res.need_accept_community_guidelines
200 assert not res.need_motivations
202 user_id = res.auth_res.user_id
204 sess_token, uid = get_session_cookie_tokens(metadata_interceptor)
205 assert uid == str(user_id)
207 with api_session(sess_token) as api:
208 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id)))
210 assert res.username == "frodo"
211 assert res.gender == "Bot"
212 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE
213 assert res.city == "New York City"
214 assert res.lat == 40.7331
215 assert res.lng == -73.9778
216 assert res.radius == 500
218 with session_scope() as session:
219 form = session.execute(select(ContributorForm)).scalar_one()
221 assert form.ideas == "I'm a robot, incapable of original ideation"
222 assert form.features == "I love all your features"
223 assert form.experience == "I haven't done couch surfing before"
224 assert form.contribute == ContributeOption.yes
225 assert form.contribute_ways == ["serving", "backend"]
226 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes"
229def _quick_signup() -> int:
230 with auth_api_session() as (auth_api, metadata_interceptor):
231 res = auth_api.SignupFlow(
232 auth_pb2.SignupFlowReq(
233 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
234 account=auth_pb2.SignupAccount(
235 username="frodo",
236 password="a very insecure password",
237 birthdate="1970-01-01",
238 gender="Bot",
239 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
240 city="New York City",
241 lat=40.7331,
242 lng=-73.9778,
243 radius=500,
244 accept_tos=True,
245 ),
246 feedback=auth_pb2.ContributorForm(),
247 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
248 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
249 )
250 )
252 flow_token = res.flow_token
254 assert res.flow_token
255 assert not res.HasField("auth_res")
256 assert not res.need_basic
257 assert not res.need_account
258 assert not res.need_feedback
259 assert not res.need_motivations
260 assert res.need_verify_email
262 # read out the signup token directly from the database for now
263 with session_scope() as session:
264 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
265 assert flow.email_sent
266 assert not flow.email_verified
267 email_token = flow.email_token
269 with auth_api_session() as (auth_api, metadata_interceptor):
270 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
272 assert not res.flow_token
273 assert res.HasField("auth_res")
274 assert res.auth_res.user_id
275 assert not res.auth_res.jailed
276 assert not res.need_basic
277 assert not res.need_account
278 assert not res.need_feedback
279 assert not res.need_motivations
280 assert not res.need_verify_email
282 # make sure we got the right token in a cookie
283 with session_scope() as session:
284 token = session.execute(
285 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
286 ).scalar_one()
287 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
288 assert sesh == token
290 return cast(int, res.auth_res.user_id)
293def test_signup(db):
294 _quick_signup()
297def test_basic_login(db):
298 # Create our test user using signup
299 _quick_signup()
301 with auth_api_session() as (auth_api, metadata_interceptor):
302 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
304 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
306 with session_scope() as session:
307 token = session.execute(
308 select(UserSession.token)
309 .join(User, UserSession.user_id == User.id)
310 .where(User.username == "frodo")
311 .where(UserSession.token == reply_token)
312 .where(UserSession.is_valid)
313 ).scalar_one_or_none()
314 assert token
316 # log out
317 with auth_api_session() as (auth_api, metadata_interceptor):
318 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
321def test_login_part_signed_up_verified_email(db):
322 """
323 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up.
324 """
325 with auth_api_session() as (auth_api, metadata_interceptor):
326 res = auth_api.SignupFlow(
327 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"))
328 )
330 flow_token = res.flow_token
331 assert res.need_verify_email
333 # verify the email
334 with session_scope() as session:
335 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
336 flow_token = flow.flow_token
337 email_token = flow.email_token
338 with auth_api_session() as (auth_api, metadata_interceptor):
339 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
341 with mock_notification_email() as mock:
342 with auth_api_session() as (auth_api, metadata_interceptor):
343 with pytest.raises(grpc.RpcError) as err:
344 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd"))
345 assert err.value.details() == "Please check your email for a link to continue signing up."
347 assert mock.call_count == 1
348 e = email_fields(mock)
349 assert e.recipient == "email@couchers.org.invalid"
350 assert flow_token in e.plain
351 assert flow_token in e.html
354def test_login_part_signed_up_not_verified_email(db):
355 with auth_api_session() as (auth_api, metadata_interceptor):
356 res = auth_api.SignupFlow(
357 auth_pb2.SignupFlowReq(
358 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
359 account=auth_pb2.SignupAccount(
360 username="frodo",
361 password="a very insecure password",
362 birthdate="1999-01-01",
363 gender="Bot",
364 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
365 city="New York City",
366 lat=40.7331,
367 lng=-73.9778,
368 radius=500,
369 accept_tos=True,
370 ),
371 )
372 )
374 flow_token = res.flow_token
375 assert res.need_verify_email
377 with mock_notification_email() as mock:
378 with auth_api_session() as (auth_api, metadata_interceptor):
379 with pytest.raises(grpc.RpcError) as err:
380 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd"))
381 assert err.value.details() == "Please check your email for a link to continue signing up."
383 with session_scope() as session:
384 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
385 email_token = flow.email_token
387 assert mock.call_count == 1
388 e = email_fields(mock)
389 assert e.recipient == "email@couchers.org.invalid"
390 assert email_token
391 assert email_token in e.plain
392 assert email_token in e.html
395def test_banned_user(db):
396 _quick_signup()
398 with session_scope() as session:
399 session.execute(select(User)).scalar_one().banned_at = now()
401 with auth_api_session() as (auth_api, metadata_interceptor):
402 with pytest.raises(grpc.RpcError) as e:
403 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
404 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
405 assert e.value.details() == "Your account is suspended."
408def test_deleted_user(db):
409 user_id = _quick_signup()
411 with session_scope() as session:
412 session.execute(update(User).where(User.id == user_id).values(deleted_at=func.now()))
414 with auth_api_session() as (auth_api, metadata_interceptor):
415 with pytest.raises(grpc.RpcError) as e:
416 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
417 assert e.value.code() == grpc.StatusCode.NOT_FOUND
418 assert e.value.details() == "An account with that username or email was not found."
421def test_invalid_token(db):
422 user1, token1 = generate_user()
423 user2, token2 = generate_user()
425 wrong_token = random_hex(32)
427 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e:
428 res = api.GetUser(api_pb2.GetUserReq(user=user2.username))
430 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
431 assert e.value.details() == "Unauthorized"
434def test_password_reset_v2(db, push_collector: PushCollector):
435 user, token = generate_user(hashed_password=hash_password("mypassword"))
437 with auth_api_session() as (auth_api, metadata_interceptor):
438 with mock_notification_email() as mock:
439 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username))
441 with session_scope() as session:
442 password_reset_token = session.execute(select(PasswordResetToken.token)).scalar_one()
444 assert mock.call_count == 1
445 e = email_fields(mock)
446 assert e.recipient == user.email
447 assert "reset" in e.subject.lower()
448 assert password_reset_token in e.plain
449 assert password_reset_token in e.html
450 unique_string = "You asked for your password to be reset on Couchers.org."
451 assert unique_string in e.plain
452 assert unique_string in e.html
453 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain
454 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html
455 assert "support@couchers.org" in e.plain
456 assert "support@couchers.org" in e.html
458 push = push_collector.pop_for_user(user.id, last=True)
459 assert push.content.title == "Password reset requested"
460 assert push.content.body == "Use the link we sent by email to complete it."
462 # make sure bad password are caught
463 with auth_api_session() as (auth_api, metadata_interceptor):
464 with pytest.raises(grpc.RpcError) as err:
465 auth_api.CompletePasswordResetV2(
466 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password")
467 )
468 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT
469 assert err.value.details() == "The password is insecure. Please use one that is not easily guessable."
471 # make sure we can set a good password
472 with auth_api_session() as (auth_api, metadata_interceptor):
473 pwd = random_hex()
474 with mock_notification_email() as mock:
475 auth_api.CompletePasswordResetV2(
476 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd)
477 )
479 push = push_collector.pop_for_user(user.id, last=True)
480 assert push.content.title == "Password reset"
481 assert push.content.body == "Your password was successfully reset."
483 session_token, _ = get_session_cookie_tokens(metadata_interceptor)
485 with session_scope() as session:
486 other_session_token = session.execute(
487 select(UserSession.token)
488 .join(User, UserSession.user_id == User.id)
489 .where(User.username == user.username)
490 .where(UserSession.token == session_token)
491 .where(UserSession.is_valid)
492 ).scalar_one_or_none()
493 assert other_session_token
495 # make sure we can't set a password again
496 with auth_api_session() as (auth_api, metadata_interceptor):
497 with pytest.raises(grpc.RpcError) as err:
498 auth_api.CompletePasswordResetV2(
499 auth_pb2.CompletePasswordResetV2Req(
500 password_reset_token=password_reset_token, new_password=random_hex()
501 )
502 )
503 assert err.value.code() == grpc.StatusCode.NOT_FOUND
504 assert err.value.details() == "Invalid token."
506 with session_scope() as session:
507 user = session.execute(select(User)).scalar_one()
508 assert user.hashed_password == hash_password(pwd)
511def test_password_reset_no_such_user(db):
512 user, token = generate_user()
514 with auth_api_session() as (auth_api, metadata_interceptor):
515 res = auth_api.ResetPassword(
516 auth_pb2.ResetPasswordReq(
517 user="nonexistentuser",
518 )
519 )
521 with session_scope() as session:
522 assert session.execute(select(PasswordResetToken)).scalar_one_or_none() is None
525def test_password_reset_invalid_token_v2(db):
526 password = random_hex()
527 user, token = generate_user(hashed_password=hash_password(password))
529 with auth_api_session() as (auth_api, metadata_interceptor):
530 res = auth_api.ResetPassword(
531 auth_pb2.ResetPasswordReq(
532 user=user.username,
533 )
534 )
536 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e:
537 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken"))
538 assert e.value.code() == grpc.StatusCode.NOT_FOUND
539 assert e.value.details() == "Invalid token."
541 with session_scope() as session:
542 user = session.execute(select(User)).scalar_one()
543 assert user.hashed_password == hash_password(password)
546def test_logout_invalid_token(db):
547 # Create our test user using signup
548 _quick_signup()
550 with auth_api_session() as (auth_api, metadata_interceptor):
551 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
553 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
555 # delete all login tokens
556 with session_scope() as session:
557 session.execute(delete(LoginToken))
559 # log out with non-existent token should still return a valid result
560 with auth_api_session() as (auth_api, metadata_interceptor):
561 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
563 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
564 # make sure we set an empty cookie
565 assert reply_token == ""
568def test_signup_without_password(db):
569 with auth_api_session() as (auth_api, metadata_interceptor):
570 with pytest.raises(grpc.RpcError) as e:
571 auth_api.SignupFlow(
572 auth_pb2.SignupFlowReq(
573 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
574 account=auth_pb2.SignupAccount(
575 username="frodo",
576 password="bad",
577 city="Minas Tirith",
578 birthdate="9999-12-31", # arbitrary future birthdate
579 gender="Robot",
580 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
581 lat=1,
582 lng=1,
583 radius=100,
584 accept_tos=True,
585 ),
586 feedback=auth_pb2.ContributorForm(),
587 )
588 )
589 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
590 assert e.value.details() == "The password must be 8 or more characters long."
593def test_signup_invalid_birthdate(db):
594 with auth_api_session() as (auth_api, metadata_interceptor):
595 with pytest.raises(grpc.RpcError) as e:
596 auth_api.SignupFlow(
597 auth_pb2.SignupFlowReq(
598 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
599 account=auth_pb2.SignupAccount(
600 username="frodo",
601 password="a very insecure password",
602 city="Minas Tirith",
603 birthdate="9999-12-31", # arbitrary future birthdate
604 gender="Robot",
605 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
606 lat=1,
607 lng=1,
608 radius=100,
609 accept_tos=True,
610 ),
611 feedback=auth_pb2.ContributorForm(),
612 )
613 )
614 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
615 assert e.value.details() == "You must be at least 18 years old to sign up."
617 res = auth_api.SignupFlow(
618 auth_pb2.SignupFlowReq(
619 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"),
620 account=auth_pb2.SignupAccount(
621 username="ceelo",
622 password="a very insecure password",
623 city="New York City",
624 birthdate="2000-12-31", # arbitrary birthdate older than 18 years
625 gender="Helicopter",
626 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
627 lat=1,
628 lng=1,
629 radius=100,
630 accept_tos=True,
631 ),
632 feedback=auth_pb2.ContributorForm(),
633 )
634 )
636 assert res.flow_token
638 with pytest.raises(grpc.RpcError) as e:
639 auth_api.SignupFlow(
640 auth_pb2.SignupFlowReq(
641 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"),
642 account=auth_pb2.SignupAccount(
643 username="franklin",
644 password="a very insecure password",
645 city="Los Santos",
646 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs
647 gender="Male",
648 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
649 lat=1,
650 lng=1,
651 radius=100,
652 accept_tos=True,
653 ),
654 feedback=auth_pb2.ContributorForm(),
655 )
656 )
657 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
658 assert e.value.details() == "You must be at least 18 years old to sign up."
660 with session_scope() as session:
661 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1
664def test_signup_invalid_email(db):
665 with auth_api_session() as (auth_api, metadata_interceptor):
666 with pytest.raises(grpc.RpcError) as e:
667 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a")))
668 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
669 assert e.value.details() == "Invalid email."
671 with auth_api_session() as (auth_api, metadata_interceptor):
672 with pytest.raises(grpc.RpcError) as e:
673 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b")))
674 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
675 assert e.value.details() == "Invalid email."
677 with auth_api_session() as (auth_api, metadata_interceptor):
678 with pytest.raises(grpc.RpcError) as e:
679 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.")))
680 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
681 assert e.value.details() == "Invalid email."
683 with auth_api_session() as (auth_api, metadata_interceptor):
684 with pytest.raises(grpc.RpcError) as e:
685 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c")))
686 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
687 assert e.value.details() == "Invalid email."
690def test_signup_existing_email(db):
691 # Signed up user
692 user, _ = generate_user()
694 with auth_api_session() as (auth_api, metadata_interceptor):
695 with pytest.raises(grpc.RpcError) as e:
696 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email)))
697 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
698 assert e.value.details() == "That email address is already associated with an account. Please log in instead!"
701def test_signup_banned_user_email(db):
702 user, _ = generate_user()
704 with session_scope() as session:
705 session.execute(update(User).where(User.id == user.id).values(banned_at=func.now()))
707 with auth_api_session() as (auth_api, _):
708 with pytest.raises(grpc.RpcError) as e:
709 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email)))
710 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
711 assert e.value.details() == "You cannot sign up with that email address."
714def test_signup_deleted_user_email(db):
715 user, _ = generate_user()
717 with session_scope() as session:
718 session.execute(update(User).where(User.id == user.id).values(deleted_at=func.now()))
720 with auth_api_session() as (auth_api, _):
721 with pytest.raises(grpc.RpcError) as e:
722 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email)))
723 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
724 assert e.value.details() == "You cannot sign up with that email address."
727def test_signup_continue_with_email(db):
728 testing_email = f"{random_hex(12)}@couchers.org.invalid"
729 with auth_api_session() as (auth_api, metadata_interceptor):
730 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
731 flow_token = res.flow_token
732 assert flow_token
734 # continue with same email, should just send another email to the user
735 with auth_api_session() as (auth_api, metadata_interceptor):
736 with pytest.raises(grpc.RpcError) as e:
737 res = auth_api.SignupFlow(
738 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
739 )
740 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
741 assert e.value.details() == "Please check your email for a link to continue signing up."
744def test_signup_resend_email(db):
745 with auth_api_session() as (auth_api, metadata_interceptor):
746 with mock_notification_email() as mock:
747 res = auth_api.SignupFlow(
748 auth_pb2.SignupFlowReq(
749 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
750 account=auth_pb2.SignupAccount(
751 username="frodo",
752 password="a very insecure password",
753 birthdate="1970-01-01",
754 gender="Bot",
755 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
756 city="New York City",
757 lat=40.7331,
758 lng=-73.9778,
759 radius=500,
760 accept_tos=True,
761 ),
762 feedback=auth_pb2.ContributorForm(),
763 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
764 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
765 )
766 )
767 assert mock.call_count == 1
768 e = email_fields(mock)
769 assert e.recipient == "email@couchers.org.invalid"
771 flow_token = res.flow_token
772 assert flow_token
774 with session_scope() as session:
775 flow = session.execute(select(SignupFlow)).scalar_one()
776 assert flow.flow_token == flow_token
777 assert flow.email_sent
778 assert not flow.email_verified
779 email_token = flow.email_token
781 # ask for a new signup email
782 with auth_api_session() as (auth_api, metadata_interceptor):
783 with mock_notification_email() as mock:
784 res = auth_api.SignupFlow(
785 auth_pb2.SignupFlowReq(
786 flow_token=flow_token,
787 resend_verification_email=True,
788 )
789 )
790 assert mock.call_count == 1
791 e = email_fields(mock)
792 assert e.recipient == "email@couchers.org.invalid"
793 assert email_token
794 assert email_token in e.plain
795 assert email_token in e.html
797 with session_scope() as session:
798 flow = session.execute(select(SignupFlow)).scalar_one()
799 assert not flow.email_verified
801 with auth_api_session() as (auth_api, metadata_interceptor):
802 res = auth_api.SignupFlow(
803 auth_pb2.SignupFlowReq(
804 email_token=email_token,
805 )
806 )
808 assert not res.flow_token
809 assert res.HasField("auth_res")
812def test_successful_authenticate(db):
813 user, _ = generate_user(hashed_password=hash_password("password"))
815 # Authenticate with username
816 with auth_api_session() as (auth_api, metadata_interceptor):
817 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password"))
818 assert not reply.jailed
820 # Authenticate with email
821 with auth_api_session() as (auth_api, metadata_interceptor):
822 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password"))
823 assert not reply.jailed
826def test_unsuccessful_authenticate(db):
827 user, _ = generate_user(hashed_password=hash_password("password"))
829 # Invalid password
830 with auth_api_session() as (auth_api, metadata_interceptor):
831 with pytest.raises(grpc.RpcError) as e:
832 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword"))
833 assert e.value.code() == grpc.StatusCode.NOT_FOUND
834 assert e.value.details() == "Wrong password."
836 # Invalid username
837 with auth_api_session() as (auth_api, metadata_interceptor):
838 with pytest.raises(grpc.RpcError) as e:
839 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password"))
840 assert e.value.code() == grpc.StatusCode.NOT_FOUND
841 assert e.value.details() == "An account with that username or email was not found."
843 # Invalid email
844 with auth_api_session() as (auth_api, metadata_interceptor):
845 with pytest.raises(grpc.RpcError) as e:
846 reply = auth_api.Authenticate(
847 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password")
848 )
849 assert e.value.code() == grpc.StatusCode.NOT_FOUND
850 assert e.value.details() == "An account with that username or email was not found."
852 # Invalid id
853 with auth_api_session() as (auth_api, metadata_interceptor):
854 with pytest.raises(grpc.RpcError) as e:
855 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password"))
856 assert e.value.code() == grpc.StatusCode.NOT_FOUND
857 assert e.value.details() == "An account with that username or email was not found."
860def test_complete_signup(db):
861 testing_email = f"{random_hex(12)}@couchers.org.invalid"
862 with auth_api_session() as (auth_api, metadata_interceptor):
863 reply = auth_api.SignupFlow(
864 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
865 )
867 flow_token = reply.flow_token
869 with auth_api_session() as (auth_api, metadata_interceptor):
870 # Invalid username
871 with pytest.raises(grpc.RpcError) as e:
872 auth_api.SignupFlow(
873 auth_pb2.SignupFlowReq(
874 flow_token=flow_token,
875 account=auth_pb2.SignupAccount(
876 username=" ",
877 password="a very insecure password",
878 city="Minas Tirith",
879 birthdate="1980-12-31",
880 gender="Robot",
881 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
882 lat=1,
883 lng=1,
884 radius=100,
885 accept_tos=True,
886 ),
887 )
888 )
889 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
890 assert e.value.details() == "Invalid username."
892 with auth_api_session() as (auth_api, metadata_interceptor):
893 # Invalid name
894 with pytest.raises(grpc.RpcError) as e:
895 auth_api.SignupFlow(
896 auth_pb2.SignupFlowReq(
897 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid")
898 )
899 )
900 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
901 assert e.value.details() == "Name not supported."
903 with auth_api_session() as (auth_api, metadata_interceptor):
904 # Hosting status required
905 with pytest.raises(grpc.RpcError) as e:
906 auth_api.SignupFlow(
907 auth_pb2.SignupFlowReq(
908 flow_token=flow_token,
909 account=auth_pb2.SignupAccount(
910 username="frodo",
911 password="a very insecure password",
912 city="Minas Tirith",
913 birthdate="1980-12-31",
914 gender="Robot",
915 hosting_status=None,
916 lat=1,
917 lng=1,
918 radius=100,
919 accept_tos=True,
920 ),
921 )
922 )
923 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
924 assert e.value.details() == "Hosting status is required."
926 user, _ = generate_user()
927 with auth_api_session() as (auth_api, metadata_interceptor):
928 # Username unavailable
929 with pytest.raises(grpc.RpcError) as e:
930 auth_api.SignupFlow(
931 auth_pb2.SignupFlowReq(
932 flow_token=flow_token,
933 account=auth_pb2.SignupAccount(
934 username=user.username,
935 password="a very insecure password",
936 city="Minas Tirith",
937 birthdate="1980-12-31",
938 gender="Robot",
939 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
940 lat=1,
941 lng=1,
942 radius=100,
943 accept_tos=True,
944 ),
945 )
946 )
947 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
948 assert e.value.details() == "Sorry, that username isn't available."
950 with auth_api_session() as (auth_api, metadata_interceptor):
951 # Invalid coordinate
952 with pytest.raises(grpc.RpcError) as e:
953 auth_api.SignupFlow(
954 auth_pb2.SignupFlowReq(
955 flow_token=flow_token,
956 account=auth_pb2.SignupAccount(
957 username="frodo",
958 password="a very insecure password",
959 city="Minas Tirith",
960 birthdate="1980-12-31",
961 gender="Robot",
962 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
963 lat=0,
964 lng=0,
965 radius=100,
966 accept_tos=True,
967 ),
968 )
969 )
970 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
971 assert e.value.details() == "Invalid coordinate."
974def test_signup_token_regression(db):
975 # Repro steps:
976 # 1. Start a signup
977 # 2. Confirm the email
978 # 3. Start a new signup with the same email
979 # Expected: send a link to the email to continue signing up.
980 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'`
982 testing_email = f"{random_hex(12)}@couchers.org.invalid"
984 # 1. Start a signup
985 with auth_api_session() as (auth_api, metadata_interceptor):
986 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
987 flow_token = res.flow_token
988 assert flow_token
990 # 2. Confirm the email
991 with session_scope() as session:
992 email_token = session.execute(
993 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token)
994 ).scalar_one()
996 with auth_api_session() as (auth_api, metadata_interceptor):
997 auth_api.SignupFlow(
998 auth_pb2.SignupFlowReq(
999 flow_token=flow_token,
1000 email_token=email_token,
1001 )
1002 )
1004 # 3. Start a new signup with the same email
1005 with auth_api_session() as (auth_api, metadata_interceptor):
1006 with pytest.raises(grpc.RpcError) as e:
1007 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
1008 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1009 assert e.value.details() == "Please check your email for a link to continue signing up."
1012@pytest.mark.parametrize("opt_out", [True, False])
1013def test_opt_out_of_newsletter(db, opt_out):
1014 with auth_api_session() as (auth_api, metadata_interceptor):
1015 res = auth_api.SignupFlow(
1016 auth_pb2.SignupFlowReq(
1017 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1018 account=auth_pb2.SignupAccount(
1019 username="frodo",
1020 password="a very insecure password",
1021 birthdate="1970-01-01",
1022 gender="Bot",
1023 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1024 city="New York City",
1025 lat=40.7331,
1026 lng=-73.9778,
1027 radius=500,
1028 accept_tos=True,
1029 opt_out_of_newsletter=opt_out,
1030 ),
1031 feedback=auth_pb2.ContributorForm(),
1032 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1033 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
1034 )
1035 )
1037 with session_scope() as session:
1038 email_token = session.execute(
1039 select(SignupFlow.email_token).where(SignupFlow.flow_token == res.flow_token)
1040 ).scalar_one()
1042 with auth_api_session() as (auth_api, metadata_interceptor):
1043 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1045 user_id = res.auth_res.user_id
1047 with session_scope() as session:
1048 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1049 assert not user.in_sync_with_newsletter
1050 assert user.opt_out_of_newsletter == opt_out
1053def test_GetAuthState(db):
1054 user, token = generate_user()
1055 jailed_user, jailed_token = generate_user(accepted_tos=0)
1057 with auth_api_session() as (auth_api, metadata_interceptor):
1058 res = auth_api.GetAuthState(empty_pb2.Empty())
1059 assert not res.logged_in
1060 assert not res.HasField("auth_res")
1062 with auth_api_session() as (auth_api, metadata_interceptor):
1063 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1064 assert res.logged_in
1065 assert res.HasField("auth_res")
1066 assert res.auth_res.user_id == user.id
1067 assert not res.auth_res.jailed
1069 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1071 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1072 assert not res.logged_in
1073 assert not res.HasField("auth_res")
1075 with auth_api_session() as (auth_api, metadata_interceptor):
1076 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),))
1077 assert res.logged_in
1078 assert res.HasField("auth_res")
1079 assert res.auth_res.user_id == jailed_user.id
1080 assert res.auth_res.jailed
1083def test_signup_no_feedback_regression(db):
1084 """
1085 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup,
1086 this regression test checks that.
1087 """
1088 with auth_api_session() as (auth_api, metadata_interceptor):
1089 res = auth_api.SignupFlow(
1090 auth_pb2.SignupFlowReq(
1091 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1092 account=auth_pb2.SignupAccount(
1093 username="frodo",
1094 password="a very insecure password",
1095 birthdate="1970-01-01",
1096 gender="Bot",
1097 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1098 city="New York City",
1099 lat=40.7331,
1100 lng=-73.9778,
1101 radius=500,
1102 accept_tos=True,
1103 ),
1104 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1105 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
1106 )
1107 )
1109 flow_token = res.flow_token
1111 assert res.flow_token
1112 assert not res.HasField("auth_res")
1113 assert not res.need_basic
1114 assert not res.need_account
1115 assert not res.need_feedback
1116 assert not res.need_motivations
1117 assert res.need_verify_email
1119 # read out the signup token directly from the database for now
1120 with session_scope() as session:
1121 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1122 assert flow.email_sent
1123 assert not flow.email_verified
1124 email_token = flow.email_token
1126 with auth_api_session() as (auth_api, metadata_interceptor):
1127 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1129 assert not res.flow_token
1130 assert res.HasField("auth_res")
1131 assert res.auth_res.user_id
1132 assert not res.auth_res.jailed
1133 assert not res.need_basic
1134 assert not res.need_account
1135 assert not res.need_feedback
1136 assert not res.need_motivations
1137 assert not res.need_verify_email
1139 # make sure we got the right token in a cookie
1140 with session_scope() as session:
1141 token = session.execute(
1142 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
1143 ).scalar_one()
1144 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
1145 assert sesh == token
1148def test_banned_username(db):
1149 testing_email = f"{random_hex(12)}@couchers.org.invalid"
1150 with auth_api_session() as (auth_api, metadata_interceptor):
1151 reply = auth_api.SignupFlow(
1152 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
1153 )
1155 flow_token = reply.flow_token
1157 with auth_api_session() as (auth_api, metadata_interceptor):
1158 # Banned username
1159 with pytest.raises(grpc.RpcError) as e:
1160 auth_api.SignupFlow(
1161 auth_pb2.SignupFlowReq(
1162 flow_token=flow_token,
1163 account=auth_pb2.SignupAccount(
1164 username="thecouchersadminaccount",
1165 password="a very insecure password",
1166 city="Minas Tirith",
1167 birthdate="1980-12-31",
1168 gender="Robot",
1169 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1170 lat=1,
1171 lng=1,
1172 radius=100,
1173 accept_tos=True,
1174 ),
1175 )
1176 )
1177 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1178 assert e.value.details() == "Sorry, that username isn't available."
1181# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*
1184def test_GetInviteCodeInfo(db):
1185 user, token = generate_user(complete_profile=True)
1187 with account_session(token) as account:
1188 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1190 with auth_api_session() as (auth, _):
1191 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code))
1192 assert res.name == user.name
1193 assert res.username == user.username
1194 # Avatar URL should be a thumbnail URL with a hashed filename
1195 assert "/img/thumbnail/" in res.avatar_url
1196 assert res.avatar_url.endswith(".jpg")
1197 # Verify the hashed filename looks correct (64 char hex hash)
1198 assert len(res.avatar_url.split("/")[-1].replace(".jpg", "")) == 64
1199 assert res.url == urls.invite_code_link(code=code)
1202def test_GetInviteCodeInfo_no_avatar(db):
1203 user, token = generate_user(complete_profile=False)
1205 with account_session(token) as account:
1206 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1208 with auth_api_session() as (auth, _):
1209 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code))
1210 assert res.name == user.name
1211 assert res.username == user.username
1212 assert res.avatar_url == ""
1213 assert res.url == urls.invite_code_link(code=code)
1216def test_GetInviteCodeInfo_not_found(db):
1217 generate_user()
1219 with auth_api_session() as (auth, _):
1220 with pytest.raises(grpc.RpcError) as e:
1221 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE"))
1222 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1223 assert e.value.details() == "Invite code not found."
1226def test_SignupFlow_invite_code(db):
1227 user, token = generate_user()
1229 with account_session(token) as account:
1230 invite_code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1232 with auth_api_session() as (auth_api, _):
1233 # Signup basic step with invite code
1234 res = auth_api.SignupFlow(
1235 auth_pb2.SignupFlowReq(
1236 basic=auth_pb2.SignupBasic(
1237 name="Test User",
1238 email="inviteuser@example.com",
1239 invite_code=invite_code,
1240 )
1241 )
1242 )
1243 flow_token = res.flow_token
1244 assert flow_token
1246 # Confirm email
1247 with session_scope() as session:
1248 email_token = session.execute(
1249 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token)
1250 ).scalar_one()
1252 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1254 # Signup account step
1255 auth_api.SignupFlow(
1256 auth_pb2.SignupFlowReq(
1257 flow_token=flow_token,
1258 account=auth_pb2.SignupAccount(
1259 username="invited_user",
1260 password="secure password",
1261 birthdate="1990-01-01",
1262 gender="Other",
1263 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1264 city="Example City",
1265 lat=1,
1266 lng=5,
1267 radius=100,
1268 accept_tos=True,
1269 ),
1270 feedback=auth_pb2.ContributorForm(),
1271 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1272 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
1273 )
1274 )
1276 # Check that invite_code_id is stored in the final User object
1277 with session_scope() as session:
1278 invite_code_id = session.execute(
1279 select(User.invite_code_id).where(User.username == "invited_user")
1280 ).scalar_one()
1281 assert invite_code_id == invite_code
1284def test_signup_with_motivations(db):
1285 """
1286 Test signup flow with the new motivations step (heard_about_couchers and signup_motivations)
1287 """
1288 with auth_api_session() as (auth_api, metadata_interceptor):
1289 res = auth_api.SignupFlow(
1290 auth_pb2.SignupFlowReq(
1291 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1292 account=auth_pb2.SignupAccount(
1293 username="intentuser",
1294 password="a very insecure password",
1295 birthdate="1970-01-01",
1296 gender="Bot",
1297 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1298 city="New York City",
1299 lat=40.7331,
1300 lng=-73.9778,
1301 radius=500,
1302 accept_tos=True,
1303 ),
1304 motivations=auth_pb2.SignupMotivations(
1305 heard_about_couchers="friend",
1306 motivations=["hosting", "surfing", "events"],
1307 ),
1308 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1309 )
1310 )
1312 flow_token = res.flow_token
1313 assert flow_token
1314 assert not res.HasField("auth_res")
1315 assert res.need_verify_email
1317 # Verify the motivations are stored in the SignupFlow
1318 with session_scope() as session:
1319 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1320 assert flow.heard_about_couchers == "friend"
1321 assert set(flow.signup_motivations) == {"hosting", "surfing", "events"}
1322 email_token = flow.email_token
1324 # Complete signup by verifying email
1325 with auth_api_session() as (auth_api, metadata_interceptor):
1326 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1328 assert res.HasField("auth_res")
1329 user_id = res.auth_res.user_id
1331 # Verify the motivations are transferred to the User object
1332 with session_scope() as session:
1333 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1334 assert user.heard_about_couchers == "friend"
1335 assert user.signup_motivations is not None
1336 assert set(user.signup_motivations) == {"hosting", "surfing", "events"}
1339def test_signup_motivations_incremental(db):
1340 """
1341 Test that motivations can be submitted incrementally as a separate step
1342 """
1343 with auth_api_session() as (auth_api, metadata_interceptor):
1344 # First, basic signup
1345 res = auth_api.SignupFlow(
1346 auth_pb2.SignupFlowReq(
1347 basic=auth_pb2.SignupBasic(name="testing", email="email2@couchers.org.invalid"),
1348 )
1349 )
1351 flow_token = res.flow_token
1352 assert flow_token
1353 assert res.need_account
1354 assert res.need_motivations # New field
1356 # Submit motivations separately
1357 with auth_api_session() as (auth_api, metadata_interceptor):
1358 res = auth_api.SignupFlow(
1359 auth_pb2.SignupFlowReq(
1360 flow_token=flow_token,
1361 motivations=auth_pb2.SignupMotivations(
1362 heard_about_couchers="social_media",
1363 motivations=["surfing"],
1364 ),
1365 )
1366 )
1368 assert res.flow_token == flow_token
1369 assert not res.need_motivations # Should be filled now
1370 assert res.need_account # Still need account
1372 # Verify motivations are stored
1373 with session_scope() as session:
1374 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1375 assert flow.heard_about_couchers == "social_media"
1376 assert flow.signup_motivations == ["surfing"]
1379def test_signup_motivations_cannot_be_refilled(db):
1380 """
1381 Test that motivations cannot be submitted twice
1382 """
1383 with auth_api_session() as (auth_api, metadata_interceptor):
1384 res = auth_api.SignupFlow(
1385 auth_pb2.SignupFlowReq(
1386 basic=auth_pb2.SignupBasic(name="testing", email="email3@couchers.org.invalid"),
1387 motivations=auth_pb2.SignupMotivations(
1388 heard_about_couchers="friend",
1389 motivations=["hosting"],
1390 ),
1391 )
1392 )
1394 flow_token = res.flow_token
1396 # Try to submit motivations again - should fail
1397 with auth_api_session() as (auth_api, metadata_interceptor):
1398 with pytest.raises(grpc.RpcError) as e:
1399 auth_api.SignupFlow(
1400 auth_pb2.SignupFlowReq(
1401 flow_token=flow_token,
1402 motivations=auth_pb2.SignupMotivations(
1403 heard_about_couchers="different_source",
1404 motivations=["surfing"],
1405 ),
1406 )
1407 )
1408 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1409 assert e.value.details() == "You've already told us about why you are signing up."
1412def test_signup_motivations_required(db):
1413 """
1414 Test that signup cannot complete without providing motivations
1415 """
1416 with auth_api_session() as (auth_api, metadata_interceptor):
1417 res = auth_api.SignupFlow(
1418 auth_pb2.SignupFlowReq(
1419 basic=auth_pb2.SignupBasic(name="testing", email="email4@couchers.org.invalid"),
1420 account=auth_pb2.SignupAccount(
1421 username="nointents",
1422 password="a very insecure password",
1423 birthdate="1970-01-01",
1424 gender="Bot",
1425 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1426 city="New York City",
1427 lat=40.7331,
1428 lng=-73.9778,
1429 radius=500,
1430 accept_tos=True,
1431 ),
1432 # No motivations provided
1433 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1434 )
1435 )
1437 flow_token = res.flow_token
1438 assert not res.HasField("auth_res")
1439 assert res.need_motivations # Intents still required
1441 with session_scope() as session:
1442 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1443 email_token = flow.email_token
1445 # Verify email - signup still not complete without motivations
1446 with auth_api_session() as (auth_api, metadata_interceptor):
1447 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1449 assert not res.HasField("auth_res")
1450 assert res.need_motivations
1452 # Now submit motivations
1453 with auth_api_session() as (auth_api, metadata_interceptor):
1454 res = auth_api.SignupFlow(
1455 auth_pb2.SignupFlowReq(
1456 flow_token=flow_token,
1457 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]),
1458 )
1459 )
1461 assert res.HasField("auth_res")
1462 user_id = res.auth_res.user_id
1464 with session_scope() as session:
1465 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1466 assert user.signup_motivations == ["surfing"]
1469def test_signup_motivations_all_options(db):
1470 """
1471 Test all the different motivation options
1472 """
1473 with auth_api_session() as (auth_api, metadata_interceptor):
1474 res = auth_api.SignupFlow(
1475 auth_pb2.SignupFlowReq(
1476 basic=auth_pb2.SignupBasic(name="testing", email="email5@couchers.org.invalid"),
1477 motivations=auth_pb2.SignupMotivations(
1478 heard_about_couchers="other",
1479 motivations=["hosting", "surfing", "events"],
1480 ),
1481 )
1482 )
1484 flow_token = res.flow_token
1486 with session_scope() as session:
1487 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1488 assert flow.heard_about_couchers == "other"
1489 assert set(flow.signup_motivations) == {"hosting", "surfing", "events"}
1492def test_signup_motivations_empty_motivations_list(db):
1493 """
1494 Test that providing heard_about but empty motivations list is valid
1495 """
1496 with auth_api_session() as (auth_api, metadata_interceptor):
1497 res = auth_api.SignupFlow(
1498 auth_pb2.SignupFlowReq(
1499 basic=auth_pb2.SignupBasic(name="testing", email="email6@couchers.org.invalid"),
1500 motivations=auth_pb2.SignupMotivations(
1501 heard_about_couchers="former_cs_member",
1502 motivations=[], # No specific motivations selected
1503 ),
1504 )
1505 )
1507 flow_token = res.flow_token
1509 with session_scope() as session:
1510 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1511 assert flow.heard_about_couchers == "former_cs_member"
1512 assert flow.signup_motivations == []