Coverage for src/tests/test_account.py: 100%

463 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-10-04 23:02 +0000

1from datetime import timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import func 

8 

9from couchers import errors 

10from couchers.crypto import hash_password, random_hex 

11from couchers.db import session_scope 

12from couchers.models import AccountDeletionReason, AccountDeletionToken, BackgroundJob, Upload, User 

13from couchers.sql import couchers_select as select 

14from couchers.utils import now 

15from proto import account_pb2, api_pb2, auth_pb2 

16from tests.test_fixtures import ( # noqa 

17 account_session, 

18 auth_api_session, 

19 db, 

20 email_fields, 

21 fast_passwords, 

22 generate_user, 

23 mock_notification_email, 

24 process_jobs, 

25 push_collector, 

26 real_account_session, 

27 testconfig, 

28) 

29 

30 

31@pytest.fixture(autouse=True) 

32def _(testconfig): 

33 pass 

34 

35 

36def test_GetAccountInfo(db, fast_passwords): 

37 # with password 

38 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

39 

40 with account_session(token1) as account: 

41 res = account.GetAccountInfo(empty_pb2.Empty()) 

42 assert res.email == "user@couchers.invalid" 

43 assert res.username == user1.username 

44 assert not res.has_strong_verification 

45 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

46 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

47 

48 

49def test_GetAccountInfo_regression(db): 

50 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

51 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

52 uploader_user, _ = generate_user() 

53 with session_scope() as session: 

54 key = random_hex(32) 

55 filename = random_hex(32) + ".jpg" 

56 session.add( 

57 Upload( 

58 key=key, 

59 filename=filename, 

60 creator_user_id=uploader_user.id, 

61 ) 

62 ) 

63 

64 user, token = generate_user(about_me=None, avatar_key=key) 

65 

66 with account_session(token) as account: 

67 res = account.GetAccountInfo(empty_pb2.Empty()) 

68 

69 

70def test_ChangePasswordV2_normal(db, fast_passwords, push_collector): 

71 # user has old password and is changing to new password 

72 old_password = random_hex() 

73 new_password = random_hex() 

74 user, token = generate_user(hashed_password=hash_password(old_password)) 

75 

76 with account_session(token) as account: 

77 with mock_notification_email() as mock: 

78 account.ChangePasswordV2( 

79 account_pb2.ChangePasswordV2Req( 

80 old_password=old_password, 

81 new_password=new_password, 

82 ) 

83 ) 

84 

85 mock.assert_called_once() 

86 assert email_fields(mock).subject == "[TEST] Your password was changed" 

87 

88 push_collector.assert_user_has_single_matching( 

89 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed." 

90 ) 

91 

92 with session_scope() as session: 

93 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

94 assert updated_user.hashed_password == hash_password(new_password) 

95 

96 

97def test_ChangePasswordV2_regression(db, fast_passwords): 

98 # send_password_changed_email wasn't working 

99 # user has old password and is changing to new password 

100 old_password = random_hex() 

101 new_password = random_hex() 

102 user, token = generate_user(hashed_password=hash_password(old_password)) 

103 

104 with account_session(token) as account: 

105 account.ChangePasswordV2( 

106 account_pb2.ChangePasswordV2Req( 

107 old_password=old_password, 

108 new_password=new_password, 

109 ) 

110 ) 

111 

112 with session_scope() as session: 

113 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

114 assert updated_user.hashed_password == hash_password(new_password) 

115 

116 

117def test_ChangePasswordV2_normal_short_password(db, fast_passwords): 

118 # user has old password and is changing to new password, but used short password 

119 old_password = random_hex() 

120 new_password = random_hex(length=1) 

121 user, token = generate_user(hashed_password=hash_password(old_password)) 

122 

123 with account_session(token) as account: 

124 with pytest.raises(grpc.RpcError) as e: 

125 account.ChangePasswordV2( 

126 account_pb2.ChangePasswordV2Req( 

127 old_password=old_password, 

128 new_password=new_password, 

129 ) 

130 ) 

131 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

132 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

133 

134 with session_scope() as session: 

135 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

136 assert updated_user.hashed_password == hash_password(old_password) 

137 

138 

139def test_ChangePasswordV2_normal_long_password(db, fast_passwords): 

140 # user has old password and is changing to new password, but used short password 

141 old_password = random_hex() 

142 new_password = random_hex(length=1000) 

143 user, token = generate_user(hashed_password=hash_password(old_password)) 

144 

145 with account_session(token) as account: 

146 with pytest.raises(grpc.RpcError) as e: 

147 account.ChangePasswordV2( 

148 account_pb2.ChangePasswordV2Req( 

149 old_password=old_password, 

150 new_password=new_password, 

151 ) 

152 ) 

153 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

154 assert e.value.details() == errors.PASSWORD_TOO_LONG 

155 

156 with session_scope() as session: 

157 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

158 assert updated_user.hashed_password == hash_password(old_password) 

159 

160 

161def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords): 

162 # user has old password and is changing to new password, but used insecure password 

163 old_password = random_hex() 

164 new_password = "12345678" 

165 user, token = generate_user(hashed_password=hash_password(old_password)) 

166 

167 with account_session(token) as account: 

168 with pytest.raises(grpc.RpcError) as e: 

169 account.ChangePasswordV2( 

170 account_pb2.ChangePasswordV2Req( 

171 old_password=old_password, 

172 new_password=new_password, 

173 ) 

174 ) 

175 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

176 assert e.value.details() == errors.INSECURE_PASSWORD 

177 

178 with session_scope() as session: 

179 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

180 assert updated_user.hashed_password == hash_password(old_password) 

181 

182 

183def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords): 

184 # user has old password and is changing to new password, but used wrong old password 

185 old_password = random_hex() 

186 new_password = random_hex() 

187 user, token = generate_user(hashed_password=hash_password(old_password)) 

188 

189 with account_session(token) as account: 

190 with pytest.raises(grpc.RpcError) as e: 

191 account.ChangePasswordV2( 

192 account_pb2.ChangePasswordV2Req( 

193 old_password="wrong password", 

194 new_password=new_password, 

195 ) 

196 ) 

197 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

198 assert e.value.details() == errors.INVALID_PASSWORD 

199 

200 with session_scope() as session: 

201 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

202 assert updated_user.hashed_password == hash_password(old_password) 

203 

204 

205def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords): 

206 # user has old password and called with empty body 

207 old_password = random_hex() 

208 user, token = generate_user(hashed_password=hash_password(old_password)) 

209 

210 with account_session(token) as account: 

211 with pytest.raises(grpc.RpcError) as e: 

212 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password)) 

213 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

214 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

215 

216 with session_scope() as session: 

217 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

218 assert updated_user.hashed_password == hash_password(old_password) 

219 

220 

221def test_ChangeEmailV2_wrong_password(db, fast_passwords): 

222 password = random_hex() 

223 new_email = f"{random_hex()}@couchers.org.invalid" 

224 user, token = generate_user(hashed_password=hash_password(password)) 

225 

226 with account_session(token) as account: 

227 with pytest.raises(grpc.RpcError) as e: 

228 account.ChangeEmailV2( 

229 account_pb2.ChangeEmailV2Req( 

230 password="wrong password", 

231 new_email=new_email, 

232 ) 

233 ) 

234 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

235 assert e.value.details() == errors.INVALID_PASSWORD 

236 

237 with session_scope() as session: 

238 assert ( 

239 session.execute( 

240 select(func.count()) 

241 .select_from(User) 

242 .where(User.new_email_token_created <= func.now()) 

243 .where(User.new_email_token_expiry >= func.now()) 

244 ) 

245 ).scalar_one() == 0 

246 

247 

248def test_ChangeEmailV2_wrong_email(db, fast_passwords): 

249 password = random_hex() 

250 new_email = f"{random_hex()}@couchers.org.invalid" 

251 user, token = generate_user(hashed_password=hash_password(password)) 

252 

253 with account_session(token) as account: 

254 with pytest.raises(grpc.RpcError) as e: 

255 account.ChangeEmailV2( 

256 account_pb2.ChangeEmailV2Req( 

257 password="wrong password", 

258 new_email=new_email, 

259 ) 

260 ) 

261 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

262 assert e.value.details() == errors.INVALID_PASSWORD 

263 

264 with session_scope() as session: 

265 assert ( 

266 session.execute( 

267 select(func.count()) 

268 .select_from(User) 

269 .where(User.new_email_token_created <= func.now()) 

270 .where(User.new_email_token_expiry >= func.now()) 

271 ) 

272 ).scalar_one() == 0 

273 

274 

275def test_ChangeEmailV2_invalid_email(db, fast_passwords): 

276 password = random_hex() 

277 user, token = generate_user(hashed_password=hash_password(password)) 

278 

279 with account_session(token) as account: 

280 with pytest.raises(grpc.RpcError) as e: 

281 account.ChangeEmailV2( 

282 account_pb2.ChangeEmailV2Req( 

283 password=password, 

284 new_email="not a real email", 

285 ) 

286 ) 

287 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

288 assert e.value.details() == errors.INVALID_EMAIL 

289 

290 with session_scope() as session: 

291 assert ( 

292 session.execute( 

293 select(func.count()) 

294 .select_from(User) 

295 .where(User.new_email_token_created <= func.now()) 

296 .where(User.new_email_token_expiry >= func.now()) 

297 ) 

298 ).scalar_one() == 0 

299 

300 

301def test_ChangeEmailV2_email_in_use(db, fast_passwords): 

302 password = random_hex() 

303 user, token = generate_user(hashed_password=hash_password(password)) 

304 user2, token2 = generate_user(hashed_password=hash_password(password)) 

305 

306 with account_session(token) as account: 

307 with pytest.raises(grpc.RpcError) as e: 

308 account.ChangeEmailV2( 

309 account_pb2.ChangeEmailV2Req( 

310 password=password, 

311 new_email=user2.email, 

312 ) 

313 ) 

314 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

315 assert e.value.details() == errors.INVALID_EMAIL 

316 

317 with session_scope() as session: 

318 assert ( 

319 session.execute( 

320 select(func.count()) 

321 .select_from(User) 

322 .where(User.new_email_token_created <= func.now()) 

323 .where(User.new_email_token_expiry >= func.now()) 

324 ) 

325 ).scalar_one() == 0 

326 

327 

328def test_ChangeEmailV2_no_change(db, fast_passwords): 

329 password = random_hex() 

330 user, token = generate_user(hashed_password=hash_password(password)) 

331 

332 with account_session(token) as account: 

333 with pytest.raises(grpc.RpcError) as e: 

334 account.ChangeEmailV2( 

335 account_pb2.ChangeEmailV2Req( 

336 password=password, 

337 new_email=user.email, 

338 ) 

339 ) 

340 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

341 assert e.value.details() == errors.INVALID_EMAIL 

342 

343 with session_scope() as session: 

344 assert ( 

345 session.execute( 

346 select(func.count()) 

347 .select_from(User) 

348 .where(User.new_email_token_created <= func.now()) 

349 .where(User.new_email_token_expiry >= func.now()) 

350 ) 

351 ).scalar_one() == 0 

352 

353 

354def test_ChangeEmailV2_wrong_token(db, fast_passwords): 

355 password = random_hex() 

356 new_email = f"{random_hex()}@couchers.org.invalid" 

357 user, token = generate_user(hashed_password=hash_password(password)) 

358 

359 with account_session(token) as account: 

360 account.ChangeEmailV2( 

361 account_pb2.ChangeEmailV2Req( 

362 password=password, 

363 new_email=new_email, 

364 ) 

365 ) 

366 

367 with auth_api_session() as (auth_api, metadata_interceptor): 

368 with pytest.raises(grpc.RpcError) as e: 

369 res = auth_api.ConfirmChangeEmailV2( 

370 auth_pb2.ConfirmChangeEmailV2Req( 

371 change_email_token="wrongtoken", 

372 ) 

373 ) 

374 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

375 assert e.value.details() == errors.INVALID_TOKEN 

376 

377 with session_scope() as session: 

378 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

379 assert user_updated.email == user.email 

380 

381 

382def test_ChangeEmailV2_tokens_two_hour_window(db): 

383 def two_hours_one_minute_in_future(): 

384 return now() + timedelta(hours=2, minutes=1) 

385 

386 def one_minute_ago(): 

387 return now() - timedelta(minutes=1) 

388 

389 password = random_hex() 

390 new_email = f"{random_hex()}@couchers.org.invalid" 

391 user, token = generate_user(hashed_password=hash_password(password)) 

392 

393 with account_session(token) as account: 

394 account.ChangeEmailV2( 

395 account_pb2.ChangeEmailV2Req( 

396 password=password, 

397 new_email=new_email, 

398 ) 

399 ) 

400 

401 with session_scope() as session: 

402 user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

403 new_email_token = user.new_email_token 

404 

405 with patch("couchers.servicers.auth.now", one_minute_ago): 

406 with auth_api_session() as (auth_api, metadata_interceptor): 

407 with pytest.raises(grpc.RpcError) as e: 

408 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

409 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

410 assert e.value.details() == errors.INVALID_TOKEN 

411 

412 with pytest.raises(grpc.RpcError) as e: 

413 auth_api.ConfirmChangeEmailV2( 

414 auth_pb2.ConfirmChangeEmailV2Req( 

415 change_email_token=new_email_token, 

416 ) 

417 ) 

418 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

419 assert e.value.details() == errors.INVALID_TOKEN 

420 

421 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

422 with auth_api_session() as (auth_api, metadata_interceptor): 

423 with pytest.raises(grpc.RpcError) as e: 

424 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

425 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

426 assert e.value.details() == errors.INVALID_TOKEN 

427 

428 with pytest.raises(grpc.RpcError) as e: 

429 auth_api.ConfirmChangeEmailV2( 

430 auth_pb2.ConfirmChangeEmailV2Req( 

431 change_email_token=new_email_token, 

432 ) 

433 ) 

434 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

435 assert e.value.details() == errors.INVALID_TOKEN 

436 

437 

438def test_ChangeEmailV2(db, fast_passwords, push_collector): 

439 password = random_hex() 

440 new_email = f"{random_hex()}@couchers.org.invalid" 

441 user, token = generate_user(hashed_password=hash_password(password)) 

442 user_id = user.id 

443 

444 with account_session(token) as account: 

445 account.ChangeEmailV2( 

446 account_pb2.ChangeEmailV2Req( 

447 password=password, 

448 new_email=new_email, 

449 ) 

450 ) 

451 

452 with session_scope() as session: 

453 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one() 

454 assert user_updated.email == user.email 

455 assert user_updated.new_email == new_email 

456 assert user_updated.new_email_token is not None 

457 assert user_updated.new_email_token_created <= now() 

458 assert user_updated.new_email_token_expiry >= now() 

459 

460 token = user_updated.new_email_token 

461 

462 process_jobs() 

463 push_collector.assert_user_push_matches_fields( 

464 user_id, 

465 ix=0, 

466 title="An email change was initiated on your account", 

467 body=f"An email change to the email {new_email} was initiated on your account.", 

468 ) 

469 

470 with auth_api_session() as (auth_api, metadata_interceptor): 

471 res = auth_api.ConfirmChangeEmailV2( 

472 auth_pb2.ConfirmChangeEmailV2Req( 

473 change_email_token=token, 

474 ) 

475 ) 

476 

477 with session_scope() as session: 

478 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

479 assert user.email == new_email 

480 assert user.new_email is None 

481 assert user.new_email_token is None 

482 assert user.new_email_token_created is None 

483 assert user.new_email_token_expiry is None 

484 

485 process_jobs() 

486 push_collector.assert_user_push_matches_fields( 

487 user_id, 

488 ix=1, 

489 title="Email change completed", 

490 body="Your new email address has been verified.", 

491 ) 

492 

493 

494def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector): 

495 password = random_hex() 

496 new_email = f"{random_hex()}@couchers.org.invalid" 

497 user, token = generate_user(hashed_password=hash_password(password)) 

498 

499 with account_session(token) as account: 

500 account.ChangeEmailV2( 

501 account_pb2.ChangeEmailV2Req( 

502 password=password, 

503 new_email=new_email, 

504 ) 

505 ) 

506 

507 process_jobs() 

508 

509 with session_scope() as session: 

510 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

511 assert len(jobs) == 2 

512 payload_for_notification_email = jobs[0].payload 

513 payload_for_confirmation_email_new_address = jobs[1].payload 

514 uq_str1 = b"An email change to the email" 

515 uq_str2 = ( 

516 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is" 

517 ) 

518 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or ( 

519 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload 

520 ) 

521 

522 push_collector.assert_user_has_single_matching( 

523 user.id, 

524 title="An email change was initiated on your account", 

525 body=f"An email change to the email {new_email} was initiated on your account.", 

526 ) 

527 

528 

529def test_contributor_form(db): 

530 user, token = generate_user() 

531 

532 with account_session(token) as account: 

533 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

534 assert not res.filled_contributor_form 

535 

536 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

537 

538 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

539 assert res.filled_contributor_form 

540 

541 

542def test_DeleteAccount_start(db): 

543 user, token = generate_user() 

544 

545 with account_session(token) as account: 

546 with mock_notification_email() as mock: 

547 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

548 mock.assert_called_once() 

549 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

550 

551 with session_scope() as session: 

552 deletion_token = session.execute( 

553 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

554 ).scalar_one() 

555 

556 assert deletion_token.is_valid 

557 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted 

558 

559 

560def test_DeleteAccount_message_storage(db): 

561 user, token = generate_user() 

562 

563 with account_session(token) as account: 

564 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

565 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

566 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

567 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

568 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

569 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

570 

571 with session_scope() as session: 

572 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

573 

574 

575def test_full_delete_account_with_recovery(db, push_collector): 

576 user, token = generate_user() 

577 user_id = user.id 

578 

579 with account_session(token) as account: 

580 with pytest.raises(grpc.RpcError) as e: 

581 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

582 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

583 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE 

584 

585 # Check the right email is sent 

586 with mock_notification_email() as mock: 

587 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

588 

589 push_collector.assert_user_push_matches_fields( 

590 user_id, 

591 ix=0, 

592 title="Account deletion initiated", 

593 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.", 

594 ) 

595 

596 mock.assert_called_once() 

597 e = email_fields(mock) 

598 

599 with session_scope() as session: 

600 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

601 token = token_o.token 

602 

603 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

604 assert token_o.user == user_ 

605 assert not user_.is_deleted 

606 assert not user_.undelete_token 

607 assert not user_.undelete_until 

608 

609 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

610 assert e.recipient == user.email 

611 assert "account deletion" in e.subject.lower() 

612 assert token in e.plain 

613 assert token in e.html 

614 unique_string = "You requested that we delete your account from Couchers.org." 

615 assert unique_string in e.plain 

616 assert unique_string in e.html 

617 url = f"http://localhost:3000/delete-account?token={token}" 

618 assert url in e.plain 

619 assert url in e.html 

620 assert "support@couchers.org" in e.plain 

621 assert "support@couchers.org" in e.html 

622 

623 with mock_notification_email() as mock: 

624 with auth_api_session() as (auth_api, metadata_interceptor): 

625 auth_api.ConfirmDeleteAccount( 

626 auth_pb2.ConfirmDeleteAccountReq( 

627 token=token, 

628 ) 

629 ) 

630 

631 push_collector.assert_user_push_matches_fields( 

632 user_id, 

633 ix=1, 

634 title="Your Couchers.org account has been deleted", 

635 body="You can still undo this by following the link we emailed to you within 7 days.", 

636 ) 

637 

638 mock.assert_called_once() 

639 e = email_fields(mock) 

640 

641 with session_scope() as session: 

642 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

643 

644 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

645 assert user_.is_deleted 

646 assert user_.undelete_token 

647 assert user_.undelete_until > now() 

648 

649 undelete_token = user_.undelete_token 

650 

651 assert e.recipient == user.email 

652 assert "account has been deleted" in e.subject.lower() 

653 unique_string = "You have successfully deleted your account from Couchers.org." 

654 assert unique_string in e.plain 

655 assert unique_string in e.html 

656 assert "7 days" in e.plain 

657 assert "7 days" in e.html 

658 url = f"http://localhost:3000/recover-account?token={undelete_token}" 

659 assert url in e.plain 

660 assert url in e.html 

661 assert "support@couchers.org" in e.plain 

662 assert "support@couchers.org" in e.html 

663 

664 with mock_notification_email() as mock: 

665 with auth_api_session() as (auth_api, metadata_interceptor): 

666 auth_api.RecoverAccount( 

667 auth_pb2.RecoverAccountReq( 

668 token=undelete_token, 

669 ) 

670 ) 

671 

672 push_collector.assert_user_push_matches_fields( 

673 user_id, 

674 ix=2, 

675 title="Your Couchers.org account has been recovered!", 

676 body="We have recovered your Couchers.org account as per your request! Welcome back!", 

677 ) 

678 

679 mock.assert_called_once() 

680 e = email_fields(mock) 

681 

682 assert e.recipient == user.email 

683 assert "account has been recovered" in e.subject.lower() 

684 unique_string = "Your account on Couchers.org has been successfully recovered!" 

685 assert unique_string in e.plain 

686 assert unique_string in e.html 

687 assert "support@couchers.org" in e.plain 

688 assert "support@couchers.org" in e.html 

689 

690 with session_scope() as session: 

691 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

692 

693 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

694 assert not user.is_deleted 

695 assert not user.undelete_token 

696 assert not user.undelete_until 

697 

698 

699def test_multiple_delete_tokens(db): 

700 """ 

701 Make sure deletion tokens are deleted on delete 

702 """ 

703 user, token = generate_user() 

704 

705 with account_session(token) as account: 

706 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

707 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

708 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

709 

710 with session_scope() as session: 

711 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

712 token = session.execute(select(AccountDeletionToken)).scalars().first().token 

713 

714 with auth_api_session() as (auth_api, metadata_interceptor): 

715 auth_api.ConfirmDeleteAccount( 

716 auth_pb2.ConfirmDeleteAccountReq( 

717 token=token, 

718 ) 

719 ) 

720 

721 with session_scope() as session: 

722 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

723 

724 

725def test_ListActiveSessions_pagination(db, fast_passwords): 

726 password = random_hex() 

727 user, token = generate_user(hashed_password=hash_password(password)) 

728 

729 with auth_api_session() as (auth_api, metadata_interceptor): 

730 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

731 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

732 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

733 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

734 

735 with real_account_session(token) as account: 

736 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3)) 

737 assert len(res.active_sessions) == 3 

738 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3)) 

739 assert len(res.active_sessions) == 2 

740 assert not res.next_page_token 

741 

742 

743def test_ListActiveSessions_details(db, fast_passwords): 

744 password = random_hex() 

745 user, token = generate_user(hashed_password=hash_password(password)) 

746 

747 ips_user_agents = [ 

748 ( 

749 "108.123.33.162", 

750 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1", 

751 ), 

752 ( 

753 "8.245.212.28", 

754 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36", 

755 ), 

756 ( 

757 "95.254.140.156", 

758 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", 

759 ), 

760 ] 

761 

762 for ip, user_agent in ips_user_agents: 

763 options = (("grpc.primary_user_agent", user_agent),) 

764 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor): 

765 auth_api.Authenticate( 

766 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),) 

767 ) 

768 

769 def dummy_geoip(ip_address): 

770 return { 

771 "108.123.33.162": "Chicago, United States", 

772 "8.245.212.28": "Sydney, Australia", 

773 }.get(ip_address) 

774 

775 with real_account_session(token) as account: 

776 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip): 

777 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

778 print(res) 

779 assert len(res.active_sessions) == 4 

780 

781 # this one currently making the API call 

782 assert res.active_sessions[0].operating_system == "Other" 

783 assert res.active_sessions[0].browser == "Other" 

784 assert res.active_sessions[0].device == "Other" 

785 assert res.active_sessions[0].approximate_location == "Unknown" 

786 assert res.active_sessions[0].is_current_session 

787 

788 assert res.active_sessions[1].operating_system == "Ubuntu" 

789 assert res.active_sessions[1].browser == "Firefox" 

790 assert res.active_sessions[1].device == "Other" 

791 assert res.active_sessions[1].approximate_location == "Unknown" 

792 assert not res.active_sessions[1].is_current_session 

793 

794 assert res.active_sessions[2].operating_system == "Android" 

795 assert res.active_sessions[2].browser == "Samsung Internet" 

796 assert res.active_sessions[2].device == "K" 

797 assert res.active_sessions[2].approximate_location == "Sydney, Australia" 

798 assert not res.active_sessions[2].is_current_session 

799 

800 assert res.active_sessions[3].operating_system == "iOS" 

801 assert res.active_sessions[3].browser == "Mobile Safari" 

802 assert res.active_sessions[3].device == "iPhone" 

803 assert res.active_sessions[3].approximate_location == "Chicago, United States" 

804 assert not res.active_sessions[3].is_current_session 

805 

806 

807def test_LogOutSession(db, fast_passwords): 

808 password = random_hex() 

809 user, token = generate_user(hashed_password=hash_password(password)) 

810 

811 with auth_api_session() as (auth_api, metadata_interceptor): 

812 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

813 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

814 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

815 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

816 

817 with real_account_session(token) as account: 

818 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

819 assert len(res.active_sessions) == 5 

820 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created)) 

821 

822 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

823 assert len(res2.active_sessions) == 4 

824 

825 # ignore the first session as it changes 

826 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:] 

827 

828 

829def test_LogOutOtherSessions(db, fast_passwords): 

830 password = random_hex() 

831 user, token = generate_user(hashed_password=hash_password(password)) 

832 

833 with auth_api_session() as (auth_api, metadata_interceptor): 

834 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

835 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

836 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

837 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

838 

839 with real_account_session(token) as account: 

840 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

841 assert len(res.active_sessions) == 5 

842 with pytest.raises(grpc.RpcError) as e: 

843 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False)) 

844 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

845 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS 

846 

847 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True)) 

848 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

849 assert len(res.active_sessions) == 1