Coverage for src/tests/test_auth.py: 100%
601 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-06 08:02 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-06 08:02 +0000
1import http.cookies
3import grpc
4import pytest
5from google.protobuf import empty_pb2, wrappers_pb2
6from sqlalchemy import update
7from sqlalchemy.sql import delete, func
9from couchers import urls
10from couchers.crypto import hash_password, random_hex
11from couchers.db import session_scope
12from couchers.models import (
13 ContributeOption,
14 ContributorForm,
15 InviteCode,
16 LoginToken,
17 PasswordResetToken,
18 SignupFlow,
19 Upload,
20 User,
21 UserSession,
22)
23from couchers.proto import api_pb2, auth_pb2
24from couchers.sql import couchers_select as select
25from tests.test_fixtures import ( # noqa
26 api_session,
27 auth_api_session,
28 db,
29 email_fields,
30 fast_passwords,
31 generate_user,
32 mock_notification_email,
33 push_collector,
34 real_api_session,
35 testconfig,
36)
39@pytest.fixture(autouse=True)
40def _(testconfig, fast_passwords):
41 pass
44def get_session_cookie_tokens(metadata_interceptor):
45 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"]
46 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value
47 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value
48 return sesh, uid
51def test_UsernameValid(db):
52 with auth_api_session() as (auth_api, metadata_interceptor):
53 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid
55 with auth_api_session() as (auth_api, metadata_interceptor):
56 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid
59def test_signup_incremental(db):
60 with auth_api_session() as (auth_api, metadata_interceptor):
61 res = auth_api.SignupFlow(
62 auth_pb2.SignupFlowReq(
63 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
64 )
65 )
67 flow_token = res.flow_token
68 assert res.flow_token
69 assert not res.HasField("auth_res")
70 assert not res.need_basic
71 assert res.need_account
72 assert not res.need_feedback
73 assert res.need_verify_email
74 assert res.need_accept_community_guidelines
76 # read out the signup token directly from the database for now
77 with session_scope() as session:
78 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
79 assert flow.email_sent
80 assert not flow.email_verified
81 email_token = flow.email_token
83 with auth_api_session() as (auth_api, metadata_interceptor):
84 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token))
86 assert res.flow_token == flow_token
87 assert not res.HasField("auth_res")
88 assert not res.need_basic
89 assert res.need_account
90 assert not res.need_feedback
91 assert res.need_verify_email
92 assert res.need_accept_community_guidelines
94 # Add feedback
95 with auth_api_session() as (auth_api, metadata_interceptor):
96 res = auth_api.SignupFlow(
97 auth_pb2.SignupFlowReq(
98 flow_token=flow_token,
99 feedback=auth_pb2.ContributorForm(
100 ideas="I'm a robot, incapable of original ideation",
101 features="I love all your features",
102 experience="I haven't done couch surfing before",
103 contribute=auth_pb2.CONTRIBUTE_OPTION_YES,
104 contribute_ways=["serving", "backend"],
105 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes",
106 ),
107 )
108 )
110 assert res.flow_token == flow_token
111 assert not res.HasField("auth_res")
112 assert not res.need_basic
113 assert res.need_account
114 assert not res.need_feedback
115 assert res.need_verify_email
116 assert res.need_accept_community_guidelines
118 # Agree to community guidelines
119 with auth_api_session() as (auth_api, metadata_interceptor):
120 res = auth_api.SignupFlow(
121 auth_pb2.SignupFlowReq(
122 flow_token=flow_token,
123 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
124 )
125 )
127 assert res.flow_token == flow_token
128 assert not res.HasField("auth_res")
129 assert not res.need_basic
130 assert res.need_account
131 assert not res.need_feedback
132 assert res.need_verify_email
133 assert not res.need_accept_community_guidelines
135 # Verify email
136 with auth_api_session() as (auth_api, metadata_interceptor):
137 res = auth_api.SignupFlow(
138 auth_pb2.SignupFlowReq(
139 flow_token=flow_token,
140 email_token=email_token,
141 )
142 )
144 assert res.flow_token == flow_token
145 assert not res.HasField("auth_res")
146 assert not res.need_basic
147 assert res.need_account
148 assert not res.need_feedback
149 assert not res.need_verify_email
150 assert not res.need_accept_community_guidelines
152 # Finally finish off account info
153 with auth_api_session() as (auth_api, metadata_interceptor):
154 res = auth_api.SignupFlow(
155 auth_pb2.SignupFlowReq(
156 flow_token=flow_token,
157 account=auth_pb2.SignupAccount(
158 username="frodo",
159 password="a very insecure password",
160 birthdate="1970-01-01",
161 gender="Bot",
162 hosting_status=api_pb2.HOSTING_STATUS_MAYBE,
163 city="New York City",
164 lat=40.7331,
165 lng=-73.9778,
166 radius=500,
167 accept_tos=True,
168 ),
169 )
170 )
172 assert not res.flow_token
173 assert res.HasField("auth_res")
174 assert res.auth_res.user_id
175 assert not res.auth_res.jailed
176 assert not res.need_basic
177 assert not res.need_account
178 assert not res.need_feedback
179 assert not res.need_verify_email
180 assert not res.need_accept_community_guidelines
182 user_id = res.auth_res.user_id
184 sess_token, uid = get_session_cookie_tokens(metadata_interceptor)
185 assert uid == str(user_id)
187 with api_session(sess_token) as api:
188 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id)))
190 assert res.username == "frodo"
191 assert res.gender == "Bot"
192 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE
193 assert res.city == "New York City"
194 assert res.lat == 40.7331
195 assert res.lng == -73.9778
196 assert res.radius == 500
198 with session_scope() as session:
199 form = session.execute(select(ContributorForm)).scalar_one()
201 assert form.ideas == "I'm a robot, incapable of original ideation"
202 assert form.features == "I love all your features"
203 assert form.experience == "I haven't done couch surfing before"
204 assert form.contribute == ContributeOption.yes
205 assert form.contribute_ways == ["serving", "backend"]
206 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes"
209def _quick_signup() -> int:
210 with auth_api_session() as (auth_api, metadata_interceptor):
211 res = auth_api.SignupFlow(
212 auth_pb2.SignupFlowReq(
213 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
214 account=auth_pb2.SignupAccount(
215 username="frodo",
216 password="a very insecure password",
217 birthdate="1970-01-01",
218 gender="Bot",
219 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
220 city="New York City",
221 lat=40.7331,
222 lng=-73.9778,
223 radius=500,
224 accept_tos=True,
225 ),
226 feedback=auth_pb2.ContributorForm(),
227 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
228 )
229 )
231 flow_token = res.flow_token
233 assert res.flow_token
234 assert not res.HasField("auth_res")
235 assert not res.need_basic
236 assert not res.need_account
237 assert not res.need_feedback
238 assert res.need_verify_email
240 # read out the signup token directly from the database for now
241 with session_scope() as session:
242 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
243 assert flow.email_sent
244 assert not flow.email_verified
245 email_token = flow.email_token
247 with auth_api_session() as (auth_api, metadata_interceptor):
248 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
250 assert not res.flow_token
251 assert res.HasField("auth_res")
252 assert res.auth_res.user_id
253 assert not res.auth_res.jailed
254 assert not res.need_basic
255 assert not res.need_account
256 assert not res.need_feedback
257 assert not res.need_verify_email
259 # make sure we got the right token in a cookie
260 with session_scope() as session:
261 token = session.execute(
262 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
263 ).scalar_one()
264 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
265 assert sesh == token
267 return res.auth_res.user_id
270def test_signup(db):
271 _quick_signup()
274def test_basic_login(db):
275 # Create our test user using signup
276 _quick_signup()
278 with auth_api_session() as (auth_api, metadata_interceptor):
279 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
281 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
283 with session_scope() as session:
284 token = session.execute(
285 select(UserSession.token)
286 .join(User, UserSession.user_id == User.id)
287 .where(User.username == "frodo")
288 .where(UserSession.token == reply_token)
289 .where(UserSession.is_valid)
290 ).scalar_one_or_none()
291 assert token
293 # log out
294 with auth_api_session() as (auth_api, metadata_interceptor):
295 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
298def test_login_part_signed_up_verified_email(db):
299 """
300 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up.
301 """
302 with auth_api_session() as (auth_api, metadata_interceptor):
303 res = auth_api.SignupFlow(
304 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"))
305 )
307 flow_token = res.flow_token
308 assert res.need_verify_email
310 # verify the email
311 with session_scope() as session:
312 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
313 flow_token = flow.flow_token
314 email_token = flow.email_token
315 with auth_api_session() as (auth_api, metadata_interceptor):
316 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
318 with mock_notification_email() as mock:
319 with auth_api_session() as (auth_api, metadata_interceptor):
320 with pytest.raises(grpc.RpcError) as e:
321 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd"))
322 assert e.value.details() == "Please check your email for a link to continue signing up."
324 assert mock.call_count == 1
325 e = email_fields(mock)
326 assert e.recipient == "email@couchers.org.invalid"
327 assert flow_token in e.plain
328 assert flow_token in e.html
331def test_login_part_signed_up_not_verified_email(db):
332 with auth_api_session() as (auth_api, metadata_interceptor):
333 res = auth_api.SignupFlow(
334 auth_pb2.SignupFlowReq(
335 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
336 account=auth_pb2.SignupAccount(
337 username="frodo",
338 password="a very insecure password",
339 birthdate="1999-01-01",
340 gender="Bot",
341 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
342 city="New York City",
343 lat=40.7331,
344 lng=-73.9778,
345 radius=500,
346 accept_tos=True,
347 ),
348 )
349 )
351 flow_token = res.flow_token
352 assert res.need_verify_email
354 with mock_notification_email() as mock:
355 with auth_api_session() as (auth_api, metadata_interceptor):
356 with pytest.raises(grpc.RpcError) as e:
357 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd"))
358 assert e.value.details() == "Please check your email for a link to continue signing up."
360 with session_scope() as session:
361 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
362 email_token = flow.email_token
364 assert mock.call_count == 1
365 e = email_fields(mock)
366 assert e.recipient == "email@couchers.org.invalid"
367 assert email_token in e.plain
368 assert email_token in e.html
371def test_banned_user(db):
372 _quick_signup()
374 with session_scope() as session:
375 session.execute(select(User)).scalar_one().is_banned = True
377 with auth_api_session() as (auth_api, metadata_interceptor):
378 with pytest.raises(grpc.RpcError) as e:
379 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
380 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
381 assert e.value.details() == "Your account is suspended."
384def test_deleted_user(db):
385 user_id = _quick_signup()
387 with session_scope() as session:
388 session.execute(update(User).where(User.id == user_id).values(is_deleted=True))
390 with auth_api_session() as (auth_api, metadata_interceptor):
391 with pytest.raises(grpc.RpcError) as e:
392 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
393 assert e.value.code() == grpc.StatusCode.NOT_FOUND
394 assert e.value.details() == "An account with that username or email was not found."
397def test_invalid_token(db):
398 user1, token1 = generate_user()
399 user2, token2 = generate_user()
401 wrong_token = random_hex(32)
403 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e:
404 res = api.GetUser(api_pb2.GetUserReq(user=user2.username))
406 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
407 assert e.value.details() == "Unauthorized"
410def test_password_reset_v2(db, push_collector):
411 user, token = generate_user(hashed_password=hash_password("mypassword"))
413 with auth_api_session() as (auth_api, metadata_interceptor):
414 with mock_notification_email() as mock:
415 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username))
417 with session_scope() as session:
418 password_reset_token = session.execute(select(PasswordResetToken.token)).scalar_one()
420 assert mock.call_count == 1
421 e = email_fields(mock)
422 assert e.recipient == user.email
423 assert "reset" in e.subject.lower()
424 assert password_reset_token in e.plain
425 assert password_reset_token in e.html
426 unique_string = "You asked for your password to be reset on Couchers.org."
427 assert unique_string in e.plain
428 assert unique_string in e.html
429 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain
430 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html
431 assert "support@couchers.org" in e.plain
432 assert "support@couchers.org" in e.html
434 push_collector.assert_user_push_matches_fields(
435 user.id,
436 title="A password reset was initiated on your account",
437 body="Someone initiated a password change on your account.",
438 )
440 # make sure bad password are caught
441 with auth_api_session() as (auth_api, metadata_interceptor):
442 with pytest.raises(grpc.RpcError) as e:
443 auth_api.CompletePasswordResetV2(
444 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password")
445 )
446 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
447 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable."
449 # make sure we can set a good password
450 with auth_api_session() as (auth_api, metadata_interceptor):
451 pwd = random_hex()
452 with mock_notification_email() as mock:
453 res = auth_api.CompletePasswordResetV2(
454 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd)
455 )
457 push_collector.assert_user_push_matches_fields(
458 user.id,
459 ix=1,
460 title="Your password was successfully reset",
461 body="Your password on Couchers.org was changed. If that was you, then no further action is needed.",
462 )
464 session_token, _ = get_session_cookie_tokens(metadata_interceptor)
466 with session_scope() as session:
467 other_session_token = session.execute(
468 select(UserSession.token)
469 .join(User, UserSession.user_id == User.id)
470 .where(User.username == user.username)
471 .where(UserSession.token == session_token)
472 .where(UserSession.is_valid)
473 ).scalar_one_or_none()
474 assert other_session_token
476 # make sure we can't set a password again
477 with auth_api_session() as (auth_api, metadata_interceptor):
478 with pytest.raises(grpc.RpcError) as e:
479 auth_api.CompletePasswordResetV2(
480 auth_pb2.CompletePasswordResetV2Req(
481 password_reset_token=password_reset_token, new_password=random_hex()
482 )
483 )
484 assert e.value.code() == grpc.StatusCode.NOT_FOUND
485 assert e.value.details() == "Invalid token."
487 with session_scope() as session:
488 user = session.execute(select(User)).scalar_one()
489 assert user.hashed_password == hash_password(pwd)
492def test_password_reset_no_such_user(db):
493 user, token = generate_user()
495 with auth_api_session() as (auth_api, metadata_interceptor):
496 res = auth_api.ResetPassword(
497 auth_pb2.ResetPasswordReq(
498 user="nonexistentuser",
499 )
500 )
502 with session_scope() as session:
503 res = session.execute(select(PasswordResetToken)).scalar_one_or_none()
505 assert res is None
508def test_password_reset_invalid_token_v2(db):
509 password = random_hex()
510 user, token = generate_user(hashed_password=hash_password(password))
512 with auth_api_session() as (auth_api, metadata_interceptor):
513 res = auth_api.ResetPassword(
514 auth_pb2.ResetPasswordReq(
515 user=user.username,
516 )
517 )
519 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e:
520 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken"))
521 assert e.value.code() == grpc.StatusCode.NOT_FOUND
522 assert e.value.details() == "Invalid token."
524 with session_scope() as session:
525 user = session.execute(select(User)).scalar_one()
526 assert user.hashed_password == hash_password(password)
529def test_logout_invalid_token(db):
530 # Create our test user using signup
531 _quick_signup()
533 with auth_api_session() as (auth_api, metadata_interceptor):
534 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
536 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
538 # delete all login tokens
539 with session_scope() as session:
540 session.execute(delete(LoginToken))
542 # log out with non-existent token should still return a valid result
543 with auth_api_session() as (auth_api, metadata_interceptor):
544 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
546 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
547 # make sure we set an empty cookie
548 assert reply_token == ""
551def test_signup_without_password(db):
552 with auth_api_session() as (auth_api, metadata_interceptor):
553 with pytest.raises(grpc.RpcError) as e:
554 auth_api.SignupFlow(
555 auth_pb2.SignupFlowReq(
556 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
557 account=auth_pb2.SignupAccount(
558 username="frodo",
559 password="bad",
560 city="Minas Tirith",
561 birthdate="9999-12-31", # arbitrary future birthdate
562 gender="Robot",
563 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
564 lat=1,
565 lng=1,
566 radius=100,
567 accept_tos=True,
568 ),
569 feedback=auth_pb2.ContributorForm(),
570 )
571 )
572 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
573 assert e.value.details() == "The password must be 8 or more characters long."
576def test_signup_invalid_birthdate(db):
577 with auth_api_session() as (auth_api, metadata_interceptor):
578 with pytest.raises(grpc.RpcError) as e:
579 auth_api.SignupFlow(
580 auth_pb2.SignupFlowReq(
581 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
582 account=auth_pb2.SignupAccount(
583 username="frodo",
584 password="a very insecure password",
585 city="Minas Tirith",
586 birthdate="9999-12-31", # arbitrary future birthdate
587 gender="Robot",
588 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
589 lat=1,
590 lng=1,
591 radius=100,
592 accept_tos=True,
593 ),
594 feedback=auth_pb2.ContributorForm(),
595 )
596 )
597 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
598 assert e.value.details() == "You must be at least 18 years old to sign up."
600 res = auth_api.SignupFlow(
601 auth_pb2.SignupFlowReq(
602 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"),
603 account=auth_pb2.SignupAccount(
604 username="ceelo",
605 password="a very insecure password",
606 city="New York City",
607 birthdate="2000-12-31", # arbitrary birthdate older than 18 years
608 gender="Helicopter",
609 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
610 lat=1,
611 lng=1,
612 radius=100,
613 accept_tos=True,
614 ),
615 feedback=auth_pb2.ContributorForm(),
616 )
617 )
619 assert res.flow_token
621 with pytest.raises(grpc.RpcError) as e:
622 auth_api.SignupFlow(
623 auth_pb2.SignupFlowReq(
624 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"),
625 account=auth_pb2.SignupAccount(
626 username="franklin",
627 password="a very insecure password",
628 city="Los Santos",
629 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs
630 gender="Male",
631 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
632 lat=1,
633 lng=1,
634 radius=100,
635 accept_tos=True,
636 ),
637 feedback=auth_pb2.ContributorForm(),
638 )
639 )
640 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
641 assert e.value.details() == "You must be at least 18 years old to sign up."
643 with session_scope() as session:
644 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1
647def test_signup_invalid_email(db):
648 with auth_api_session() as (auth_api, metadata_interceptor):
649 with pytest.raises(grpc.RpcError) as e:
650 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a")))
651 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
652 assert e.value.details() == "Invalid email."
654 with auth_api_session() as (auth_api, metadata_interceptor):
655 with pytest.raises(grpc.RpcError) as e:
656 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b")))
657 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
658 assert e.value.details() == "Invalid email."
660 with auth_api_session() as (auth_api, metadata_interceptor):
661 with pytest.raises(grpc.RpcError) as e:
662 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.")))
663 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
664 assert e.value.details() == "Invalid email."
666 with auth_api_session() as (auth_api, metadata_interceptor):
667 with pytest.raises(grpc.RpcError) as e:
668 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c")))
669 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
670 assert e.value.details() == "Invalid email."
673def test_signup_existing_email(db):
674 # Signed up user
675 user, _ = generate_user()
677 with auth_api_session() as (auth_api, metadata_interceptor):
678 with pytest.raises(grpc.RpcError) as e:
679 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email)))
680 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
681 assert e.value.details() == "That email address is already associated with an account. Please log in instead!"
684def test_signup_banned_user_email(db):
685 user, _ = generate_user()
687 with session_scope() as session:
688 session.execute(update(User).where(User.id == user.id).values(is_banned=True))
690 with auth_api_session() as (auth_api, _):
691 with pytest.raises(grpc.RpcError) as e:
692 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email)))
693 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
694 assert e.value.details() == "You cannot sign up with that email address."
697def test_signup_deleted_user_email(db):
698 user, _ = generate_user()
700 with session_scope() as session:
701 session.execute(update(User).where(User.id == user.id).values(is_deleted=True))
703 with auth_api_session() as (auth_api, _):
704 with pytest.raises(grpc.RpcError) as e:
705 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email)))
706 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
707 assert e.value.details() == "You cannot sign up with that email address."
710def test_signup_continue_with_email(db):
711 testing_email = f"{random_hex(12)}@couchers.org.invalid"
712 with auth_api_session() as (auth_api, metadata_interceptor):
713 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
714 flow_token = res.flow_token
715 assert flow_token
717 # continue with same email, should just send another email to the user
718 with auth_api_session() as (auth_api, metadata_interceptor):
719 with pytest.raises(grpc.RpcError) as e:
720 res = auth_api.SignupFlow(
721 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
722 )
723 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
724 assert e.value.details() == "Please check your email for a link to continue signing up."
727def test_signup_resend_email(db):
728 with auth_api_session() as (auth_api, metadata_interceptor):
729 with mock_notification_email() as mock:
730 res = auth_api.SignupFlow(
731 auth_pb2.SignupFlowReq(
732 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
733 account=auth_pb2.SignupAccount(
734 username="frodo",
735 password="a very insecure password",
736 birthdate="1970-01-01",
737 gender="Bot",
738 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
739 city="New York City",
740 lat=40.7331,
741 lng=-73.9778,
742 radius=500,
743 accept_tos=True,
744 ),
745 feedback=auth_pb2.ContributorForm(),
746 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
747 )
748 )
749 assert mock.call_count == 1
750 e = email_fields(mock)
751 assert e.recipient == "email@couchers.org.invalid"
753 flow_token = res.flow_token
754 assert flow_token
756 with session_scope() as session:
757 flow = session.execute(select(SignupFlow)).scalar_one()
758 assert flow.flow_token == flow_token
759 assert flow.email_sent
760 assert not flow.email_verified
761 email_token = flow.email_token
763 # ask for a new signup email
764 with auth_api_session() as (auth_api, metadata_interceptor):
765 with mock_notification_email() as mock:
766 res = auth_api.SignupFlow(
767 auth_pb2.SignupFlowReq(
768 flow_token=flow_token,
769 resend_verification_email=True,
770 )
771 )
772 assert mock.call_count == 1
773 e = email_fields(mock)
774 assert e.recipient == "email@couchers.org.invalid"
775 assert email_token in e.plain
776 assert email_token in e.html
778 with session_scope() as session:
779 flow = session.execute(select(SignupFlow)).scalar_one()
780 assert not flow.email_verified
782 with auth_api_session() as (auth_api, metadata_interceptor):
783 res = auth_api.SignupFlow(
784 auth_pb2.SignupFlowReq(
785 email_token=email_token,
786 )
787 )
789 assert not res.flow_token
790 assert res.HasField("auth_res")
793def test_successful_authenticate(db):
794 user, _ = generate_user(hashed_password=hash_password("password"))
796 # Authenticate with username
797 with auth_api_session() as (auth_api, metadata_interceptor):
798 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password"))
799 assert not reply.jailed
801 # Authenticate with email
802 with auth_api_session() as (auth_api, metadata_interceptor):
803 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password"))
804 assert not reply.jailed
807def test_unsuccessful_authenticate(db):
808 user, _ = generate_user(hashed_password=hash_password("password"))
810 # Invalid password
811 with auth_api_session() as (auth_api, metadata_interceptor):
812 with pytest.raises(grpc.RpcError) as e:
813 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword"))
814 assert e.value.code() == grpc.StatusCode.NOT_FOUND
815 assert e.value.details() == "Wrong password."
817 # Invalid username
818 with auth_api_session() as (auth_api, metadata_interceptor):
819 with pytest.raises(grpc.RpcError) as e:
820 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password"))
821 assert e.value.code() == grpc.StatusCode.NOT_FOUND
822 assert e.value.details() == "An account with that username or email was not found."
824 # Invalid email
825 with auth_api_session() as (auth_api, metadata_interceptor):
826 with pytest.raises(grpc.RpcError) as e:
827 reply = auth_api.Authenticate(
828 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password")
829 )
830 assert e.value.code() == grpc.StatusCode.NOT_FOUND
831 assert e.value.details() == "An account with that username or email was not found."
833 # Invalid id
834 with auth_api_session() as (auth_api, metadata_interceptor):
835 with pytest.raises(grpc.RpcError) as e:
836 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password"))
837 assert e.value.code() == grpc.StatusCode.NOT_FOUND
838 assert e.value.details() == "An account with that username or email was not found."
841def test_complete_signup(db):
842 testing_email = f"{random_hex(12)}@couchers.org.invalid"
843 with auth_api_session() as (auth_api, metadata_interceptor):
844 reply = auth_api.SignupFlow(
845 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
846 )
848 flow_token = reply.flow_token
850 with auth_api_session() as (auth_api, metadata_interceptor):
851 # Invalid username
852 with pytest.raises(grpc.RpcError) as e:
853 auth_api.SignupFlow(
854 auth_pb2.SignupFlowReq(
855 flow_token=flow_token,
856 account=auth_pb2.SignupAccount(
857 username=" ",
858 password="a very insecure password",
859 city="Minas Tirith",
860 birthdate="1980-12-31",
861 gender="Robot",
862 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
863 lat=1,
864 lng=1,
865 radius=100,
866 accept_tos=True,
867 ),
868 )
869 )
870 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
871 assert e.value.details() == "Invalid username."
873 with auth_api_session() as (auth_api, metadata_interceptor):
874 # Invalid name
875 with pytest.raises(grpc.RpcError) as e:
876 auth_api.SignupFlow(
877 auth_pb2.SignupFlowReq(
878 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid")
879 )
880 )
881 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
882 assert e.value.details() == "Name not supported."
884 with auth_api_session() as (auth_api, metadata_interceptor):
885 # Hosting status required
886 with pytest.raises(grpc.RpcError) as e:
887 auth_api.SignupFlow(
888 auth_pb2.SignupFlowReq(
889 flow_token=flow_token,
890 account=auth_pb2.SignupAccount(
891 username="frodo",
892 password="a very insecure password",
893 city="Minas Tirith",
894 birthdate="1980-12-31",
895 gender="Robot",
896 hosting_status=None,
897 lat=1,
898 lng=1,
899 radius=100,
900 accept_tos=True,
901 ),
902 )
903 )
904 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
905 assert e.value.details() == "Hosting status is required."
907 user, _ = generate_user()
908 with auth_api_session() as (auth_api, metadata_interceptor):
909 # Username unavailable
910 with pytest.raises(grpc.RpcError) as e:
911 auth_api.SignupFlow(
912 auth_pb2.SignupFlowReq(
913 flow_token=flow_token,
914 account=auth_pb2.SignupAccount(
915 username=user.username,
916 password="a very insecure password",
917 city="Minas Tirith",
918 birthdate="1980-12-31",
919 gender="Robot",
920 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
921 lat=1,
922 lng=1,
923 radius=100,
924 accept_tos=True,
925 ),
926 )
927 )
928 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
929 assert e.value.details() == "Sorry, that username isn't available."
931 with auth_api_session() as (auth_api, metadata_interceptor):
932 # Invalid coordinate
933 with pytest.raises(grpc.RpcError) as e:
934 auth_api.SignupFlow(
935 auth_pb2.SignupFlowReq(
936 flow_token=flow_token,
937 account=auth_pb2.SignupAccount(
938 username="frodo",
939 password="a very insecure password",
940 city="Minas Tirith",
941 birthdate="1980-12-31",
942 gender="Robot",
943 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
944 lat=0,
945 lng=0,
946 radius=100,
947 accept_tos=True,
948 ),
949 )
950 )
951 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
952 assert e.value.details() == "Invalid coordinate."
955def test_signup_token_regression(db):
956 # Repro steps:
957 # 1. Start a signup
958 # 2. Confirm the email
959 # 3. Start a new signup with the same email
960 # Expected: send a link to the email to continue signing up.
961 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'`
963 testing_email = f"{random_hex(12)}@couchers.org.invalid"
965 # 1. Start a signup
966 with auth_api_session() as (auth_api, metadata_interceptor):
967 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
968 flow_token = res.flow_token
969 assert flow_token
971 # 2. Confirm the email
972 with session_scope() as session:
973 email_token = session.execute(
974 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token)
975 ).scalar_one()
977 with auth_api_session() as (auth_api, metadata_interceptor):
978 auth_api.SignupFlow(
979 auth_pb2.SignupFlowReq(
980 flow_token=flow_token,
981 email_token=email_token,
982 )
983 )
985 # 3. Start a new signup with the same email
986 with auth_api_session() as (auth_api, metadata_interceptor):
987 with pytest.raises(grpc.RpcError) as e:
988 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
989 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
990 assert e.value.details() == "Please check your email for a link to continue signing up."
993@pytest.mark.parametrize("opt_out", [True, False])
994def test_opt_out_of_newsletter(db, opt_out):
995 with auth_api_session() as (auth_api, metadata_interceptor):
996 res = auth_api.SignupFlow(
997 auth_pb2.SignupFlowReq(
998 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
999 account=auth_pb2.SignupAccount(
1000 username="frodo",
1001 password="a very insecure password",
1002 birthdate="1970-01-01",
1003 gender="Bot",
1004 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1005 city="New York City",
1006 lat=40.7331,
1007 lng=-73.9778,
1008 radius=500,
1009 accept_tos=True,
1010 opt_out_of_newsletter=opt_out,
1011 ),
1012 feedback=auth_pb2.ContributorForm(),
1013 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1014 )
1015 )
1017 with session_scope() as session:
1018 email_token = session.execute(
1019 select(SignupFlow.email_token).where(SignupFlow.flow_token == res.flow_token)
1020 ).scalar_one()
1022 with auth_api_session() as (auth_api, metadata_interceptor):
1023 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1025 user_id = res.auth_res.user_id
1027 with session_scope() as session:
1028 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1029 assert not user.in_sync_with_newsletter
1030 assert user.opt_out_of_newsletter == opt_out
1033def test_GetAuthState(db):
1034 user, token = generate_user()
1035 jailed_user, jailed_token = generate_user(accepted_tos=0)
1037 with auth_api_session() as (auth_api, metadata_interceptor):
1038 res = auth_api.GetAuthState(empty_pb2.Empty())
1039 assert not res.logged_in
1040 assert not res.HasField("auth_res")
1042 with auth_api_session() as (auth_api, metadata_interceptor):
1043 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1044 assert res.logged_in
1045 assert res.HasField("auth_res")
1046 assert res.auth_res.user_id == user.id
1047 assert not res.auth_res.jailed
1049 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1051 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1052 assert not res.logged_in
1053 assert not res.HasField("auth_res")
1055 with auth_api_session() as (auth_api, metadata_interceptor):
1056 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),))
1057 assert res.logged_in
1058 assert res.HasField("auth_res")
1059 assert res.auth_res.user_id == jailed_user.id
1060 assert res.auth_res.jailed
1063def test_signup_no_feedback_regression(db):
1064 """
1065 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup,
1066 this regression test checks that.
1067 """
1068 with auth_api_session() as (auth_api, metadata_interceptor):
1069 res = auth_api.SignupFlow(
1070 auth_pb2.SignupFlowReq(
1071 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1072 account=auth_pb2.SignupAccount(
1073 username="frodo",
1074 password="a very insecure password",
1075 birthdate="1970-01-01",
1076 gender="Bot",
1077 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1078 city="New York City",
1079 lat=40.7331,
1080 lng=-73.9778,
1081 radius=500,
1082 accept_tos=True,
1083 ),
1084 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1085 )
1086 )
1088 flow_token = res.flow_token
1090 assert res.flow_token
1091 assert not res.HasField("auth_res")
1092 assert not res.need_basic
1093 assert not res.need_account
1094 assert not res.need_feedback
1095 assert res.need_verify_email
1097 # read out the signup token directly from the database for now
1098 with session_scope() as session:
1099 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1100 assert flow.email_sent
1101 assert not flow.email_verified
1102 email_token = flow.email_token
1104 with auth_api_session() as (auth_api, metadata_interceptor):
1105 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1107 assert not res.flow_token
1108 assert res.HasField("auth_res")
1109 assert res.auth_res.user_id
1110 assert not res.auth_res.jailed
1111 assert not res.need_basic
1112 assert not res.need_account
1113 assert not res.need_feedback
1114 assert not res.need_verify_email
1116 # make sure we got the right token in a cookie
1117 with session_scope() as session:
1118 token = session.execute(
1119 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
1120 ).scalar_one()
1121 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
1122 assert sesh == token
1125def test_banned_username(db):
1126 testing_email = f"{random_hex(12)}@couchers.org.invalid"
1127 with auth_api_session() as (auth_api, metadata_interceptor):
1128 reply = auth_api.SignupFlow(
1129 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
1130 )
1132 flow_token = reply.flow_token
1134 with auth_api_session() as (auth_api, metadata_interceptor):
1135 # Banned username
1136 with pytest.raises(grpc.RpcError) as e:
1137 auth_api.SignupFlow(
1138 auth_pb2.SignupFlowReq(
1139 flow_token=flow_token,
1140 account=auth_pb2.SignupAccount(
1141 username="thecouchersadminaccount",
1142 password="a very insecure password",
1143 city="Minas Tirith",
1144 birthdate="1980-12-31",
1145 gender="Robot",
1146 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1147 lat=1,
1148 lng=1,
1149 radius=100,
1150 accept_tos=True,
1151 ),
1152 )
1153 )
1154 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1155 assert e.value.details() == "Sorry, that username isn't available."
1158# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*
1161def test_GetInviteCodeInfo(db):
1162 user, token = generate_user()
1163 code_id = "TST12345"
1165 with session_scope() as session:
1166 avatar = Upload(
1167 key="test_avatar.jpg",
1168 filename="test_avatar.jpg",
1169 creator_user_id=user.id,
1170 )
1171 session.add(avatar)
1172 session.flush()
1174 session.execute(update(User).where(User.id == user.id).values(avatar_key=avatar.key))
1176 code = InviteCode(id=code_id, creator_user_id=user.id)
1177 session.add(code)
1179 with auth_api_session() as (auth, _):
1180 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code_id))
1181 assert res.name == user.name
1182 assert res.username == user.username
1183 assert res.avatar_url.endswith("/img/thumbnail/test_avatar.jpg")
1184 assert res.url == urls.invite_code_link(code=code_id)
1187def test_GetInviteCodeInfo_no_avatar(db):
1188 user, token = generate_user()
1189 code_id = "NOAVTR1"
1191 with session_scope() as session:
1192 session.execute(update(User).where(User.id == user.id).values(avatar_key=None))
1194 code = InviteCode(id="NOAVTR1", creator_user_id=user.id)
1195 session.add(code)
1197 with auth_api_session() as (auth, _):
1198 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code_id))
1199 assert res.name == user.name
1200 assert res.username == user.username
1201 assert res.avatar_url == ""
1202 assert res.url == urls.invite_code_link(code=code_id)
1205def test_GetInviteCodeInfo_not_found(db):
1206 generate_user()
1208 with auth_api_session() as (auth, _):
1209 with pytest.raises(grpc.RpcError) as e:
1210 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE"))
1211 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1212 assert e.value.details() == "Invite code not found."
1215def test_SignupFlow_invite_code(db):
1216 user, token = generate_user()
1217 invite_code = "INV12345"
1218 with session_scope() as session:
1219 session.flush()
1220 invite = InviteCode(id=invite_code, creator_user_id=user.id)
1221 session.add(invite)
1223 with auth_api_session() as (auth_api, _):
1224 # Signup basic step with invite code
1225 res = auth_api.SignupFlow(
1226 auth_pb2.SignupFlowReq(
1227 basic=auth_pb2.SignupBasic(
1228 name="Test User",
1229 email="inviteuser@example.com",
1230 invite_code=invite_code,
1231 )
1232 )
1233 )
1234 flow_token = res.flow_token
1235 assert flow_token
1237 # Confirm email
1238 with session_scope() as session:
1239 email_token = session.execute(
1240 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token)
1241 ).scalar_one()
1243 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1245 # Signup account step
1246 auth_api.SignupFlow(
1247 auth_pb2.SignupFlowReq(
1248 flow_token=flow_token,
1249 account=auth_pb2.SignupAccount(
1250 username="invited_user",
1251 password="secure password",
1252 birthdate="1990-01-01",
1253 gender="Other",
1254 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1255 city="Example City",
1256 lat=1,
1257 lng=5,
1258 radius=100,
1259 accept_tos=True,
1260 ),
1261 feedback=auth_pb2.ContributorForm(),
1262 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1263 )
1264 )
1266 # Check that invite_code_id is stored in the final User object
1267 with session_scope() as session:
1268 user = session.execute(select(User).where(User.username == "invited_user")).scalar_one()
1269 assert user.invite_code_id == invite_code