Coverage for src/tests/test_account.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

530 statements  

1from datetime import timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy.sql import func 

8 

9from couchers import errors 

10from couchers.crypto import hash_password, random_hex 

11from couchers.db import session_scope 

12from couchers.models import AccountDeletionReason, AccountDeletionToken, BackgroundJob, Upload, User 

13from couchers.sql import couchers_select as select 

14from couchers.utils import now 

15from proto import account_pb2, auth_pb2 

16from tests.test_fixtures import account_session, auth_api_session, db, fast_passwords, generate_user, testconfig # noqa 

17 

18 

19@pytest.fixture(autouse=True) 

20def _(testconfig): 

21 pass 

22 

23 

24def test_GetAccountInfo(db, fast_passwords): 

25 # without password 

26 user1, token1 = generate_user(hashed_password=None, email="funkybot@couchers.invalid") 

27 

28 with account_session(token1) as account: 

29 res = account.GetAccountInfo(empty_pb2.Empty()) 

30 assert res.login_method == account_pb2.GetAccountInfoRes.LoginMethod.MAGIC_LINK 

31 assert not res.has_password 

32 assert res.email == "funkybot@couchers.invalid" 

33 assert res.username == user1.username 

34 

35 # with password 

36 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

37 

38 with account_session(token1) as account: 

39 res = account.GetAccountInfo(empty_pb2.Empty()) 

40 assert res.login_method == account_pb2.GetAccountInfoRes.LoginMethod.PASSWORD 

41 assert res.has_password 

42 assert res.email == "user@couchers.invalid" 

43 assert res.username == user1.username 

44 

45 

46def test_GetAccountInfo_regression(db): 

47 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

48 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

49 uploader_user, _ = generate_user() 

50 with session_scope() as session: 

51 key = random_hex(32) 

52 filename = random_hex(32) + ".jpg" 

53 session.add( 

54 Upload( 

55 key=key, 

56 filename=filename, 

57 creator_user_id=uploader_user.id, 

58 ) 

59 ) 

60 session.commit() 

61 user, token = generate_user(about_me=None, avatar_key=key) 

62 

63 with account_session(token) as account: 

64 res = account.GetAccountInfo(empty_pb2.Empty()) 

65 

66 

67def test_ChangePassword_normal(db, fast_passwords): 

68 # user has old password and is changing to new password 

69 old_password = random_hex() 

70 new_password = random_hex() 

71 user, token = generate_user(hashed_password=hash_password(old_password)) 

72 

73 with account_session(token) as account: 

74 with patch("couchers.servicers.account.send_password_changed_email") as mock: 

75 account.ChangePassword( 

76 account_pb2.ChangePasswordReq( 

77 old_password=wrappers_pb2.StringValue(value=old_password), 

78 new_password=wrappers_pb2.StringValue(value=new_password), 

79 ) 

80 ) 

81 mock.assert_called_once() 

82 

83 with session_scope() as session: 

84 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

85 assert updated_user.hashed_password == hash_password(new_password) 

86 

87 

88def test_ChangePassword_regression(db, fast_passwords): 

89 # send_password_changed_email wasn't working 

90 # user has old password and is changing to new password 

91 old_password = random_hex() 

92 new_password = random_hex() 

93 user, token = generate_user(hashed_password=hash_password(old_password)) 

94 

95 with account_session(token) as account: 

96 account.ChangePassword( 

97 account_pb2.ChangePasswordReq( 

98 old_password=wrappers_pb2.StringValue(value=old_password), 

99 new_password=wrappers_pb2.StringValue(value=new_password), 

100 ) 

101 ) 

102 

103 with session_scope() as session: 

104 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

105 assert updated_user.hashed_password == hash_password(new_password) 

106 

107 

108def test_ChangePassword_normal_short_password(db, fast_passwords): 

109 # user has old password and is changing to new password, but used short password 

110 old_password = random_hex() 

111 new_password = random_hex(length=1) 

112 user, token = generate_user(hashed_password=hash_password(old_password)) 

113 

114 with account_session(token) as account: 

115 with pytest.raises(grpc.RpcError) as e: 

116 account.ChangePassword( 

117 account_pb2.ChangePasswordReq( 

118 old_password=wrappers_pb2.StringValue(value=old_password), 

119 new_password=wrappers_pb2.StringValue(value=new_password), 

120 ) 

121 ) 

122 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

123 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

124 

125 with session_scope() as session: 

126 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

127 assert updated_user.hashed_password == hash_password(old_password) 

128 

129 

130def test_ChangePassword_normal_long_password(db, fast_passwords): 

131 # user has old password and is changing to new password, but used short password 

132 old_password = random_hex() 

133 new_password = random_hex(length=1000) 

134 user, token = generate_user(hashed_password=hash_password(old_password)) 

135 

136 with account_session(token) as account: 

137 with pytest.raises(grpc.RpcError) as e: 

138 account.ChangePassword( 

139 account_pb2.ChangePasswordReq( 

140 old_password=wrappers_pb2.StringValue(value=old_password), 

141 new_password=wrappers_pb2.StringValue(value=new_password), 

142 ) 

143 ) 

144 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

145 assert e.value.details() == errors.PASSWORD_TOO_LONG 

146 

147 with session_scope() as session: 

148 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

149 assert updated_user.hashed_password == hash_password(old_password) 

150 

151 

152def test_ChangePassword_normal_insecure_password(db, fast_passwords): 

153 # user has old password and is changing to new password, but used insecure password 

154 old_password = random_hex() 

155 new_password = "12345678" 

156 user, token = generate_user(hashed_password=hash_password(old_password)) 

157 

158 with account_session(token) as account: 

159 with pytest.raises(grpc.RpcError) as e: 

160 account.ChangePassword( 

161 account_pb2.ChangePasswordReq( 

162 old_password=wrappers_pb2.StringValue(value=old_password), 

163 new_password=wrappers_pb2.StringValue(value=new_password), 

164 ) 

165 ) 

166 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

167 assert e.value.details() == errors.INSECURE_PASSWORD 

168 

169 with session_scope() as session: 

170 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

171 assert updated_user.hashed_password == hash_password(old_password) 

172 

173 

174def test_ChangePassword_normal_wrong_password(db, fast_passwords): 

175 # user has old password and is changing to new password, but used wrong old password 

176 old_password = random_hex() 

177 new_password = random_hex() 

178 user, token = generate_user(hashed_password=hash_password(old_password)) 

179 

180 with account_session(token) as account: 

181 with pytest.raises(grpc.RpcError) as e: 

182 account.ChangePassword( 

183 account_pb2.ChangePasswordReq( 

184 old_password=wrappers_pb2.StringValue(value="wrong password"), 

185 new_password=wrappers_pb2.StringValue(value=new_password), 

186 ) 

187 ) 

188 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

189 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD 

190 

191 with session_scope() as session: 

192 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

193 assert updated_user.hashed_password == hash_password(old_password) 

194 

195 

196def test_ChangePassword_normal_no_password(db, fast_passwords): 

197 # user has old password and is changing to new password, but didn't supply old password 

198 old_password = random_hex() 

199 new_password = random_hex() 

200 user, token = generate_user(hashed_password=hash_password(old_password)) 

201 

202 with account_session(token) as account: 

203 with pytest.raises(grpc.RpcError) as e: 

204 account.ChangePassword( 

205 account_pb2.ChangePasswordReq( 

206 new_password=wrappers_pb2.StringValue(value=new_password), 

207 ) 

208 ) 

209 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

210 assert e.value.details() == errors.MISSING_PASSWORD 

211 

212 with session_scope() as session: 

213 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

214 assert updated_user.hashed_password == hash_password(old_password) 

215 

216 

217def test_ChangePassword_normal_no_passwords(db, fast_passwords): 

218 # user has old password and called with empty body 

219 old_password = random_hex() 

220 user, token = generate_user(hashed_password=hash_password(old_password)) 

221 

222 with account_session(token) as account: 

223 with pytest.raises(grpc.RpcError) as e: 

224 account.ChangePassword(account_pb2.ChangePasswordReq()) 

225 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

226 assert e.value.details() == errors.MISSING_BOTH_PASSWORDS 

227 

228 with session_scope() as session: 

229 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

230 assert updated_user.hashed_password == hash_password(old_password) 

231 

232 

233def test_ChangePassword_add(db, fast_passwords): 

234 # user does not have an old password and is adding a new password 

235 new_password = random_hex() 

236 user, token = generate_user(hashed_password=None) 

237 

238 with account_session(token) as account: 

239 with patch("couchers.servicers.account.send_password_changed_email") as mock: 

240 account.ChangePassword( 

241 account_pb2.ChangePasswordReq( 

242 new_password=wrappers_pb2.StringValue(value=new_password), 

243 ) 

244 ) 

245 mock.assert_called_once() 

246 

247 with session_scope() as session: 

248 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

249 assert updated_user.hashed_password == hash_password(new_password) 

250 

251 

252def test_ChangePassword_add_with_password(db, fast_passwords): 

253 # user does not have an old password and is adding a new password, but supplied a password 

254 new_password = random_hex() 

255 user, token = generate_user(hashed_password=None) 

256 

257 with account_session(token) as account: 

258 with pytest.raises(grpc.RpcError) as e: 

259 account.ChangePassword( 

260 account_pb2.ChangePasswordReq( 

261 old_password=wrappers_pb2.StringValue(value="wrong password"), 

262 new_password=wrappers_pb2.StringValue(value=new_password), 

263 ) 

264 ) 

265 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

266 assert e.value.details() == errors.NO_PASSWORD 

267 

268 with session_scope() as session: 

269 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

270 assert not updated_user.has_password 

271 

272 

273def test_ChangePassword_add_no_passwords(db, fast_passwords): 

274 # user does not have an old password and called with empty body 

275 user, token = generate_user(hashed_password=None) 

276 

277 with account_session(token) as account: 

278 with pytest.raises(grpc.RpcError) as e: 

279 account.ChangePassword(account_pb2.ChangePasswordReq()) 

280 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

281 assert e.value.details() == errors.MISSING_BOTH_PASSWORDS 

282 

283 with session_scope() as session: 

284 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

285 assert updated_user.hashed_password == None 

286 

287 

288def test_ChangePassword_remove(db, fast_passwords): 

289 old_password = random_hex() 

290 user, token = generate_user(hashed_password=hash_password(old_password)) 

291 

292 with account_session(token) as account: 

293 with patch("couchers.servicers.account.send_password_changed_email") as mock: 

294 account.ChangePassword( 

295 account_pb2.ChangePasswordReq( 

296 old_password=wrappers_pb2.StringValue(value=old_password), 

297 ) 

298 ) 

299 mock.assert_called_once() 

300 

301 with session_scope() as session: 

302 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

303 assert not updated_user.has_password 

304 

305 

306def test_ChangePassword_remove_wrong_password(db, fast_passwords): 

307 old_password = random_hex() 

308 user, token = generate_user(hashed_password=hash_password(old_password)) 

309 

310 with account_session(token) as account: 

311 with pytest.raises(grpc.RpcError) as e: 

312 account.ChangePassword( 

313 account_pb2.ChangePasswordReq( 

314 old_password=wrappers_pb2.StringValue(value="wrong password"), 

315 ) 

316 ) 

317 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

318 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD 

319 

320 with session_scope() as session: 

321 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

322 assert updated_user.hashed_password == hash_password(old_password) 

323 

324 

325def test_ChangeEmail_wrong_password(db, fast_passwords): 

326 password = random_hex() 

327 new_email = f"{random_hex()}@couchers.org.invalid" 

328 user, token = generate_user(hashed_password=hash_password(password)) 

329 

330 with account_session(token) as account: 

331 with pytest.raises(grpc.RpcError) as e: 

332 account.ChangeEmail( 

333 account_pb2.ChangeEmailReq( 

334 password=wrappers_pb2.StringValue(value="wrong password"), 

335 new_email=new_email, 

336 ) 

337 ) 

338 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

339 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD 

340 

341 with session_scope() as session: 

342 assert ( 

343 session.execute( 

344 select(func.count()) 

345 .select_from(User) 

346 .where(User.new_email_token_created <= func.now()) 

347 .where(User.new_email_token_expiry >= func.now()) 

348 ) 

349 ).scalar_one() == 0 

350 

351 

352def test_ChangeEmail_wrong_email(db, fast_passwords): 

353 password = random_hex() 

354 new_email = f"{random_hex()}@couchers.org.invalid" 

355 user, token = generate_user(hashed_password=hash_password(password)) 

356 

357 with account_session(token) as account: 

358 with pytest.raises(grpc.RpcError) as e: 

359 account.ChangeEmail( 

360 account_pb2.ChangeEmailReq( 

361 password=wrappers_pb2.StringValue(value="wrong password"), 

362 new_email=new_email, 

363 ) 

364 ) 

365 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

366 assert e.value.details() == errors.INVALID_USERNAME_OR_PASSWORD 

367 

368 with session_scope() as session: 

369 assert ( 

370 session.execute( 

371 select(func.count()) 

372 .select_from(User) 

373 .where(User.new_email_token_created <= func.now()) 

374 .where(User.new_email_token_expiry >= func.now()) 

375 ) 

376 ).scalar_one() == 0 

377 

378 

379def test_ChangeEmail_invalid_email(db, fast_passwords): 

380 password = random_hex() 

381 user, token = generate_user(hashed_password=hash_password(password)) 

382 

383 with account_session(token) as account: 

384 with pytest.raises(grpc.RpcError) as e: 

385 account.ChangeEmail( 

386 account_pb2.ChangeEmailReq( 

387 password=wrappers_pb2.StringValue(value=password), 

388 new_email="not a real email", 

389 ) 

390 ) 

391 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

392 assert e.value.details() == errors.INVALID_EMAIL 

393 

394 with session_scope() as session: 

395 assert ( 

396 session.execute( 

397 select(func.count()) 

398 .select_from(User) 

399 .where(User.new_email_token_created <= func.now()) 

400 .where(User.new_email_token_expiry >= func.now()) 

401 ) 

402 ).scalar_one() == 0 

403 

404 

405def test_ChangeEmail_email_in_use(db, fast_passwords): 

406 password = random_hex() 

407 user, token = generate_user(hashed_password=hash_password(password)) 

408 user2, token2 = generate_user(hashed_password=hash_password(password)) 

409 

410 with account_session(token) as account: 

411 with pytest.raises(grpc.RpcError) as e: 

412 account.ChangeEmail( 

413 account_pb2.ChangeEmailReq( 

414 password=wrappers_pb2.StringValue(value=password), 

415 new_email=user2.email, 

416 ) 

417 ) 

418 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

419 assert e.value.details() == errors.INVALID_EMAIL 

420 

421 with session_scope() as session: 

422 assert ( 

423 session.execute( 

424 select(func.count()) 

425 .select_from(User) 

426 .where(User.new_email_token_created <= func.now()) 

427 .where(User.new_email_token_expiry >= func.now()) 

428 ) 

429 ).scalar_one() == 0 

430 

431 

432def test_ChangeEmail_no_change(db, fast_passwords): 

433 password = random_hex() 

434 user, token = generate_user(hashed_password=hash_password(password)) 

435 

436 with account_session(token) as account: 

437 with pytest.raises(grpc.RpcError) as e: 

438 account.ChangeEmail( 

439 account_pb2.ChangeEmailReq( 

440 password=wrappers_pb2.StringValue(value=password), 

441 new_email=user.email, 

442 ) 

443 ) 

444 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

445 assert e.value.details() == errors.INVALID_EMAIL 

446 

447 with session_scope() as session: 

448 assert ( 

449 session.execute( 

450 select(func.count()) 

451 .select_from(User) 

452 .where(User.new_email_token_created <= func.now()) 

453 .where(User.new_email_token_expiry >= func.now()) 

454 ) 

455 ).scalar_one() == 0 

456 

457 

458def test_ChangeEmail_wrong_token(db, fast_passwords): 

459 password = random_hex() 

460 new_email = f"{random_hex()}@couchers.org.invalid" 

461 user, token = generate_user(hashed_password=None) 

462 

463 with account_session(token) as account: 

464 account.ChangeEmail( 

465 account_pb2.ChangeEmailReq( 

466 new_email=new_email, 

467 ) 

468 ) 

469 

470 with auth_api_session() as (auth_api, metadata_interceptor): 

471 with pytest.raises(grpc.RpcError) as e: 

472 res = auth_api.ConfirmChangeEmail( 

473 auth_pb2.ConfirmChangeEmailReq( 

474 change_email_token="wrongtoken", 

475 ) 

476 ) 

477 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

478 assert e.value.details() == errors.INVALID_TOKEN 

479 

480 with session_scope() as session: 

481 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

482 assert user_updated.email == user.email 

483 

484 

485def test_ChangeEmail_tokens_two_hour_window(db): 

486 def two_hours_one_minute_in_future(): 

487 return now() + timedelta(hours=2, minutes=1) 

488 

489 def one_minute_ago(): 

490 return now() - timedelta(minutes=1) 

491 

492 new_email = f"{random_hex()}@couchers.org.invalid" 

493 user, token = generate_user(hashed_password=None) 

494 

495 with account_session(token) as account: 

496 account.ChangeEmail( 

497 account_pb2.ChangeEmailReq( 

498 new_email=new_email, 

499 ) 

500 ) 

501 

502 with session_scope() as session: 

503 user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

504 old_email_token = user.old_email_token 

505 new_email_token = user.new_email_token 

506 

507 with patch("couchers.servicers.auth.now", one_minute_ago): 

508 with auth_api_session() as (auth_api, metadata_interceptor): 

509 with pytest.raises(grpc.RpcError) as e: 

510 auth_api.ConfirmChangeEmail( 

511 auth_pb2.ConfirmChangeEmailReq( 

512 change_email_token=old_email_token, 

513 ) 

514 ) 

515 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

516 assert e.value.details() == errors.INVALID_TOKEN 

517 

518 with pytest.raises(grpc.RpcError) as e: 

519 auth_api.ConfirmChangeEmail( 

520 auth_pb2.ConfirmChangeEmailReq( 

521 change_email_token=new_email_token, 

522 ) 

523 ) 

524 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

525 assert e.value.details() == errors.INVALID_TOKEN 

526 

527 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

528 with auth_api_session() as (auth_api, metadata_interceptor): 

529 with pytest.raises(grpc.RpcError) as e: 

530 auth_api.ConfirmChangeEmail( 

531 auth_pb2.ConfirmChangeEmailReq( 

532 change_email_token=old_email_token, 

533 ) 

534 ) 

535 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

536 assert e.value.details() == errors.INVALID_TOKEN 

537 

538 with pytest.raises(grpc.RpcError) as e: 

539 auth_api.ConfirmChangeEmail( 

540 auth_pb2.ConfirmChangeEmailReq( 

541 change_email_token=new_email_token, 

542 ) 

543 ) 

544 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

545 assert e.value.details() == errors.INVALID_TOKEN 

546 

547 

548def test_ChangeEmail_has_password(db, fast_passwords): 

549 password = random_hex() 

550 new_email = f"{random_hex()}@couchers.org.invalid" 

551 user, token = generate_user(hashed_password=hash_password(password)) 

552 

553 with account_session(token) as account: 

554 account.ChangeEmail( 

555 account_pb2.ChangeEmailReq( 

556 password=wrappers_pb2.StringValue(value=password), 

557 new_email=new_email, 

558 ) 

559 ) 

560 

561 with session_scope() as session: 

562 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

563 assert user_updated.email == user.email 

564 assert user_updated.new_email == new_email 

565 assert user_updated.old_email_token is None 

566 assert not user_updated.old_email_token_created 

567 assert not user_updated.old_email_token_expiry 

568 assert not user_updated.need_to_confirm_via_old_email 

569 assert user_updated.new_email_token is not None 

570 assert user_updated.new_email_token_created <= now() 

571 assert user_updated.new_email_token_expiry >= now() 

572 assert user_updated.need_to_confirm_via_new_email 

573 

574 token = user_updated.new_email_token 

575 

576 with auth_api_session() as (auth_api, metadata_interceptor): 

577 res = auth_api.ConfirmChangeEmail( 

578 auth_pb2.ConfirmChangeEmailReq( 

579 change_email_token=token, 

580 ) 

581 ) 

582 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_SUCCESS 

583 

584 with session_scope() as session: 

585 user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

586 assert user.email == new_email 

587 assert user.new_email is None 

588 assert user.old_email_token is None 

589 assert user.old_email_token_created is None 

590 assert user.old_email_token_expiry is None 

591 assert not user.need_to_confirm_via_old_email 

592 assert user.new_email_token is None 

593 assert user.new_email_token_created is None 

594 assert user.new_email_token_expiry is None 

595 assert not user.need_to_confirm_via_new_email 

596 

597 

598def test_ChangeEmail_no_password_confirm_with_old_email_first(db): 

599 new_email = f"{random_hex()}@couchers.org.invalid" 

600 user, token = generate_user(hashed_password=None) 

601 

602 with account_session(token) as account: 

603 account.ChangeEmail( 

604 account_pb2.ChangeEmailReq( 

605 new_email=new_email, 

606 ) 

607 ) 

608 

609 with session_scope() as session: 

610 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

611 assert user_updated.email == user.email 

612 assert user_updated.new_email == new_email 

613 assert user_updated.old_email_token is not None 

614 assert user_updated.old_email_token_created <= now() 

615 assert user_updated.old_email_token_expiry >= now() 

616 assert user_updated.need_to_confirm_via_old_email 

617 assert user_updated.new_email_token is not None 

618 assert user_updated.new_email_token_created <= now() 

619 assert user_updated.new_email_token_expiry >= now() 

620 assert user_updated.need_to_confirm_via_new_email 

621 

622 token = user_updated.old_email_token 

623 

624 with auth_api_session() as (auth_api, metadata_interceptor): 

625 res = auth_api.ConfirmChangeEmail( 

626 auth_pb2.ConfirmChangeEmailReq( 

627 change_email_token=token, 

628 ) 

629 ) 

630 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_REQUIRES_CONFIRMATION_FROM_NEW_EMAIL 

631 

632 with session_scope() as session: 

633 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

634 assert user_updated.email == user.email 

635 assert user_updated.new_email == new_email 

636 assert user_updated.old_email_token is None 

637 assert user_updated.old_email_token_created is None 

638 assert user_updated.old_email_token_expiry is None 

639 assert not user_updated.need_to_confirm_via_old_email 

640 assert user_updated.new_email_token is not None 

641 assert user_updated.new_email_token_created <= now() 

642 assert user_updated.new_email_token_expiry >= now() 

643 assert user_updated.need_to_confirm_via_new_email 

644 

645 token = user_updated.new_email_token 

646 

647 with auth_api_session() as (auth_api, metadata_interceptor): 

648 res = auth_api.ConfirmChangeEmail( 

649 auth_pb2.ConfirmChangeEmailReq( 

650 change_email_token=token, 

651 ) 

652 ) 

653 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_SUCCESS 

654 

655 with session_scope() as session: 

656 user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

657 assert user.email == new_email 

658 assert user.new_email is None 

659 assert user.old_email_token is None 

660 assert user.old_email_token_created is None 

661 assert user.old_email_token_expiry is None 

662 assert not user.need_to_confirm_via_old_email 

663 assert user.new_email_token is None 

664 assert user.new_email_token_created is None 

665 assert user.new_email_token_expiry is None 

666 assert not user.need_to_confirm_via_new_email 

667 

668 

669def test_ChangeEmail_no_password_confirm_with_new_email_first(db): 

670 new_email = f"{random_hex()}@couchers.org.invalid" 

671 user, token = generate_user(hashed_password=None) 

672 

673 with account_session(token) as account: 

674 account.ChangeEmail( 

675 account_pb2.ChangeEmailReq( 

676 new_email=new_email, 

677 ) 

678 ) 

679 

680 with session_scope() as session: 

681 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

682 assert user_updated.email == user.email 

683 assert user_updated.new_email == new_email 

684 assert user_updated.old_email_token is not None 

685 assert user_updated.old_email_token_created <= now() 

686 assert user_updated.old_email_token_expiry >= now() 

687 assert user_updated.need_to_confirm_via_old_email 

688 assert user_updated.new_email_token is not None 

689 assert user_updated.new_email_token_created <= now() 

690 assert user_updated.new_email_token_expiry >= now() 

691 assert user_updated.need_to_confirm_via_new_email 

692 

693 token = user_updated.new_email_token 

694 

695 with auth_api_session() as (auth_api, metadata_interceptor): 

696 res = auth_api.ConfirmChangeEmail( 

697 auth_pb2.ConfirmChangeEmailReq( 

698 change_email_token=token, 

699 ) 

700 ) 

701 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_REQUIRES_CONFIRMATION_FROM_OLD_EMAIL 

702 

703 with session_scope() as session: 

704 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

705 assert user_updated.email == user.email 

706 assert user_updated.new_email == new_email 

707 assert user_updated.old_email_token is not None 

708 assert user_updated.old_email_token_created <= now() 

709 assert user_updated.old_email_token_expiry >= now() 

710 assert user_updated.need_to_confirm_via_old_email 

711 assert user_updated.new_email_token is None 

712 assert user_updated.new_email_token_created is None 

713 assert user_updated.new_email_token_expiry is None 

714 assert not user_updated.need_to_confirm_via_new_email 

715 

716 token = user_updated.old_email_token 

717 

718 with auth_api_session() as (auth_api, metadata_interceptor): 

719 res = auth_api.ConfirmChangeEmail( 

720 auth_pb2.ConfirmChangeEmailReq( 

721 change_email_token=token, 

722 ) 

723 ) 

724 assert res.state == auth_pb2.EMAIL_CONFIRMATION_STATE_SUCCESS 

725 

726 with session_scope() as session: 

727 user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

728 assert user.email == new_email 

729 assert user.new_email is None 

730 assert user.old_email_token is None 

731 assert user.old_email_token_created is None 

732 assert user.old_email_token_expiry is None 

733 assert not user.need_to_confirm_via_old_email 

734 assert user.new_email_token is None 

735 assert user.new_email_token_created is None 

736 assert user.new_email_token_expiry is None 

737 assert not user.need_to_confirm_via_new_email 

738 

739 

740def test_ChangeEmail_sends_proper_emails_has_password(db, fast_passwords): 

741 password = random_hex() 

742 new_email = f"{random_hex()}@couchers.org.invalid" 

743 user, token = generate_user(hashed_password=hash_password(password)) 

744 

745 with account_session(token) as account: 

746 account.ChangeEmail( 

747 account_pb2.ChangeEmailReq( 

748 password=wrappers_pb2.StringValue(value=password), 

749 new_email=new_email, 

750 ) 

751 ) 

752 

753 with session_scope() as session: 

754 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

755 assert len(jobs) == 2 

756 payload_for_notification_email = jobs[0].payload 

757 payload_for_confirmation_email_new_address = jobs[1].payload 

758 unique_string_notification_email_as_bytes = b"You requested that your email on Couchers.org be changed to" 

759 unique_string_for_confirmation_email_new_email_address_as_bytes = ( 

760 b"You requested that your email be changed to this email address on Couchers.org" 

761 ) 

762 assert unique_string_notification_email_as_bytes in payload_for_notification_email 

763 assert ( 

764 unique_string_for_confirmation_email_new_email_address_as_bytes 

765 in payload_for_confirmation_email_new_address 

766 ) 

767 

768 

769def test_ChangeEmail_sends_proper_emails_no_password(db): 

770 new_email = f"{random_hex()}@couchers.org.invalid" 

771 user, token = generate_user(hashed_password=None) 

772 

773 with account_session(token) as account: 

774 account.ChangeEmail( 

775 account_pb2.ChangeEmailReq( 

776 new_email=new_email, 

777 ) 

778 ) 

779 

780 with session_scope() as session: 

781 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

782 assert len(jobs) == 2 

783 payload_for_confirmation_email_old_address = jobs[0].payload 

784 payload_for_confirmation_email_new_address = jobs[1].payload 

785 unique_string_for_confirmation_email_old_address_as_bytes = ( 

786 b"You requested that your email be changed on Couchers.org" 

787 ) 

788 unique_string_for_confirmation_email_new_email_address_as_bytes = ( 

789 b"You requested that your email be changed to this email address on Couchers.org" 

790 ) 

791 assert unique_string_for_confirmation_email_old_address_as_bytes in payload_for_confirmation_email_old_address 

792 assert ( 

793 unique_string_for_confirmation_email_new_email_address_as_bytes 

794 in payload_for_confirmation_email_new_address 

795 ) 

796 

797 

798def test_contributor_form(db): 

799 user, token = generate_user() 

800 

801 with account_session(token) as account: 

802 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

803 assert not res.filled_contributor_form 

804 

805 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

806 

807 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

808 assert res.filled_contributor_form 

809 

810 

811def test_DeleteAccount_start(db): 

812 user, token = generate_user() 

813 

814 with account_session(token) as account: 

815 with patch("couchers.email.queue_email") as mock: 

816 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

817 mock.assert_called_once() 

818 (_, _, _, subject, _, _), _ = mock.call_args 

819 assert subject == "[TEST] Confirm your Couchers.org account deletion" 

820 

821 with session_scope() as session: 

822 deletion_token = session.execute( 

823 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

824 ).scalar_one() 

825 

826 assert deletion_token.is_valid 

827 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted 

828 

829 

830def test_DeleteAccount_message_storage(db): 

831 user, token = generate_user() 

832 

833 with account_session(token) as account: 

834 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

835 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

836 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

837 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

838 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

839 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

840 

841 with session_scope() as session: 

842 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

843 

844 

845def test_full_delete_account_with_recovery(db): 

846 user, token = generate_user() 

847 user_id = user.id 

848 

849 with account_session(token) as account: 

850 with pytest.raises(grpc.RpcError) as e: 

851 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

852 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

853 assert e.value.details() == errors.MUST_CONFIRM_ACCOUNT_DELETE 

854 

855 # Check the right email is sent 

856 with patch("couchers.email.queue_email") as mock: 

857 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

858 mock.assert_called_once() 

859 (_, _, _, subject, _, _), _ = mock.call_args 

860 assert subject == "[TEST] Confirm your Couchers.org account deletion" 

861 

862 with session_scope() as session: 

863 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

864 token = token_o.token 

865 

866 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

867 assert token_o.user == user 

868 assert not user.is_deleted 

869 assert not user.undelete_token 

870 assert not user.undelete_until 

871 

872 with auth_api_session() as (auth_api, metadata_interceptor): 

873 auth_api.ConfirmDeleteAccount( 

874 auth_pb2.ConfirmDeleteAccountReq( 

875 token=token, 

876 ) 

877 ) 

878 

879 with session_scope() as session: 

880 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

881 

882 with session_scope() as session: 

883 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

884 assert user.is_deleted 

885 assert user.undelete_token 

886 assert user.undelete_until > now() 

887 

888 undelete_token = user.undelete_token 

889 

890 with auth_api_session() as (auth_api, metadata_interceptor): 

891 auth_api.RecoverAccount( 

892 auth_pb2.RecoverAccountReq( 

893 token=undelete_token, 

894 ) 

895 ) 

896 

897 with session_scope() as session: 

898 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

899 

900 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

901 assert not user.is_deleted 

902 assert not user.undelete_token 

903 assert not user.undelete_until 

904 

905 

906def test_multiple_delete_tokens(db): 

907 """ 

908 Make sure deletion tokens are deleted on delete 

909 """ 

910 user, token = generate_user() 

911 user_id = user.id 

912 

913 with account_session(token) as account: 

914 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

915 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

916 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

917 

918 with session_scope() as session: 

919 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

920 token = session.execute(select(AccountDeletionToken)).scalars().first().token 

921 

922 with auth_api_session() as (auth_api, metadata_interceptor): 

923 auth_api.ConfirmDeleteAccount( 

924 auth_pb2.ConfirmDeleteAccountReq( 

925 token=token, 

926 ) 

927 ) 

928 

929 with session_scope() as session: 

930 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none()