Coverage for src/tests/test_account.py: 100%

479 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-04-16 15:13 +0000

1from datetime import timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2 

7from sqlalchemy.sql import func 

8 

9from couchers import errors 

10from couchers.crypto import hash_password, random_hex 

11from couchers.db import session_scope 

12from couchers.models import ( 

13 AccountDeletionReason, 

14 AccountDeletionToken, 

15 BackgroundJob, 

16 Upload, 

17 User, 

18) 

19from couchers.sql import couchers_select as select 

20from couchers.utils import now 

21from proto import account_pb2, api_pb2, auth_pb2 

22from tests.test_fixtures import ( # noqa 

23 account_session, 

24 auth_api_session, 

25 db, 

26 email_fields, 

27 fast_passwords, 

28 generate_user, 

29 mock_notification_email, 

30 process_jobs, 

31 push_collector, 

32 real_account_session, 

33 testconfig, 

34) 

35 

36 

37@pytest.fixture(autouse=True) 

38def _(testconfig): 

39 pass 

40 

41 

42def test_GetAccountInfo(db, fast_passwords): 

43 # with password 

44 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

45 

46 with account_session(token1) as account: 

47 res = account.GetAccountInfo(empty_pb2.Empty()) 

48 assert res.email == "user@couchers.invalid" 

49 assert res.username == user1.username 

50 assert not res.has_strong_verification 

51 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

52 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

53 assert not res.is_superuser 

54 assert res.ui_language_preference == "" 

55 

56 

57def test_GetAccountInfo_regression(db): 

58 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

59 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

60 uploader_user, _ = generate_user() 

61 with session_scope() as session: 

62 key = random_hex(32) 

63 filename = random_hex(32) + ".jpg" 

64 session.add( 

65 Upload( 

66 key=key, 

67 filename=filename, 

68 creator_user_id=uploader_user.id, 

69 ) 

70 ) 

71 

72 user, token = generate_user(about_me=None, avatar_key=key) 

73 

74 with account_session(token) as account: 

75 res = account.GetAccountInfo(empty_pb2.Empty()) 

76 

77 

78def test_ChangePasswordV2_normal(db, fast_passwords, push_collector): 

79 # user has old password and is changing to new password 

80 old_password = random_hex() 

81 new_password = random_hex() 

82 user, token = generate_user(hashed_password=hash_password(old_password)) 

83 

84 with account_session(token) as account: 

85 with mock_notification_email() as mock: 

86 account.ChangePasswordV2( 

87 account_pb2.ChangePasswordV2Req( 

88 old_password=old_password, 

89 new_password=new_password, 

90 ) 

91 ) 

92 

93 mock.assert_called_once() 

94 assert email_fields(mock).subject == "[TEST] Your password was changed" 

95 

96 push_collector.assert_user_has_single_matching( 

97 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed." 

98 ) 

99 

100 with session_scope() as session: 

101 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

102 assert updated_user.hashed_password == hash_password(new_password) 

103 

104 

105def test_ChangePasswordV2_regression(db, fast_passwords): 

106 # send_password_changed_email wasn't working 

107 # user has old password and is changing to new password 

108 old_password = random_hex() 

109 new_password = random_hex() 

110 user, token = generate_user(hashed_password=hash_password(old_password)) 

111 

112 with account_session(token) as account: 

113 account.ChangePasswordV2( 

114 account_pb2.ChangePasswordV2Req( 

115 old_password=old_password, 

116 new_password=new_password, 

117 ) 

118 ) 

119 

120 with session_scope() as session: 

121 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

122 assert updated_user.hashed_password == hash_password(new_password) 

123 

124 

125def test_ChangePasswordV2_normal_short_password(db, fast_passwords): 

126 # user has old password and is changing to new password, but used short password 

127 old_password = random_hex() 

128 new_password = random_hex(length=1) 

129 user, token = generate_user(hashed_password=hash_password(old_password)) 

130 

131 with account_session(token) as account: 

132 with pytest.raises(grpc.RpcError) as e: 

133 account.ChangePasswordV2( 

134 account_pb2.ChangePasswordV2Req( 

135 old_password=old_password, 

136 new_password=new_password, 

137 ) 

138 ) 

139 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

140 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

141 

142 with session_scope() as session: 

143 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

144 assert updated_user.hashed_password == hash_password(old_password) 

145 

146 

147def test_ChangePasswordV2_normal_long_password(db, fast_passwords): 

148 # user has old password and is changing to new password, but used short password 

149 old_password = random_hex() 

150 new_password = random_hex(length=1000) 

151 user, token = generate_user(hashed_password=hash_password(old_password)) 

152 

153 with account_session(token) as account: 

154 with pytest.raises(grpc.RpcError) as e: 

155 account.ChangePasswordV2( 

156 account_pb2.ChangePasswordV2Req( 

157 old_password=old_password, 

158 new_password=new_password, 

159 ) 

160 ) 

161 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

162 assert e.value.details() == errors.PASSWORD_TOO_LONG 

163 

164 with session_scope() as session: 

165 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

166 assert updated_user.hashed_password == hash_password(old_password) 

167 

168 

169def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords): 

170 # user has old password and is changing to new password, but used insecure password 

171 old_password = random_hex() 

172 new_password = "12345678" 

173 user, token = generate_user(hashed_password=hash_password(old_password)) 

174 

175 with account_session(token) as account: 

176 with pytest.raises(grpc.RpcError) as e: 

177 account.ChangePasswordV2( 

178 account_pb2.ChangePasswordV2Req( 

179 old_password=old_password, 

180 new_password=new_password, 

181 ) 

182 ) 

183 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

184 assert e.value.details() == errors.INSECURE_PASSWORD 

185 

186 with session_scope() as session: 

187 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

188 assert updated_user.hashed_password == hash_password(old_password) 

189 

190 

191def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords): 

192 # user has old password and is changing to new password, but used wrong old password 

193 old_password = random_hex() 

194 new_password = random_hex() 

195 user, token = generate_user(hashed_password=hash_password(old_password)) 

196 

197 with account_session(token) as account: 

198 with pytest.raises(grpc.RpcError) as e: 

199 account.ChangePasswordV2( 

200 account_pb2.ChangePasswordV2Req( 

201 old_password="wrong password", 

202 new_password=new_password, 

203 ) 

204 ) 

205 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

206 assert e.value.details() == errors.INVALID_PASSWORD 

207 

208 with session_scope() as session: 

209 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

210 assert updated_user.hashed_password == hash_password(old_password) 

211 

212 

213def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords): 

214 # user has old password and called with empty body 

215 old_password = random_hex() 

216 user, token = generate_user(hashed_password=hash_password(old_password)) 

217 

218 with account_session(token) as account: 

219 with pytest.raises(grpc.RpcError) as e: 

220 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password)) 

221 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

222 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

223 

224 with session_scope() as session: 

225 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

226 assert updated_user.hashed_password == hash_password(old_password) 

227 

228 

229def test_ChangeEmailV2_wrong_password(db, fast_passwords): 

230 password = random_hex() 

231 new_email = f"{random_hex()}@couchers.org.invalid" 

232 user, token = generate_user(hashed_password=hash_password(password)) 

233 

234 with account_session(token) as account: 

235 with pytest.raises(grpc.RpcError) as e: 

236 account.ChangeEmailV2( 

237 account_pb2.ChangeEmailV2Req( 

238 password="wrong password", 

239 new_email=new_email, 

240 ) 

241 ) 

242 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

243 assert e.value.details() == errors.INVALID_PASSWORD 

244 

245 with session_scope() as session: 

246 assert ( 

247 session.execute( 

248 select(func.count()) 

249 .select_from(User) 

250 .where(User.new_email_token_created <= func.now()) 

251 .where(User.new_email_token_expiry >= func.now()) 

252 ) 

253 ).scalar_one() == 0 

254 

255 

256def test_ChangeEmailV2_wrong_email(db, fast_passwords): 

257 password = random_hex() 

258 new_email = f"{random_hex()}@couchers.org.invalid" 

259 user, token = generate_user(hashed_password=hash_password(password)) 

260 

261 with account_session(token) as account: 

262 with pytest.raises(grpc.RpcError) as e: 

263 account.ChangeEmailV2( 

264 account_pb2.ChangeEmailV2Req( 

265 password="wrong password", 

266 new_email=new_email, 

267 ) 

268 ) 

269 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

270 assert e.value.details() == errors.INVALID_PASSWORD 

271 

272 with session_scope() as session: 

273 assert ( 

274 session.execute( 

275 select(func.count()) 

276 .select_from(User) 

277 .where(User.new_email_token_created <= func.now()) 

278 .where(User.new_email_token_expiry >= func.now()) 

279 ) 

280 ).scalar_one() == 0 

281 

282 

283def test_ChangeEmailV2_invalid_email(db, fast_passwords): 

284 password = random_hex() 

285 user, token = generate_user(hashed_password=hash_password(password)) 

286 

287 with account_session(token) as account: 

288 with pytest.raises(grpc.RpcError) as e: 

289 account.ChangeEmailV2( 

290 account_pb2.ChangeEmailV2Req( 

291 password=password, 

292 new_email="not a real email", 

293 ) 

294 ) 

295 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

296 assert e.value.details() == errors.INVALID_EMAIL 

297 

298 with session_scope() as session: 

299 assert ( 

300 session.execute( 

301 select(func.count()) 

302 .select_from(User) 

303 .where(User.new_email_token_created <= func.now()) 

304 .where(User.new_email_token_expiry >= func.now()) 

305 ) 

306 ).scalar_one() == 0 

307 

308 

309def test_ChangeEmailV2_email_in_use(db, fast_passwords): 

310 password = random_hex() 

311 user, token = generate_user(hashed_password=hash_password(password)) 

312 user2, token2 = generate_user(hashed_password=hash_password(password)) 

313 

314 with account_session(token) as account: 

315 with pytest.raises(grpc.RpcError) as e: 

316 account.ChangeEmailV2( 

317 account_pb2.ChangeEmailV2Req( 

318 password=password, 

319 new_email=user2.email, 

320 ) 

321 ) 

322 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

323 assert e.value.details() == errors.INVALID_EMAIL 

324 

325 with session_scope() as session: 

326 assert ( 

327 session.execute( 

328 select(func.count()) 

329 .select_from(User) 

330 .where(User.new_email_token_created <= func.now()) 

331 .where(User.new_email_token_expiry >= func.now()) 

332 ) 

333 ).scalar_one() == 0 

334 

335 

336def test_ChangeEmailV2_no_change(db, fast_passwords): 

337 password = random_hex() 

338 user, token = generate_user(hashed_password=hash_password(password)) 

339 

340 with account_session(token) as account: 

341 with pytest.raises(grpc.RpcError) as e: 

342 account.ChangeEmailV2( 

343 account_pb2.ChangeEmailV2Req( 

344 password=password, 

345 new_email=user.email, 

346 ) 

347 ) 

348 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

349 assert e.value.details() == errors.INVALID_EMAIL 

350 

351 with session_scope() as session: 

352 assert ( 

353 session.execute( 

354 select(func.count()) 

355 .select_from(User) 

356 .where(User.new_email_token_created <= func.now()) 

357 .where(User.new_email_token_expiry >= func.now()) 

358 ) 

359 ).scalar_one() == 0 

360 

361 

362def test_ChangeEmailV2_wrong_token(db, fast_passwords): 

363 password = random_hex() 

364 new_email = f"{random_hex()}@couchers.org.invalid" 

365 user, token = generate_user(hashed_password=hash_password(password)) 

366 

367 with account_session(token) as account: 

368 account.ChangeEmailV2( 

369 account_pb2.ChangeEmailV2Req( 

370 password=password, 

371 new_email=new_email, 

372 ) 

373 ) 

374 

375 with auth_api_session() as (auth_api, metadata_interceptor): 

376 with pytest.raises(grpc.RpcError) as e: 

377 res = auth_api.ConfirmChangeEmailV2( 

378 auth_pb2.ConfirmChangeEmailV2Req( 

379 change_email_token="wrongtoken", 

380 ) 

381 ) 

382 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

383 assert e.value.details() == errors.INVALID_TOKEN 

384 

385 with session_scope() as session: 

386 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

387 assert user_updated.email == user.email 

388 

389 

390def test_ChangeEmailV2_tokens_two_hour_window(db): 

391 def two_hours_one_minute_in_future(): 

392 return now() + timedelta(hours=2, minutes=1) 

393 

394 def one_minute_ago(): 

395 return now() - timedelta(minutes=1) 

396 

397 password = random_hex() 

398 new_email = f"{random_hex()}@couchers.org.invalid" 

399 user, token = generate_user(hashed_password=hash_password(password)) 

400 

401 with account_session(token) as account: 

402 account.ChangeEmailV2( 

403 account_pb2.ChangeEmailV2Req( 

404 password=password, 

405 new_email=new_email, 

406 ) 

407 ) 

408 

409 with session_scope() as session: 

410 user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

411 new_email_token = user.new_email_token 

412 

413 with patch("couchers.servicers.auth.now", one_minute_ago): 

414 with auth_api_session() as (auth_api, metadata_interceptor): 

415 with pytest.raises(grpc.RpcError) as e: 

416 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

417 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

418 assert e.value.details() == errors.INVALID_TOKEN 

419 

420 with pytest.raises(grpc.RpcError) as e: 

421 auth_api.ConfirmChangeEmailV2( 

422 auth_pb2.ConfirmChangeEmailV2Req( 

423 change_email_token=new_email_token, 

424 ) 

425 ) 

426 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

427 assert e.value.details() == errors.INVALID_TOKEN 

428 

429 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

430 with auth_api_session() as (auth_api, metadata_interceptor): 

431 with pytest.raises(grpc.RpcError) as e: 

432 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

433 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

434 assert e.value.details() == errors.INVALID_TOKEN 

435 

436 with pytest.raises(grpc.RpcError) as e: 

437 auth_api.ConfirmChangeEmailV2( 

438 auth_pb2.ConfirmChangeEmailV2Req( 

439 change_email_token=new_email_token, 

440 ) 

441 ) 

442 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

443 assert e.value.details() == errors.INVALID_TOKEN 

444 

445 

446def test_ChangeEmailV2(db, fast_passwords, push_collector): 

447 password = random_hex() 

448 new_email = f"{random_hex()}@couchers.org.invalid" 

449 user, token = generate_user(hashed_password=hash_password(password)) 

450 user_id = user.id 

451 

452 with account_session(token) as account: 

453 account.ChangeEmailV2( 

454 account_pb2.ChangeEmailV2Req( 

455 password=password, 

456 new_email=new_email, 

457 ) 

458 ) 

459 

460 with session_scope() as session: 

461 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one() 

462 assert user_updated.email == user.email 

463 assert user_updated.new_email == new_email 

464 assert user_updated.new_email_token is not None 

465 assert user_updated.new_email_token_created <= now() 

466 assert user_updated.new_email_token_expiry >= now() 

467 

468 token = user_updated.new_email_token 

469 

470 process_jobs() 

471 push_collector.assert_user_push_matches_fields( 

472 user_id, 

473 ix=0, 

474 title="An email change was initiated on your account", 

475 body=f"An email change to the email {new_email} was initiated on your account.", 

476 ) 

477 

478 with auth_api_session() as (auth_api, metadata_interceptor): 

479 res = auth_api.ConfirmChangeEmailV2( 

480 auth_pb2.ConfirmChangeEmailV2Req( 

481 change_email_token=token, 

482 ) 

483 ) 

484 

485 with session_scope() as session: 

486 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

487 assert user.email == new_email 

488 assert user.new_email is None 

489 assert user.new_email_token is None 

490 assert user.new_email_token_created is None 

491 assert user.new_email_token_expiry is None 

492 

493 process_jobs() 

494 push_collector.assert_user_push_matches_fields( 

495 user_id, 

496 ix=1, 

497 title="Email change completed", 

498 body="Your new email address has been verified.", 

499 ) 

500 

501 

502def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector): 

503 password = random_hex() 

504 new_email = f"{random_hex()}@couchers.org.invalid" 

505 user, token = generate_user(hashed_password=hash_password(password)) 

506 

507 with account_session(token) as account: 

508 account.ChangeEmailV2( 

509 account_pb2.ChangeEmailV2Req( 

510 password=password, 

511 new_email=new_email, 

512 ) 

513 ) 

514 

515 process_jobs() 

516 

517 with session_scope() as session: 

518 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

519 assert len(jobs) == 2 

520 payload_for_notification_email = jobs[0].payload 

521 payload_for_confirmation_email_new_address = jobs[1].payload 

522 uq_str1 = b"An email change to the email" 

523 uq_str2 = ( 

524 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is" 

525 ) 

526 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or ( 

527 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload 

528 ) 

529 

530 push_collector.assert_user_has_single_matching( 

531 user.id, 

532 title="An email change was initiated on your account", 

533 body=f"An email change to the email {new_email} was initiated on your account.", 

534 ) 

535 

536 

537def test_ChangeLanguagePreference(db, fast_passwords): 

538 # user changes from default to ISO 639-1 language code 

539 newLanguageCode = "zh" 

540 user, token = generate_user() 

541 

542 with real_account_session(token) as account: 

543 res = account.GetAccountInfo(empty_pb2.Empty()) 

544 assert res.ui_language_preference == "" 

545 

546 request = account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=newLanguageCode) 

547 

548 # call will have info about the request 

549 res, call = account.ChangeLanguagePreference.with_call(request) 

550 

551 # cookies are sent via initial metadata, so we check for it there 

552 metadata = dict(call.initial_metadata()) 

553 

554 assert "set-cookie" in metadata, "expected 'set-cookie' in initial metadata" 

555 

556 # the value of "set-cookie" will be the full cookie string, pull the key value from the string 

557 key_val = metadata["set-cookie"].split(";")[0] 

558 assert key_val == "couchers-preferred-language=zh", f"expected 'couchers-preferred-language=zh', got {key_val}" 

559 

560 # the changed language preference should also be sent to the backend 

561 res = account.GetAccountInfo(empty_pb2.Empty()) 

562 assert res.ui_language_preference == "zh" 

563 

564 

565def test_contributor_form(db): 

566 user, token = generate_user() 

567 

568 with account_session(token) as account: 

569 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

570 assert not res.filled_contributor_form 

571 

572 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

573 

574 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

575 assert res.filled_contributor_form 

576 

577 

578def test_DeleteAccount_start(db): 

579 user, token = generate_user() 

580 

581 with account_session(token) as account: 

582 with mock_notification_email() as mock: 

583 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

584 mock.assert_called_once() 

585 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

586 

587 with session_scope() as session: 

588 deletion_token = session.execute( 

589 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

590 ).scalar_one() 

591 

592 assert deletion_token.is_valid 

593 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted 

594 

595 

596def test_DeleteAccount_message_storage(db): 

597 user, token = generate_user() 

598 

599 with account_session(token) as account: 

600 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

601 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

602 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

603 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

604 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

605 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

606 

607 with session_scope() as session: 

608 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

609 

610 

611def test_full_delete_account_with_recovery(db, push_collector): 

612 user, token = generate_user() 

613 user_id = user.id 

614 

615 with account_session(token) as account: 

616 with pytest.raises(grpc.RpcError) as e: 

617 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

618 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

619 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE 

620 

621 # Check the right email is sent 

622 with mock_notification_email() as mock: 

623 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

624 

625 push_collector.assert_user_push_matches_fields( 

626 user_id, 

627 ix=0, 

628 title="Account deletion initiated", 

629 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.", 

630 ) 

631 

632 mock.assert_called_once() 

633 e = email_fields(mock) 

634 

635 with session_scope() as session: 

636 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

637 token = token_o.token 

638 

639 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

640 assert token_o.user == user_ 

641 assert not user_.is_deleted 

642 assert not user_.undelete_token 

643 assert not user_.undelete_until 

644 

645 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

646 assert e.recipient == user.email 

647 assert "account deletion" in e.subject.lower() 

648 assert token in e.plain 

649 assert token in e.html 

650 unique_string = "You requested that we delete your account from Couchers.org." 

651 assert unique_string in e.plain 

652 assert unique_string in e.html 

653 url = f"http://localhost:3000/delete-account?token={token}" 

654 assert url in e.plain 

655 assert url in e.html 

656 assert "support@couchers.org" in e.plain 

657 assert "support@couchers.org" in e.html 

658 

659 with mock_notification_email() as mock: 

660 with auth_api_session() as (auth_api, metadata_interceptor): 

661 auth_api.ConfirmDeleteAccount( 

662 auth_pb2.ConfirmDeleteAccountReq( 

663 token=token, 

664 ) 

665 ) 

666 

667 push_collector.assert_user_push_matches_fields( 

668 user_id, 

669 ix=1, 

670 title="Your Couchers.org account has been deleted", 

671 body="You can still undo this by following the link we emailed to you within 7 days.", 

672 ) 

673 

674 mock.assert_called_once() 

675 e = email_fields(mock) 

676 

677 with session_scope() as session: 

678 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

679 

680 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

681 assert user_.is_deleted 

682 assert user_.undelete_token 

683 assert user_.undelete_until > now() 

684 

685 undelete_token = user_.undelete_token 

686 

687 assert e.recipient == user.email 

688 assert "account has been deleted" in e.subject.lower() 

689 unique_string = "You have successfully deleted your account from Couchers.org." 

690 assert unique_string in e.plain 

691 assert unique_string in e.html 

692 assert "7 days" in e.plain 

693 assert "7 days" in e.html 

694 url = f"http://localhost:3000/recover-account?token={undelete_token}" 

695 assert url in e.plain 

696 assert url in e.html 

697 assert "support@couchers.org" in e.plain 

698 assert "support@couchers.org" in e.html 

699 

700 with mock_notification_email() as mock: 

701 with auth_api_session() as (auth_api, metadata_interceptor): 

702 auth_api.RecoverAccount( 

703 auth_pb2.RecoverAccountReq( 

704 token=undelete_token, 

705 ) 

706 ) 

707 

708 push_collector.assert_user_push_matches_fields( 

709 user_id, 

710 ix=2, 

711 title="Your Couchers.org account has been recovered!", 

712 body="We have recovered your Couchers.org account as per your request! Welcome back!", 

713 ) 

714 

715 mock.assert_called_once() 

716 e = email_fields(mock) 

717 

718 assert e.recipient == user.email 

719 assert "account has been recovered" in e.subject.lower() 

720 unique_string = "Your account on Couchers.org has been successfully recovered!" 

721 assert unique_string in e.plain 

722 assert unique_string in e.html 

723 assert "support@couchers.org" in e.plain 

724 assert "support@couchers.org" in e.html 

725 

726 with session_scope() as session: 

727 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

728 

729 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

730 assert not user.is_deleted 

731 assert not user.undelete_token 

732 assert not user.undelete_until 

733 

734 

735def test_multiple_delete_tokens(db): 

736 """ 

737 Make sure deletion tokens are deleted on delete 

738 """ 

739 user, token = generate_user() 

740 

741 with account_session(token) as account: 

742 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

743 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

744 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

745 

746 with session_scope() as session: 

747 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

748 token = session.execute(select(AccountDeletionToken)).scalars().first().token 

749 

750 with auth_api_session() as (auth_api, metadata_interceptor): 

751 auth_api.ConfirmDeleteAccount( 

752 auth_pb2.ConfirmDeleteAccountReq( 

753 token=token, 

754 ) 

755 ) 

756 

757 with session_scope() as session: 

758 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

759 

760 

761def test_ListActiveSessions_pagination(db, fast_passwords): 

762 password = random_hex() 

763 user, token = generate_user(hashed_password=hash_password(password)) 

764 

765 with auth_api_session() as (auth_api, metadata_interceptor): 

766 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

767 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

768 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

769 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

770 

771 with real_account_session(token) as account: 

772 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3)) 

773 assert len(res.active_sessions) == 3 

774 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3)) 

775 assert len(res.active_sessions) == 2 

776 assert not res.next_page_token 

777 

778 

779def test_ListActiveSessions_details(db, fast_passwords): 

780 password = random_hex() 

781 user, token = generate_user(hashed_password=hash_password(password)) 

782 

783 ips_user_agents = [ 

784 ( 

785 "108.123.33.162", 

786 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1", 

787 ), 

788 ( 

789 "8.245.212.28", 

790 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36", 

791 ), 

792 ( 

793 "95.254.140.156", 

794 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", 

795 ), 

796 ] 

797 

798 for ip, user_agent in ips_user_agents: 

799 options = (("grpc.primary_user_agent", user_agent),) 

800 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor): 

801 auth_api.Authenticate( 

802 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),) 

803 ) 

804 

805 def dummy_geoip(ip_address): 

806 return { 

807 "108.123.33.162": "Chicago, United States", 

808 "8.245.212.28": "Sydney, Australia", 

809 }.get(ip_address) 

810 

811 with real_account_session(token) as account: 

812 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip): 

813 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

814 print(res) 

815 assert len(res.active_sessions) == 4 

816 

817 # this one currently making the API call 

818 assert res.active_sessions[0].operating_system == "Other" 

819 assert res.active_sessions[0].browser == "Other" 

820 assert res.active_sessions[0].device == "Other" 

821 assert res.active_sessions[0].approximate_location == "Unknown" 

822 assert res.active_sessions[0].is_current_session 

823 

824 assert res.active_sessions[1].operating_system == "Ubuntu" 

825 assert res.active_sessions[1].browser == "Firefox" 

826 assert res.active_sessions[1].device == "Other" 

827 assert res.active_sessions[1].approximate_location == "Unknown" 

828 assert not res.active_sessions[1].is_current_session 

829 

830 assert res.active_sessions[2].operating_system == "Android" 

831 assert res.active_sessions[2].browser == "Samsung Internet" 

832 assert res.active_sessions[2].device == "K" 

833 assert res.active_sessions[2].approximate_location == "Sydney, Australia" 

834 assert not res.active_sessions[2].is_current_session 

835 

836 assert res.active_sessions[3].operating_system == "iOS" 

837 assert res.active_sessions[3].browser == "Mobile Safari" 

838 assert res.active_sessions[3].device == "iPhone" 

839 assert res.active_sessions[3].approximate_location == "Chicago, United States" 

840 assert not res.active_sessions[3].is_current_session 

841 

842 

843def test_LogOutSession(db, fast_passwords): 

844 password = random_hex() 

845 user, token = generate_user(hashed_password=hash_password(password)) 

846 

847 with auth_api_session() as (auth_api, metadata_interceptor): 

848 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

849 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

850 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

851 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

852 

853 with real_account_session(token) as account: 

854 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

855 assert len(res.active_sessions) == 5 

856 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created)) 

857 

858 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

859 assert len(res2.active_sessions) == 4 

860 

861 # ignore the first session as it changes 

862 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:] 

863 

864 

865def test_LogOutOtherSessions(db, fast_passwords): 

866 password = random_hex() 

867 user, token = generate_user(hashed_password=hash_password(password)) 

868 

869 with auth_api_session() as (auth_api, metadata_interceptor): 

870 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

871 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

872 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

873 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

874 

875 with real_account_session(token) as account: 

876 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

877 assert len(res.active_sessions) == 5 

878 with pytest.raises(grpc.RpcError) as e: 

879 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False)) 

880 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

881 assert e.value.details() == errors.MUST_CONFIRM_LOGOUT_OTHER_SESSIONS 

882 

883 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True)) 

884 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

885 assert len(res.active_sessions) == 1