Coverage for src/tests/test_auth.py: 100%
527 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-12 16:45 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-12 16:45 +0000
1import http.cookies
3import grpc
4import pytest
5from google.protobuf import empty_pb2, wrappers_pb2
6from sqlalchemy.sql import delete, func
8from couchers import errors
9from couchers.crypto import hash_password, random_hex
10from couchers.db import session_scope
11from couchers.models import (
12 ContributeOption,
13 ContributorForm,
14 LoginToken,
15 PasswordResetToken,
16 SignupFlow,
17 User,
18 UserSession,
19)
20from couchers.sql import couchers_select as select
21from proto import api_pb2, auth_pb2
22from tests.test_fixtures import ( # noqa
23 api_session,
24 auth_api_session,
25 db,
26 email_fields,
27 fast_passwords,
28 generate_user,
29 mock_notification_email,
30 push_collector,
31 real_api_session,
32 testconfig,
33)
36@pytest.fixture(autouse=True)
37def _(testconfig, fast_passwords):
38 pass
41def get_session_cookie_tokens(metadata_interceptor):
42 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"]
43 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value
44 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value
45 return sesh, uid
48def test_UsernameValid(db):
49 with auth_api_session() as (auth_api, metadata_interceptor):
50 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid
52 with auth_api_session() as (auth_api, metadata_interceptor):
53 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid
56def test_signup_incremental(db):
57 with auth_api_session() as (auth_api, metadata_interceptor):
58 res = auth_api.SignupFlow(
59 auth_pb2.SignupFlowReq(
60 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
61 )
62 )
64 flow_token = res.flow_token
65 assert res.flow_token
66 assert not res.HasField("auth_res")
67 assert not res.need_basic
68 assert res.need_account
69 assert not res.need_feedback
70 assert res.need_verify_email
71 assert res.need_accept_community_guidelines
73 # read out the signup token directly from the database for now
74 with session_scope() as session:
75 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
76 assert flow.email_sent
77 assert not flow.email_verified
78 email_token = flow.email_token
80 with auth_api_session() as (auth_api, metadata_interceptor):
81 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token))
83 assert res.flow_token == flow_token
84 assert not res.HasField("auth_res")
85 assert not res.need_basic
86 assert res.need_account
87 assert not res.need_feedback
88 assert res.need_verify_email
89 assert res.need_accept_community_guidelines
91 # Add feedback
92 with auth_api_session() as (auth_api, metadata_interceptor):
93 res = auth_api.SignupFlow(
94 auth_pb2.SignupFlowReq(
95 flow_token=flow_token,
96 feedback=auth_pb2.ContributorForm(
97 ideas="I'm a robot, incapable of original ideation",
98 features="I love all your features",
99 experience="I haven't done couch surfing before",
100 contribute=auth_pb2.CONTRIBUTE_OPTION_YES,
101 contribute_ways=["serving", "backend"],
102 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes",
103 ),
104 )
105 )
107 assert res.flow_token == flow_token
108 assert not res.HasField("auth_res")
109 assert not res.need_basic
110 assert res.need_account
111 assert not res.need_feedback
112 assert res.need_verify_email
113 assert res.need_accept_community_guidelines
115 # Agree to community guidelines
116 with auth_api_session() as (auth_api, metadata_interceptor):
117 res = auth_api.SignupFlow(
118 auth_pb2.SignupFlowReq(
119 flow_token=flow_token,
120 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
121 )
122 )
124 assert res.flow_token == flow_token
125 assert not res.HasField("auth_res")
126 assert not res.need_basic
127 assert res.need_account
128 assert not res.need_feedback
129 assert res.need_verify_email
130 assert not res.need_accept_community_guidelines
132 # Verify email
133 with auth_api_session() as (auth_api, metadata_interceptor):
134 res = auth_api.SignupFlow(
135 auth_pb2.SignupFlowReq(
136 flow_token=flow_token,
137 email_token=email_token,
138 )
139 )
141 assert res.flow_token == flow_token
142 assert not res.HasField("auth_res")
143 assert not res.need_basic
144 assert res.need_account
145 assert not res.need_feedback
146 assert not res.need_verify_email
147 assert not res.need_accept_community_guidelines
149 # Finally finish off account info
150 with auth_api_session() as (auth_api, metadata_interceptor):
151 res = auth_api.SignupFlow(
152 auth_pb2.SignupFlowReq(
153 flow_token=flow_token,
154 account=auth_pb2.SignupAccount(
155 username="frodo",
156 password="a very insecure password",
157 birthdate="1970-01-01",
158 gender="Bot",
159 hosting_status=api_pb2.HOSTING_STATUS_MAYBE,
160 city="New York City",
161 lat=40.7331,
162 lng=-73.9778,
163 radius=500,
164 accept_tos=True,
165 ),
166 )
167 )
169 assert not res.flow_token
170 assert res.HasField("auth_res")
171 assert res.auth_res.user_id
172 assert not res.auth_res.jailed
173 assert not res.need_basic
174 assert not res.need_account
175 assert not res.need_feedback
176 assert not res.need_verify_email
177 assert not res.need_accept_community_guidelines
179 user_id = res.auth_res.user_id
181 sess_token, uid = get_session_cookie_tokens(metadata_interceptor)
182 assert uid == str(user_id)
184 with api_session(sess_token) as api:
185 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id)))
187 assert res.username == "frodo"
188 assert res.gender == "Bot"
189 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE
190 assert res.city == "New York City"
191 assert res.lat == 40.7331
192 assert res.lng == -73.9778
193 assert res.radius == 500
195 with session_scope() as session:
196 form = session.execute(select(ContributorForm)).scalar_one()
198 assert form.ideas == "I'm a robot, incapable of original ideation"
199 assert form.features == "I love all your features"
200 assert form.experience == "I haven't done couch surfing before"
201 assert form.contribute == ContributeOption.yes
202 assert form.contribute_ways == ["serving", "backend"]
203 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes"
206def _quick_signup():
207 with auth_api_session() as (auth_api, metadata_interceptor):
208 res = auth_api.SignupFlow(
209 auth_pb2.SignupFlowReq(
210 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
211 account=auth_pb2.SignupAccount(
212 username="frodo",
213 password="a very insecure password",
214 birthdate="1970-01-01",
215 gender="Bot",
216 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
217 city="New York City",
218 lat=40.7331,
219 lng=-73.9778,
220 radius=500,
221 accept_tos=True,
222 ),
223 feedback=auth_pb2.ContributorForm(),
224 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
225 )
226 )
228 flow_token = res.flow_token
230 assert res.flow_token
231 assert not res.HasField("auth_res")
232 assert not res.need_basic
233 assert not res.need_account
234 assert not res.need_feedback
235 assert res.need_verify_email
237 # read out the signup token directly from the database for now
238 with session_scope() as session:
239 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
240 assert flow.email_sent
241 assert not flow.email_verified
242 email_token = flow.email_token
244 with auth_api_session() as (auth_api, metadata_interceptor):
245 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
247 assert not res.flow_token
248 assert res.HasField("auth_res")
249 assert res.auth_res.user_id
250 assert not res.auth_res.jailed
251 assert not res.need_basic
252 assert not res.need_account
253 assert not res.need_feedback
254 assert not res.need_verify_email
256 # make sure we got the right token in a cookie
257 with session_scope() as session:
258 token = (
259 session.execute(
260 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
261 ).scalar_one()
262 ).token
263 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
264 assert sesh == token
267def test_signup(db):
268 _quick_signup()
271def test_basic_login(db):
272 # Create our test user using signup
273 _quick_signup()
275 with auth_api_session() as (auth_api, metadata_interceptor):
276 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
278 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
280 with session_scope() as session:
281 token = (
282 session.execute(
283 select(UserSession)
284 .join(User, UserSession.user_id == User.id)
285 .where(User.username == "frodo")
286 .where(UserSession.token == reply_token)
287 .where(UserSession.is_valid)
288 ).scalar_one_or_none()
289 ).token
290 assert token
292 # log out
293 with auth_api_session() as (auth_api, metadata_interceptor):
294 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
297def test_login_part_signed_up_verified_email(db):
298 """
299 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up.
300 """
301 with auth_api_session() as (auth_api, metadata_interceptor):
302 res = auth_api.SignupFlow(
303 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"))
304 )
306 flow_token = res.flow_token
307 assert res.need_verify_email
309 # verify the email
310 with session_scope() as session:
311 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
312 flow_token = flow.flow_token
313 email_token = flow.email_token
314 with auth_api_session() as (auth_api, metadata_interceptor):
315 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
317 with mock_notification_email() as mock:
318 with auth_api_session() as (auth_api, metadata_interceptor):
319 with pytest.raises(grpc.RpcError) as e:
320 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd"))
321 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
323 assert mock.call_count == 1
324 e = email_fields(mock)
325 assert e.recipient == "email@couchers.org.invalid"
326 assert flow_token in e.plain
327 assert flow_token in e.html
330def test_login_part_signed_up_not_verified_email(db):
331 with auth_api_session() as (auth_api, metadata_interceptor):
332 res = auth_api.SignupFlow(
333 auth_pb2.SignupFlowReq(
334 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
335 account=auth_pb2.SignupAccount(
336 username="frodo",
337 password="a very insecure password",
338 birthdate="1999-01-01",
339 gender="Bot",
340 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
341 city="New York City",
342 lat=40.7331,
343 lng=-73.9778,
344 radius=500,
345 accept_tos=True,
346 ),
347 )
348 )
350 flow_token = res.flow_token
351 assert res.need_verify_email
353 with mock_notification_email() as mock:
354 with auth_api_session() as (auth_api, metadata_interceptor):
355 with pytest.raises(grpc.RpcError) as e:
356 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd"))
357 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
359 with session_scope() as session:
360 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
361 email_token = flow.email_token
363 assert mock.call_count == 1
364 e = email_fields(mock)
365 assert e.recipient == "email@couchers.org.invalid"
366 assert email_token in e.plain
367 assert email_token in e.html
370def test_banned_user(db):
371 _quick_signup()
373 with session_scope() as session:
374 session.execute(select(User)).scalar_one().is_banned = True
376 with auth_api_session() as (auth_api, metadata_interceptor):
377 with pytest.raises(grpc.RpcError) as e:
378 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
379 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
380 assert e.value.details() == errors.ACCOUNT_SUSPENDED
383def test_deleted_user(db):
384 _quick_signup()
386 with session_scope() as session:
387 session.execute(select(User)).scalar_one().is_deleted = True
389 with auth_api_session() as (auth_api, metadata_interceptor):
390 with pytest.raises(grpc.RpcError) as e:
391 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
392 assert e.value.code() == grpc.StatusCode.NOT_FOUND
393 assert e.value.details() == errors.ACCOUNT_NOT_FOUND
396def test_invalid_token(db):
397 user1, token1 = generate_user()
398 user2, token2 = generate_user()
400 wrong_token = random_hex(32)
402 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e:
403 res = api.GetUser(api_pb2.GetUserReq(user=user2.username))
405 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
406 assert e.value.details() == "Unauthorized"
409def test_password_reset_v2(db, push_collector):
410 user, token = generate_user(hashed_password=hash_password("mypassword"))
412 with auth_api_session() as (auth_api, metadata_interceptor):
413 with mock_notification_email() as mock:
414 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username))
416 with session_scope() as session:
417 password_reset_token = session.execute(select(PasswordResetToken)).scalar_one().token
419 assert mock.call_count == 1
420 e = email_fields(mock)
421 assert e.recipient == user.email
422 assert "reset" in e.subject.lower()
423 assert password_reset_token in e.plain
424 assert password_reset_token in e.html
425 unique_string = "You asked for your password to be reset on Couchers.org."
426 assert unique_string in e.plain
427 assert unique_string in e.html
428 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain
429 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html
430 assert "support@couchers.org" in e.plain
431 assert "support@couchers.org" in e.html
433 push_collector.assert_user_push_matches_fields(
434 user.id,
435 title="A password reset was initiated on your account",
436 body="Someone initiated a password change on your account.",
437 )
439 # make sure bad password are caught
440 with auth_api_session() as (auth_api, metadata_interceptor):
441 with pytest.raises(grpc.RpcError) as e:
442 auth_api.CompletePasswordResetV2(
443 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password")
444 )
445 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
446 assert e.value.details() == errors.INSECURE_PASSWORD
448 # make sure we can set a good password
449 with auth_api_session() as (auth_api, metadata_interceptor):
450 pwd = random_hex()
451 with mock_notification_email() as mock:
452 res = auth_api.CompletePasswordResetV2(
453 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd)
454 )
456 push_collector.assert_user_push_matches_fields(
457 user.id,
458 ix=1,
459 title="Your password was successfully reset",
460 body="Your password on Couchers.org was changed. If that was you, then no further action is needed.",
461 )
463 session_token, _ = get_session_cookie_tokens(metadata_interceptor)
465 with session_scope() as session:
466 other_session_token = (
467 session.execute(
468 select(UserSession)
469 .join(User, UserSession.user_id == User.id)
470 .where(User.username == user.username)
471 .where(UserSession.token == session_token)
472 .where(UserSession.is_valid)
473 ).scalar_one_or_none()
474 ).token
475 assert other_session_token
477 # make sure we can't set a password again
478 with auth_api_session() as (auth_api, metadata_interceptor):
479 with pytest.raises(grpc.RpcError) as e:
480 auth_api.CompletePasswordResetV2(
481 auth_pb2.CompletePasswordResetV2Req(
482 password_reset_token=password_reset_token, new_password=random_hex()
483 )
484 )
485 assert e.value.code() == grpc.StatusCode.NOT_FOUND
486 assert e.value.details() == errors.INVALID_TOKEN
488 with session_scope() as session:
489 user = session.execute(select(User)).scalar_one()
490 assert user.hashed_password == hash_password(pwd)
493def test_password_reset_no_such_user(db):
494 user, token = generate_user()
496 with auth_api_session() as (auth_api, metadata_interceptor):
497 res = auth_api.ResetPassword(
498 auth_pb2.ResetPasswordReq(
499 user="nonexistentuser",
500 )
501 )
503 with session_scope() as session:
504 res = session.execute(select(PasswordResetToken)).scalar_one_or_none()
506 assert res is None
509def test_password_reset_invalid_token_v2(db):
510 password = random_hex()
511 user, token = generate_user(hashed_password=hash_password(password))
513 with auth_api_session() as (auth_api, metadata_interceptor):
514 res = auth_api.ResetPassword(
515 auth_pb2.ResetPasswordReq(
516 user=user.username,
517 )
518 )
520 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e:
521 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken"))
522 assert e.value.code() == grpc.StatusCode.NOT_FOUND
523 assert e.value.details() == errors.INVALID_TOKEN
525 with session_scope() as session:
526 user = session.execute(select(User)).scalar_one()
527 assert user.hashed_password == hash_password(password)
530def test_logout_invalid_token(db):
531 # Create our test user using signup
532 _quick_signup()
534 with auth_api_session() as (auth_api, metadata_interceptor):
535 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
537 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
539 # delete all login tokens
540 with session_scope() as session:
541 session.execute(delete(LoginToken))
543 # log out with non-existent token should still return a valid result
544 with auth_api_session() as (auth_api, metadata_interceptor):
545 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
547 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
548 # make sure we set an empty cookie
549 assert reply_token == ""
552def test_signup_without_password(db):
553 with auth_api_session() as (auth_api, metadata_interceptor):
554 with pytest.raises(grpc.RpcError) as e:
555 auth_api.SignupFlow(
556 auth_pb2.SignupFlowReq(
557 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
558 account=auth_pb2.SignupAccount(
559 username="frodo",
560 password="bad",
561 city="Minas Tirith",
562 birthdate="9999-12-31", # arbitrary future birthdate
563 gender="Robot",
564 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
565 lat=1,
566 lng=1,
567 radius=100,
568 accept_tos=True,
569 ),
570 feedback=auth_pb2.ContributorForm(),
571 )
572 )
573 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
574 assert e.value.details() == errors.PASSWORD_TOO_SHORT
577def test_signup_invalid_birthdate(db):
578 with auth_api_session() as (auth_api, metadata_interceptor):
579 with pytest.raises(grpc.RpcError) as e:
580 auth_api.SignupFlow(
581 auth_pb2.SignupFlowReq(
582 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
583 account=auth_pb2.SignupAccount(
584 username="frodo",
585 password="a very insecure password",
586 city="Minas Tirith",
587 birthdate="9999-12-31", # arbitrary future birthdate
588 gender="Robot",
589 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
590 lat=1,
591 lng=1,
592 radius=100,
593 accept_tos=True,
594 ),
595 feedback=auth_pb2.ContributorForm(),
596 )
597 )
598 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
599 assert e.value.details() == errors.INVALID_BIRTHDATE
601 res = auth_api.SignupFlow(
602 auth_pb2.SignupFlowReq(
603 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"),
604 account=auth_pb2.SignupAccount(
605 username="ceelo",
606 password="a very insecure password",
607 city="New York City",
608 birthdate="2000-12-31", # arbitrary birthdate older than 18 years
609 gender="Helicopter",
610 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
611 lat=1,
612 lng=1,
613 radius=100,
614 accept_tos=True,
615 ),
616 feedback=auth_pb2.ContributorForm(),
617 )
618 )
620 assert res.flow_token
622 with pytest.raises(grpc.RpcError) as e:
623 auth_api.SignupFlow(
624 auth_pb2.SignupFlowReq(
625 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"),
626 account=auth_pb2.SignupAccount(
627 username="franklin",
628 password="a very insecure password",
629 city="Los Santos",
630 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs
631 gender="Male",
632 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
633 lat=1,
634 lng=1,
635 radius=100,
636 accept_tos=True,
637 ),
638 feedback=auth_pb2.ContributorForm(),
639 )
640 )
641 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
642 assert e.value.details() == errors.INVALID_BIRTHDATE
644 with session_scope() as session:
645 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1
648def test_signup_invalid_email(db):
649 with auth_api_session() as (auth_api, metadata_interceptor):
650 with pytest.raises(grpc.RpcError) as e:
651 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a")))
652 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
653 assert e.value.details() == errors.INVALID_EMAIL
655 with auth_api_session() as (auth_api, metadata_interceptor):
656 with pytest.raises(grpc.RpcError) as e:
657 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b")))
658 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
659 assert e.value.details() == errors.INVALID_EMAIL
661 with auth_api_session() as (auth_api, metadata_interceptor):
662 with pytest.raises(grpc.RpcError) as e:
663 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.")))
664 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
665 assert e.value.details() == errors.INVALID_EMAIL
667 with auth_api_session() as (auth_api, metadata_interceptor):
668 with pytest.raises(grpc.RpcError) as e:
669 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c")))
670 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
671 assert e.value.details() == errors.INVALID_EMAIL
674def test_signup_existing_email(db):
675 # Signed up user
676 user, _ = generate_user()
678 with auth_api_session() as (auth_api, metadata_interceptor):
679 with pytest.raises(grpc.RpcError) as e:
680 reply = auth_api.SignupFlow(
681 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email))
682 )
683 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
684 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_TAKEN
687def test_signup_continue_with_email(db):
688 testing_email = f"{random_hex(12)}@couchers.org.invalid"
689 with auth_api_session() as (auth_api, metadata_interceptor):
690 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
691 flow_token = res.flow_token
692 assert flow_token
694 # continue with same email, should just send another email to the user
695 with auth_api_session() as (auth_api, metadata_interceptor):
696 with pytest.raises(grpc.RpcError) as e:
697 res = auth_api.SignupFlow(
698 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
699 )
700 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
701 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
704def test_signup_resend_email(db):
705 with auth_api_session() as (auth_api, metadata_interceptor):
706 with mock_notification_email() as mock:
707 res = auth_api.SignupFlow(
708 auth_pb2.SignupFlowReq(
709 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
710 account=auth_pb2.SignupAccount(
711 username="frodo",
712 password="a very insecure password",
713 birthdate="1970-01-01",
714 gender="Bot",
715 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
716 city="New York City",
717 lat=40.7331,
718 lng=-73.9778,
719 radius=500,
720 accept_tos=True,
721 ),
722 feedback=auth_pb2.ContributorForm(),
723 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
724 )
725 )
726 assert mock.call_count == 1
727 e = email_fields(mock)
728 assert e.recipient == "email@couchers.org.invalid"
730 flow_token = res.flow_token
731 assert flow_token
733 with session_scope() as session:
734 flow = session.execute(select(SignupFlow)).scalar_one()
735 assert flow.flow_token == flow_token
736 assert flow.email_sent
737 assert not flow.email_verified
738 email_token = flow.email_token
740 # ask for a new signup email
741 with auth_api_session() as (auth_api, metadata_interceptor):
742 with mock_notification_email() as mock:
743 res = auth_api.SignupFlow(
744 auth_pb2.SignupFlowReq(
745 flow_token=flow_token,
746 resend_verification_email=True,
747 )
748 )
749 assert mock.call_count == 1
750 e = email_fields(mock)
751 assert e.recipient == "email@couchers.org.invalid"
752 assert email_token in e.plain
753 assert email_token in e.html
755 with session_scope() as session:
756 flow = session.execute(select(SignupFlow)).scalar_one()
757 assert not flow.email_verified
759 with auth_api_session() as (auth_api, metadata_interceptor):
760 res = auth_api.SignupFlow(
761 auth_pb2.SignupFlowReq(
762 email_token=email_token,
763 )
764 )
766 assert not res.flow_token
767 assert res.HasField("auth_res")
770def test_successful_authenticate(db):
771 user, _ = generate_user(hashed_password=hash_password("password"))
773 # Authenticate with username
774 with auth_api_session() as (auth_api, metadata_interceptor):
775 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password"))
776 assert not reply.jailed
778 # Authenticate with email
779 with auth_api_session() as (auth_api, metadata_interceptor):
780 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password"))
781 assert not reply.jailed
784def test_unsuccessful_authenticate(db):
785 user, _ = generate_user(hashed_password=hash_password("password"))
787 # Invalid password
788 with auth_api_session() as (auth_api, metadata_interceptor):
789 with pytest.raises(grpc.RpcError) as e:
790 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword"))
791 assert e.value.code() == grpc.StatusCode.NOT_FOUND
792 assert e.value.details() == errors.INVALID_PASSWORD
794 # Invalid username
795 with auth_api_session() as (auth_api, metadata_interceptor):
796 with pytest.raises(grpc.RpcError) as e:
797 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password"))
798 assert e.value.code() == grpc.StatusCode.NOT_FOUND
799 assert e.value.details() == errors.ACCOUNT_NOT_FOUND
801 # Invalid email
802 with auth_api_session() as (auth_api, metadata_interceptor):
803 with pytest.raises(grpc.RpcError) as e:
804 reply = auth_api.Authenticate(
805 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password")
806 )
807 assert e.value.code() == grpc.StatusCode.NOT_FOUND
808 assert e.value.details() == errors.ACCOUNT_NOT_FOUND
810 # Invalid id
811 with auth_api_session() as (auth_api, metadata_interceptor):
812 with pytest.raises(grpc.RpcError) as e:
813 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password"))
814 assert e.value.code() == grpc.StatusCode.NOT_FOUND
815 assert e.value.details() == errors.ACCOUNT_NOT_FOUND
818def test_complete_signup(db):
819 testing_email = f"{random_hex(12)}@couchers.org.invalid"
820 with auth_api_session() as (auth_api, metadata_interceptor):
821 reply = auth_api.SignupFlow(
822 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
823 )
825 flow_token = reply.flow_token
827 with auth_api_session() as (auth_api, metadata_interceptor):
828 # Invalid username
829 with pytest.raises(grpc.RpcError) as e:
830 reply = auth_api.SignupFlow(
831 auth_pb2.SignupFlowReq(
832 flow_token=flow_token,
833 account=auth_pb2.SignupAccount(
834 username=" ",
835 password="a very insecure password",
836 city="Minas Tirith",
837 birthdate="1980-12-31",
838 gender="Robot",
839 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
840 lat=1,
841 lng=1,
842 radius=100,
843 accept_tos=True,
844 ),
845 )
846 )
847 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
848 assert e.value.details() == errors.INVALID_USERNAME
850 with auth_api_session() as (auth_api, metadata_interceptor):
851 # Invalid name
852 with pytest.raises(grpc.RpcError) as e:
853 reply = auth_api.SignupFlow(
854 auth_pb2.SignupFlowReq(
855 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid")
856 )
857 )
858 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
859 assert e.value.details() == errors.INVALID_NAME
861 with auth_api_session() as (auth_api, metadata_interceptor):
862 # Hosting status required
863 with pytest.raises(grpc.RpcError) as e:
864 reply = auth_api.SignupFlow(
865 auth_pb2.SignupFlowReq(
866 flow_token=flow_token,
867 account=auth_pb2.SignupAccount(
868 username="frodo",
869 password="a very insecure password",
870 city="Minas Tirith",
871 birthdate="1980-12-31",
872 gender="Robot",
873 hosting_status=None,
874 lat=1,
875 lng=1,
876 radius=100,
877 accept_tos=True,
878 ),
879 )
880 )
881 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
882 assert e.value.details() == errors.HOSTING_STATUS_REQUIRED
884 user, _ = generate_user()
885 with auth_api_session() as (auth_api, metadata_interceptor):
886 # Username unavailable
887 with pytest.raises(grpc.RpcError) as e:
888 reply = auth_api.SignupFlow(
889 auth_pb2.SignupFlowReq(
890 flow_token=flow_token,
891 account=auth_pb2.SignupAccount(
892 username=user.username,
893 password="a very insecure password",
894 city="Minas Tirith",
895 birthdate="1980-12-31",
896 gender="Robot",
897 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
898 lat=1,
899 lng=1,
900 radius=100,
901 accept_tos=True,
902 ),
903 )
904 )
905 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
906 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE
908 with auth_api_session() as (auth_api, metadata_interceptor):
909 # Invalid coordinate
910 with pytest.raises(grpc.RpcError) as e:
911 reply = auth_api.SignupFlow(
912 auth_pb2.SignupFlowReq(
913 flow_token=flow_token,
914 account=auth_pb2.SignupAccount(
915 username="frodo",
916 password="a very insecure password",
917 city="Minas Tirith",
918 birthdate="1980-12-31",
919 gender="Robot",
920 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
921 lat=0,
922 lng=0,
923 radius=100,
924 accept_tos=True,
925 ),
926 )
927 )
928 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
929 assert e.value.details() == errors.INVALID_COORDINATE
932def test_signup_token_regression(db):
933 # Repro steps:
934 # 1. Start a signup
935 # 2. Confirm the email
936 # 3. Start a new signup with the same email
937 # Expected: send a link to the email to continue signing up.
938 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'`
940 testing_email = f"{random_hex(12)}@couchers.org.invalid"
942 # 1. Start a signup
943 with auth_api_session() as (auth_api, metadata_interceptor):
944 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
945 flow_token = res.flow_token
946 assert flow_token
948 # 2. Confirm the email
949 with session_scope() as session:
950 email_token = (
951 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token
952 )
954 with auth_api_session() as (auth_api, metadata_interceptor):
955 res = auth_api.SignupFlow(
956 auth_pb2.SignupFlowReq(
957 flow_token=flow_token,
958 email_token=email_token,
959 )
960 )
962 # 3. Start a new signup with the same email
963 with auth_api_session() as (auth_api, metadata_interceptor):
964 with pytest.raises(grpc.RpcError) as e:
965 res = auth_api.SignupFlow(
966 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
967 )
968 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
969 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
972@pytest.mark.parametrize("opt_out", [True, False])
973def test_opt_out_of_newsletter(db, opt_out):
974 with auth_api_session() as (auth_api, metadata_interceptor):
975 res = auth_api.SignupFlow(
976 auth_pb2.SignupFlowReq(
977 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
978 account=auth_pb2.SignupAccount(
979 username="frodo",
980 password="a very insecure password",
981 birthdate="1970-01-01",
982 gender="Bot",
983 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
984 city="New York City",
985 lat=40.7331,
986 lng=-73.9778,
987 radius=500,
988 accept_tos=True,
989 opt_out_of_newsletter=opt_out,
990 ),
991 feedback=auth_pb2.ContributorForm(),
992 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
993 )
994 )
996 with session_scope() as session:
997 email_token = (
998 session.execute(select(SignupFlow).where(SignupFlow.flow_token == res.flow_token)).scalar_one().email_token
999 )
1001 with auth_api_session() as (auth_api, metadata_interceptor):
1002 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1004 user_id = res.auth_res.user_id
1006 with session_scope() as session:
1007 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1008 assert not user.in_sync_with_newsletter
1009 assert user.opt_out_of_newsletter == opt_out
1012def test_GetAuthState(db):
1013 user, token = generate_user()
1014 jailed_user, jailed_token = generate_user(accepted_tos=0)
1016 with auth_api_session() as (auth_api, metadata_interceptor):
1017 res = auth_api.GetAuthState(empty_pb2.Empty())
1018 assert not res.logged_in
1019 assert not res.HasField("auth_res")
1021 with auth_api_session() as (auth_api, metadata_interceptor):
1022 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1023 assert res.logged_in
1024 assert res.HasField("auth_res")
1025 assert res.auth_res.user_id == user.id
1026 assert not res.auth_res.jailed
1028 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1030 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1031 assert not res.logged_in
1032 assert not res.HasField("auth_res")
1034 with auth_api_session() as (auth_api, metadata_interceptor):
1035 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),))
1036 assert res.logged_in
1037 assert res.HasField("auth_res")
1038 assert res.auth_res.user_id == jailed_user.id
1039 assert res.auth_res.jailed
1042def test_signup_no_feedback_regression(db):
1043 """
1044 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup,
1045 this regression test checks that.
1046 """
1047 with auth_api_session() as (auth_api, metadata_interceptor):
1048 res = auth_api.SignupFlow(
1049 auth_pb2.SignupFlowReq(
1050 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1051 account=auth_pb2.SignupAccount(
1052 username="frodo",
1053 password="a very insecure password",
1054 birthdate="1970-01-01",
1055 gender="Bot",
1056 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1057 city="New York City",
1058 lat=40.7331,
1059 lng=-73.9778,
1060 radius=500,
1061 accept_tos=True,
1062 ),
1063 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1064 )
1065 )
1067 flow_token = res.flow_token
1069 assert res.flow_token
1070 assert not res.HasField("auth_res")
1071 assert not res.need_basic
1072 assert not res.need_account
1073 assert not res.need_feedback
1074 assert res.need_verify_email
1076 # read out the signup token directly from the database for now
1077 with session_scope() as session:
1078 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1079 assert flow.email_sent
1080 assert not flow.email_verified
1081 email_token = flow.email_token
1083 with auth_api_session() as (auth_api, metadata_interceptor):
1084 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1086 assert not res.flow_token
1087 assert res.HasField("auth_res")
1088 assert res.auth_res.user_id
1089 assert not res.auth_res.jailed
1090 assert not res.need_basic
1091 assert not res.need_account
1092 assert not res.need_feedback
1093 assert not res.need_verify_email
1095 # make sure we got the right token in a cookie
1096 with session_scope() as session:
1097 token = (
1098 session.execute(
1099 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
1100 ).scalar_one()
1101 ).token
1102 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
1103 assert sesh == token
1106def test_banned_username(db):
1107 testing_email = f"{random_hex(12)}@couchers.org.invalid"
1108 with auth_api_session() as (auth_api, metadata_interceptor):
1109 reply = auth_api.SignupFlow(
1110 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
1111 )
1113 flow_token = reply.flow_token
1115 with auth_api_session() as (auth_api, metadata_interceptor):
1116 # Banned username
1117 with pytest.raises(grpc.RpcError) as e:
1118 reply = auth_api.SignupFlow(
1119 auth_pb2.SignupFlowReq(
1120 flow_token=flow_token,
1121 account=auth_pb2.SignupAccount(
1122 username="thecouchersadminaccount",
1123 password="a very insecure password",
1124 city="Minas Tirith",
1125 birthdate="1980-12-31",
1126 gender="Robot",
1127 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1128 lat=1,
1129 lng=1,
1130 radius=100,
1131 accept_tos=True,
1132 ),
1133 )
1134 )
1135 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1136 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE
1139# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*