Coverage for src/tests/test_account.py: 100%

474 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-11 15:27 +0000

1from datetime import timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import func 

8 

9from couchers import errors 

10from couchers.crypto import hash_password, random_hex 

11from couchers.db import session_scope 

12from couchers.models import AccountDeletionReason, AccountDeletionToken, BackgroundJob, Upload, User 

13from couchers.sql import couchers_select as select 

14from couchers.utils import now 

15from proto import account_pb2, api_pb2, auth_pb2 

16from tests.test_fixtures import ( # noqa 

17 account_session, 

18 auth_api_session, 

19 db, 

20 email_fields, 

21 fast_passwords, 

22 generate_user, 

23 mock_notification_email, 

24 process_jobs, 

25 push_collector, 

26 real_account_session, 

27 testconfig, 

28) 

29 

30 

31@pytest.fixture(autouse=True) 

32def _(testconfig): 

33 pass 

34 

35 

36def test_GetAccountInfo(db, fast_passwords): 

37 # with password 

38 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

39 

40 with account_session(token1) as account: 

41 res = account.GetAccountInfo(empty_pb2.Empty()) 

42 assert res.email == "user@couchers.invalid" 

43 assert res.username == user1.username 

44 assert not res.has_strong_verification 

45 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

46 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

47 assert not res.is_superuser 

48 assert res.ui_language_preference == "" 

49 

50 

51def test_GetAccountInfo_regression(db): 

52 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

53 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

54 uploader_user, _ = generate_user() 

55 with session_scope() as session: 

56 key = random_hex(32) 

57 filename = random_hex(32) + ".jpg" 

58 session.add( 

59 Upload( 

60 key=key, 

61 filename=filename, 

62 creator_user_id=uploader_user.id, 

63 ) 

64 ) 

65 

66 user, token = generate_user(about_me=None, avatar_key=key) 

67 

68 with account_session(token) as account: 

69 res = account.GetAccountInfo(empty_pb2.Empty()) 

70 

71 

72def test_ChangePasswordV2_normal(db, fast_passwords, push_collector): 

73 # user has old password and is changing to new password 

74 old_password = random_hex() 

75 new_password = random_hex() 

76 user, token = generate_user(hashed_password=hash_password(old_password)) 

77 

78 with account_session(token) as account: 

79 with mock_notification_email() as mock: 

80 account.ChangePasswordV2( 

81 account_pb2.ChangePasswordV2Req( 

82 old_password=old_password, 

83 new_password=new_password, 

84 ) 

85 ) 

86 

87 mock.assert_called_once() 

88 assert email_fields(mock).subject == "[TEST] Your password was changed" 

89 

90 push_collector.assert_user_has_single_matching( 

91 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed." 

92 ) 

93 

94 with session_scope() as session: 

95 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

96 assert updated_user.hashed_password == hash_password(new_password) 

97 

98 

99def test_ChangePasswordV2_regression(db, fast_passwords): 

100 # send_password_changed_email wasn't working 

101 # user has old password and is changing to new password 

102 old_password = random_hex() 

103 new_password = random_hex() 

104 user, token = generate_user(hashed_password=hash_password(old_password)) 

105 

106 with account_session(token) as account: 

107 account.ChangePasswordV2( 

108 account_pb2.ChangePasswordV2Req( 

109 old_password=old_password, 

110 new_password=new_password, 

111 ) 

112 ) 

113 

114 with session_scope() as session: 

115 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

116 assert updated_user.hashed_password == hash_password(new_password) 

117 

118 

119def test_ChangePasswordV2_normal_short_password(db, fast_passwords): 

120 # user has old password and is changing to new password, but used short password 

121 old_password = random_hex() 

122 new_password = random_hex(length=1) 

123 user, token = generate_user(hashed_password=hash_password(old_password)) 

124 

125 with account_session(token) as account: 

126 with pytest.raises(grpc.RpcError) as e: 

127 account.ChangePasswordV2( 

128 account_pb2.ChangePasswordV2Req( 

129 old_password=old_password, 

130 new_password=new_password, 

131 ) 

132 ) 

133 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

134 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

135 

136 with session_scope() as session: 

137 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

138 assert updated_user.hashed_password == hash_password(old_password) 

139 

140 

141def test_ChangePasswordV2_normal_long_password(db, fast_passwords): 

142 # user has old password and is changing to new password, but used short password 

143 old_password = random_hex() 

144 new_password = random_hex(length=1000) 

145 user, token = generate_user(hashed_password=hash_password(old_password)) 

146 

147 with account_session(token) as account: 

148 with pytest.raises(grpc.RpcError) as e: 

149 account.ChangePasswordV2( 

150 account_pb2.ChangePasswordV2Req( 

151 old_password=old_password, 

152 new_password=new_password, 

153 ) 

154 ) 

155 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

156 assert e.value.details() == errors.PASSWORD_TOO_LONG 

157 

158 with session_scope() as session: 

159 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

160 assert updated_user.hashed_password == hash_password(old_password) 

161 

162 

163def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords): 

164 # user has old password and is changing to new password, but used insecure password 

165 old_password = random_hex() 

166 new_password = "12345678" 

167 user, token = generate_user(hashed_password=hash_password(old_password)) 

168 

169 with account_session(token) as account: 

170 with pytest.raises(grpc.RpcError) as e: 

171 account.ChangePasswordV2( 

172 account_pb2.ChangePasswordV2Req( 

173 old_password=old_password, 

174 new_password=new_password, 

175 ) 

176 ) 

177 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

178 assert e.value.details() == errors.INSECURE_PASSWORD 

179 

180 with session_scope() as session: 

181 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

182 assert updated_user.hashed_password == hash_password(old_password) 

183 

184 

185def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords): 

186 # user has old password and is changing to new password, but used wrong old password 

187 old_password = random_hex() 

188 new_password = random_hex() 

189 user, token = generate_user(hashed_password=hash_password(old_password)) 

190 

191 with account_session(token) as account: 

192 with pytest.raises(grpc.RpcError) as e: 

193 account.ChangePasswordV2( 

194 account_pb2.ChangePasswordV2Req( 

195 old_password="wrong password", 

196 new_password=new_password, 

197 ) 

198 ) 

199 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

200 assert e.value.details() == errors.INVALID_PASSWORD 

201 

202 with session_scope() as session: 

203 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

204 assert updated_user.hashed_password == hash_password(old_password) 

205 

206 

207def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords): 

208 # user has old password and called with empty body 

209 old_password = random_hex() 

210 user, token = generate_user(hashed_password=hash_password(old_password)) 

211 

212 with account_session(token) as account: 

213 with pytest.raises(grpc.RpcError) as e: 

214 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password)) 

215 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

216 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

217 

218 with session_scope() as session: 

219 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

220 assert updated_user.hashed_password == hash_password(old_password) 

221 

222 

223def test_ChangeEmailV2_wrong_password(db, fast_passwords): 

224 password = random_hex() 

225 new_email = f"{random_hex()}@couchers.org.invalid" 

226 user, token = generate_user(hashed_password=hash_password(password)) 

227 

228 with account_session(token) as account: 

229 with pytest.raises(grpc.RpcError) as e: 

230 account.ChangeEmailV2( 

231 account_pb2.ChangeEmailV2Req( 

232 password="wrong password", 

233 new_email=new_email, 

234 ) 

235 ) 

236 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

237 assert e.value.details() == errors.INVALID_PASSWORD 

238 

239 with session_scope() as session: 

240 assert ( 

241 session.execute( 

242 select(func.count()) 

243 .select_from(User) 

244 .where(User.new_email_token_created <= func.now()) 

245 .where(User.new_email_token_expiry >= func.now()) 

246 ) 

247 ).scalar_one() == 0 

248 

249 

250def test_ChangeEmailV2_wrong_email(db, fast_passwords): 

251 password = random_hex() 

252 new_email = f"{random_hex()}@couchers.org.invalid" 

253 user, token = generate_user(hashed_password=hash_password(password)) 

254 

255 with account_session(token) as account: 

256 with pytest.raises(grpc.RpcError) as e: 

257 account.ChangeEmailV2( 

258 account_pb2.ChangeEmailV2Req( 

259 password="wrong password", 

260 new_email=new_email, 

261 ) 

262 ) 

263 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

264 assert e.value.details() == errors.INVALID_PASSWORD 

265 

266 with session_scope() as session: 

267 assert ( 

268 session.execute( 

269 select(func.count()) 

270 .select_from(User) 

271 .where(User.new_email_token_created <= func.now()) 

272 .where(User.new_email_token_expiry >= func.now()) 

273 ) 

274 ).scalar_one() == 0 

275 

276 

277def test_ChangeEmailV2_invalid_email(db, fast_passwords): 

278 password = random_hex() 

279 user, token = generate_user(hashed_password=hash_password(password)) 

280 

281 with account_session(token) as account: 

282 with pytest.raises(grpc.RpcError) as e: 

283 account.ChangeEmailV2( 

284 account_pb2.ChangeEmailV2Req( 

285 password=password, 

286 new_email="not a real email", 

287 ) 

288 ) 

289 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

290 assert e.value.details() == errors.INVALID_EMAIL 

291 

292 with session_scope() as session: 

293 assert ( 

294 session.execute( 

295 select(func.count()) 

296 .select_from(User) 

297 .where(User.new_email_token_created <= func.now()) 

298 .where(User.new_email_token_expiry >= func.now()) 

299 ) 

300 ).scalar_one() == 0 

301 

302 

303def test_ChangeEmailV2_email_in_use(db, fast_passwords): 

304 password = random_hex() 

305 user, token = generate_user(hashed_password=hash_password(password)) 

306 user2, token2 = generate_user(hashed_password=hash_password(password)) 

307 

308 with account_session(token) as account: 

309 with pytest.raises(grpc.RpcError) as e: 

310 account.ChangeEmailV2( 

311 account_pb2.ChangeEmailV2Req( 

312 password=password, 

313 new_email=user2.email, 

314 ) 

315 ) 

316 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

317 assert e.value.details() == errors.INVALID_EMAIL 

318 

319 with session_scope() as session: 

320 assert ( 

321 session.execute( 

322 select(func.count()) 

323 .select_from(User) 

324 .where(User.new_email_token_created <= func.now()) 

325 .where(User.new_email_token_expiry >= func.now()) 

326 ) 

327 ).scalar_one() == 0 

328 

329 

330def test_ChangeEmailV2_no_change(db, fast_passwords): 

331 password = random_hex() 

332 user, token = generate_user(hashed_password=hash_password(password)) 

333 

334 with account_session(token) as account: 

335 with pytest.raises(grpc.RpcError) as e: 

336 account.ChangeEmailV2( 

337 account_pb2.ChangeEmailV2Req( 

338 password=password, 

339 new_email=user.email, 

340 ) 

341 ) 

342 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

343 assert e.value.details() == errors.INVALID_EMAIL 

344 

345 with session_scope() as session: 

346 assert ( 

347 session.execute( 

348 select(func.count()) 

349 .select_from(User) 

350 .where(User.new_email_token_created <= func.now()) 

351 .where(User.new_email_token_expiry >= func.now()) 

352 ) 

353 ).scalar_one() == 0 

354 

355 

356def test_ChangeEmailV2_wrong_token(db, fast_passwords): 

357 password = random_hex() 

358 new_email = f"{random_hex()}@couchers.org.invalid" 

359 user, token = generate_user(hashed_password=hash_password(password)) 

360 

361 with account_session(token) as account: 

362 account.ChangeEmailV2( 

363 account_pb2.ChangeEmailV2Req( 

364 password=password, 

365 new_email=new_email, 

366 ) 

367 ) 

368 

369 with auth_api_session() as (auth_api, metadata_interceptor): 

370 with pytest.raises(grpc.RpcError) as e: 

371 res = auth_api.ConfirmChangeEmailV2( 

372 auth_pb2.ConfirmChangeEmailV2Req( 

373 change_email_token="wrongtoken", 

374 ) 

375 ) 

376 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

377 assert e.value.details() == errors.INVALID_TOKEN 

378 

379 with session_scope() as session: 

380 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

381 assert user_updated.email == user.email 

382 

383 

384def test_ChangeEmailV2_tokens_two_hour_window(db): 

385 def two_hours_one_minute_in_future(): 

386 return now() + timedelta(hours=2, minutes=1) 

387 

388 def one_minute_ago(): 

389 return now() - timedelta(minutes=1) 

390 

391 password = random_hex() 

392 new_email = f"{random_hex()}@couchers.org.invalid" 

393 user, token = generate_user(hashed_password=hash_password(password)) 

394 

395 with account_session(token) as account: 

396 account.ChangeEmailV2( 

397 account_pb2.ChangeEmailV2Req( 

398 password=password, 

399 new_email=new_email, 

400 ) 

401 ) 

402 

403 with session_scope() as session: 

404 user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

405 new_email_token = user.new_email_token 

406 

407 with patch("couchers.servicers.auth.now", one_minute_ago): 

408 with auth_api_session() as (auth_api, metadata_interceptor): 

409 with pytest.raises(grpc.RpcError) as e: 

410 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

411 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

412 assert e.value.details() == errors.INVALID_TOKEN 

413 

414 with pytest.raises(grpc.RpcError) as e: 

415 auth_api.ConfirmChangeEmailV2( 

416 auth_pb2.ConfirmChangeEmailV2Req( 

417 change_email_token=new_email_token, 

418 ) 

419 ) 

420 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

421 assert e.value.details() == errors.INVALID_TOKEN 

422 

423 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

424 with auth_api_session() as (auth_api, metadata_interceptor): 

425 with pytest.raises(grpc.RpcError) as e: 

426 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

427 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

428 assert e.value.details() == errors.INVALID_TOKEN 

429 

430 with pytest.raises(grpc.RpcError) as e: 

431 auth_api.ConfirmChangeEmailV2( 

432 auth_pb2.ConfirmChangeEmailV2Req( 

433 change_email_token=new_email_token, 

434 ) 

435 ) 

436 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

437 assert e.value.details() == errors.INVALID_TOKEN 

438 

439 

440def test_ChangeEmailV2(db, fast_passwords, push_collector): 

441 password = random_hex() 

442 new_email = f"{random_hex()}@couchers.org.invalid" 

443 user, token = generate_user(hashed_password=hash_password(password)) 

444 user_id = user.id 

445 

446 with account_session(token) as account: 

447 account.ChangeEmailV2( 

448 account_pb2.ChangeEmailV2Req( 

449 password=password, 

450 new_email=new_email, 

451 ) 

452 ) 

453 

454 with session_scope() as session: 

455 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one() 

456 assert user_updated.email == user.email 

457 assert user_updated.new_email == new_email 

458 assert user_updated.new_email_token is not None 

459 assert user_updated.new_email_token_created <= now() 

460 assert user_updated.new_email_token_expiry >= now() 

461 

462 token = user_updated.new_email_token 

463 

464 process_jobs() 

465 push_collector.assert_user_push_matches_fields( 

466 user_id, 

467 ix=0, 

468 title="An email change was initiated on your account", 

469 body=f"An email change to the email {new_email} was initiated on your account.", 

470 ) 

471 

472 with auth_api_session() as (auth_api, metadata_interceptor): 

473 res = auth_api.ConfirmChangeEmailV2( 

474 auth_pb2.ConfirmChangeEmailV2Req( 

475 change_email_token=token, 

476 ) 

477 ) 

478 

479 with session_scope() as session: 

480 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

481 assert user.email == new_email 

482 assert user.new_email is None 

483 assert user.new_email_token is None 

484 assert user.new_email_token_created is None 

485 assert user.new_email_token_expiry is None 

486 

487 process_jobs() 

488 push_collector.assert_user_push_matches_fields( 

489 user_id, 

490 ix=1, 

491 title="Email change completed", 

492 body="Your new email address has been verified.", 

493 ) 

494 

495 

496def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector): 

497 password = random_hex() 

498 new_email = f"{random_hex()}@couchers.org.invalid" 

499 user, token = generate_user(hashed_password=hash_password(password)) 

500 

501 with account_session(token) as account: 

502 account.ChangeEmailV2( 

503 account_pb2.ChangeEmailV2Req( 

504 password=password, 

505 new_email=new_email, 

506 ) 

507 ) 

508 

509 process_jobs() 

510 

511 with session_scope() as session: 

512 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

513 assert len(jobs) == 2 

514 payload_for_notification_email = jobs[0].payload 

515 payload_for_confirmation_email_new_address = jobs[1].payload 

516 uq_str1 = b"An email change to the email" 

517 uq_str2 = ( 

518 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is" 

519 ) 

520 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or ( 

521 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload 

522 ) 

523 

524 push_collector.assert_user_has_single_matching( 

525 user.id, 

526 title="An email change was initiated on your account", 

527 body=f"An email change to the email {new_email} was initiated on your account.", 

528 ) 

529 

530 

531def test_ChangePreferredLanguage(db, fast_passwords): 

532 # user changes from default to ISO 639-1 language code 

533 newLanguageCode = "zh" 

534 user, token = generate_user() 

535 

536 with account_session(token) as account: 

537 res = account.GetAccountInfo(empty_pb2.Empty()) 

538 assert res.ui_language_preference == "" 

539 

540 account.ChangeLanguagePreference( 

541 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=newLanguageCode) 

542 ) 

543 res = account.GetAccountInfo(empty_pb2.Empty()) 

544 assert res.ui_language_preference == "zh" 

545 

546 

547def test_contributor_form(db): 

548 user, token = generate_user() 

549 

550 with account_session(token) as account: 

551 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

552 assert not res.filled_contributor_form 

553 

554 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

555 

556 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

557 assert res.filled_contributor_form 

558 

559 

560def test_DeleteAccount_start(db): 

561 user, token = generate_user() 

562 

563 with account_session(token) as account: 

564 with mock_notification_email() as mock: 

565 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

566 mock.assert_called_once() 

567 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

568 

569 with session_scope() as session: 

570 deletion_token = session.execute( 

571 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

572 ).scalar_one() 

573 

574 assert deletion_token.is_valid 

575 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted 

576 

577 

578def test_DeleteAccount_message_storage(db): 

579 user, token = generate_user() 

580 

581 with account_session(token) as account: 

582 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

583 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

584 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

585 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

586 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

587 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

588 

589 with session_scope() as session: 

590 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

591 

592 

593def test_full_delete_account_with_recovery(db, push_collector): 

594 user, token = generate_user() 

595 user_id = user.id 

596 

597 with account_session(token) as account: 

598 with pytest.raises(grpc.RpcError) as e: 

599 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

600 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

601 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE 

602 

603 # Check the right email is sent 

604 with mock_notification_email() as mock: 

605 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

606 

607 push_collector.assert_user_push_matches_fields( 

608 user_id, 

609 ix=0, 

610 title="Account deletion initiated", 

611 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.", 

612 ) 

613 

614 mock.assert_called_once() 

615 e = email_fields(mock) 

616 

617 with session_scope() as session: 

618 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

619 token = token_o.token 

620 

621 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

622 assert token_o.user == user_ 

623 assert not user_.is_deleted 

624 assert not user_.undelete_token 

625 assert not user_.undelete_until 

626 

627 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

628 assert e.recipient == user.email 

629 assert "account deletion" in e.subject.lower() 

630 assert token in e.plain 

631 assert token in e.html 

632 unique_string = "You requested that we delete your account from Couchers.org." 

633 assert unique_string in e.plain 

634 assert unique_string in e.html 

635 url = f"http://localhost:3000/delete-account?token={token}" 

636 assert url in e.plain 

637 assert url in e.html 

638 assert "support@couchers.org" in e.plain 

639 assert "support@couchers.org" in e.html 

640 

641 with mock_notification_email() as mock: 

642 with auth_api_session() as (auth_api, metadata_interceptor): 

643 auth_api.ConfirmDeleteAccount( 

644 auth_pb2.ConfirmDeleteAccountReq( 

645 token=token, 

646 ) 

647 ) 

648 

649 push_collector.assert_user_push_matches_fields( 

650 user_id, 

651 ix=1, 

652 title="Your Couchers.org account has been deleted", 

653 body="You can still undo this by following the link we emailed to you within 7 days.", 

654 ) 

655 

656 mock.assert_called_once() 

657 e = email_fields(mock) 

658 

659 with session_scope() as session: 

660 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

661 

662 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

663 assert user_.is_deleted 

664 assert user_.undelete_token 

665 assert user_.undelete_until > now() 

666 

667 undelete_token = user_.undelete_token 

668 

669 assert e.recipient == user.email 

670 assert "account has been deleted" in e.subject.lower() 

671 unique_string = "You have successfully deleted your account from Couchers.org." 

672 assert unique_string in e.plain 

673 assert unique_string in e.html 

674 assert "7 days" in e.plain 

675 assert "7 days" in e.html 

676 url = f"http://localhost:3000/recover-account?token={undelete_token}" 

677 assert url in e.plain 

678 assert url in e.html 

679 assert "support@couchers.org" in e.plain 

680 assert "support@couchers.org" in e.html 

681 

682 with mock_notification_email() as mock: 

683 with auth_api_session() as (auth_api, metadata_interceptor): 

684 auth_api.RecoverAccount( 

685 auth_pb2.RecoverAccountReq( 

686 token=undelete_token, 

687 ) 

688 ) 

689 

690 push_collector.assert_user_push_matches_fields( 

691 user_id, 

692 ix=2, 

693 title="Your Couchers.org account has been recovered!", 

694 body="We have recovered your Couchers.org account as per your request! Welcome back!", 

695 ) 

696 

697 mock.assert_called_once() 

698 e = email_fields(mock) 

699 

700 assert e.recipient == user.email 

701 assert "account has been recovered" in e.subject.lower() 

702 unique_string = "Your account on Couchers.org has been successfully recovered!" 

703 assert unique_string in e.plain 

704 assert unique_string in e.html 

705 assert "support@couchers.org" in e.plain 

706 assert "support@couchers.org" in e.html 

707 

708 with session_scope() as session: 

709 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

710 

711 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

712 assert not user.is_deleted 

713 assert not user.undelete_token 

714 assert not user.undelete_until 

715 

716 

717def test_multiple_delete_tokens(db): 

718 """ 

719 Make sure deletion tokens are deleted on delete 

720 """ 

721 user, token = generate_user() 

722 

723 with account_session(token) as account: 

724 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

725 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

726 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

727 

728 with session_scope() as session: 

729 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

730 token = session.execute(select(AccountDeletionToken)).scalars().first().token 

731 

732 with auth_api_session() as (auth_api, metadata_interceptor): 

733 auth_api.ConfirmDeleteAccount( 

734 auth_pb2.ConfirmDeleteAccountReq( 

735 token=token, 

736 ) 

737 ) 

738 

739 with session_scope() as session: 

740 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

741 

742 

743def test_ListActiveSessions_pagination(db, fast_passwords): 

744 password = random_hex() 

745 user, token = generate_user(hashed_password=hash_password(password)) 

746 

747 with auth_api_session() as (auth_api, metadata_interceptor): 

748 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

749 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

750 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

751 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

752 

753 with real_account_session(token) as account: 

754 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3)) 

755 assert len(res.active_sessions) == 3 

756 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3)) 

757 assert len(res.active_sessions) == 2 

758 assert not res.next_page_token 

759 

760 

761def test_ListActiveSessions_details(db, fast_passwords): 

762 password = random_hex() 

763 user, token = generate_user(hashed_password=hash_password(password)) 

764 

765 ips_user_agents = [ 

766 ( 

767 "108.123.33.162", 

768 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1", 

769 ), 

770 ( 

771 "8.245.212.28", 

772 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36", 

773 ), 

774 ( 

775 "95.254.140.156", 

776 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", 

777 ), 

778 ] 

779 

780 for ip, user_agent in ips_user_agents: 

781 options = (("grpc.primary_user_agent", user_agent),) 

782 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor): 

783 auth_api.Authenticate( 

784 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),) 

785 ) 

786 

787 def dummy_geoip(ip_address): 

788 return { 

789 "108.123.33.162": "Chicago, United States", 

790 "8.245.212.28": "Sydney, Australia", 

791 }.get(ip_address) 

792 

793 with real_account_session(token) as account: 

794 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip): 

795 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

796 print(res) 

797 assert len(res.active_sessions) == 4 

798 

799 # this one currently making the API call 

800 assert res.active_sessions[0].operating_system == "Other" 

801 assert res.active_sessions[0].browser == "Other" 

802 assert res.active_sessions[0].device == "Other" 

803 assert res.active_sessions[0].approximate_location == "Unknown" 

804 assert res.active_sessions[0].is_current_session 

805 

806 assert res.active_sessions[1].operating_system == "Ubuntu" 

807 assert res.active_sessions[1].browser == "Firefox" 

808 assert res.active_sessions[1].device == "Other" 

809 assert res.active_sessions[1].approximate_location == "Unknown" 

810 assert not res.active_sessions[1].is_current_session 

811 

812 assert res.active_sessions[2].operating_system == "Android" 

813 assert res.active_sessions[2].browser == "Samsung Internet" 

814 assert res.active_sessions[2].device == "K" 

815 assert res.active_sessions[2].approximate_location == "Sydney, Australia" 

816 assert not res.active_sessions[2].is_current_session 

817 

818 assert res.active_sessions[3].operating_system == "iOS" 

819 assert res.active_sessions[3].browser == "Mobile Safari" 

820 assert res.active_sessions[3].device == "iPhone" 

821 assert res.active_sessions[3].approximate_location == "Chicago, United States" 

822 assert not res.active_sessions[3].is_current_session 

823 

824 

825def test_LogOutSession(db, fast_passwords): 

826 password = random_hex() 

827 user, token = generate_user(hashed_password=hash_password(password)) 

828 

829 with auth_api_session() as (auth_api, metadata_interceptor): 

830 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

831 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

832 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

833 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

834 

835 with real_account_session(token) as account: 

836 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

837 assert len(res.active_sessions) == 5 

838 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created)) 

839 

840 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

841 assert len(res2.active_sessions) == 4 

842 

843 # ignore the first session as it changes 

844 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:] 

845 

846 

847def test_LogOutOtherSessions(db, fast_passwords): 

848 password = random_hex() 

849 user, token = generate_user(hashed_password=hash_password(password)) 

850 

851 with auth_api_session() as (auth_api, metadata_interceptor): 

852 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

853 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

854 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

855 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

856 

857 with real_account_session(token) as account: 

858 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

859 assert len(res.active_sessions) == 5 

860 with pytest.raises(grpc.RpcError) as e: 

861 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False)) 

862 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

863 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS 

864 

865 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True)) 

866 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

867 assert len(res.active_sessions) == 1