Coverage for src/tests/test_auth.py: 100%
588 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1import http.cookies
3import grpc
4import pytest
5from google.protobuf import empty_pb2, wrappers_pb2
6from sqlalchemy.sql import delete, func
8from couchers import errors, urls
9from couchers.crypto import hash_password, random_hex
10from couchers.db import session_scope
11from couchers.models import (
12 ContributeOption,
13 ContributorForm,
14 InviteCode,
15 LoginToken,
16 PasswordResetToken,
17 SignupFlow,
18 Upload,
19 User,
20 UserSession,
21)
22from couchers.sql import couchers_select as select
23from proto import api_pb2, auth_pb2
24from tests.test_fixtures import ( # noqa
25 api_session,
26 auth_api_session,
27 db,
28 email_fields,
29 fast_passwords,
30 generate_user,
31 mock_notification_email,
32 push_collector,
33 real_api_session,
34 testconfig,
35)
38@pytest.fixture(autouse=True)
39def _(testconfig, fast_passwords):
40 pass
43def get_session_cookie_tokens(metadata_interceptor):
44 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"]
45 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value
46 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value
47 return sesh, uid
50def test_UsernameValid(db):
51 with auth_api_session() as (auth_api, metadata_interceptor):
52 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid
54 with auth_api_session() as (auth_api, metadata_interceptor):
55 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid
58def test_signup_incremental(db):
59 with auth_api_session() as (auth_api, metadata_interceptor):
60 res = auth_api.SignupFlow(
61 auth_pb2.SignupFlowReq(
62 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
63 )
64 )
66 flow_token = res.flow_token
67 assert res.flow_token
68 assert not res.HasField("auth_res")
69 assert not res.need_basic
70 assert res.need_account
71 assert not res.need_feedback
72 assert res.need_verify_email
73 assert res.need_accept_community_guidelines
75 # read out the signup token directly from the database for now
76 with session_scope() as session:
77 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
78 assert flow.email_sent
79 assert not flow.email_verified
80 email_token = flow.email_token
82 with auth_api_session() as (auth_api, metadata_interceptor):
83 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token))
85 assert res.flow_token == flow_token
86 assert not res.HasField("auth_res")
87 assert not res.need_basic
88 assert res.need_account
89 assert not res.need_feedback
90 assert res.need_verify_email
91 assert res.need_accept_community_guidelines
93 # Add feedback
94 with auth_api_session() as (auth_api, metadata_interceptor):
95 res = auth_api.SignupFlow(
96 auth_pb2.SignupFlowReq(
97 flow_token=flow_token,
98 feedback=auth_pb2.ContributorForm(
99 ideas="I'm a robot, incapable of original ideation",
100 features="I love all your features",
101 experience="I haven't done couch surfing before",
102 contribute=auth_pb2.CONTRIBUTE_OPTION_YES,
103 contribute_ways=["serving", "backend"],
104 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes",
105 ),
106 )
107 )
109 assert res.flow_token == flow_token
110 assert not res.HasField("auth_res")
111 assert not res.need_basic
112 assert res.need_account
113 assert not res.need_feedback
114 assert res.need_verify_email
115 assert res.need_accept_community_guidelines
117 # Agree to community guidelines
118 with auth_api_session() as (auth_api, metadata_interceptor):
119 res = auth_api.SignupFlow(
120 auth_pb2.SignupFlowReq(
121 flow_token=flow_token,
122 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
123 )
124 )
126 assert res.flow_token == flow_token
127 assert not res.HasField("auth_res")
128 assert not res.need_basic
129 assert res.need_account
130 assert not res.need_feedback
131 assert res.need_verify_email
132 assert not res.need_accept_community_guidelines
134 # Verify email
135 with auth_api_session() as (auth_api, metadata_interceptor):
136 res = auth_api.SignupFlow(
137 auth_pb2.SignupFlowReq(
138 flow_token=flow_token,
139 email_token=email_token,
140 )
141 )
143 assert res.flow_token == flow_token
144 assert not res.HasField("auth_res")
145 assert not res.need_basic
146 assert res.need_account
147 assert not res.need_feedback
148 assert not res.need_verify_email
149 assert not res.need_accept_community_guidelines
151 # Finally finish off account info
152 with auth_api_session() as (auth_api, metadata_interceptor):
153 res = auth_api.SignupFlow(
154 auth_pb2.SignupFlowReq(
155 flow_token=flow_token,
156 account=auth_pb2.SignupAccount(
157 username="frodo",
158 password="a very insecure password",
159 birthdate="1970-01-01",
160 gender="Bot",
161 hosting_status=api_pb2.HOSTING_STATUS_MAYBE,
162 city="New York City",
163 lat=40.7331,
164 lng=-73.9778,
165 radius=500,
166 accept_tos=True,
167 ),
168 )
169 )
171 assert not res.flow_token
172 assert res.HasField("auth_res")
173 assert res.auth_res.user_id
174 assert not res.auth_res.jailed
175 assert not res.need_basic
176 assert not res.need_account
177 assert not res.need_feedback
178 assert not res.need_verify_email
179 assert not res.need_accept_community_guidelines
181 user_id = res.auth_res.user_id
183 sess_token, uid = get_session_cookie_tokens(metadata_interceptor)
184 assert uid == str(user_id)
186 with api_session(sess_token) as api:
187 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id)))
189 assert res.username == "frodo"
190 assert res.gender == "Bot"
191 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE
192 assert res.city == "New York City"
193 assert res.lat == 40.7331
194 assert res.lng == -73.9778
195 assert res.radius == 500
197 with session_scope() as session:
198 form = session.execute(select(ContributorForm)).scalar_one()
200 assert form.ideas == "I'm a robot, incapable of original ideation"
201 assert form.features == "I love all your features"
202 assert form.experience == "I haven't done couch surfing before"
203 assert form.contribute == ContributeOption.yes
204 assert form.contribute_ways == ["serving", "backend"]
205 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes"
208def _quick_signup():
209 with auth_api_session() as (auth_api, metadata_interceptor):
210 res = auth_api.SignupFlow(
211 auth_pb2.SignupFlowReq(
212 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
213 account=auth_pb2.SignupAccount(
214 username="frodo",
215 password="a very insecure password",
216 birthdate="1970-01-01",
217 gender="Bot",
218 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
219 city="New York City",
220 lat=40.7331,
221 lng=-73.9778,
222 radius=500,
223 accept_tos=True,
224 ),
225 feedback=auth_pb2.ContributorForm(),
226 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
227 )
228 )
230 flow_token = res.flow_token
232 assert res.flow_token
233 assert not res.HasField("auth_res")
234 assert not res.need_basic
235 assert not res.need_account
236 assert not res.need_feedback
237 assert res.need_verify_email
239 # read out the signup token directly from the database for now
240 with session_scope() as session:
241 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
242 assert flow.email_sent
243 assert not flow.email_verified
244 email_token = flow.email_token
246 with auth_api_session() as (auth_api, metadata_interceptor):
247 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
249 assert not res.flow_token
250 assert res.HasField("auth_res")
251 assert res.auth_res.user_id
252 assert not res.auth_res.jailed
253 assert not res.need_basic
254 assert not res.need_account
255 assert not res.need_feedback
256 assert not res.need_verify_email
258 # make sure we got the right token in a cookie
259 with session_scope() as session:
260 token = (
261 session.execute(
262 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
263 ).scalar_one()
264 ).token
265 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
266 assert sesh == token
269def test_signup(db):
270 _quick_signup()
273def test_basic_login(db):
274 # Create our test user using signup
275 _quick_signup()
277 with auth_api_session() as (auth_api, metadata_interceptor):
278 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
280 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
282 with session_scope() as session:
283 token = (
284 session.execute(
285 select(UserSession)
286 .join(User, UserSession.user_id == User.id)
287 .where(User.username == "frodo")
288 .where(UserSession.token == reply_token)
289 .where(UserSession.is_valid)
290 ).scalar_one_or_none()
291 ).token
292 assert token
294 # log out
295 with auth_api_session() as (auth_api, metadata_interceptor):
296 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
299def test_login_part_signed_up_verified_email(db):
300 """
301 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up.
302 """
303 with auth_api_session() as (auth_api, metadata_interceptor):
304 res = auth_api.SignupFlow(
305 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"))
306 )
308 flow_token = res.flow_token
309 assert res.need_verify_email
311 # verify the email
312 with session_scope() as session:
313 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
314 flow_token = flow.flow_token
315 email_token = flow.email_token
316 with auth_api_session() as (auth_api, metadata_interceptor):
317 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
319 with mock_notification_email() as mock:
320 with auth_api_session() as (auth_api, metadata_interceptor):
321 with pytest.raises(grpc.RpcError) as e:
322 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd"))
323 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
325 assert mock.call_count == 1
326 e = email_fields(mock)
327 assert e.recipient == "email@couchers.org.invalid"
328 assert flow_token in e.plain
329 assert flow_token in e.html
332def test_login_part_signed_up_not_verified_email(db):
333 with auth_api_session() as (auth_api, metadata_interceptor):
334 res = auth_api.SignupFlow(
335 auth_pb2.SignupFlowReq(
336 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
337 account=auth_pb2.SignupAccount(
338 username="frodo",
339 password="a very insecure password",
340 birthdate="1999-01-01",
341 gender="Bot",
342 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
343 city="New York City",
344 lat=40.7331,
345 lng=-73.9778,
346 radius=500,
347 accept_tos=True,
348 ),
349 )
350 )
352 flow_token = res.flow_token
353 assert res.need_verify_email
355 with mock_notification_email() as mock:
356 with auth_api_session() as (auth_api, metadata_interceptor):
357 with pytest.raises(grpc.RpcError) as e:
358 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd"))
359 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
361 with session_scope() as session:
362 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
363 email_token = flow.email_token
365 assert mock.call_count == 1
366 e = email_fields(mock)
367 assert e.recipient == "email@couchers.org.invalid"
368 assert email_token in e.plain
369 assert email_token in e.html
372def test_banned_user(db):
373 _quick_signup()
375 with session_scope() as session:
376 session.execute(select(User)).scalar_one().is_banned = True
378 with auth_api_session() as (auth_api, metadata_interceptor):
379 with pytest.raises(grpc.RpcError) as e:
380 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
381 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
382 assert e.value.details() == errors.ACCOUNT_SUSPENDED
385def test_deleted_user(db):
386 _quick_signup()
388 with session_scope() as session:
389 session.execute(select(User)).scalar_one().is_deleted = True
391 with auth_api_session() as (auth_api, metadata_interceptor):
392 with pytest.raises(grpc.RpcError) as e:
393 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
394 assert e.value.code() == grpc.StatusCode.NOT_FOUND
395 assert e.value.details() == errors.ACCOUNT_NOT_FOUND
398def test_invalid_token(db):
399 user1, token1 = generate_user()
400 user2, token2 = generate_user()
402 wrong_token = random_hex(32)
404 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e:
405 res = api.GetUser(api_pb2.GetUserReq(user=user2.username))
407 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
408 assert e.value.details() == "Unauthorized"
411def test_password_reset_v2(db, push_collector):
412 user, token = generate_user(hashed_password=hash_password("mypassword"))
414 with auth_api_session() as (auth_api, metadata_interceptor):
415 with mock_notification_email() as mock:
416 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username))
418 with session_scope() as session:
419 password_reset_token = session.execute(select(PasswordResetToken)).scalar_one().token
421 assert mock.call_count == 1
422 e = email_fields(mock)
423 assert e.recipient == user.email
424 assert "reset" in e.subject.lower()
425 assert password_reset_token in e.plain
426 assert password_reset_token in e.html
427 unique_string = "You asked for your password to be reset on Couchers.org."
428 assert unique_string in e.plain
429 assert unique_string in e.html
430 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain
431 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html
432 assert "support@couchers.org" in e.plain
433 assert "support@couchers.org" in e.html
435 push_collector.assert_user_push_matches_fields(
436 user.id,
437 title="A password reset was initiated on your account",
438 body="Someone initiated a password change on your account.",
439 )
441 # make sure bad password are caught
442 with auth_api_session() as (auth_api, metadata_interceptor):
443 with pytest.raises(grpc.RpcError) as e:
444 auth_api.CompletePasswordResetV2(
445 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password")
446 )
447 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
448 assert e.value.details() == errors.INSECURE_PASSWORD
450 # make sure we can set a good password
451 with auth_api_session() as (auth_api, metadata_interceptor):
452 pwd = random_hex()
453 with mock_notification_email() as mock:
454 res = auth_api.CompletePasswordResetV2(
455 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd)
456 )
458 push_collector.assert_user_push_matches_fields(
459 user.id,
460 ix=1,
461 title="Your password was successfully reset",
462 body="Your password on Couchers.org was changed. If that was you, then no further action is needed.",
463 )
465 session_token, _ = get_session_cookie_tokens(metadata_interceptor)
467 with session_scope() as session:
468 other_session_token = (
469 session.execute(
470 select(UserSession)
471 .join(User, UserSession.user_id == User.id)
472 .where(User.username == user.username)
473 .where(UserSession.token == session_token)
474 .where(UserSession.is_valid)
475 ).scalar_one_or_none()
476 ).token
477 assert other_session_token
479 # make sure we can't set a password again
480 with auth_api_session() as (auth_api, metadata_interceptor):
481 with pytest.raises(grpc.RpcError) as e:
482 auth_api.CompletePasswordResetV2(
483 auth_pb2.CompletePasswordResetV2Req(
484 password_reset_token=password_reset_token, new_password=random_hex()
485 )
486 )
487 assert e.value.code() == grpc.StatusCode.NOT_FOUND
488 assert e.value.details() == errors.INVALID_TOKEN
490 with session_scope() as session:
491 user = session.execute(select(User)).scalar_one()
492 assert user.hashed_password == hash_password(pwd)
495def test_password_reset_no_such_user(db):
496 user, token = generate_user()
498 with auth_api_session() as (auth_api, metadata_interceptor):
499 res = auth_api.ResetPassword(
500 auth_pb2.ResetPasswordReq(
501 user="nonexistentuser",
502 )
503 )
505 with session_scope() as session:
506 res = session.execute(select(PasswordResetToken)).scalar_one_or_none()
508 assert res is None
511def test_password_reset_invalid_token_v2(db):
512 password = random_hex()
513 user, token = generate_user(hashed_password=hash_password(password))
515 with auth_api_session() as (auth_api, metadata_interceptor):
516 res = auth_api.ResetPassword(
517 auth_pb2.ResetPasswordReq(
518 user=user.username,
519 )
520 )
522 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e:
523 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken"))
524 assert e.value.code() == grpc.StatusCode.NOT_FOUND
525 assert e.value.details() == errors.INVALID_TOKEN
527 with session_scope() as session:
528 user = session.execute(select(User)).scalar_one()
529 assert user.hashed_password == hash_password(password)
532def test_logout_invalid_token(db):
533 # Create our test user using signup
534 _quick_signup()
536 with auth_api_session() as (auth_api, metadata_interceptor):
537 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
539 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
541 # delete all login tokens
542 with session_scope() as session:
543 session.execute(delete(LoginToken))
545 # log out with non-existent token should still return a valid result
546 with auth_api_session() as (auth_api, metadata_interceptor):
547 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
549 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
550 # make sure we set an empty cookie
551 assert reply_token == ""
554def test_signup_without_password(db):
555 with auth_api_session() as (auth_api, metadata_interceptor):
556 with pytest.raises(grpc.RpcError) as e:
557 auth_api.SignupFlow(
558 auth_pb2.SignupFlowReq(
559 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
560 account=auth_pb2.SignupAccount(
561 username="frodo",
562 password="bad",
563 city="Minas Tirith",
564 birthdate="9999-12-31", # arbitrary future birthdate
565 gender="Robot",
566 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
567 lat=1,
568 lng=1,
569 radius=100,
570 accept_tos=True,
571 ),
572 feedback=auth_pb2.ContributorForm(),
573 )
574 )
575 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
576 assert e.value.details() == errors.PASSWORD_TOO_SHORT
579def test_signup_invalid_birthdate(db):
580 with auth_api_session() as (auth_api, metadata_interceptor):
581 with pytest.raises(grpc.RpcError) as e:
582 auth_api.SignupFlow(
583 auth_pb2.SignupFlowReq(
584 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
585 account=auth_pb2.SignupAccount(
586 username="frodo",
587 password="a very insecure password",
588 city="Minas Tirith",
589 birthdate="9999-12-31", # arbitrary future birthdate
590 gender="Robot",
591 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
592 lat=1,
593 lng=1,
594 radius=100,
595 accept_tos=True,
596 ),
597 feedback=auth_pb2.ContributorForm(),
598 )
599 )
600 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
601 assert e.value.details() == errors.INVALID_BIRTHDATE
603 res = auth_api.SignupFlow(
604 auth_pb2.SignupFlowReq(
605 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"),
606 account=auth_pb2.SignupAccount(
607 username="ceelo",
608 password="a very insecure password",
609 city="New York City",
610 birthdate="2000-12-31", # arbitrary birthdate older than 18 years
611 gender="Helicopter",
612 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
613 lat=1,
614 lng=1,
615 radius=100,
616 accept_tos=True,
617 ),
618 feedback=auth_pb2.ContributorForm(),
619 )
620 )
622 assert res.flow_token
624 with pytest.raises(grpc.RpcError) as e:
625 auth_api.SignupFlow(
626 auth_pb2.SignupFlowReq(
627 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"),
628 account=auth_pb2.SignupAccount(
629 username="franklin",
630 password="a very insecure password",
631 city="Los Santos",
632 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs
633 gender="Male",
634 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
635 lat=1,
636 lng=1,
637 radius=100,
638 accept_tos=True,
639 ),
640 feedback=auth_pb2.ContributorForm(),
641 )
642 )
643 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
644 assert e.value.details() == errors.INVALID_BIRTHDATE
646 with session_scope() as session:
647 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1
650def test_signup_invalid_email(db):
651 with auth_api_session() as (auth_api, metadata_interceptor):
652 with pytest.raises(grpc.RpcError) as e:
653 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a")))
654 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
655 assert e.value.details() == errors.INVALID_EMAIL
657 with auth_api_session() as (auth_api, metadata_interceptor):
658 with pytest.raises(grpc.RpcError) as e:
659 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b")))
660 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
661 assert e.value.details() == errors.INVALID_EMAIL
663 with auth_api_session() as (auth_api, metadata_interceptor):
664 with pytest.raises(grpc.RpcError) as e:
665 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.")))
666 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
667 assert e.value.details() == errors.INVALID_EMAIL
669 with auth_api_session() as (auth_api, metadata_interceptor):
670 with pytest.raises(grpc.RpcError) as e:
671 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c")))
672 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
673 assert e.value.details() == errors.INVALID_EMAIL
676def test_signup_existing_email(db):
677 # Signed up user
678 user, _ = generate_user()
680 with auth_api_session() as (auth_api, metadata_interceptor):
681 with pytest.raises(grpc.RpcError) as e:
682 reply = auth_api.SignupFlow(
683 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email))
684 )
685 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
686 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_TAKEN
689def test_signup_continue_with_email(db):
690 testing_email = f"{random_hex(12)}@couchers.org.invalid"
691 with auth_api_session() as (auth_api, metadata_interceptor):
692 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
693 flow_token = res.flow_token
694 assert flow_token
696 # continue with same email, should just send another email to the user
697 with auth_api_session() as (auth_api, metadata_interceptor):
698 with pytest.raises(grpc.RpcError) as e:
699 res = auth_api.SignupFlow(
700 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
701 )
702 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
703 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
706def test_signup_resend_email(db):
707 with auth_api_session() as (auth_api, metadata_interceptor):
708 with mock_notification_email() as mock:
709 res = auth_api.SignupFlow(
710 auth_pb2.SignupFlowReq(
711 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
712 account=auth_pb2.SignupAccount(
713 username="frodo",
714 password="a very insecure password",
715 birthdate="1970-01-01",
716 gender="Bot",
717 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
718 city="New York City",
719 lat=40.7331,
720 lng=-73.9778,
721 radius=500,
722 accept_tos=True,
723 ),
724 feedback=auth_pb2.ContributorForm(),
725 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
726 )
727 )
728 assert mock.call_count == 1
729 e = email_fields(mock)
730 assert e.recipient == "email@couchers.org.invalid"
732 flow_token = res.flow_token
733 assert flow_token
735 with session_scope() as session:
736 flow = session.execute(select(SignupFlow)).scalar_one()
737 assert flow.flow_token == flow_token
738 assert flow.email_sent
739 assert not flow.email_verified
740 email_token = flow.email_token
742 # ask for a new signup email
743 with auth_api_session() as (auth_api, metadata_interceptor):
744 with mock_notification_email() as mock:
745 res = auth_api.SignupFlow(
746 auth_pb2.SignupFlowReq(
747 flow_token=flow_token,
748 resend_verification_email=True,
749 )
750 )
751 assert mock.call_count == 1
752 e = email_fields(mock)
753 assert e.recipient == "email@couchers.org.invalid"
754 assert email_token in e.plain
755 assert email_token in e.html
757 with session_scope() as session:
758 flow = session.execute(select(SignupFlow)).scalar_one()
759 assert not flow.email_verified
761 with auth_api_session() as (auth_api, metadata_interceptor):
762 res = auth_api.SignupFlow(
763 auth_pb2.SignupFlowReq(
764 email_token=email_token,
765 )
766 )
768 assert not res.flow_token
769 assert res.HasField("auth_res")
772def test_successful_authenticate(db):
773 user, _ = generate_user(hashed_password=hash_password("password"))
775 # Authenticate with username
776 with auth_api_session() as (auth_api, metadata_interceptor):
777 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password"))
778 assert not reply.jailed
780 # Authenticate with email
781 with auth_api_session() as (auth_api, metadata_interceptor):
782 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password"))
783 assert not reply.jailed
786def test_unsuccessful_authenticate(db):
787 user, _ = generate_user(hashed_password=hash_password("password"))
789 # Invalid password
790 with auth_api_session() as (auth_api, metadata_interceptor):
791 with pytest.raises(grpc.RpcError) as e:
792 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword"))
793 assert e.value.code() == grpc.StatusCode.NOT_FOUND
794 assert e.value.details() == errors.INVALID_PASSWORD
796 # Invalid username
797 with auth_api_session() as (auth_api, metadata_interceptor):
798 with pytest.raises(grpc.RpcError) as e:
799 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password"))
800 assert e.value.code() == grpc.StatusCode.NOT_FOUND
801 assert e.value.details() == errors.ACCOUNT_NOT_FOUND
803 # Invalid email
804 with auth_api_session() as (auth_api, metadata_interceptor):
805 with pytest.raises(grpc.RpcError) as e:
806 reply = auth_api.Authenticate(
807 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password")
808 )
809 assert e.value.code() == grpc.StatusCode.NOT_FOUND
810 assert e.value.details() == errors.ACCOUNT_NOT_FOUND
812 # Invalid id
813 with auth_api_session() as (auth_api, metadata_interceptor):
814 with pytest.raises(grpc.RpcError) as e:
815 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password"))
816 assert e.value.code() == grpc.StatusCode.NOT_FOUND
817 assert e.value.details() == errors.ACCOUNT_NOT_FOUND
820def test_complete_signup(db):
821 testing_email = f"{random_hex(12)}@couchers.org.invalid"
822 with auth_api_session() as (auth_api, metadata_interceptor):
823 reply = auth_api.SignupFlow(
824 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
825 )
827 flow_token = reply.flow_token
829 with auth_api_session() as (auth_api, metadata_interceptor):
830 # Invalid username
831 with pytest.raises(grpc.RpcError) as e:
832 reply = auth_api.SignupFlow(
833 auth_pb2.SignupFlowReq(
834 flow_token=flow_token,
835 account=auth_pb2.SignupAccount(
836 username=" ",
837 password="a very insecure password",
838 city="Minas Tirith",
839 birthdate="1980-12-31",
840 gender="Robot",
841 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
842 lat=1,
843 lng=1,
844 radius=100,
845 accept_tos=True,
846 ),
847 )
848 )
849 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
850 assert e.value.details() == errors.INVALID_USERNAME
852 with auth_api_session() as (auth_api, metadata_interceptor):
853 # Invalid name
854 with pytest.raises(grpc.RpcError) as e:
855 reply = auth_api.SignupFlow(
856 auth_pb2.SignupFlowReq(
857 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid")
858 )
859 )
860 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
861 assert e.value.details() == errors.INVALID_NAME
863 with auth_api_session() as (auth_api, metadata_interceptor):
864 # Hosting status required
865 with pytest.raises(grpc.RpcError) as e:
866 reply = auth_api.SignupFlow(
867 auth_pb2.SignupFlowReq(
868 flow_token=flow_token,
869 account=auth_pb2.SignupAccount(
870 username="frodo",
871 password="a very insecure password",
872 city="Minas Tirith",
873 birthdate="1980-12-31",
874 gender="Robot",
875 hosting_status=None,
876 lat=1,
877 lng=1,
878 radius=100,
879 accept_tos=True,
880 ),
881 )
882 )
883 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
884 assert e.value.details() == errors.HOSTING_STATUS_REQUIRED
886 user, _ = generate_user()
887 with auth_api_session() as (auth_api, metadata_interceptor):
888 # Username unavailable
889 with pytest.raises(grpc.RpcError) as e:
890 reply = auth_api.SignupFlow(
891 auth_pb2.SignupFlowReq(
892 flow_token=flow_token,
893 account=auth_pb2.SignupAccount(
894 username=user.username,
895 password="a very insecure password",
896 city="Minas Tirith",
897 birthdate="1980-12-31",
898 gender="Robot",
899 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
900 lat=1,
901 lng=1,
902 radius=100,
903 accept_tos=True,
904 ),
905 )
906 )
907 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
908 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE
910 with auth_api_session() as (auth_api, metadata_interceptor):
911 # Invalid coordinate
912 with pytest.raises(grpc.RpcError) as e:
913 reply = auth_api.SignupFlow(
914 auth_pb2.SignupFlowReq(
915 flow_token=flow_token,
916 account=auth_pb2.SignupAccount(
917 username="frodo",
918 password="a very insecure password",
919 city="Minas Tirith",
920 birthdate="1980-12-31",
921 gender="Robot",
922 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
923 lat=0,
924 lng=0,
925 radius=100,
926 accept_tos=True,
927 ),
928 )
929 )
930 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
931 assert e.value.details() == errors.INVALID_COORDINATE
934def test_signup_token_regression(db):
935 # Repro steps:
936 # 1. Start a signup
937 # 2. Confirm the email
938 # 3. Start a new signup with the same email
939 # Expected: send a link to the email to continue signing up.
940 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'`
942 testing_email = f"{random_hex(12)}@couchers.org.invalid"
944 # 1. Start a signup
945 with auth_api_session() as (auth_api, metadata_interceptor):
946 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
947 flow_token = res.flow_token
948 assert flow_token
950 # 2. Confirm the email
951 with session_scope() as session:
952 email_token = (
953 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token
954 )
956 with auth_api_session() as (auth_api, metadata_interceptor):
957 res = auth_api.SignupFlow(
958 auth_pb2.SignupFlowReq(
959 flow_token=flow_token,
960 email_token=email_token,
961 )
962 )
964 # 3. Start a new signup with the same email
965 with auth_api_session() as (auth_api, metadata_interceptor):
966 with pytest.raises(grpc.RpcError) as e:
967 res = auth_api.SignupFlow(
968 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
969 )
970 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
971 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
974@pytest.mark.parametrize("opt_out", [True, False])
975def test_opt_out_of_newsletter(db, opt_out):
976 with auth_api_session() as (auth_api, metadata_interceptor):
977 res = auth_api.SignupFlow(
978 auth_pb2.SignupFlowReq(
979 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
980 account=auth_pb2.SignupAccount(
981 username="frodo",
982 password="a very insecure password",
983 birthdate="1970-01-01",
984 gender="Bot",
985 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
986 city="New York City",
987 lat=40.7331,
988 lng=-73.9778,
989 radius=500,
990 accept_tos=True,
991 opt_out_of_newsletter=opt_out,
992 ),
993 feedback=auth_pb2.ContributorForm(),
994 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
995 )
996 )
998 with session_scope() as session:
999 email_token = (
1000 session.execute(select(SignupFlow).where(SignupFlow.flow_token == res.flow_token)).scalar_one().email_token
1001 )
1003 with auth_api_session() as (auth_api, metadata_interceptor):
1004 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1006 user_id = res.auth_res.user_id
1008 with session_scope() as session:
1009 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1010 assert not user.in_sync_with_newsletter
1011 assert user.opt_out_of_newsletter == opt_out
1014def test_GetAuthState(db):
1015 user, token = generate_user()
1016 jailed_user, jailed_token = generate_user(accepted_tos=0)
1018 with auth_api_session() as (auth_api, metadata_interceptor):
1019 res = auth_api.GetAuthState(empty_pb2.Empty())
1020 assert not res.logged_in
1021 assert not res.HasField("auth_res")
1023 with auth_api_session() as (auth_api, metadata_interceptor):
1024 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1025 assert res.logged_in
1026 assert res.HasField("auth_res")
1027 assert res.auth_res.user_id == user.id
1028 assert not res.auth_res.jailed
1030 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1032 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1033 assert not res.logged_in
1034 assert not res.HasField("auth_res")
1036 with auth_api_session() as (auth_api, metadata_interceptor):
1037 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),))
1038 assert res.logged_in
1039 assert res.HasField("auth_res")
1040 assert res.auth_res.user_id == jailed_user.id
1041 assert res.auth_res.jailed
1044def test_signup_no_feedback_regression(db):
1045 """
1046 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup,
1047 this regression test checks that.
1048 """
1049 with auth_api_session() as (auth_api, metadata_interceptor):
1050 res = auth_api.SignupFlow(
1051 auth_pb2.SignupFlowReq(
1052 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1053 account=auth_pb2.SignupAccount(
1054 username="frodo",
1055 password="a very insecure password",
1056 birthdate="1970-01-01",
1057 gender="Bot",
1058 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1059 city="New York City",
1060 lat=40.7331,
1061 lng=-73.9778,
1062 radius=500,
1063 accept_tos=True,
1064 ),
1065 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1066 )
1067 )
1069 flow_token = res.flow_token
1071 assert res.flow_token
1072 assert not res.HasField("auth_res")
1073 assert not res.need_basic
1074 assert not res.need_account
1075 assert not res.need_feedback
1076 assert res.need_verify_email
1078 # read out the signup token directly from the database for now
1079 with session_scope() as session:
1080 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1081 assert flow.email_sent
1082 assert not flow.email_verified
1083 email_token = flow.email_token
1085 with auth_api_session() as (auth_api, metadata_interceptor):
1086 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1088 assert not res.flow_token
1089 assert res.HasField("auth_res")
1090 assert res.auth_res.user_id
1091 assert not res.auth_res.jailed
1092 assert not res.need_basic
1093 assert not res.need_account
1094 assert not res.need_feedback
1095 assert not res.need_verify_email
1097 # make sure we got the right token in a cookie
1098 with session_scope() as session:
1099 token = (
1100 session.execute(
1101 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
1102 ).scalar_one()
1103 ).token
1104 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
1105 assert sesh == token
1108def test_banned_username(db):
1109 testing_email = f"{random_hex(12)}@couchers.org.invalid"
1110 with auth_api_session() as (auth_api, metadata_interceptor):
1111 reply = auth_api.SignupFlow(
1112 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
1113 )
1115 flow_token = reply.flow_token
1117 with auth_api_session() as (auth_api, metadata_interceptor):
1118 # Banned username
1119 with pytest.raises(grpc.RpcError) as e:
1120 reply = auth_api.SignupFlow(
1121 auth_pb2.SignupFlowReq(
1122 flow_token=flow_token,
1123 account=auth_pb2.SignupAccount(
1124 username="thecouchersadminaccount",
1125 password="a very insecure password",
1126 city="Minas Tirith",
1127 birthdate="1980-12-31",
1128 gender="Robot",
1129 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1130 lat=1,
1131 lng=1,
1132 radius=100,
1133 accept_tos=True,
1134 ),
1135 )
1136 )
1137 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1138 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE
1141# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*
1144def test_GetInviteCodeInfo(db):
1145 user, token = generate_user()
1146 code_id = "TST12345"
1148 with session_scope() as session:
1149 avatar = Upload(
1150 key="test_avatar.jpg",
1151 filename="test_avatar.jpg",
1152 creator_user_id=user.id,
1153 )
1154 session.add(avatar)
1155 session.flush()
1157 db_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
1158 db_user.avatar_key = avatar.key
1160 code = InviteCode(id=code_id, creator_user_id=user.id)
1161 session.add(code)
1162 session.commit()
1164 with auth_api_session() as (auth, _):
1165 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code_id))
1166 assert res.name == user.name
1167 assert res.username == user.username
1168 assert res.avatar_url.endswith("/img/thumbnail/test_avatar.jpg")
1169 assert res.url == urls.invite_code_link(code=code_id)
1172def test_GetInviteCodeInfo_no_avatar(db):
1173 user, token = generate_user()
1174 code_id = "NOAVTR1"
1176 with session_scope() as session:
1177 db_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
1178 db_user.avatar_key = None
1180 code = InviteCode(id="NOAVTR1", creator_user_id=user.id)
1181 session.add(code)
1182 session.commit()
1184 with auth_api_session() as (auth, _):
1185 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code_id))
1186 assert res.name == user.name
1187 assert res.username == user.username
1188 assert res.avatar_url == ""
1189 assert res.url == urls.invite_code_link(code=code_id)
1192def test_GetInviteCodeInfo_not_found(db):
1193 user, token = generate_user()
1195 with auth_api_session() as (auth, _):
1196 with pytest.raises(grpc.RpcError) as e:
1197 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE"))
1198 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1199 assert e.value.details() == errors.INVITE_CODE_NOT_FOUND
1202def test_SignupFlow_invite_code(db):
1203 user, token = generate_user()
1204 invite_code = "INV12345"
1205 with session_scope() as session:
1206 session.flush()
1207 invite = InviteCode(id=invite_code, creator_user_id=user.id)
1208 session.add(invite)
1209 session.commit()
1211 with auth_api_session() as (auth_api, _):
1212 # Signup basic step with invite code
1213 res = auth_api.SignupFlow(
1214 auth_pb2.SignupFlowReq(
1215 basic=auth_pb2.SignupBasic(
1216 name="Test User",
1217 email="inviteuser@example.com",
1218 invite_code=invite_code,
1219 )
1220 )
1221 )
1222 flow_token = res.flow_token
1223 assert flow_token
1225 # Confirm email
1226 with session_scope() as session:
1227 email_token = (
1228 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token
1229 )
1231 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1233 # Signup account step
1234 auth_api.SignupFlow(
1235 auth_pb2.SignupFlowReq(
1236 flow_token=flow_token,
1237 account=auth_pb2.SignupAccount(
1238 username="invited_user",
1239 password="secure password",
1240 birthdate="1990-01-01",
1241 gender="Other",
1242 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1243 city="Example City",
1244 lat=1,
1245 lng=5,
1246 radius=100,
1247 accept_tos=True,
1248 ),
1249 feedback=auth_pb2.ContributorForm(),
1250 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1251 )
1252 )
1253 with session_scope() as session:
1254 users = session.execute(select(User)).scalars().all()
1256 # Check that invite_code_id is stored in the final User object
1257 with session_scope() as session:
1258 user = session.execute(select(User).where(User.username == "invited_user")).scalar_one()
1259 assert user.invite_code_id == invite_code