Coverage for src/tests/test_auth.py: 100%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import http.cookies
3import grpc
4import pytest
5from google.protobuf import empty_pb2, wrappers_pb2
6from sqlalchemy.sql import delete, func
8from couchers import errors
9from couchers.crypto import hash_password, random_hex
10from couchers.db import session_scope
11from couchers.models import (
12 ContributeOption,
13 ContributorForm,
14 LoginToken,
15 PasswordResetToken,
16 SignupFlow,
17 User,
18 UserSession,
19)
20from couchers.sql import couchers_select as select
21from proto import api_pb2, auth_pb2
22from tests.test_fixtures import ( # noqa
23 api_session,
24 auth_api_session,
25 db,
26 fast_passwords,
27 generate_user,
28 real_api_session,
29 testconfig,
30)
33@pytest.fixture(autouse=True)
34def _(testconfig, fast_passwords):
35 pass
38def get_session_cookie_token(metadata_interceptor):
39 return http.cookies.SimpleCookie(metadata_interceptor.latest_headers["set-cookie"])["couchers-sesh"].value
42def test_UsernameValid(db):
43 with auth_api_session() as (auth_api, metadata_interceptor):
44 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid
46 with auth_api_session() as (auth_api, metadata_interceptor):
47 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid
50def test_signup_incremental(db):
51 with auth_api_session() as (auth_api, metadata_interceptor):
52 res = auth_api.SignupFlow(
53 auth_pb2.SignupFlowReq(
54 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
55 )
56 )
58 flow_token = res.flow_token
59 assert res.flow_token
60 assert not res.HasField("auth_res")
61 assert not res.need_basic
62 assert res.need_account
63 assert res.need_feedback
64 assert res.need_verify_email
65 assert res.need_accept_community_guidelines
67 # read out the signup token directly from the database for now
68 with session_scope() as session:
69 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
70 assert flow.email_sent
71 assert not flow.email_verified
72 email_token = flow.email_token
74 with auth_api_session() as (auth_api, metadata_interceptor):
75 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token))
77 assert res.flow_token == flow_token
78 assert not res.HasField("auth_res")
79 assert not res.need_basic
80 assert res.need_account
81 assert res.need_feedback
82 assert res.need_verify_email
83 assert res.need_accept_community_guidelines
85 # Add feedback
86 with auth_api_session() as (auth_api, metadata_interceptor):
87 res = auth_api.SignupFlow(
88 auth_pb2.SignupFlowReq(
89 flow_token=flow_token,
90 feedback=auth_pb2.ContributorForm(
91 ideas="I'm a robot, incapable of original ideation",
92 features="I love all your features",
93 experience="I haven't done couch surfing before",
94 contribute=auth_pb2.CONTRIBUTE_OPTION_YES,
95 contribute_ways=["serving", "backend"],
96 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes",
97 ),
98 )
99 )
101 assert res.flow_token == flow_token
102 assert not res.HasField("auth_res")
103 assert not res.need_basic
104 assert res.need_account
105 assert not res.need_feedback
106 assert res.need_verify_email
107 assert res.need_accept_community_guidelines
109 # Agree to community guidelines
110 with auth_api_session() as (auth_api, metadata_interceptor):
111 res = auth_api.SignupFlow(
112 auth_pb2.SignupFlowReq(
113 flow_token=flow_token,
114 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
115 )
116 )
118 assert res.flow_token == flow_token
119 assert not res.HasField("auth_res")
120 assert not res.need_basic
121 assert res.need_account
122 assert not res.need_feedback
123 assert res.need_verify_email
124 assert not res.need_accept_community_guidelines
126 # Verify email
127 with auth_api_session() as (auth_api, metadata_interceptor):
128 res = auth_api.SignupFlow(
129 auth_pb2.SignupFlowReq(
130 flow_token=flow_token,
131 email_token=email_token,
132 )
133 )
135 assert res.flow_token == flow_token
136 assert not res.HasField("auth_res")
137 assert not res.need_basic
138 assert res.need_account
139 assert not res.need_feedback
140 assert not res.need_verify_email
141 assert not res.need_accept_community_guidelines
143 # Finally finish off account info
144 with auth_api_session() as (auth_api, metadata_interceptor):
145 res = auth_api.SignupFlow(
146 auth_pb2.SignupFlowReq(
147 flow_token=flow_token,
148 account=auth_pb2.SignupAccount(
149 username="frodo",
150 password="a very insecure password",
151 birthdate="1970-01-01",
152 gender="Bot",
153 hosting_status=api_pb2.HOSTING_STATUS_MAYBE,
154 city="New York City",
155 lat=40.7331,
156 lng=-73.9778,
157 radius=500,
158 accept_tos=True,
159 ),
160 )
161 )
163 assert not res.flow_token
164 assert res.HasField("auth_res")
165 assert res.auth_res.user_id
166 assert not res.auth_res.jailed
167 assert not res.need_basic
168 assert not res.need_account
169 assert not res.need_feedback
170 assert not res.need_verify_email
171 assert not res.need_accept_community_guidelines
173 user_id = res.auth_res.user_id
175 sess_token = get_session_cookie_token(metadata_interceptor)
177 with api_session(sess_token) as api:
178 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id)))
180 assert res.username == "frodo"
181 assert res.gender == "Bot"
182 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE
183 assert res.city == "New York City"
184 assert res.lat == 40.7331
185 assert res.lng == -73.9778
186 assert res.radius == 500
188 with session_scope() as session:
189 form = session.execute(select(ContributorForm)).scalar_one()
191 assert form.ideas == "I'm a robot, incapable of original ideation"
192 assert form.features == "I love all your features"
193 assert form.experience == "I haven't done couch surfing before"
194 assert form.contribute == ContributeOption.yes
195 assert form.contribute_ways == ["serving", "backend"]
196 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes"
199def _quick_signup():
200 with auth_api_session() as (auth_api, metadata_interceptor):
201 res = auth_api.SignupFlow(
202 auth_pb2.SignupFlowReq(
203 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
204 account=auth_pb2.SignupAccount(
205 username="frodo",
206 password="a very insecure password",
207 birthdate="1970-01-01",
208 gender="Bot",
209 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
210 city="New York City",
211 lat=40.7331,
212 lng=-73.9778,
213 radius=500,
214 accept_tos=True,
215 ),
216 feedback=auth_pb2.ContributorForm(),
217 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
218 )
219 )
221 flow_token = res.flow_token
223 assert res.flow_token
224 assert not res.HasField("auth_res")
225 assert not res.need_basic
226 assert not res.need_account
227 assert not res.need_feedback
228 assert res.need_verify_email
230 # read out the signup token directly from the database for now
231 with session_scope() as session:
232 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
233 assert flow.email_sent
234 assert not flow.email_verified
235 email_token = flow.email_token
237 with auth_api_session() as (auth_api, metadata_interceptor):
238 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
240 assert not res.flow_token
241 assert res.HasField("auth_res")
242 assert res.auth_res.user_id
243 assert not res.auth_res.jailed
244 assert not res.need_basic
245 assert not res.need_account
246 assert not res.need_feedback
247 assert not res.need_verify_email
249 user_id = res.auth_res.user_id
251 # make sure we got the right token in a cookie
252 with session_scope() as session:
253 token = (
254 session.execute(
255 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
256 ).scalar_one()
257 ).token
258 assert get_session_cookie_token(metadata_interceptor) == token
261def test_signup(db):
262 _quick_signup()
265def test_basic_login(db):
266 # Create our test user using signup
267 _quick_signup()
269 with auth_api_session() as (auth_api, metadata_interceptor):
270 reply = auth_api.Login(auth_pb2.LoginReq(user="frodo"))
271 assert reply.next_step == auth_pb2.LoginRes.LoginStep.NEED_PASSWORD
273 with auth_api_session() as (auth_api, metadata_interceptor):
274 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
276 reply_token = get_session_cookie_token(metadata_interceptor)
278 with session_scope() as session:
279 token = (
280 session.execute(
281 select(UserSession)
282 .join(User, UserSession.user_id == User.id)
283 .where(User.username == "frodo")
284 .where(UserSession.token == reply_token)
285 ).scalar_one_or_none()
286 ).token
287 assert token
289 # log out
290 with auth_api_session() as (auth_api, metadata_interceptor):
291 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
294def test_basic_login_without_password(db):
295 # Create our test user using signup
296 _quick_signup()
298 with session_scope() as session:
299 user = session.execute(select(User)).scalar_one()
300 user.hashed_password = None
302 with auth_api_session() as (auth_api, metadata_interceptor):
303 reply = auth_api.Login(auth_pb2.LoginReq(user="frodo"))
304 assert reply.next_step == auth_pb2.LoginRes.LoginStep.SENT_LOGIN_EMAIL
306 # backdoor to find login token
307 with session_scope() as session:
308 entry = session.execute(select(LoginToken)).scalar_one()
309 login_token = entry.token
311 with auth_api_session() as (auth_api, metadata_interceptor):
312 auth_api.CompleteTokenLogin(auth_pb2.CompleteTokenLoginReq(login_token=login_token))
314 reply_token = get_session_cookie_token(metadata_interceptor)
316 with session_scope() as session:
317 token = (
318 session.execute(
319 select(UserSession)
320 .join(User, UserSession.user_id == User.id)
321 .where(User.username == "frodo")
322 .where(UserSession.token == reply_token)
323 ).scalar_one_or_none()
324 ).token
325 assert token
327 # log out
328 with auth_api_session() as (auth_api, metadata_interceptor):
329 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
332def test_login_tokens_invalidate_after_use(db):
333 _quick_signup()
335 with session_scope() as session:
336 user = session.execute(select(User)).scalar_one()
337 user.hashed_password = None
339 with auth_api_session() as (auth_api, metadata_interceptor):
340 reply = auth_api.Login(auth_pb2.LoginReq(user="frodo"))
341 assert reply.next_step == auth_pb2.LoginRes.LoginStep.SENT_LOGIN_EMAIL
343 with session_scope() as session:
344 login_token = session.execute(select(LoginToken)).scalar_one().token
346 with auth_api_session() as (auth_api, metadata_interceptor):
347 auth_api.CompleteTokenLogin(auth_pb2.CompleteTokenLoginReq(login_token=login_token))
348 session_token = get_session_cookie_token(metadata_interceptor)
350 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError):
351 # check we can't login again
352 auth_api.CompleteTokenLogin(auth_pb2.CompleteTokenLoginReq(login_token=login_token))
355def test_banned_user(db):
356 _quick_signup()
357 with auth_api_session() as (auth_api, metadata_interceptor):
358 reply = auth_api.Login(auth_pb2.LoginReq(user="frodo"))
359 assert reply.next_step == auth_pb2.LoginRes.LoginStep.NEED_PASSWORD
361 with session_scope() as session:
362 session.execute(select(User)).scalar_one().is_banned = True
364 with auth_api_session() as (auth_api, metadata_interceptor):
365 with pytest.raises(grpc.RpcError) as e:
366 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
367 assert e.value.details() == "Your account is suspended."
370def test_banned_user_without_password(db):
371 _quick_signup()
373 with session_scope() as session:
374 user = session.execute(select(User)).scalar_one()
375 user.hashed_password = None
377 with auth_api_session() as (auth_api, metadata_interceptor):
378 reply = auth_api.Login(auth_pb2.LoginReq(user="frodo"))
379 assert reply.next_step == auth_pb2.LoginRes.LoginStep.SENT_LOGIN_EMAIL
381 with session_scope() as session:
382 login_token = session.execute(select(LoginToken)).scalar_one().token
384 with session_scope() as session:
385 session.execute(select(User)).scalar_one().is_banned = True
387 with auth_api_session() as (auth_api, metadata_interceptor):
388 with pytest.raises(grpc.RpcError) as e:
389 auth_api.CompleteTokenLogin(auth_pb2.CompleteTokenLoginReq(login_token=login_token))
390 assert e.value.details() == "Your account is suspended."
393def test_deleted_user(db):
394 _quick_signup()
396 with session_scope() as session:
397 session.execute(select(User)).scalar_one().is_deleted = True
399 with auth_api_session() as (auth_api, metadata_interceptor):
400 with pytest.raises(grpc.RpcError) as e:
401 reply = auth_api.Login(auth_pb2.LoginReq(user="frodo"))
402 assert e.value.code() == grpc.StatusCode.NOT_FOUND
403 assert e.value.details() == errors.USER_NOT_FOUND
406def test_invalid_token(db):
407 user1, token1 = generate_user()
408 user2, token2 = generate_user()
410 wrong_token = random_hex(32)
412 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e:
413 res = api.GetUser(api_pb2.GetUserReq(user=user2.username))
415 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
416 assert e.value.details() == "Unauthorized"
419def test_password_reset(db):
420 user, token = generate_user(hashed_password=hash_password("mypassword"))
422 with auth_api_session() as (auth_api, metadata_interceptor):
423 res = auth_api.ResetPassword(
424 auth_pb2.ResetPasswordReq(
425 user=user.username,
426 )
427 )
429 with session_scope() as session:
430 token = session.execute(select(PasswordResetToken)).scalar_one().token
432 with auth_api_session() as (auth_api, metadata_interceptor):
433 res = auth_api.CompletePasswordReset(auth_pb2.CompletePasswordResetReq(password_reset_token=token))
435 with session_scope() as session:
436 user = session.execute(select(User)).scalar_one()
437 assert not user.has_password
440def test_password_reset_no_such_user(db):
441 user, token = generate_user()
443 with auth_api_session() as (auth_api, metadata_interceptor):
444 res = auth_api.ResetPassword(
445 auth_pb2.ResetPasswordReq(
446 user="nonexistentuser",
447 )
448 )
450 with session_scope() as session:
451 res = session.execute(select(PasswordResetToken)).scalar_one_or_none()
453 assert res is None
456def test_password_reset_invalid_token(db):
457 password = random_hex()
458 user, token = generate_user(hashed_password=hash_password(password))
460 with auth_api_session() as (auth_api, metadata_interceptor):
461 res = auth_api.ResetPassword(
462 auth_pb2.ResetPasswordReq(
463 user=user.username,
464 )
465 )
467 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e:
468 res = auth_api.CompletePasswordReset(auth_pb2.CompletePasswordResetReq(password_reset_token="wrongtoken"))
469 assert e.value.code() == grpc.StatusCode.NOT_FOUND
470 assert e.value.details() == errors.INVALID_TOKEN
472 with session_scope() as session:
473 user = session.execute(select(User)).scalar_one()
474 assert user.hashed_password == hash_password(password)
477def test_logout_invalid_token(db):
478 # Create our test user using signup
479 _quick_signup()
481 with auth_api_session() as (auth_api, metadata_interceptor):
482 reply = auth_api.Login(auth_pb2.LoginReq(user="frodo"))
483 assert reply.next_step == auth_pb2.LoginRes.LoginStep.NEED_PASSWORD
485 with auth_api_session() as (auth_api, metadata_interceptor):
486 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
488 reply_token = get_session_cookie_token(metadata_interceptor)
490 # delete all login tokens
491 with session_scope() as session:
492 session.execute(delete(LoginToken))
494 # log out with non-existent token should still return a valid result
495 with auth_api_session() as (auth_api, metadata_interceptor):
496 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
498 reply_token = get_session_cookie_token(metadata_interceptor)
499 # make sure we set an empty cookie
500 assert reply_token == ""
503def test_signup_without_password(db):
504 with auth_api_session() as (auth_api, metadata_interceptor):
505 with pytest.raises(grpc.RpcError) as e:
506 auth_api.SignupFlow(
507 auth_pb2.SignupFlowReq(
508 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
509 account=auth_pb2.SignupAccount(
510 username="frodo",
511 password="bad",
512 city="Minas Tirith",
513 birthdate="9999-12-31", # arbitrary future birthdate
514 gender="Robot",
515 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
516 lat=1,
517 lng=1,
518 radius=100,
519 accept_tos=True,
520 ),
521 feedback=auth_pb2.ContributorForm(),
522 )
523 )
524 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
525 assert e.value.details() == errors.PASSWORD_TOO_SHORT
528def test_signup_invalid_birthdate(db):
529 with auth_api_session() as (auth_api, metadata_interceptor):
530 with pytest.raises(grpc.RpcError) as e:
531 auth_api.SignupFlow(
532 auth_pb2.SignupFlowReq(
533 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
534 account=auth_pb2.SignupAccount(
535 username="frodo",
536 password="a very insecure password",
537 city="Minas Tirith",
538 birthdate="9999-12-31", # arbitrary future birthdate
539 gender="Robot",
540 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
541 lat=1,
542 lng=1,
543 radius=100,
544 accept_tos=True,
545 ),
546 feedback=auth_pb2.ContributorForm(),
547 )
548 )
549 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
550 assert e.value.details() == errors.INVALID_BIRTHDATE
552 res = auth_api.SignupFlow(
553 auth_pb2.SignupFlowReq(
554 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"),
555 account=auth_pb2.SignupAccount(
556 username="ceelo",
557 password="a very insecure password",
558 city="New York City",
559 birthdate="2000-12-31", # arbitrary birthdate older than 18 years
560 gender="Helicopter",
561 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
562 lat=1,
563 lng=1,
564 radius=100,
565 accept_tos=True,
566 ),
567 feedback=auth_pb2.ContributorForm(),
568 )
569 )
571 assert res.flow_token
573 with pytest.raises(grpc.RpcError) as e:
574 auth_api.SignupFlow(
575 auth_pb2.SignupFlowReq(
576 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"),
577 account=auth_pb2.SignupAccount(
578 username="franklin",
579 password="a very insecure password",
580 city="Los Santos",
581 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs
582 gender="Male",
583 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
584 lat=1,
585 lng=1,
586 radius=100,
587 accept_tos=True,
588 ),
589 feedback=auth_pb2.ContributorForm(),
590 )
591 )
592 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
593 assert e.value.details() == errors.INVALID_BIRTHDATE
595 with session_scope() as session:
596 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1
599def test_signup_invalid_email(db):
600 with auth_api_session() as (auth_api, metadata_interceptor):
601 with pytest.raises(grpc.RpcError) as e:
602 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a")))
603 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
604 assert e.value.details() == errors.INVALID_EMAIL
606 with auth_api_session() as (auth_api, metadata_interceptor):
607 with pytest.raises(grpc.RpcError) as e:
608 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b")))
609 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
610 assert e.value.details() == errors.INVALID_EMAIL
612 with auth_api_session() as (auth_api, metadata_interceptor):
613 with pytest.raises(grpc.RpcError) as e:
614 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.")))
615 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
616 assert e.value.details() == errors.INVALID_EMAIL
618 with auth_api_session() as (auth_api, metadata_interceptor):
619 with pytest.raises(grpc.RpcError) as e:
620 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c")))
621 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
622 assert e.value.details() == errors.INVALID_EMAIL
625def test_signup_existing_email(db):
626 # Signed up user
627 user, _ = generate_user()
629 with auth_api_session() as (auth_api, metadata_interceptor):
630 with pytest.raises(grpc.RpcError) as e:
631 reply = auth_api.SignupFlow(
632 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email))
633 )
634 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
635 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_TAKEN
638def test_signup_continue_with_email(db):
639 testing_email = f"{random_hex(12)}@couchers.org.invalid"
640 with auth_api_session() as (auth_api, metadata_interceptor):
641 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
642 flow_token = res.flow_token
643 assert flow_token
645 # continue with same email, should just send another email to the user
646 with auth_api_session() as (auth_api, metadata_interceptor):
647 with pytest.raises(grpc.RpcError) as e:
648 res = auth_api.SignupFlow(
649 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
650 )
651 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
652 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
655def test_successful_authenticate(db):
656 user, _ = generate_user(hashed_password=hash_password("password"))
658 # Authenticate with username
659 with auth_api_session() as (auth_api, metadata_interceptor):
660 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password"))
661 assert not reply.jailed
663 # Authenticate with email
664 with auth_api_session() as (auth_api, metadata_interceptor):
665 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password"))
666 assert not reply.jailed
669def test_unsuccessful_authenticate(db):
670 user, _ = generate_user(hashed_password=hash_password("password"))
672 # Invalid password
673 with auth_api_session() as (auth_api, metadata_interceptor):
674 with pytest.raises(grpc.RpcError) as e:
675 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword"))
676 assert e.value.code() == grpc.StatusCode.NOT_FOUND
677 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD
679 # Invalid username
680 with auth_api_session() as (auth_api, metadata_interceptor):
681 with pytest.raises(grpc.RpcError) as e:
682 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password"))
683 assert e.value.code() == grpc.StatusCode.NOT_FOUND
684 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD
686 # Invalid email
687 with auth_api_session() as (auth_api, metadata_interceptor):
688 with pytest.raises(grpc.RpcError) as e:
689 reply = auth_api.Authenticate(
690 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password")
691 )
692 assert e.value.code() == grpc.StatusCode.NOT_FOUND
693 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD
695 # Invalid id
696 with auth_api_session() as (auth_api, metadata_interceptor):
697 with pytest.raises(grpc.RpcError) as e:
698 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password"))
699 assert e.value.code() == grpc.StatusCode.NOT_FOUND
700 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD
702 # No Password
703 user_without_pass, _ = generate_user(hashed_password=None)
705 with auth_api_session() as (auth_api, metadata_interceptor):
706 with pytest.raises(grpc.RpcError) as e:
707 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user_without_pass.username, password="password"))
708 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
709 assert e.value.details() == errors.NO_PASSWORD
712def test_successful_login(db):
713 user, _ = generate_user()
714 # Valid email login
715 with auth_api_session() as (auth_api, metadata_interceptor):
716 reply = auth_api.Login(auth_pb2.LoginReq(user=user.email))
717 assert reply.next_step == auth_pb2.LoginRes.LoginStep.NEED_PASSWORD
719 # Valid username login
720 with auth_api_session() as (auth_api, metadata_interceptor):
721 reply = auth_api.Login(auth_pb2.LoginReq(user=user.username))
722 assert reply.next_step == auth_pb2.LoginRes.LoginStep.NEED_PASSWORD
725def test_unsuccessful_login(db):
726 # Invalid email, user doesn't exist
727 with auth_api_session() as (auth_api, metadata_interceptor):
728 with pytest.raises(grpc.RpcError) as e:
729 reply = auth_api.Login(auth_pb2.LoginReq(user=f"{random_hex(12)}@couchers.org.invalid"))
730 assert e.value.code() == grpc.StatusCode.NOT_FOUND
731 assert e.value.details() == errors.USER_NOT_FOUND
733 # Invalid id
734 with auth_api_session() as (auth_api, metadata_interceptor):
735 with pytest.raises(grpc.RpcError) as e:
736 reply = auth_api.Login(auth_pb2.LoginReq(user="-1"))
737 assert e.value.code() == grpc.StatusCode.NOT_FOUND
738 assert e.value.details() == errors.USER_NOT_FOUND
740 # Invalid username
741 with auth_api_session() as (auth_api, metadata_interceptor):
742 with pytest.raises(grpc.RpcError) as e:
743 reply = auth_api.Login(auth_pb2.LoginReq(user="notarealusername"))
744 assert e.value.code() == grpc.StatusCode.NOT_FOUND
745 assert e.value.details() == errors.USER_NOT_FOUND
747 testing_email = f"{random_hex(12)}@couchers.org.invalid"
748 # No Password
749 user_without_pass, _ = generate_user(hashed_password=None)
751 with auth_api_session() as (auth_api, metadata_interceptor):
752 reply = auth_api.Login(auth_pb2.LoginReq(user=user_without_pass.username))
753 assert reply.next_step == auth_pb2.LoginRes.LoginStep.SENT_LOGIN_EMAIL
756def test_complete_signup(db):
757 testing_email = f"{random_hex(12)}@couchers.org.invalid"
758 with auth_api_session() as (auth_api, metadata_interceptor):
759 reply = auth_api.SignupFlow(
760 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
761 )
763 flow_token = reply.flow_token
765 with auth_api_session() as (auth_api, metadata_interceptor):
766 # Invalid username
767 with pytest.raises(grpc.RpcError) as e:
768 reply = auth_api.SignupFlow(
769 auth_pb2.SignupFlowReq(
770 flow_token=flow_token,
771 account=auth_pb2.SignupAccount(
772 username=" ",
773 password="a very insecure password",
774 city="Minas Tirith",
775 birthdate="1980-12-31",
776 gender="Robot",
777 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
778 lat=1,
779 lng=1,
780 radius=100,
781 accept_tos=True,
782 ),
783 )
784 )
785 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
786 assert e.value.details() == errors.INVALID_USERNAME
788 with auth_api_session() as (auth_api, metadata_interceptor):
789 # Invalid name
790 with pytest.raises(grpc.RpcError) as e:
791 reply = auth_api.SignupFlow(
792 auth_pb2.SignupFlowReq(
793 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid")
794 )
795 )
796 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
797 assert e.value.details() == errors.INVALID_NAME
799 with auth_api_session() as (auth_api, metadata_interceptor):
800 # Hosting status required
801 with pytest.raises(grpc.RpcError) as e:
802 reply = auth_api.SignupFlow(
803 auth_pb2.SignupFlowReq(
804 flow_token=flow_token,
805 account=auth_pb2.SignupAccount(
806 username="frodo",
807 password="a very insecure password",
808 city="Minas Tirith",
809 birthdate="1980-12-31",
810 gender="Robot",
811 hosting_status=None,
812 lat=1,
813 lng=1,
814 radius=100,
815 accept_tos=True,
816 ),
817 )
818 )
819 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
820 assert e.value.details() == errors.HOSTING_STATUS_REQUIRED
822 user, _ = generate_user()
823 with auth_api_session() as (auth_api, metadata_interceptor):
824 # Username unavailable
825 with pytest.raises(grpc.RpcError) as e:
826 reply = auth_api.SignupFlow(
827 auth_pb2.SignupFlowReq(
828 flow_token=flow_token,
829 account=auth_pb2.SignupAccount(
830 username=user.username,
831 password="a very insecure password",
832 city="Minas Tirith",
833 birthdate="1980-12-31",
834 gender="Robot",
835 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
836 lat=1,
837 lng=1,
838 radius=100,
839 accept_tos=True,
840 ),
841 )
842 )
843 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
844 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE
846 with auth_api_session() as (auth_api, metadata_interceptor):
847 # Invalid coordinate
848 with pytest.raises(grpc.RpcError) as e:
849 reply = auth_api.SignupFlow(
850 auth_pb2.SignupFlowReq(
851 flow_token=flow_token,
852 account=auth_pb2.SignupAccount(
853 username="frodo",
854 password="a very insecure password",
855 city="Minas Tirith",
856 birthdate="1980-12-31",
857 gender="Robot",
858 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
859 lat=0,
860 lng=0,
861 radius=100,
862 accept_tos=True,
863 ),
864 )
865 )
866 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
867 assert e.value.details() == errors.INVALID_COORDINATE
870def test_signup_token_regression(db):
871 # Repro steps:
872 # 1. Start a signup
873 # 2. Confirm the email
874 # 3. Start a new signup with the same email
875 # Expected: send a link to the email to continue signing up.
876 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'`
878 testing_email = f"{random_hex(12)}@couchers.org.invalid"
880 # 1. Start a signup
881 with auth_api_session() as (auth_api, metadata_interceptor):
882 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
883 flow_token = res.flow_token
884 assert flow_token
886 # 2. Confirm the email
887 with session_scope() as session:
888 email_token = (
889 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token
890 )
892 with auth_api_session() as (auth_api, metadata_interceptor):
893 res = auth_api.SignupFlow(
894 auth_pb2.SignupFlowReq(
895 flow_token=flow_token,
896 email_token=email_token,
897 )
898 )
900 # 3. Start a new signup with the same email
901 with auth_api_session() as (auth_api, metadata_interceptor):
902 with pytest.raises(grpc.RpcError) as e:
903 res = auth_api.SignupFlow(
904 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
905 )
906 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
907 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP
910# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*