Coverage for app / backend / src / tests / test_auth.py: 100%
598 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import http.cookies
2from typing import cast
4import grpc
5import pytest
6from google.protobuf import empty_pb2, wrappers_pb2
7from sqlalchemy import select, update
8from sqlalchemy.sql import delete, func
10from couchers import urls
11from couchers.crypto import hash_password, random_hex
12from couchers.db import session_scope
13from couchers.models import (
14 ContributeOption,
15 ContributorForm,
16 LoginToken,
17 PasswordResetToken,
18 SignupFlow,
19 User,
20 UserSession,
21)
22from couchers.proto import account_pb2, api_pb2, auth_pb2
23from tests.fixtures.db import generate_user
24from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email
25from tests.fixtures.sessions import (
26 MetadataKeeperInterceptor,
27 account_session,
28 api_session,
29 auth_api_session,
30 real_api_session,
31)
34@pytest.fixture(autouse=True)
35def _(testconfig, fast_passwords):
36 pass
39def get_session_cookie_tokens(metadata_interceptor: MetadataKeeperInterceptor) -> tuple[str, str]:
40 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"]
41 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value
42 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value
43 return sesh, uid
46def test_UsernameValid(db):
47 with auth_api_session() as (auth_api, metadata_interceptor):
48 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid
50 with auth_api_session() as (auth_api, metadata_interceptor):
51 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid
54def test_signup_incremental(db):
55 with auth_api_session() as (auth_api, metadata_interceptor):
56 res = auth_api.SignupFlow(
57 auth_pb2.SignupFlowReq(
58 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
59 )
60 )
62 flow_token = res.flow_token
63 assert res.flow_token
64 assert not res.HasField("auth_res")
65 assert not res.need_basic
66 assert res.need_account
67 assert not res.need_feedback
68 assert res.need_verify_email
69 assert res.need_accept_community_guidelines
71 # read out the signup token directly from the database for now
72 with session_scope() as session:
73 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
74 assert flow.email_sent
75 assert not flow.email_verified
76 email_token = flow.email_token
78 with auth_api_session() as (auth_api, metadata_interceptor):
79 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token))
81 assert res.flow_token == flow_token
82 assert not res.HasField("auth_res")
83 assert not res.need_basic
84 assert res.need_account
85 assert not res.need_feedback
86 assert res.need_verify_email
87 assert res.need_accept_community_guidelines
89 # Add feedback
90 with auth_api_session() as (auth_api, metadata_interceptor):
91 res = auth_api.SignupFlow(
92 auth_pb2.SignupFlowReq(
93 flow_token=flow_token,
94 feedback=auth_pb2.ContributorForm(
95 ideas="I'm a robot, incapable of original ideation",
96 features="I love all your features",
97 experience="I haven't done couch surfing before",
98 contribute=auth_pb2.CONTRIBUTE_OPTION_YES,
99 contribute_ways=["serving", "backend"],
100 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes",
101 ),
102 )
103 )
105 assert res.flow_token == flow_token
106 assert not res.HasField("auth_res")
107 assert not res.need_basic
108 assert res.need_account
109 assert not res.need_feedback
110 assert res.need_verify_email
111 assert res.need_accept_community_guidelines
113 # Agree to community guidelines
114 with auth_api_session() as (auth_api, metadata_interceptor):
115 res = auth_api.SignupFlow(
116 auth_pb2.SignupFlowReq(
117 flow_token=flow_token,
118 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
119 )
120 )
122 assert res.flow_token == flow_token
123 assert not res.HasField("auth_res")
124 assert not res.need_basic
125 assert res.need_account
126 assert not res.need_feedback
127 assert res.need_verify_email
128 assert not res.need_accept_community_guidelines
130 # Verify email
131 with auth_api_session() as (auth_api, metadata_interceptor):
132 res = auth_api.SignupFlow(
133 auth_pb2.SignupFlowReq(
134 flow_token=flow_token,
135 email_token=email_token,
136 )
137 )
139 assert res.flow_token == flow_token
140 assert not res.HasField("auth_res")
141 assert not res.need_basic
142 assert res.need_account
143 assert not res.need_feedback
144 assert not res.need_verify_email
145 assert not res.need_accept_community_guidelines
147 # Finally finish off account info
148 with auth_api_session() as (auth_api, metadata_interceptor):
149 res = auth_api.SignupFlow(
150 auth_pb2.SignupFlowReq(
151 flow_token=flow_token,
152 account=auth_pb2.SignupAccount(
153 username="frodo",
154 password="a very insecure password",
155 birthdate="1970-01-01",
156 gender="Bot",
157 hosting_status=api_pb2.HOSTING_STATUS_MAYBE,
158 city="New York City",
159 lat=40.7331,
160 lng=-73.9778,
161 radius=500,
162 accept_tos=True,
163 ),
164 )
165 )
167 assert not res.flow_token
168 assert res.HasField("auth_res")
169 assert res.auth_res.user_id
170 assert not res.auth_res.jailed
171 assert not res.need_basic
172 assert not res.need_account
173 assert not res.need_feedback
174 assert not res.need_verify_email
175 assert not res.need_accept_community_guidelines
177 user_id = res.auth_res.user_id
179 sess_token, uid = get_session_cookie_tokens(metadata_interceptor)
180 assert uid == str(user_id)
182 with api_session(sess_token) as api:
183 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id)))
185 assert res.username == "frodo"
186 assert res.gender == "Bot"
187 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE
188 assert res.city == "New York City"
189 assert res.lat == 40.7331
190 assert res.lng == -73.9778
191 assert res.radius == 500
193 with session_scope() as session:
194 form = session.execute(select(ContributorForm)).scalar_one()
196 assert form.ideas == "I'm a robot, incapable of original ideation"
197 assert form.features == "I love all your features"
198 assert form.experience == "I haven't done couch surfing before"
199 assert form.contribute == ContributeOption.yes
200 assert form.contribute_ways == ["serving", "backend"]
201 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes"
204def _quick_signup() -> int:
205 with auth_api_session() as (auth_api, metadata_interceptor):
206 res = auth_api.SignupFlow(
207 auth_pb2.SignupFlowReq(
208 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
209 account=auth_pb2.SignupAccount(
210 username="frodo",
211 password="a very insecure password",
212 birthdate="1970-01-01",
213 gender="Bot",
214 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
215 city="New York City",
216 lat=40.7331,
217 lng=-73.9778,
218 radius=500,
219 accept_tos=True,
220 ),
221 feedback=auth_pb2.ContributorForm(),
222 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
223 )
224 )
226 flow_token = res.flow_token
228 assert res.flow_token
229 assert not res.HasField("auth_res")
230 assert not res.need_basic
231 assert not res.need_account
232 assert not res.need_feedback
233 assert res.need_verify_email
235 # read out the signup token directly from the database for now
236 with session_scope() as session:
237 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
238 assert flow.email_sent
239 assert not flow.email_verified
240 email_token = flow.email_token
242 with auth_api_session() as (auth_api, metadata_interceptor):
243 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
245 assert not res.flow_token
246 assert res.HasField("auth_res")
247 assert res.auth_res.user_id
248 assert not res.auth_res.jailed
249 assert not res.need_basic
250 assert not res.need_account
251 assert not res.need_feedback
252 assert not res.need_verify_email
254 # make sure we got the right token in a cookie
255 with session_scope() as session:
256 token = session.execute(
257 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
258 ).scalar_one()
259 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
260 assert sesh == token
262 return cast(int, res.auth_res.user_id)
265def test_signup(db):
266 _quick_signup()
269def test_basic_login(db):
270 # Create our test user using signup
271 _quick_signup()
273 with auth_api_session() as (auth_api, metadata_interceptor):
274 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
276 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
278 with session_scope() as session:
279 token = session.execute(
280 select(UserSession.token)
281 .join(User, UserSession.user_id == User.id)
282 .where(User.username == "frodo")
283 .where(UserSession.token == reply_token)
284 .where(UserSession.is_valid)
285 ).scalar_one_or_none()
286 assert token
288 # log out
289 with auth_api_session() as (auth_api, metadata_interceptor):
290 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
293def test_login_part_signed_up_verified_email(db):
294 """
295 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up.
296 """
297 with auth_api_session() as (auth_api, metadata_interceptor):
298 res = auth_api.SignupFlow(
299 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"))
300 )
302 flow_token = res.flow_token
303 assert res.need_verify_email
305 # verify the email
306 with session_scope() as session:
307 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
308 flow_token = flow.flow_token
309 email_token = flow.email_token
310 with auth_api_session() as (auth_api, metadata_interceptor):
311 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
313 with mock_notification_email() as mock:
314 with auth_api_session() as (auth_api, metadata_interceptor):
315 with pytest.raises(grpc.RpcError) as err:
316 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd"))
317 assert err.value.details() == "Please check your email for a link to continue signing up."
319 assert mock.call_count == 1
320 e = email_fields(mock)
321 assert e.recipient == "email@couchers.org.invalid"
322 assert flow_token in e.plain
323 assert flow_token in e.html
326def test_login_part_signed_up_not_verified_email(db):
327 with auth_api_session() as (auth_api, metadata_interceptor):
328 res = auth_api.SignupFlow(
329 auth_pb2.SignupFlowReq(
330 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
331 account=auth_pb2.SignupAccount(
332 username="frodo",
333 password="a very insecure password",
334 birthdate="1999-01-01",
335 gender="Bot",
336 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
337 city="New York City",
338 lat=40.7331,
339 lng=-73.9778,
340 radius=500,
341 accept_tos=True,
342 ),
343 )
344 )
346 flow_token = res.flow_token
347 assert res.need_verify_email
349 with mock_notification_email() as mock:
350 with auth_api_session() as (auth_api, metadata_interceptor):
351 with pytest.raises(grpc.RpcError) as err:
352 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd"))
353 assert err.value.details() == "Please check your email for a link to continue signing up."
355 with session_scope() as session:
356 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
357 email_token = flow.email_token
359 assert mock.call_count == 1
360 e = email_fields(mock)
361 assert e.recipient == "email@couchers.org.invalid"
362 assert email_token
363 assert email_token in e.plain
364 assert email_token in e.html
367def test_banned_user(db):
368 _quick_signup()
370 with session_scope() as session:
371 session.execute(select(User)).scalar_one().is_banned = True
373 with auth_api_session() as (auth_api, metadata_interceptor):
374 with pytest.raises(grpc.RpcError) as e:
375 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
376 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
377 assert e.value.details() == "Your account is suspended."
380def test_deleted_user(db):
381 user_id = _quick_signup()
383 with session_scope() as session:
384 session.execute(update(User).where(User.id == user_id).values(is_deleted=True))
386 with auth_api_session() as (auth_api, metadata_interceptor):
387 with pytest.raises(grpc.RpcError) as e:
388 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
389 assert e.value.code() == grpc.StatusCode.NOT_FOUND
390 assert e.value.details() == "An account with that username or email was not found."
393def test_invalid_token(db):
394 user1, token1 = generate_user()
395 user2, token2 = generate_user()
397 wrong_token = random_hex(32)
399 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e:
400 res = api.GetUser(api_pb2.GetUserReq(user=user2.username))
402 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED
403 assert e.value.details() == "Unauthorized"
406def test_password_reset_v2(db, push_collector: PushCollector):
407 user, token = generate_user(hashed_password=hash_password("mypassword"))
409 with auth_api_session() as (auth_api, metadata_interceptor):
410 with mock_notification_email() as mock:
411 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username))
413 with session_scope() as session:
414 password_reset_token = session.execute(select(PasswordResetToken.token)).scalar_one()
416 assert mock.call_count == 1
417 e = email_fields(mock)
418 assert e.recipient == user.email
419 assert "reset" in e.subject.lower()
420 assert password_reset_token in e.plain
421 assert password_reset_token in e.html
422 unique_string = "You asked for your password to be reset on Couchers.org."
423 assert unique_string in e.plain
424 assert unique_string in e.html
425 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain
426 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html
427 assert "support@couchers.org" in e.plain
428 assert "support@couchers.org" in e.html
430 push = push_collector.pop_for_user(user.id, last=True)
431 assert push.content.title == "Password reset requested"
432 assert push.content.body == "Use the link we sent by email to complete it."
434 # make sure bad password are caught
435 with auth_api_session() as (auth_api, metadata_interceptor):
436 with pytest.raises(grpc.RpcError) as err:
437 auth_api.CompletePasswordResetV2(
438 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password")
439 )
440 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT
441 assert err.value.details() == "The password is insecure. Please use one that is not easily guessable."
443 # make sure we can set a good password
444 with auth_api_session() as (auth_api, metadata_interceptor):
445 pwd = random_hex()
446 with mock_notification_email() as mock:
447 auth_api.CompletePasswordResetV2(
448 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd)
449 )
451 push = push_collector.pop_for_user(user.id, last=True)
452 assert push.content.title == "Password reset"
453 assert push.content.body == "Your password was successfully reset."
455 session_token, _ = get_session_cookie_tokens(metadata_interceptor)
457 with session_scope() as session:
458 other_session_token = session.execute(
459 select(UserSession.token)
460 .join(User, UserSession.user_id == User.id)
461 .where(User.username == user.username)
462 .where(UserSession.token == session_token)
463 .where(UserSession.is_valid)
464 ).scalar_one_or_none()
465 assert other_session_token
467 # make sure we can't set a password again
468 with auth_api_session() as (auth_api, metadata_interceptor):
469 with pytest.raises(grpc.RpcError) as err:
470 auth_api.CompletePasswordResetV2(
471 auth_pb2.CompletePasswordResetV2Req(
472 password_reset_token=password_reset_token, new_password=random_hex()
473 )
474 )
475 assert err.value.code() == grpc.StatusCode.NOT_FOUND
476 assert err.value.details() == "Invalid token."
478 with session_scope() as session:
479 user = session.execute(select(User)).scalar_one()
480 assert user.hashed_password == hash_password(pwd)
483def test_password_reset_no_such_user(db):
484 user, token = generate_user()
486 with auth_api_session() as (auth_api, metadata_interceptor):
487 res = auth_api.ResetPassword(
488 auth_pb2.ResetPasswordReq(
489 user="nonexistentuser",
490 )
491 )
493 with session_scope() as session:
494 assert session.execute(select(PasswordResetToken)).scalar_one_or_none() is None
497def test_password_reset_invalid_token_v2(db):
498 password = random_hex()
499 user, token = generate_user(hashed_password=hash_password(password))
501 with auth_api_session() as (auth_api, metadata_interceptor):
502 res = auth_api.ResetPassword(
503 auth_pb2.ResetPasswordReq(
504 user=user.username,
505 )
506 )
508 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e:
509 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken"))
510 assert e.value.code() == grpc.StatusCode.NOT_FOUND
511 assert e.value.details() == "Invalid token."
513 with session_scope() as session:
514 user = session.execute(select(User)).scalar_one()
515 assert user.hashed_password == hash_password(password)
518def test_logout_invalid_token(db):
519 # Create our test user using signup
520 _quick_signup()
522 with auth_api_session() as (auth_api, metadata_interceptor):
523 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password"))
525 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
527 # delete all login tokens
528 with session_scope() as session:
529 session.execute(delete(LoginToken))
531 # log out with non-existent token should still return a valid result
532 with auth_api_session() as (auth_api, metadata_interceptor):
533 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),))
535 reply_token, _ = get_session_cookie_tokens(metadata_interceptor)
536 # make sure we set an empty cookie
537 assert reply_token == ""
540def test_signup_without_password(db):
541 with auth_api_session() as (auth_api, metadata_interceptor):
542 with pytest.raises(grpc.RpcError) as e:
543 auth_api.SignupFlow(
544 auth_pb2.SignupFlowReq(
545 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
546 account=auth_pb2.SignupAccount(
547 username="frodo",
548 password="bad",
549 city="Minas Tirith",
550 birthdate="9999-12-31", # arbitrary future birthdate
551 gender="Robot",
552 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
553 lat=1,
554 lng=1,
555 radius=100,
556 accept_tos=True,
557 ),
558 feedback=auth_pb2.ContributorForm(),
559 )
560 )
561 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
562 assert e.value.details() == "The password must be 8 or more characters long."
565def test_signup_invalid_birthdate(db):
566 with auth_api_session() as (auth_api, metadata_interceptor):
567 with pytest.raises(grpc.RpcError) as e:
568 auth_api.SignupFlow(
569 auth_pb2.SignupFlowReq(
570 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"),
571 account=auth_pb2.SignupAccount(
572 username="frodo",
573 password="a very insecure password",
574 city="Minas Tirith",
575 birthdate="9999-12-31", # arbitrary future birthdate
576 gender="Robot",
577 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
578 lat=1,
579 lng=1,
580 radius=100,
581 accept_tos=True,
582 ),
583 feedback=auth_pb2.ContributorForm(),
584 )
585 )
586 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
587 assert e.value.details() == "You must be at least 18 years old to sign up."
589 res = auth_api.SignupFlow(
590 auth_pb2.SignupFlowReq(
591 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"),
592 account=auth_pb2.SignupAccount(
593 username="ceelo",
594 password="a very insecure password",
595 city="New York City",
596 birthdate="2000-12-31", # arbitrary birthdate older than 18 years
597 gender="Helicopter",
598 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
599 lat=1,
600 lng=1,
601 radius=100,
602 accept_tos=True,
603 ),
604 feedback=auth_pb2.ContributorForm(),
605 )
606 )
608 assert res.flow_token
610 with pytest.raises(grpc.RpcError) as e:
611 auth_api.SignupFlow(
612 auth_pb2.SignupFlowReq(
613 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"),
614 account=auth_pb2.SignupAccount(
615 username="franklin",
616 password="a very insecure password",
617 city="Los Santos",
618 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs
619 gender="Male",
620 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
621 lat=1,
622 lng=1,
623 radius=100,
624 accept_tos=True,
625 ),
626 feedback=auth_pb2.ContributorForm(),
627 )
628 )
629 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
630 assert e.value.details() == "You must be at least 18 years old to sign up."
632 with session_scope() as session:
633 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1
636def test_signup_invalid_email(db):
637 with auth_api_session() as (auth_api, metadata_interceptor):
638 with pytest.raises(grpc.RpcError) as e:
639 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a")))
640 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
641 assert e.value.details() == "Invalid email."
643 with auth_api_session() as (auth_api, metadata_interceptor):
644 with pytest.raises(grpc.RpcError) as e:
645 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b")))
646 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
647 assert e.value.details() == "Invalid email."
649 with auth_api_session() as (auth_api, metadata_interceptor):
650 with pytest.raises(grpc.RpcError) as e:
651 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.")))
652 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
653 assert e.value.details() == "Invalid email."
655 with auth_api_session() as (auth_api, metadata_interceptor):
656 with pytest.raises(grpc.RpcError) as e:
657 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c")))
658 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
659 assert e.value.details() == "Invalid email."
662def test_signup_existing_email(db):
663 # Signed up user
664 user, _ = generate_user()
666 with auth_api_session() as (auth_api, metadata_interceptor):
667 with pytest.raises(grpc.RpcError) as e:
668 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email)))
669 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
670 assert e.value.details() == "That email address is already associated with an account. Please log in instead!"
673def test_signup_banned_user_email(db):
674 user, _ = generate_user()
676 with session_scope() as session:
677 session.execute(update(User).where(User.id == user.id).values(is_banned=True))
679 with auth_api_session() as (auth_api, _):
680 with pytest.raises(grpc.RpcError) as e:
681 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email)))
682 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
683 assert e.value.details() == "You cannot sign up with that email address."
686def test_signup_deleted_user_email(db):
687 user, _ = generate_user()
689 with session_scope() as session:
690 session.execute(update(User).where(User.id == user.id).values(is_deleted=True))
692 with auth_api_session() as (auth_api, _):
693 with pytest.raises(grpc.RpcError) as e:
694 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email)))
695 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
696 assert e.value.details() == "You cannot sign up with that email address."
699def test_signup_continue_with_email(db):
700 testing_email = f"{random_hex(12)}@couchers.org.invalid"
701 with auth_api_session() as (auth_api, metadata_interceptor):
702 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
703 flow_token = res.flow_token
704 assert flow_token
706 # continue with same email, should just send another email to the user
707 with auth_api_session() as (auth_api, metadata_interceptor):
708 with pytest.raises(grpc.RpcError) as e:
709 res = auth_api.SignupFlow(
710 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))
711 )
712 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
713 assert e.value.details() == "Please check your email for a link to continue signing up."
716def test_signup_resend_email(db):
717 with auth_api_session() as (auth_api, metadata_interceptor):
718 with mock_notification_email() as mock:
719 res = auth_api.SignupFlow(
720 auth_pb2.SignupFlowReq(
721 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
722 account=auth_pb2.SignupAccount(
723 username="frodo",
724 password="a very insecure password",
725 birthdate="1970-01-01",
726 gender="Bot",
727 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
728 city="New York City",
729 lat=40.7331,
730 lng=-73.9778,
731 radius=500,
732 accept_tos=True,
733 ),
734 feedback=auth_pb2.ContributorForm(),
735 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
736 )
737 )
738 assert mock.call_count == 1
739 e = email_fields(mock)
740 assert e.recipient == "email@couchers.org.invalid"
742 flow_token = res.flow_token
743 assert flow_token
745 with session_scope() as session:
746 flow = session.execute(select(SignupFlow)).scalar_one()
747 assert flow.flow_token == flow_token
748 assert flow.email_sent
749 assert not flow.email_verified
750 email_token = flow.email_token
752 # ask for a new signup email
753 with auth_api_session() as (auth_api, metadata_interceptor):
754 with mock_notification_email() as mock:
755 res = auth_api.SignupFlow(
756 auth_pb2.SignupFlowReq(
757 flow_token=flow_token,
758 resend_verification_email=True,
759 )
760 )
761 assert mock.call_count == 1
762 e = email_fields(mock)
763 assert e.recipient == "email@couchers.org.invalid"
764 assert email_token
765 assert email_token in e.plain
766 assert email_token in e.html
768 with session_scope() as session:
769 flow = session.execute(select(SignupFlow)).scalar_one()
770 assert not flow.email_verified
772 with auth_api_session() as (auth_api, metadata_interceptor):
773 res = auth_api.SignupFlow(
774 auth_pb2.SignupFlowReq(
775 email_token=email_token,
776 )
777 )
779 assert not res.flow_token
780 assert res.HasField("auth_res")
783def test_successful_authenticate(db):
784 user, _ = generate_user(hashed_password=hash_password("password"))
786 # Authenticate with username
787 with auth_api_session() as (auth_api, metadata_interceptor):
788 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password"))
789 assert not reply.jailed
791 # Authenticate with email
792 with auth_api_session() as (auth_api, metadata_interceptor):
793 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password"))
794 assert not reply.jailed
797def test_unsuccessful_authenticate(db):
798 user, _ = generate_user(hashed_password=hash_password("password"))
800 # Invalid password
801 with auth_api_session() as (auth_api, metadata_interceptor):
802 with pytest.raises(grpc.RpcError) as e:
803 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword"))
804 assert e.value.code() == grpc.StatusCode.NOT_FOUND
805 assert e.value.details() == "Wrong password."
807 # Invalid username
808 with auth_api_session() as (auth_api, metadata_interceptor):
809 with pytest.raises(grpc.RpcError) as e:
810 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password"))
811 assert e.value.code() == grpc.StatusCode.NOT_FOUND
812 assert e.value.details() == "An account with that username or email was not found."
814 # Invalid email
815 with auth_api_session() as (auth_api, metadata_interceptor):
816 with pytest.raises(grpc.RpcError) as e:
817 reply = auth_api.Authenticate(
818 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password")
819 )
820 assert e.value.code() == grpc.StatusCode.NOT_FOUND
821 assert e.value.details() == "An account with that username or email was not found."
823 # Invalid id
824 with auth_api_session() as (auth_api, metadata_interceptor):
825 with pytest.raises(grpc.RpcError) as e:
826 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password"))
827 assert e.value.code() == grpc.StatusCode.NOT_FOUND
828 assert e.value.details() == "An account with that username or email was not found."
831def test_complete_signup(db):
832 testing_email = f"{random_hex(12)}@couchers.org.invalid"
833 with auth_api_session() as (auth_api, metadata_interceptor):
834 reply = auth_api.SignupFlow(
835 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
836 )
838 flow_token = reply.flow_token
840 with auth_api_session() as (auth_api, metadata_interceptor):
841 # Invalid username
842 with pytest.raises(grpc.RpcError) as e:
843 auth_api.SignupFlow(
844 auth_pb2.SignupFlowReq(
845 flow_token=flow_token,
846 account=auth_pb2.SignupAccount(
847 username=" ",
848 password="a very insecure password",
849 city="Minas Tirith",
850 birthdate="1980-12-31",
851 gender="Robot",
852 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
853 lat=1,
854 lng=1,
855 radius=100,
856 accept_tos=True,
857 ),
858 )
859 )
860 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
861 assert e.value.details() == "Invalid username."
863 with auth_api_session() as (auth_api, metadata_interceptor):
864 # Invalid name
865 with pytest.raises(grpc.RpcError) as e:
866 auth_api.SignupFlow(
867 auth_pb2.SignupFlowReq(
868 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid")
869 )
870 )
871 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
872 assert e.value.details() == "Name not supported."
874 with auth_api_session() as (auth_api, metadata_interceptor):
875 # Hosting status required
876 with pytest.raises(grpc.RpcError) as e:
877 auth_api.SignupFlow(
878 auth_pb2.SignupFlowReq(
879 flow_token=flow_token,
880 account=auth_pb2.SignupAccount(
881 username="frodo",
882 password="a very insecure password",
883 city="Minas Tirith",
884 birthdate="1980-12-31",
885 gender="Robot",
886 hosting_status=None,
887 lat=1,
888 lng=1,
889 radius=100,
890 accept_tos=True,
891 ),
892 )
893 )
894 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
895 assert e.value.details() == "Hosting status is required."
897 user, _ = generate_user()
898 with auth_api_session() as (auth_api, metadata_interceptor):
899 # Username unavailable
900 with pytest.raises(grpc.RpcError) as e:
901 auth_api.SignupFlow(
902 auth_pb2.SignupFlowReq(
903 flow_token=flow_token,
904 account=auth_pb2.SignupAccount(
905 username=user.username,
906 password="a very insecure password",
907 city="Minas Tirith",
908 birthdate="1980-12-31",
909 gender="Robot",
910 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
911 lat=1,
912 lng=1,
913 radius=100,
914 accept_tos=True,
915 ),
916 )
917 )
918 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
919 assert e.value.details() == "Sorry, that username isn't available."
921 with auth_api_session() as (auth_api, metadata_interceptor):
922 # Invalid coordinate
923 with pytest.raises(grpc.RpcError) as e:
924 auth_api.SignupFlow(
925 auth_pb2.SignupFlowReq(
926 flow_token=flow_token,
927 account=auth_pb2.SignupAccount(
928 username="frodo",
929 password="a very insecure password",
930 city="Minas Tirith",
931 birthdate="1980-12-31",
932 gender="Robot",
933 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
934 lat=0,
935 lng=0,
936 radius=100,
937 accept_tos=True,
938 ),
939 )
940 )
941 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
942 assert e.value.details() == "Invalid coordinate."
945def test_signup_token_regression(db):
946 # Repro steps:
947 # 1. Start a signup
948 # 2. Confirm the email
949 # 3. Start a new signup with the same email
950 # Expected: send a link to the email to continue signing up.
951 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'`
953 testing_email = f"{random_hex(12)}@couchers.org.invalid"
955 # 1. Start a signup
956 with auth_api_session() as (auth_api, metadata_interceptor):
957 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
958 flow_token = res.flow_token
959 assert flow_token
961 # 2. Confirm the email
962 with session_scope() as session:
963 email_token = session.execute(
964 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token)
965 ).scalar_one()
967 with auth_api_session() as (auth_api, metadata_interceptor):
968 auth_api.SignupFlow(
969 auth_pb2.SignupFlowReq(
970 flow_token=flow_token,
971 email_token=email_token,
972 )
973 )
975 # 3. Start a new signup with the same email
976 with auth_api_session() as (auth_api, metadata_interceptor):
977 with pytest.raises(grpc.RpcError) as e:
978 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)))
979 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
980 assert e.value.details() == "Please check your email for a link to continue signing up."
983@pytest.mark.parametrize("opt_out", [True, False])
984def test_opt_out_of_newsletter(db, opt_out):
985 with auth_api_session() as (auth_api, metadata_interceptor):
986 res = auth_api.SignupFlow(
987 auth_pb2.SignupFlowReq(
988 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
989 account=auth_pb2.SignupAccount(
990 username="frodo",
991 password="a very insecure password",
992 birthdate="1970-01-01",
993 gender="Bot",
994 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
995 city="New York City",
996 lat=40.7331,
997 lng=-73.9778,
998 radius=500,
999 accept_tos=True,
1000 opt_out_of_newsletter=opt_out,
1001 ),
1002 feedback=auth_pb2.ContributorForm(),
1003 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1004 )
1005 )
1007 with session_scope() as session:
1008 email_token = session.execute(
1009 select(SignupFlow.email_token).where(SignupFlow.flow_token == res.flow_token)
1010 ).scalar_one()
1012 with auth_api_session() as (auth_api, metadata_interceptor):
1013 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1015 user_id = res.auth_res.user_id
1017 with session_scope() as session:
1018 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
1019 assert not user.in_sync_with_newsletter
1020 assert user.opt_out_of_newsletter == opt_out
1023def test_GetAuthState(db):
1024 user, token = generate_user()
1025 jailed_user, jailed_token = generate_user(accepted_tos=0)
1027 with auth_api_session() as (auth_api, metadata_interceptor):
1028 res = auth_api.GetAuthState(empty_pb2.Empty())
1029 assert not res.logged_in
1030 assert not res.HasField("auth_res")
1032 with auth_api_session() as (auth_api, metadata_interceptor):
1033 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1034 assert res.logged_in
1035 assert res.HasField("auth_res")
1036 assert res.auth_res.user_id == user.id
1037 assert not res.auth_res.jailed
1039 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1041 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),))
1042 assert not res.logged_in
1043 assert not res.HasField("auth_res")
1045 with auth_api_session() as (auth_api, metadata_interceptor):
1046 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),))
1047 assert res.logged_in
1048 assert res.HasField("auth_res")
1049 assert res.auth_res.user_id == jailed_user.id
1050 assert res.auth_res.jailed
1053def test_signup_no_feedback_regression(db):
1054 """
1055 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup,
1056 this regression test checks that.
1057 """
1058 with auth_api_session() as (auth_api, metadata_interceptor):
1059 res = auth_api.SignupFlow(
1060 auth_pb2.SignupFlowReq(
1061 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"),
1062 account=auth_pb2.SignupAccount(
1063 username="frodo",
1064 password="a very insecure password",
1065 birthdate="1970-01-01",
1066 gender="Bot",
1067 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1068 city="New York City",
1069 lat=40.7331,
1070 lng=-73.9778,
1071 radius=500,
1072 accept_tos=True,
1073 ),
1074 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1075 )
1076 )
1078 flow_token = res.flow_token
1080 assert res.flow_token
1081 assert not res.HasField("auth_res")
1082 assert not res.need_basic
1083 assert not res.need_account
1084 assert not res.need_feedback
1085 assert res.need_verify_email
1087 # read out the signup token directly from the database for now
1088 with session_scope() as session:
1089 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one()
1090 assert flow.email_sent
1091 assert not flow.email_verified
1092 email_token = flow.email_token
1094 with auth_api_session() as (auth_api, metadata_interceptor):
1095 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1097 assert not res.flow_token
1098 assert res.HasField("auth_res")
1099 assert res.auth_res.user_id
1100 assert not res.auth_res.jailed
1101 assert not res.need_basic
1102 assert not res.need_account
1103 assert not res.need_feedback
1104 assert not res.need_verify_email
1106 # make sure we got the right token in a cookie
1107 with session_scope() as session:
1108 token = session.execute(
1109 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo")
1110 ).scalar_one()
1111 sesh, uid = get_session_cookie_tokens(metadata_interceptor)
1112 assert sesh == token
1115def test_banned_username(db):
1116 testing_email = f"{random_hex(12)}@couchers.org.invalid"
1117 with auth_api_session() as (auth_api, metadata_interceptor):
1118 reply = auth_api.SignupFlow(
1119 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email))
1120 )
1122 flow_token = reply.flow_token
1124 with auth_api_session() as (auth_api, metadata_interceptor):
1125 # Banned username
1126 with pytest.raises(grpc.RpcError) as e:
1127 auth_api.SignupFlow(
1128 auth_pb2.SignupFlowReq(
1129 flow_token=flow_token,
1130 account=auth_pb2.SignupAccount(
1131 username="thecouchersadminaccount",
1132 password="a very insecure password",
1133 city="Minas Tirith",
1134 birthdate="1980-12-31",
1135 gender="Robot",
1136 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1137 lat=1,
1138 lng=1,
1139 radius=100,
1140 accept_tos=True,
1141 ),
1142 )
1143 )
1144 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
1145 assert e.value.details() == "Sorry, that username isn't available."
1148# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*
1151def test_GetInviteCodeInfo(db):
1152 user, token = generate_user(complete_profile=True)
1154 with account_session(token) as account:
1155 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1157 with auth_api_session() as (auth, _):
1158 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code))
1159 assert res.name == user.name
1160 assert res.username == user.username
1161 # Avatar URL should be a thumbnail URL with a hashed filename
1162 assert "/img/thumbnail/" in res.avatar_url
1163 assert res.avatar_url.endswith(".jpg")
1164 # Verify the hashed filename looks correct (64 char hex hash)
1165 assert len(res.avatar_url.split("/")[-1].replace(".jpg", "")) == 64
1166 assert res.url == urls.invite_code_link(code=code)
1169def test_GetInviteCodeInfo_no_avatar(db):
1170 user, token = generate_user(complete_profile=False)
1172 with account_session(token) as account:
1173 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1175 with auth_api_session() as (auth, _):
1176 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code))
1177 assert res.name == user.name
1178 assert res.username == user.username
1179 assert res.avatar_url == ""
1180 assert res.url == urls.invite_code_link(code=code)
1183def test_GetInviteCodeInfo_not_found(db):
1184 generate_user()
1186 with auth_api_session() as (auth, _):
1187 with pytest.raises(grpc.RpcError) as e:
1188 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE"))
1189 assert e.value.code() == grpc.StatusCode.NOT_FOUND
1190 assert e.value.details() == "Invite code not found."
1193def test_SignupFlow_invite_code(db):
1194 user, token = generate_user()
1196 with account_session(token) as account:
1197 invite_code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code
1199 with auth_api_session() as (auth_api, _):
1200 # Signup basic step with invite code
1201 res = auth_api.SignupFlow(
1202 auth_pb2.SignupFlowReq(
1203 basic=auth_pb2.SignupBasic(
1204 name="Test User",
1205 email="inviteuser@example.com",
1206 invite_code=invite_code,
1207 )
1208 )
1209 )
1210 flow_token = res.flow_token
1211 assert flow_token
1213 # Confirm email
1214 with session_scope() as session:
1215 email_token = session.execute(
1216 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token)
1217 ).scalar_one()
1219 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token))
1221 # Signup account step
1222 auth_api.SignupFlow(
1223 auth_pb2.SignupFlowReq(
1224 flow_token=flow_token,
1225 account=auth_pb2.SignupAccount(
1226 username="invited_user",
1227 password="secure password",
1228 birthdate="1990-01-01",
1229 gender="Other",
1230 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST,
1231 city="Example City",
1232 lat=1,
1233 lng=5,
1234 radius=100,
1235 accept_tos=True,
1236 ),
1237 feedback=auth_pb2.ContributorForm(),
1238 accept_community_guidelines=wrappers_pb2.BoolValue(value=True),
1239 )
1240 )
1242 # Check that invite_code_id is stored in the final User object
1243 with session_scope() as session:
1244 invite_code_id = session.execute(
1245 select(User.invite_code_id).where(User.username == "invited_user")
1246 ).scalar_one()
1247 assert invite_code_id == invite_code