Coverage for src/tests/test_account.py: 100%

464 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-12-20 18:03 +0000

1from datetime import timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import func 

8 

9from couchers import errors 

10from couchers.crypto import hash_password, random_hex 

11from couchers.db import session_scope 

12from couchers.models import AccountDeletionReason, AccountDeletionToken, BackgroundJob, Upload, User 

13from couchers.sql import couchers_select as select 

14from couchers.utils import now 

15from proto import account_pb2, api_pb2, auth_pb2 

16from tests.test_fixtures import ( # noqa 

17 account_session, 

18 auth_api_session, 

19 db, 

20 email_fields, 

21 fast_passwords, 

22 generate_user, 

23 mock_notification_email, 

24 process_jobs, 

25 push_collector, 

26 real_account_session, 

27 testconfig, 

28) 

29 

30 

31@pytest.fixture(autouse=True) 

32def _(testconfig): 

33 pass 

34 

35 

36def test_GetAccountInfo(db, fast_passwords): 

37 # with password 

38 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

39 

40 with account_session(token1) as account: 

41 res = account.GetAccountInfo(empty_pb2.Empty()) 

42 assert res.email == "user@couchers.invalid" 

43 assert res.username == user1.username 

44 assert not res.has_strong_verification 

45 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

46 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

47 assert not res.is_superuser 

48 

49 

50def test_GetAccountInfo_regression(db): 

51 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

52 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

53 uploader_user, _ = generate_user() 

54 with session_scope() as session: 

55 key = random_hex(32) 

56 filename = random_hex(32) + ".jpg" 

57 session.add( 

58 Upload( 

59 key=key, 

60 filename=filename, 

61 creator_user_id=uploader_user.id, 

62 ) 

63 ) 

64 

65 user, token = generate_user(about_me=None, avatar_key=key) 

66 

67 with account_session(token) as account: 

68 res = account.GetAccountInfo(empty_pb2.Empty()) 

69 

70 

71def test_ChangePasswordV2_normal(db, fast_passwords, push_collector): 

72 # user has old password and is changing to new password 

73 old_password = random_hex() 

74 new_password = random_hex() 

75 user, token = generate_user(hashed_password=hash_password(old_password)) 

76 

77 with account_session(token) as account: 

78 with mock_notification_email() as mock: 

79 account.ChangePasswordV2( 

80 account_pb2.ChangePasswordV2Req( 

81 old_password=old_password, 

82 new_password=new_password, 

83 ) 

84 ) 

85 

86 mock.assert_called_once() 

87 assert email_fields(mock).subject == "[TEST] Your password was changed" 

88 

89 push_collector.assert_user_has_single_matching( 

90 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed." 

91 ) 

92 

93 with session_scope() as session: 

94 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

95 assert updated_user.hashed_password == hash_password(new_password) 

96 

97 

98def test_ChangePasswordV2_regression(db, fast_passwords): 

99 # send_password_changed_email wasn't working 

100 # user has old password and is changing to new password 

101 old_password = random_hex() 

102 new_password = random_hex() 

103 user, token = generate_user(hashed_password=hash_password(old_password)) 

104 

105 with account_session(token) as account: 

106 account.ChangePasswordV2( 

107 account_pb2.ChangePasswordV2Req( 

108 old_password=old_password, 

109 new_password=new_password, 

110 ) 

111 ) 

112 

113 with session_scope() as session: 

114 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

115 assert updated_user.hashed_password == hash_password(new_password) 

116 

117 

118def test_ChangePasswordV2_normal_short_password(db, fast_passwords): 

119 # user has old password and is changing to new password, but used short password 

120 old_password = random_hex() 

121 new_password = random_hex(length=1) 

122 user, token = generate_user(hashed_password=hash_password(old_password)) 

123 

124 with account_session(token) as account: 

125 with pytest.raises(grpc.RpcError) as e: 

126 account.ChangePasswordV2( 

127 account_pb2.ChangePasswordV2Req( 

128 old_password=old_password, 

129 new_password=new_password, 

130 ) 

131 ) 

132 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

133 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

134 

135 with session_scope() as session: 

136 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

137 assert updated_user.hashed_password == hash_password(old_password) 

138 

139 

140def test_ChangePasswordV2_normal_long_password(db, fast_passwords): 

141 # user has old password and is changing to new password, but used short password 

142 old_password = random_hex() 

143 new_password = random_hex(length=1000) 

144 user, token = generate_user(hashed_password=hash_password(old_password)) 

145 

146 with account_session(token) as account: 

147 with pytest.raises(grpc.RpcError) as e: 

148 account.ChangePasswordV2( 

149 account_pb2.ChangePasswordV2Req( 

150 old_password=old_password, 

151 new_password=new_password, 

152 ) 

153 ) 

154 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

155 assert e.value.details() == errors.PASSWORD_TOO_LONG 

156 

157 with session_scope() as session: 

158 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

159 assert updated_user.hashed_password == hash_password(old_password) 

160 

161 

162def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords): 

163 # user has old password and is changing to new password, but used insecure password 

164 old_password = random_hex() 

165 new_password = "12345678" 

166 user, token = generate_user(hashed_password=hash_password(old_password)) 

167 

168 with account_session(token) as account: 

169 with pytest.raises(grpc.RpcError) as e: 

170 account.ChangePasswordV2( 

171 account_pb2.ChangePasswordV2Req( 

172 old_password=old_password, 

173 new_password=new_password, 

174 ) 

175 ) 

176 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

177 assert e.value.details() == errors.INSECURE_PASSWORD 

178 

179 with session_scope() as session: 

180 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

181 assert updated_user.hashed_password == hash_password(old_password) 

182 

183 

184def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords): 

185 # user has old password and is changing to new password, but used wrong old password 

186 old_password = random_hex() 

187 new_password = random_hex() 

188 user, token = generate_user(hashed_password=hash_password(old_password)) 

189 

190 with account_session(token) as account: 

191 with pytest.raises(grpc.RpcError) as e: 

192 account.ChangePasswordV2( 

193 account_pb2.ChangePasswordV2Req( 

194 old_password="wrong password", 

195 new_password=new_password, 

196 ) 

197 ) 

198 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

199 assert e.value.details() == errors.INVALID_PASSWORD 

200 

201 with session_scope() as session: 

202 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

203 assert updated_user.hashed_password == hash_password(old_password) 

204 

205 

206def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords): 

207 # user has old password and called with empty body 

208 old_password = random_hex() 

209 user, token = generate_user(hashed_password=hash_password(old_password)) 

210 

211 with account_session(token) as account: 

212 with pytest.raises(grpc.RpcError) as e: 

213 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password)) 

214 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

215 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

216 

217 with session_scope() as session: 

218 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

219 assert updated_user.hashed_password == hash_password(old_password) 

220 

221 

222def test_ChangeEmailV2_wrong_password(db, fast_passwords): 

223 password = random_hex() 

224 new_email = f"{random_hex()}@couchers.org.invalid" 

225 user, token = generate_user(hashed_password=hash_password(password)) 

226 

227 with account_session(token) as account: 

228 with pytest.raises(grpc.RpcError) as e: 

229 account.ChangeEmailV2( 

230 account_pb2.ChangeEmailV2Req( 

231 password="wrong password", 

232 new_email=new_email, 

233 ) 

234 ) 

235 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

236 assert e.value.details() == errors.INVALID_PASSWORD 

237 

238 with session_scope() as session: 

239 assert ( 

240 session.execute( 

241 select(func.count()) 

242 .select_from(User) 

243 .where(User.new_email_token_created <= func.now()) 

244 .where(User.new_email_token_expiry >= func.now()) 

245 ) 

246 ).scalar_one() == 0 

247 

248 

249def test_ChangeEmailV2_wrong_email(db, fast_passwords): 

250 password = random_hex() 

251 new_email = f"{random_hex()}@couchers.org.invalid" 

252 user, token = generate_user(hashed_password=hash_password(password)) 

253 

254 with account_session(token) as account: 

255 with pytest.raises(grpc.RpcError) as e: 

256 account.ChangeEmailV2( 

257 account_pb2.ChangeEmailV2Req( 

258 password="wrong password", 

259 new_email=new_email, 

260 ) 

261 ) 

262 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

263 assert e.value.details() == errors.INVALID_PASSWORD 

264 

265 with session_scope() as session: 

266 assert ( 

267 session.execute( 

268 select(func.count()) 

269 .select_from(User) 

270 .where(User.new_email_token_created <= func.now()) 

271 .where(User.new_email_token_expiry >= func.now()) 

272 ) 

273 ).scalar_one() == 0 

274 

275 

276def test_ChangeEmailV2_invalid_email(db, fast_passwords): 

277 password = random_hex() 

278 user, token = generate_user(hashed_password=hash_password(password)) 

279 

280 with account_session(token) as account: 

281 with pytest.raises(grpc.RpcError) as e: 

282 account.ChangeEmailV2( 

283 account_pb2.ChangeEmailV2Req( 

284 password=password, 

285 new_email="not a real email", 

286 ) 

287 ) 

288 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

289 assert e.value.details() == errors.INVALID_EMAIL 

290 

291 with session_scope() as session: 

292 assert ( 

293 session.execute( 

294 select(func.count()) 

295 .select_from(User) 

296 .where(User.new_email_token_created <= func.now()) 

297 .where(User.new_email_token_expiry >= func.now()) 

298 ) 

299 ).scalar_one() == 0 

300 

301 

302def test_ChangeEmailV2_email_in_use(db, fast_passwords): 

303 password = random_hex() 

304 user, token = generate_user(hashed_password=hash_password(password)) 

305 user2, token2 = generate_user(hashed_password=hash_password(password)) 

306 

307 with account_session(token) as account: 

308 with pytest.raises(grpc.RpcError) as e: 

309 account.ChangeEmailV2( 

310 account_pb2.ChangeEmailV2Req( 

311 password=password, 

312 new_email=user2.email, 

313 ) 

314 ) 

315 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

316 assert e.value.details() == errors.INVALID_EMAIL 

317 

318 with session_scope() as session: 

319 assert ( 

320 session.execute( 

321 select(func.count()) 

322 .select_from(User) 

323 .where(User.new_email_token_created <= func.now()) 

324 .where(User.new_email_token_expiry >= func.now()) 

325 ) 

326 ).scalar_one() == 0 

327 

328 

329def test_ChangeEmailV2_no_change(db, fast_passwords): 

330 password = random_hex() 

331 user, token = generate_user(hashed_password=hash_password(password)) 

332 

333 with account_session(token) as account: 

334 with pytest.raises(grpc.RpcError) as e: 

335 account.ChangeEmailV2( 

336 account_pb2.ChangeEmailV2Req( 

337 password=password, 

338 new_email=user.email, 

339 ) 

340 ) 

341 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

342 assert e.value.details() == errors.INVALID_EMAIL 

343 

344 with session_scope() as session: 

345 assert ( 

346 session.execute( 

347 select(func.count()) 

348 .select_from(User) 

349 .where(User.new_email_token_created <= func.now()) 

350 .where(User.new_email_token_expiry >= func.now()) 

351 ) 

352 ).scalar_one() == 0 

353 

354 

355def test_ChangeEmailV2_wrong_token(db, fast_passwords): 

356 password = random_hex() 

357 new_email = f"{random_hex()}@couchers.org.invalid" 

358 user, token = generate_user(hashed_password=hash_password(password)) 

359 

360 with account_session(token) as account: 

361 account.ChangeEmailV2( 

362 account_pb2.ChangeEmailV2Req( 

363 password=password, 

364 new_email=new_email, 

365 ) 

366 ) 

367 

368 with auth_api_session() as (auth_api, metadata_interceptor): 

369 with pytest.raises(grpc.RpcError) as e: 

370 res = auth_api.ConfirmChangeEmailV2( 

371 auth_pb2.ConfirmChangeEmailV2Req( 

372 change_email_token="wrongtoken", 

373 ) 

374 ) 

375 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

376 assert e.value.details() == errors.INVALID_TOKEN 

377 

378 with session_scope() as session: 

379 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

380 assert user_updated.email == user.email 

381 

382 

383def test_ChangeEmailV2_tokens_two_hour_window(db): 

384 def two_hours_one_minute_in_future(): 

385 return now() + timedelta(hours=2, minutes=1) 

386 

387 def one_minute_ago(): 

388 return now() - timedelta(minutes=1) 

389 

390 password = random_hex() 

391 new_email = f"{random_hex()}@couchers.org.invalid" 

392 user, token = generate_user(hashed_password=hash_password(password)) 

393 

394 with account_session(token) as account: 

395 account.ChangeEmailV2( 

396 account_pb2.ChangeEmailV2Req( 

397 password=password, 

398 new_email=new_email, 

399 ) 

400 ) 

401 

402 with session_scope() as session: 

403 user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

404 new_email_token = user.new_email_token 

405 

406 with patch("couchers.servicers.auth.now", one_minute_ago): 

407 with auth_api_session() as (auth_api, metadata_interceptor): 

408 with pytest.raises(grpc.RpcError) as e: 

409 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

410 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

411 assert e.value.details() == errors.INVALID_TOKEN 

412 

413 with pytest.raises(grpc.RpcError) as e: 

414 auth_api.ConfirmChangeEmailV2( 

415 auth_pb2.ConfirmChangeEmailV2Req( 

416 change_email_token=new_email_token, 

417 ) 

418 ) 

419 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

420 assert e.value.details() == errors.INVALID_TOKEN 

421 

422 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

423 with auth_api_session() as (auth_api, metadata_interceptor): 

424 with pytest.raises(grpc.RpcError) as e: 

425 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

426 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

427 assert e.value.details() == errors.INVALID_TOKEN 

428 

429 with pytest.raises(grpc.RpcError) as e: 

430 auth_api.ConfirmChangeEmailV2( 

431 auth_pb2.ConfirmChangeEmailV2Req( 

432 change_email_token=new_email_token, 

433 ) 

434 ) 

435 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

436 assert e.value.details() == errors.INVALID_TOKEN 

437 

438 

439def test_ChangeEmailV2(db, fast_passwords, push_collector): 

440 password = random_hex() 

441 new_email = f"{random_hex()}@couchers.org.invalid" 

442 user, token = generate_user(hashed_password=hash_password(password)) 

443 user_id = user.id 

444 

445 with account_session(token) as account: 

446 account.ChangeEmailV2( 

447 account_pb2.ChangeEmailV2Req( 

448 password=password, 

449 new_email=new_email, 

450 ) 

451 ) 

452 

453 with session_scope() as session: 

454 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one() 

455 assert user_updated.email == user.email 

456 assert user_updated.new_email == new_email 

457 assert user_updated.new_email_token is not None 

458 assert user_updated.new_email_token_created <= now() 

459 assert user_updated.new_email_token_expiry >= now() 

460 

461 token = user_updated.new_email_token 

462 

463 process_jobs() 

464 push_collector.assert_user_push_matches_fields( 

465 user_id, 

466 ix=0, 

467 title="An email change was initiated on your account", 

468 body=f"An email change to the email {new_email} was initiated on your account.", 

469 ) 

470 

471 with auth_api_session() as (auth_api, metadata_interceptor): 

472 res = auth_api.ConfirmChangeEmailV2( 

473 auth_pb2.ConfirmChangeEmailV2Req( 

474 change_email_token=token, 

475 ) 

476 ) 

477 

478 with session_scope() as session: 

479 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

480 assert user.email == new_email 

481 assert user.new_email is None 

482 assert user.new_email_token is None 

483 assert user.new_email_token_created is None 

484 assert user.new_email_token_expiry is None 

485 

486 process_jobs() 

487 push_collector.assert_user_push_matches_fields( 

488 user_id, 

489 ix=1, 

490 title="Email change completed", 

491 body="Your new email address has been verified.", 

492 ) 

493 

494 

495def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector): 

496 password = random_hex() 

497 new_email = f"{random_hex()}@couchers.org.invalid" 

498 user, token = generate_user(hashed_password=hash_password(password)) 

499 

500 with account_session(token) as account: 

501 account.ChangeEmailV2( 

502 account_pb2.ChangeEmailV2Req( 

503 password=password, 

504 new_email=new_email, 

505 ) 

506 ) 

507 

508 process_jobs() 

509 

510 with session_scope() as session: 

511 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

512 assert len(jobs) == 2 

513 payload_for_notification_email = jobs[0].payload 

514 payload_for_confirmation_email_new_address = jobs[1].payload 

515 uq_str1 = b"An email change to the email" 

516 uq_str2 = ( 

517 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is" 

518 ) 

519 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or ( 

520 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload 

521 ) 

522 

523 push_collector.assert_user_has_single_matching( 

524 user.id, 

525 title="An email change was initiated on your account", 

526 body=f"An email change to the email {new_email} was initiated on your account.", 

527 ) 

528 

529 

530def test_contributor_form(db): 

531 user, token = generate_user() 

532 

533 with account_session(token) as account: 

534 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

535 assert not res.filled_contributor_form 

536 

537 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

538 

539 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

540 assert res.filled_contributor_form 

541 

542 

543def test_DeleteAccount_start(db): 

544 user, token = generate_user() 

545 

546 with account_session(token) as account: 

547 with mock_notification_email() as mock: 

548 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

549 mock.assert_called_once() 

550 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

551 

552 with session_scope() as session: 

553 deletion_token = session.execute( 

554 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

555 ).scalar_one() 

556 

557 assert deletion_token.is_valid 

558 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted 

559 

560 

561def test_DeleteAccount_message_storage(db): 

562 user, token = generate_user() 

563 

564 with account_session(token) as account: 

565 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

566 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

567 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

568 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

569 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

570 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

571 

572 with session_scope() as session: 

573 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

574 

575 

576def test_full_delete_account_with_recovery(db, push_collector): 

577 user, token = generate_user() 

578 user_id = user.id 

579 

580 with account_session(token) as account: 

581 with pytest.raises(grpc.RpcError) as e: 

582 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

583 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

584 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE 

585 

586 # Check the right email is sent 

587 with mock_notification_email() as mock: 

588 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

589 

590 push_collector.assert_user_push_matches_fields( 

591 user_id, 

592 ix=0, 

593 title="Account deletion initiated", 

594 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.", 

595 ) 

596 

597 mock.assert_called_once() 

598 e = email_fields(mock) 

599 

600 with session_scope() as session: 

601 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

602 token = token_o.token 

603 

604 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

605 assert token_o.user == user_ 

606 assert not user_.is_deleted 

607 assert not user_.undelete_token 

608 assert not user_.undelete_until 

609 

610 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

611 assert e.recipient == user.email 

612 assert "account deletion" in e.subject.lower() 

613 assert token in e.plain 

614 assert token in e.html 

615 unique_string = "You requested that we delete your account from Couchers.org." 

616 assert unique_string in e.plain 

617 assert unique_string in e.html 

618 url = f"http://localhost:3000/delete-account?token={token}" 

619 assert url in e.plain 

620 assert url in e.html 

621 assert "support@couchers.org" in e.plain 

622 assert "support@couchers.org" in e.html 

623 

624 with mock_notification_email() as mock: 

625 with auth_api_session() as (auth_api, metadata_interceptor): 

626 auth_api.ConfirmDeleteAccount( 

627 auth_pb2.ConfirmDeleteAccountReq( 

628 token=token, 

629 ) 

630 ) 

631 

632 push_collector.assert_user_push_matches_fields( 

633 user_id, 

634 ix=1, 

635 title="Your Couchers.org account has been deleted", 

636 body="You can still undo this by following the link we emailed to you within 7 days.", 

637 ) 

638 

639 mock.assert_called_once() 

640 e = email_fields(mock) 

641 

642 with session_scope() as session: 

643 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

644 

645 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

646 assert user_.is_deleted 

647 assert user_.undelete_token 

648 assert user_.undelete_until > now() 

649 

650 undelete_token = user_.undelete_token 

651 

652 assert e.recipient == user.email 

653 assert "account has been deleted" in e.subject.lower() 

654 unique_string = "You have successfully deleted your account from Couchers.org." 

655 assert unique_string in e.plain 

656 assert unique_string in e.html 

657 assert "7 days" in e.plain 

658 assert "7 days" in e.html 

659 url = f"http://localhost:3000/recover-account?token={undelete_token}" 

660 assert url in e.plain 

661 assert url in e.html 

662 assert "support@couchers.org" in e.plain 

663 assert "support@couchers.org" in e.html 

664 

665 with mock_notification_email() as mock: 

666 with auth_api_session() as (auth_api, metadata_interceptor): 

667 auth_api.RecoverAccount( 

668 auth_pb2.RecoverAccountReq( 

669 token=undelete_token, 

670 ) 

671 ) 

672 

673 push_collector.assert_user_push_matches_fields( 

674 user_id, 

675 ix=2, 

676 title="Your Couchers.org account has been recovered!", 

677 body="We have recovered your Couchers.org account as per your request! Welcome back!", 

678 ) 

679 

680 mock.assert_called_once() 

681 e = email_fields(mock) 

682 

683 assert e.recipient == user.email 

684 assert "account has been recovered" in e.subject.lower() 

685 unique_string = "Your account on Couchers.org has been successfully recovered!" 

686 assert unique_string in e.plain 

687 assert unique_string in e.html 

688 assert "support@couchers.org" in e.plain 

689 assert "support@couchers.org" in e.html 

690 

691 with session_scope() as session: 

692 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

693 

694 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

695 assert not user.is_deleted 

696 assert not user.undelete_token 

697 assert not user.undelete_until 

698 

699 

700def test_multiple_delete_tokens(db): 

701 """ 

702 Make sure deletion tokens are deleted on delete 

703 """ 

704 user, token = generate_user() 

705 

706 with account_session(token) as account: 

707 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

708 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

709 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

710 

711 with session_scope() as session: 

712 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

713 token = session.execute(select(AccountDeletionToken)).scalars().first().token 

714 

715 with auth_api_session() as (auth_api, metadata_interceptor): 

716 auth_api.ConfirmDeleteAccount( 

717 auth_pb2.ConfirmDeleteAccountReq( 

718 token=token, 

719 ) 

720 ) 

721 

722 with session_scope() as session: 

723 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

724 

725 

726def test_ListActiveSessions_pagination(db, fast_passwords): 

727 password = random_hex() 

728 user, token = generate_user(hashed_password=hash_password(password)) 

729 

730 with auth_api_session() as (auth_api, metadata_interceptor): 

731 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

732 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

733 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

734 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

735 

736 with real_account_session(token) as account: 

737 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3)) 

738 assert len(res.active_sessions) == 3 

739 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3)) 

740 assert len(res.active_sessions) == 2 

741 assert not res.next_page_token 

742 

743 

744def test_ListActiveSessions_details(db, fast_passwords): 

745 password = random_hex() 

746 user, token = generate_user(hashed_password=hash_password(password)) 

747 

748 ips_user_agents = [ 

749 ( 

750 "108.123.33.162", 

751 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1", 

752 ), 

753 ( 

754 "8.245.212.28", 

755 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36", 

756 ), 

757 ( 

758 "95.254.140.156", 

759 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", 

760 ), 

761 ] 

762 

763 for ip, user_agent in ips_user_agents: 

764 options = (("grpc.primary_user_agent", user_agent),) 

765 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor): 

766 auth_api.Authenticate( 

767 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),) 

768 ) 

769 

770 def dummy_geoip(ip_address): 

771 return { 

772 "108.123.33.162": "Chicago, United States", 

773 "8.245.212.28": "Sydney, Australia", 

774 }.get(ip_address) 

775 

776 with real_account_session(token) as account: 

777 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip): 

778 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

779 print(res) 

780 assert len(res.active_sessions) == 4 

781 

782 # this one currently making the API call 

783 assert res.active_sessions[0].operating_system == "Other" 

784 assert res.active_sessions[0].browser == "Other" 

785 assert res.active_sessions[0].device == "Other" 

786 assert res.active_sessions[0].approximate_location == "Unknown" 

787 assert res.active_sessions[0].is_current_session 

788 

789 assert res.active_sessions[1].operating_system == "Ubuntu" 

790 assert res.active_sessions[1].browser == "Firefox" 

791 assert res.active_sessions[1].device == "Other" 

792 assert res.active_sessions[1].approximate_location == "Unknown" 

793 assert not res.active_sessions[1].is_current_session 

794 

795 assert res.active_sessions[2].operating_system == "Android" 

796 assert res.active_sessions[2].browser == "Samsung Internet" 

797 assert res.active_sessions[2].device == "K" 

798 assert res.active_sessions[2].approximate_location == "Sydney, Australia" 

799 assert not res.active_sessions[2].is_current_session 

800 

801 assert res.active_sessions[3].operating_system == "iOS" 

802 assert res.active_sessions[3].browser == "Mobile Safari" 

803 assert res.active_sessions[3].device == "iPhone" 

804 assert res.active_sessions[3].approximate_location == "Chicago, United States" 

805 assert not res.active_sessions[3].is_current_session 

806 

807 

808def test_LogOutSession(db, fast_passwords): 

809 password = random_hex() 

810 user, token = generate_user(hashed_password=hash_password(password)) 

811 

812 with auth_api_session() as (auth_api, metadata_interceptor): 

813 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

814 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

815 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

816 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

817 

818 with real_account_session(token) as account: 

819 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

820 assert len(res.active_sessions) == 5 

821 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created)) 

822 

823 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

824 assert len(res2.active_sessions) == 4 

825 

826 # ignore the first session as it changes 

827 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:] 

828 

829 

830def test_LogOutOtherSessions(db, fast_passwords): 

831 password = random_hex() 

832 user, token = generate_user(hashed_password=hash_password(password)) 

833 

834 with auth_api_session() as (auth_api, metadata_interceptor): 

835 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

836 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

837 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

838 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

839 

840 with real_account_session(token) as account: 

841 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

842 assert len(res.active_sessions) == 5 

843 with pytest.raises(grpc.RpcError) as e: 

844 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False)) 

845 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

846 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS 

847 

848 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True)) 

849 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

850 assert len(res.active_sessions) == 1