Coverage for src/tests/test_account.py: 100%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from datetime import timedelta
2from unittest.mock import patch
4import grpc
5import pytest
6from google.protobuf import empty_pb2, wrappers_pb2
7from sqlalchemy.sql import func
9from couchers import errors
10from couchers.crypto import hash_password, random_hex
11from couchers.db import session_scope
12from couchers.models import AccountDeletionReason, AccountDeletionToken, BackgroundJob, BackgroundJobType, Upload, User
13from couchers.sql import couchers_select as select
14from couchers.utils import now
15from proto import account_pb2, auth_pb2
16from tests.test_fixtures import account_session, auth_api_session, db, fast_passwords, generate_user, testconfig # noqa
19@pytest.fixture(autouse=True)
20def _(testconfig):
21 pass
24def test_GetAccountInfo(db, fast_passwords):
25 # without password
26 user1, token1 = generate_user(hashed_password=None, email="funkybot@couchers.invalid")
28 with account_session(token1) as account:
29 res = account.GetAccountInfo(empty_pb2.Empty())
30 assert res.login_method == account_pb2.GetAccountInfoRes.LoginMethod.MAGIC_LINK
31 assert not res.has_password
32 assert res.email == "funkybot@couchers.invalid"
33 assert res.username == user1.username
35 # with password
36 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid")
38 with account_session(token1) as account:
39 res = account.GetAccountInfo(empty_pb2.Empty())
40 assert res.login_method == account_pb2.GetAccountInfoRes.LoginMethod.PASSWORD
41 assert res.has_password
42 assert res.email == "user@couchers.invalid"
43 assert res.username == user1.username
46def test_GetAccountInfo_regression(db):
47 # there was a bug in evaluating `has_completed_profile` on the backend (in python)
48 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None
49 uploader_user, _ = generate_user()
50 with session_scope() as session:
51 key = random_hex(32)
52 filename = random_hex(32) + ".jpg"
53 session.add(
54 Upload(
55 key=key,
56 filename=filename,
57 creator_user_id=uploader_user.id,
58 )
59 )
60 session.commit()
61 user, token = generate_user(about_me=None, avatar_key=key)
63 with account_session(token) as account:
64 res = account.GetAccountInfo(empty_pb2.Empty())
67def test_ChangePassword_normal(db, fast_passwords):
68 # user has old password and is changing to new password
69 old_password = random_hex()
70 new_password = random_hex()
71 user, token = generate_user(hashed_password=hash_password(old_password))
73 with account_session(token) as account:
74 with patch("couchers.servicers.account.send_password_changed_email") as mock:
75 account.ChangePassword(
76 account_pb2.ChangePasswordReq(
77 old_password=wrappers_pb2.StringValue(value=old_password),
78 new_password=wrappers_pb2.StringValue(value=new_password),
79 )
80 )
81 mock.assert_called_once()
83 with session_scope() as session:
84 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
85 assert updated_user.hashed_password == hash_password(new_password)
88def test_ChangePassword_regression(db, fast_passwords):
89 # send_password_changed_email wasn't working
90 # user has old password and is changing to new password
91 old_password = random_hex()
92 new_password = random_hex()
93 user, token = generate_user(hashed_password=hash_password(old_password))
95 with account_session(token) as account:
96 account.ChangePassword(
97 account_pb2.ChangePasswordReq(
98 old_password=wrappers_pb2.StringValue(value=old_password),
99 new_password=wrappers_pb2.StringValue(value=new_password),
100 )
101 )
103 with session_scope() as session:
104 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
105 assert updated_user.hashed_password == hash_password(new_password)
108def test_ChangePassword_normal_short_password(db, fast_passwords):
109 # user has old password and is changing to new password, but used short password
110 old_password = random_hex()
111 new_password = random_hex(length=1)
112 user, token = generate_user(hashed_password=hash_password(old_password))
114 with account_session(token) as account:
115 with pytest.raises(grpc.RpcError) as e:
116 account.ChangePassword(
117 account_pb2.ChangePasswordReq(
118 old_password=wrappers_pb2.StringValue(value=old_password),
119 new_password=wrappers_pb2.StringValue(value=new_password),
120 )
121 )
122 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
123 assert e.value.details() == errors.PASSWORD_TOO_SHORT
125 with session_scope() as session:
126 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
127 assert updated_user.hashed_password == hash_password(old_password)
130def test_ChangePassword_normal_long_password(db, fast_passwords):
131 # user has old password and is changing to new password, but used short password
132 old_password = random_hex()
133 new_password = random_hex(length=1000)
134 user, token = generate_user(hashed_password=hash_password(old_password))
136 with account_session(token) as account:
137 with pytest.raises(grpc.RpcError) as e:
138 account.ChangePassword(
139 account_pb2.ChangePasswordReq(
140 old_password=wrappers_pb2.StringValue(value=old_password),
141 new_password=wrappers_pb2.StringValue(value=new_password),
142 )
143 )
144 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
145 assert e.value.details() == errors.PASSWORD_TOO_LONG
147 with session_scope() as session:
148 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
149 assert updated_user.hashed_password == hash_password(old_password)
152def test_ChangePassword_normal_insecure_password(db, fast_passwords):
153 # user has old password and is changing to new password, but used insecure password
154 old_password = random_hex()
155 new_password = "12345678"
156 user, token = generate_user(hashed_password=hash_password(old_password))
158 with account_session(token) as account:
159 with pytest.raises(grpc.RpcError) as e:
160 account.ChangePassword(
161 account_pb2.ChangePasswordReq(
162 old_password=wrappers_pb2.StringValue(value=old_password),
163 new_password=wrappers_pb2.StringValue(value=new_password),
164 )
165 )
166 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
167 assert e.value.details() == errors.INSECURE_PASSWORD
169 with session_scope() as session:
170 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
171 assert updated_user.hashed_password == hash_password(old_password)
174def test_ChangePassword_normal_wrong_password(db, fast_passwords):
175 # user has old password and is changing to new password, but used wrong old password
176 old_password = random_hex()
177 new_password = random_hex()
178 user, token = generate_user(hashed_password=hash_password(old_password))
180 with account_session(token) as account:
181 with pytest.raises(grpc.RpcError) as e:
182 account.ChangePassword(
183 account_pb2.ChangePasswordReq(
184 old_password=wrappers_pb2.StringValue(value="wrong password"),
185 new_password=wrappers_pb2.StringValue(value=new_password),
186 )
187 )
188 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
189 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD
191 with session_scope() as session:
192 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
193 assert updated_user.hashed_password == hash_password(old_password)
196def test_ChangePassword_normal_no_password(db, fast_passwords):
197 # user has old password and is changing to new password, but didn't supply old password
198 old_password = random_hex()
199 new_password = random_hex()
200 user, token = generate_user(hashed_password=hash_password(old_password))
202 with account_session(token) as account:
203 with pytest.raises(grpc.RpcError) as e:
204 account.ChangePassword(
205 account_pb2.ChangePasswordReq(
206 new_password=wrappers_pb2.StringValue(value=new_password),
207 )
208 )
209 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
210 assert e.value.details() == errors.MISSING_PASSWORD
212 with session_scope() as session:
213 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
214 assert updated_user.hashed_password == hash_password(old_password)
217def test_ChangePassword_normal_no_passwords(db, fast_passwords):
218 # user has old password and called with empty body
219 old_password = random_hex()
220 user, token = generate_user(hashed_password=hash_password(old_password))
222 with account_session(token) as account:
223 with pytest.raises(grpc.RpcError) as e:
224 account.ChangePassword(account_pb2.ChangePasswordReq())
225 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
226 assert e.value.details() == errors.MISSING_BOTH_PASSWORDS
228 with session_scope() as session:
229 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
230 assert updated_user.hashed_password == hash_password(old_password)
233def test_ChangePassword_add(db, fast_passwords):
234 # user does not have an old password and is adding a new password
235 new_password = random_hex()
236 user, token = generate_user(hashed_password=None)
238 with account_session(token) as account:
239 with patch("couchers.servicers.account.send_password_changed_email") as mock:
240 account.ChangePassword(
241 account_pb2.ChangePasswordReq(
242 new_password=wrappers_pb2.StringValue(value=new_password),
243 )
244 )
245 mock.assert_called_once()
247 with session_scope() as session:
248 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
249 assert updated_user.hashed_password == hash_password(new_password)
252def test_ChangePassword_add_with_password(db, fast_passwords):
253 # user does not have an old password and is adding a new password, but supplied a password
254 new_password = random_hex()
255 user, token = generate_user(hashed_password=None)
257 with account_session(token) as account:
258 with pytest.raises(grpc.RpcError) as e:
259 account.ChangePassword(
260 account_pb2.ChangePasswordReq(
261 old_password=wrappers_pb2.StringValue(value="wrong password"),
262 new_password=wrappers_pb2.StringValue(value=new_password),
263 )
264 )
265 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
266 assert e.value.details() == errors.NO_PASSWORD
268 with session_scope() as session:
269 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
270 assert not updated_user.has_password
273def test_ChangePassword_add_no_passwords(db, fast_passwords):
274 # user does not have an old password and called with empty body
275 user, token = generate_user(hashed_password=None)
277 with account_session(token) as account:
278 with pytest.raises(grpc.RpcError) as e:
279 account.ChangePassword(account_pb2.ChangePasswordReq())
280 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
281 assert e.value.details() == errors.MISSING_BOTH_PASSWORDS
283 with session_scope() as session:
284 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
285 assert updated_user.hashed_password == None
288def test_ChangePassword_remove(db, fast_passwords):
289 old_password = random_hex()
290 user, token = generate_user(hashed_password=hash_password(old_password))
292 with account_session(token) as account:
293 with patch("couchers.servicers.account.send_password_changed_email") as mock:
294 account.ChangePassword(
295 account_pb2.ChangePasswordReq(
296 old_password=wrappers_pb2.StringValue(value=old_password),
297 )
298 )
299 mock.assert_called_once()
301 with session_scope() as session:
302 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
303 assert not updated_user.has_password
306def test_ChangePassword_remove_wrong_password(db, fast_passwords):
307 old_password = random_hex()
308 user, token = generate_user(hashed_password=hash_password(old_password))
310 with account_session(token) as account:
311 with pytest.raises(grpc.RpcError) as e:
312 account.ChangePassword(
313 account_pb2.ChangePasswordReq(
314 old_password=wrappers_pb2.StringValue(value="wrong password"),
315 )
316 )
317 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
318 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD
320 with session_scope() as session:
321 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one()
322 assert updated_user.hashed_password == hash_password(old_password)
325def test_ChangeEmail_wrong_password(db, fast_passwords):
326 password = random_hex()
327 new_email = f"{random_hex()}@couchers.org.invalid"
328 user, token = generate_user(hashed_password=hash_password(password))
330 with account_session(token) as account:
331 with pytest.raises(grpc.RpcError) as e:
332 account.ChangeEmail(
333 account_pb2.ChangeEmailReq(
334 password=wrappers_pb2.StringValue(value="wrong password"),
335 new_email=new_email,
336 )
337 )
338 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
339 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD
341 with session_scope() as session:
342 assert (
343 session.execute(
344 select(func.count())
345 .select_from(User)
346 .where(User.new_email_token_created <= func.now())
347 .where(User.new_email_token_expiry >= func.now())
348 )
349 ).scalar_one() == 0
352def test_ChangeEmail_wrong_email(db, fast_passwords):
353 password = random_hex()
354 new_email = f"{random_hex()}@couchers.org.invalid"
355 user, token = generate_user(hashed_password=hash_password(password))
357 with account_session(token) as account:
358 with pytest.raises(grpc.RpcError) as e:
359 account.ChangeEmail(
360 account_pb2.ChangeEmailReq(
361 password=wrappers_pb2.StringValue(value="wrong password"),
362 new_email=new_email,
363 )
364 )
365 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
366 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD
368 with session_scope() as session:
369 assert (
370 session.execute(
371 select(func.count())
372 .select_from(User)
373 .where(User.new_email_token_created <= func.now())
374 .where(User.new_email_token_expiry >= func.now())
375 )
376 ).scalar_one() == 0
379def test_ChangeEmail_invalid_email(db, fast_passwords):
380 password = random_hex()
381 user, token = generate_user(hashed_password=hash_password(password))
383 with account_session(token) as account:
384 with pytest.raises(grpc.RpcError) as e:
385 account.ChangeEmail(
386 account_pb2.ChangeEmailReq(
387 password=wrappers_pb2.StringValue(value=password),
388 new_email="not a real email",
389 )
390 )
391 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
392 assert e.value.details() == errors.INVALID_EMAIL
394 with session_scope() as session:
395 assert (
396 session.execute(
397 select(func.count())
398 .select_from(User)
399 .where(User.new_email_token_created <= func.now())
400 .where(User.new_email_token_expiry >= func.now())
401 )
402 ).scalar_one() == 0
405def test_ChangeEmail_email_in_use(db, fast_passwords):
406 password = random_hex()
407 user, token = generate_user(hashed_password=hash_password(password))
408 user2, token2 = generate_user(hashed_password=hash_password(password))
410 with account_session(token) as account:
411 with pytest.raises(grpc.RpcError) as e:
412 account.ChangeEmail(
413 account_pb2.ChangeEmailReq(
414 password=wrappers_pb2.StringValue(value=password),
415 new_email=user2.email,
416 )
417 )
418 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
419 assert e.value.details() == errors.INVALID_EMAIL
421 with session_scope() as session:
422 assert (
423 session.execute(
424 select(func.count())
425 .select_from(User)
426 .where(User.new_email_token_created <= func.now())
427 .where(User.new_email_token_expiry >= func.now())
428 )
429 ).scalar_one() == 0
432def test_ChangeEmail_no_change(db, fast_passwords):
433 password = random_hex()
434 user, token = generate_user(hashed_password=hash_password(password))
436 with account_session(token) as account:
437 with pytest.raises(grpc.RpcError) as e:
438 account.ChangeEmail(
439 account_pb2.ChangeEmailReq(
440 password=wrappers_pb2.StringValue(value=password),
441 new_email=user.email,
442 )
443 )
444 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
445 assert e.value.details() == errors.INVALID_EMAIL
447 with session_scope() as session:
448 assert (
449 session.execute(
450 select(func.count())
451 .select_from(User)
452 .where(User.new_email_token_created <= func.now())
453 .where(User.new_email_token_expiry >= func.now())
454 )
455 ).scalar_one() == 0
458def test_ChangeEmail_wrong_token(db, fast_passwords):
459 password = random_hex()
460 new_email = f"{random_hex()}@couchers.org.invalid"
461 user, token = generate_user(hashed_password=None)
463 with account_session(token) as account:
464 account.ChangeEmail(
465 account_pb2.ChangeEmailReq(
466 new_email=new_email,
467 )
468 )
470 with auth_api_session() as (auth_api, metadata_interceptor):
471 with pytest.raises(grpc.RpcError) as e:
472 res = auth_api.ConfirmChangeEmail(
473 auth_pb2.ConfirmChangeEmailReq(
474 change_email_token="wrongtoken",
475 )
476 )
477 assert e.value.code() == grpc.StatusCode.NOT_FOUND
478 assert e.value.details() == errors.INVALID_TOKEN
480 with session_scope() as session:
481 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
482 assert user_updated.email == user.email
485def test_ChangeEmail_tokens_two_hour_window(db):
486 def two_hours_one_minute_in_future():
487 return now() + timedelta(hours=2, minutes=1)
489 def one_minute_ago():
490 return now() - timedelta(minutes=1)
492 new_email = f"{random_hex()}@couchers.org.invalid"
493 user, token = generate_user(hashed_password=None)
495 with account_session(token) as account:
496 account.ChangeEmail(
497 account_pb2.ChangeEmailReq(
498 new_email=new_email,
499 )
500 )
502 with session_scope() as session:
503 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
504 old_email_token = user.old_email_token
505 new_email_token = user.new_email_token
507 with patch("couchers.servicers.auth.now", one_minute_ago):
508 with auth_api_session() as (auth_api, metadata_interceptor):
509 with pytest.raises(grpc.RpcError) as e:
510 auth_api.ConfirmChangeEmail(
511 auth_pb2.ConfirmChangeEmailReq(
512 change_email_token=old_email_token,
513 )
514 )
515 assert e.value.code() == grpc.StatusCode.NOT_FOUND
516 assert e.value.details() == errors.INVALID_TOKEN
518 with pytest.raises(grpc.RpcError) as e:
519 auth_api.ConfirmChangeEmail(
520 auth_pb2.ConfirmChangeEmailReq(
521 change_email_token=new_email_token,
522 )
523 )
524 assert e.value.code() == grpc.StatusCode.NOT_FOUND
525 assert e.value.details() == errors.INVALID_TOKEN
527 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future):
528 with auth_api_session() as (auth_api, metadata_interceptor):
529 with pytest.raises(grpc.RpcError) as e:
530 auth_api.ConfirmChangeEmail(
531 auth_pb2.ConfirmChangeEmailReq(
532 change_email_token=old_email_token,
533 )
534 )
535 assert e.value.code() == grpc.StatusCode.NOT_FOUND
536 assert e.value.details() == errors.INVALID_TOKEN
538 with pytest.raises(grpc.RpcError) as e:
539 auth_api.ConfirmChangeEmail(
540 auth_pb2.ConfirmChangeEmailReq(
541 change_email_token=new_email_token,
542 )
543 )
544 assert e.value.code() == grpc.StatusCode.NOT_FOUND
545 assert e.value.details() == errors.INVALID_TOKEN
548def test_ChangeEmail_has_password(db, fast_passwords):
549 password = random_hex()
550 new_email = f"{random_hex()}@couchers.org.invalid"
551 user, token = generate_user(hashed_password=hash_password(password))
553 with account_session(token) as account:
554 account.ChangeEmail(
555 account_pb2.ChangeEmailReq(
556 password=wrappers_pb2.StringValue(value=password),
557 new_email=new_email,
558 )
559 )
561 with session_scope() as session:
562 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
563 assert user_updated.email == user.email
564 assert user_updated.new_email == new_email
565 assert user_updated.old_email_token is None
566 assert not user_updated.old_email_token_created
567 assert not user_updated.old_email_token_expiry
568 assert not user_updated.need_to_confirm_via_old_email
569 assert user_updated.new_email_token is not None
570 assert user_updated.new_email_token_created <= now()
571 assert user_updated.new_email_token_expiry >= now()
572 assert user_updated.need_to_confirm_via_new_email
574 token = user_updated.new_email_token
576 with auth_api_session() as (auth_api, metadata_interceptor):
577 res = auth_api.ConfirmChangeEmail(
578 auth_pb2.ConfirmChangeEmailReq(
579 change_email_token=token,
580 )
581 )
582 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_SUCCESS
584 with session_scope() as session:
585 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
586 assert user.email == new_email
587 assert user.new_email is None
588 assert user.old_email_token is None
589 assert user.old_email_token_created is None
590 assert user.old_email_token_expiry is None
591 assert not user.need_to_confirm_via_old_email
592 assert user.new_email_token is None
593 assert user.new_email_token_created is None
594 assert user.new_email_token_expiry is None
595 assert not user.need_to_confirm_via_new_email
598def test_ChangeEmail_no_password_confirm_with_old_email_first(db):
599 new_email = f"{random_hex()}@couchers.org.invalid"
600 user, token = generate_user(hashed_password=None)
602 with account_session(token) as account:
603 account.ChangeEmail(
604 account_pb2.ChangeEmailReq(
605 new_email=new_email,
606 )
607 )
609 with session_scope() as session:
610 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
611 assert user_updated.email == user.email
612 assert user_updated.new_email == new_email
613 assert user_updated.old_email_token is not None
614 assert user_updated.old_email_token_created <= now()
615 assert user_updated.old_email_token_expiry >= now()
616 assert user_updated.need_to_confirm_via_old_email
617 assert user_updated.new_email_token is not None
618 assert user_updated.new_email_token_created <= now()
619 assert user_updated.new_email_token_expiry >= now()
620 assert user_updated.need_to_confirm_via_new_email
622 token = user_updated.old_email_token
624 with auth_api_session() as (auth_api, metadata_interceptor):
625 res = auth_api.ConfirmChangeEmail(
626 auth_pb2.ConfirmChangeEmailReq(
627 change_email_token=token,
628 )
629 )
630 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_REQUIRES_CONFIRMATION_FROM_NEW_EMAIL
632 with session_scope() as session:
633 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
634 assert user_updated.email == user.email
635 assert user_updated.new_email == new_email
636 assert user_updated.old_email_token is None
637 assert user_updated.old_email_token_created is None
638 assert user_updated.old_email_token_expiry is None
639 assert not user_updated.need_to_confirm_via_old_email
640 assert user_updated.new_email_token is not None
641 assert user_updated.new_email_token_created <= now()
642 assert user_updated.new_email_token_expiry >= now()
643 assert user_updated.need_to_confirm_via_new_email
645 token = user_updated.new_email_token
647 with auth_api_session() as (auth_api, metadata_interceptor):
648 res = auth_api.ConfirmChangeEmail(
649 auth_pb2.ConfirmChangeEmailReq(
650 change_email_token=token,
651 )
652 )
653 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_SUCCESS
655 with session_scope() as session:
656 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
657 assert user.email == new_email
658 assert user.new_email is None
659 assert user.old_email_token is None
660 assert user.old_email_token_created is None
661 assert user.old_email_token_expiry is None
662 assert not user.need_to_confirm_via_old_email
663 assert user.new_email_token is None
664 assert user.new_email_token_created is None
665 assert user.new_email_token_expiry is None
666 assert not user.need_to_confirm_via_new_email
669def test_ChangeEmail_no_password_confirm_with_new_email_first(db):
670 new_email = f"{random_hex()}@couchers.org.invalid"
671 user, token = generate_user(hashed_password=None)
673 with account_session(token) as account:
674 account.ChangeEmail(
675 account_pb2.ChangeEmailReq(
676 new_email=new_email,
677 )
678 )
680 with session_scope() as session:
681 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
682 assert user_updated.email == user.email
683 assert user_updated.new_email == new_email
684 assert user_updated.old_email_token is not None
685 assert user_updated.old_email_token_created <= now()
686 assert user_updated.old_email_token_expiry >= now()
687 assert user_updated.need_to_confirm_via_old_email
688 assert user_updated.new_email_token is not None
689 assert user_updated.new_email_token_created <= now()
690 assert user_updated.new_email_token_expiry >= now()
691 assert user_updated.need_to_confirm_via_new_email
693 token = user_updated.new_email_token
695 with auth_api_session() as (auth_api, metadata_interceptor):
696 res = auth_api.ConfirmChangeEmail(
697 auth_pb2.ConfirmChangeEmailReq(
698 change_email_token=token,
699 )
700 )
701 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_REQUIRES_CONFIRMATION_FROM_OLD_EMAIL
703 with session_scope() as session:
704 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one()
705 assert user_updated.email == user.email
706 assert user_updated.new_email == new_email
707 assert user_updated.old_email_token is not None
708 assert user_updated.old_email_token_created <= now()
709 assert user_updated.old_email_token_expiry >= now()
710 assert user_updated.need_to_confirm_via_old_email
711 assert user_updated.new_email_token is None
712 assert user_updated.new_email_token_created is None
713 assert user_updated.new_email_token_expiry is None
714 assert not user_updated.need_to_confirm_via_new_email
716 token = user_updated.old_email_token
718 with auth_api_session() as (auth_api, metadata_interceptor):
719 res = auth_api.ConfirmChangeEmail(
720 auth_pb2.ConfirmChangeEmailReq(
721 change_email_token=token,
722 )
723 )
724 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_SUCCESS
726 with session_scope() as session:
727 user = session.execute(select(User).where(User.id == user.id)).scalar_one()
728 assert user.email == new_email
729 assert user.new_email is None
730 assert user.old_email_token is None
731 assert user.old_email_token_created is None
732 assert user.old_email_token_expiry is None
733 assert not user.need_to_confirm_via_old_email
734 assert user.new_email_token is None
735 assert user.new_email_token_created is None
736 assert user.new_email_token_expiry is None
737 assert not user.need_to_confirm_via_new_email
740def test_ChangeEmail_sends_proper_emails_has_password(db, fast_passwords):
741 password = random_hex()
742 new_email = f"{random_hex()}@couchers.org.invalid"
743 user, token = generate_user(hashed_password=hash_password(password))
745 with account_session(token) as account:
746 account.ChangeEmail(
747 account_pb2.ChangeEmailReq(
748 password=wrappers_pb2.StringValue(value=password),
749 new_email=new_email,
750 )
751 )
753 with session_scope() as session:
754 jobs = (
755 session.execute(select(BackgroundJob).where(BackgroundJob.job_type == BackgroundJobType.send_email))
756 .scalars()
757 .all()
758 )
759 assert len(jobs) == 2
760 payload_for_notification_email = jobs[0].payload
761 payload_for_confirmation_email_new_address = jobs[1].payload
762 unique_string_notification_email_as_bytes = b"You requested that your email on Couchers.org be changed to"
763 unique_string_for_confirmation_email_new_email_address_as_bytes = (
764 b"You requested that your email be changed to this email address on Couchers.org"
765 )
766 assert unique_string_notification_email_as_bytes in payload_for_notification_email
767 assert (
768 unique_string_for_confirmation_email_new_email_address_as_bytes
769 in payload_for_confirmation_email_new_address
770 )
773def test_ChangeEmail_sends_proper_emails_no_password(db):
774 new_email = f"{random_hex()}@couchers.org.invalid"
775 user, token = generate_user(hashed_password=None)
777 with account_session(token) as account:
778 account.ChangeEmail(
779 account_pb2.ChangeEmailReq(
780 new_email=new_email,
781 )
782 )
784 with session_scope() as session:
785 jobs = (
786 session.execute(select(BackgroundJob).where(BackgroundJob.job_type == BackgroundJobType.send_email))
787 .scalars()
788 .all()
789 )
790 assert len(jobs) == 2
791 payload_for_confirmation_email_old_address = jobs[0].payload
792 payload_for_confirmation_email_new_address = jobs[1].payload
793 unique_string_for_confirmation_email_old_address_as_bytes = (
794 b"You requested that your email be changed on Couchers.org"
795 )
796 unique_string_for_confirmation_email_new_email_address_as_bytes = (
797 b"You requested that your email be changed to this email address on Couchers.org"
798 )
799 assert unique_string_for_confirmation_email_old_address_as_bytes in payload_for_confirmation_email_old_address
800 assert (
801 unique_string_for_confirmation_email_new_email_address_as_bytes
802 in payload_for_confirmation_email_new_address
803 )
806def test_contributor_form(db):
807 user, token = generate_user()
809 with account_session(token) as account:
810 res = account.GetContributorFormInfo(empty_pb2.Empty())
811 assert not res.filled_contributor_form
813 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm()))
815 res = account.GetContributorFormInfo(empty_pb2.Empty())
816 assert res.filled_contributor_form
819def test_DeleteAccount_start(db):
820 user, token = generate_user()
822 with account_session(token) as account:
823 with patch("couchers.email.queue_email") as mock:
824 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None))
825 mock.assert_called_once()
826 (_, _, _, subject, _, _), _ = mock.call_args
827 assert subject == "[TEST] Confirm your Couchers.org account deletion"
829 with session_scope() as session:
830 deletion_token = session.execute(
831 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id)
832 ).scalar_one()
834 assert deletion_token.is_valid
835 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted
838def test_DeleteAccount_message_storage(db):
839 user, token = generate_user()
841 with account_session(token) as account:
842 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored
843 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored
844 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason"))
845 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8"))
846 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored
847 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337"))
849 with session_scope() as session:
850 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3
853def test_full_delete_account_with_recovery(db):
854 user, token = generate_user()
855 user_id = user.id
857 with account_session(token) as account:
858 with pytest.raises(grpc.RpcError) as e:
859 account.DeleteAccount(account_pb2.DeleteAccountReq())
860 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION
861 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE
863 # Check the right email is sent
864 with patch("couchers.email.queue_email") as mock:
865 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
866 mock.assert_called_once()
867 (_, _, _, subject, _, _), _ = mock.call_args
868 assert subject == "[TEST] Confirm your Couchers.org account deletion"
870 with session_scope() as session:
871 token_o = session.execute(select(AccountDeletionToken)).scalar_one()
872 token = token_o.token
874 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
875 assert token_o.user == user
876 assert not user.is_deleted
877 assert not user.undelete_token
878 assert not user.undelete_until
880 with auth_api_session() as (auth_api, metadata_interceptor):
881 auth_api.ConfirmDeleteAccount(
882 auth_pb2.ConfirmDeleteAccountReq(
883 token=token,
884 )
885 )
887 with session_scope() as session:
888 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
890 with session_scope() as session:
891 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
892 assert user.is_deleted
893 assert user.undelete_token
894 assert user.undelete_until > now()
896 undelete_token = user.undelete_token
898 with auth_api_session() as (auth_api, metadata_interceptor):
899 auth_api.RecoverAccount(
900 auth_pb2.RecoverAccountReq(
901 token=undelete_token,
902 )
903 )
905 with session_scope() as session:
906 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()
908 user = session.execute(select(User).where(User.id == user_id)).scalar_one()
909 assert not user.is_deleted
910 assert not user.undelete_token
911 assert not user.undelete_until
914def test_multiple_delete_tokens(db):
915 """
916 Make sure deletion tokens are deleted on delete
917 """
918 user, token = generate_user()
919 user_id = user.id
921 with account_session(token) as account:
922 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
923 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
924 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True))
926 with session_scope() as session:
927 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3
928 token = session.execute(select(AccountDeletionToken)).scalars().first().token
930 with auth_api_session() as (auth_api, metadata_interceptor):
931 auth_api.ConfirmDeleteAccount(
932 auth_pb2.ConfirmDeleteAccountReq(
933 token=token,
934 )
935 )
937 with session_scope() as session:
938 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()