Coverage for src/tests/test_account.py: 99%

701 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1from datetime import UTC, date, datetime, timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy import update 

8from sqlalchemy.sql import func 

9 

10from couchers import constants, urls 

11from couchers.crypto import hash_password, random_hex 

12from couchers.db import session_scope 

13from couchers.materialized_views import refresh_materialized_views_rapid 

14from couchers.models import ( 

15 AccountDeletionReason, 

16 AccountDeletionToken, 

17 BackgroundJob, 

18 InviteCode, 

19 Upload, 

20 User, 

21 Volunteer, 

22) 

23from couchers.proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2 

24from couchers.sql import couchers_select as select 

25from couchers.utils import now, today 

26from tests.test_fixtures import ( # noqa 

27 account_session, 

28 auth_api_session, 

29 email_fields, 

30 generate_user, 

31 mock_notification_email, 

32 moderator, 

33 process_jobs, 

34 public_session, 

35 real_account_session, 

36 requests_session, 

37) 

38from tests.test_requests import valid_request_text 

39 

40 

41@pytest.fixture(autouse=True) 

42def _(testconfig): 

43 pass 

44 

45 

46def test_GetAccountInfo(db, fast_passwords): 

47 # with password 

48 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

49 

50 with account_session(token1) as account: 

51 res = account.GetAccountInfo(empty_pb2.Empty()) 

52 assert res.email == "user@couchers.invalid" 

53 assert res.username == user1.username 

54 assert not res.has_strong_verification 

55 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

56 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

57 assert not res.is_superuser 

58 assert res.ui_language_preference == "" 

59 assert not res.is_volunteer 

60 

61 

62def test_donation_banner_no_drive(db): 

63 """Test that the banner is not shown when DONATION_DRIVE_START is None""" 

64 

65 original_value = constants.DONATION_DRIVE_START 

66 try: 

67 constants.DONATION_DRIVE_START = None 

68 

69 # User has donated, but the drive is disabled, so the banner should not show 

70 user, token = generate_user() 

71 

72 with account_session(token) as account: 

73 res = account.GetAccountInfo(empty_pb2.Empty()) 

74 assert not res.should_show_donation_banner 

75 finally: 

76 constants.DONATION_DRIVE_START = original_value 

77 

78 

79def test_donation_banner_never_donated(db): 

80 """Test that banner is shown when user has never donated and drive is active""" 

81 

82 original_value = constants.DONATION_DRIVE_START 

83 try: 

84 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

85 constants.DONATION_DRIVE_START = drive_start 

86 

87 # Explicitly set last_donated=None since generate_user defaults to now() 

88 user, token = generate_user(last_donated=None) 

89 

90 with account_session(token) as account: 

91 res = account.GetAccountInfo(empty_pb2.Empty()) 

92 assert res.should_show_donation_banner 

93 finally: 

94 constants.DONATION_DRIVE_START = original_value 

95 

96 

97def test_donation_banner_donated_before_drive(db): 

98 """Test that banner is shown when user donated before drive start""" 

99 

100 original_value = constants.DONATION_DRIVE_START 

101 try: 

102 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

103 constants.DONATION_DRIVE_START = drive_start 

104 

105 user, token = generate_user() 

106 

107 # Set donation before drive start 

108 with session_scope() as session: 

109 last_donated = datetime(2025, 10, 15, tzinfo=UTC) # Before Nov 1 

110 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated)) 

111 

112 with account_session(token) as account: 

113 res = account.GetAccountInfo(empty_pb2.Empty()) 

114 assert res.should_show_donation_banner 

115 finally: 

116 constants.DONATION_DRIVE_START = original_value 

117 

118 

119def test_donation_banner_donated_after_drive(db): 

120 """Test that banner is not shown when user donated after drive start""" 

121 

122 original_value = constants.DONATION_DRIVE_START 

123 try: 

124 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

125 constants.DONATION_DRIVE_START = drive_start 

126 

127 user, token = generate_user() 

128 

129 # Set donation after drive start 

130 with session_scope() as session: 

131 last_donated = datetime(2025, 11, 15, tzinfo=UTC) # After Nov 1 

132 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated)) 

133 

134 with account_session(token) as account: 

135 res = account.GetAccountInfo(empty_pb2.Empty()) 

136 assert not res.should_show_donation_banner 

137 finally: 

138 constants.DONATION_DRIVE_START = original_value 

139 

140 

141def test_donation_banner_donated_exactly_at_drive_start(db): 

142 """Test that banner is not shown when user donated exactly at drive start time""" 

143 

144 original_value = constants.DONATION_DRIVE_START 

145 try: 

146 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

147 constants.DONATION_DRIVE_START = drive_start 

148 

149 user, token = generate_user() 

150 

151 # Set donation exactly at drive start 

152 with session_scope() as session: 

153 session.execute(update(User).where(User.id == user.id).values(last_donated=drive_start)) 

154 

155 with account_session(token) as account: 

156 res = account.GetAccountInfo(empty_pb2.Empty()) 

157 assert not res.should_show_donation_banner 

158 finally: 

159 constants.DONATION_DRIVE_START = original_value 

160 

161 

162def test_GetAccountInfo_regression(db): 

163 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

164 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

165 uploader_user, _ = generate_user() 

166 with session_scope() as session: 

167 key = random_hex(32) 

168 filename = random_hex(32) + ".jpg" 

169 session.add( 

170 Upload( 

171 key=key, 

172 filename=filename, 

173 creator_user_id=uploader_user.id, 

174 ) 

175 ) 

176 

177 user, token = generate_user(about_me=None, avatar_key=key) 

178 

179 with account_session(token) as account: 

180 res = account.GetAccountInfo(empty_pb2.Empty()) 

181 

182 

183def test_ChangePasswordV2_normal(db, fast_passwords, push_collector): 

184 # user has old password and is changing to new password 

185 old_password = random_hex() 

186 new_password = random_hex() 

187 user, token = generate_user(hashed_password=hash_password(old_password)) 

188 

189 with account_session(token) as account: 

190 with mock_notification_email() as mock: 

191 account.ChangePasswordV2( 

192 account_pb2.ChangePasswordV2Req( 

193 old_password=old_password, 

194 new_password=new_password, 

195 ) 

196 ) 

197 

198 mock.assert_called_once() 

199 assert email_fields(mock).subject == "[TEST] Your password was changed" 

200 

201 push_collector.assert_user_has_single_matching( 

202 user.id, title="Your password was changed", body="Your login password for Couchers.org was changed." 

203 ) 

204 

205 with session_scope() as session: 

206 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

207 assert updated_user.hashed_password == hash_password(new_password) 

208 

209 

210def test_ChangePasswordV2_regression(db, fast_passwords): 

211 # send_password_changed_email wasn't working 

212 # user has old password and is changing to new password 

213 old_password = random_hex() 

214 new_password = random_hex() 

215 user, token = generate_user(hashed_password=hash_password(old_password)) 

216 

217 with account_session(token) as account: 

218 account.ChangePasswordV2( 

219 account_pb2.ChangePasswordV2Req( 

220 old_password=old_password, 

221 new_password=new_password, 

222 ) 

223 ) 

224 

225 with session_scope() as session: 

226 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

227 assert updated_user.hashed_password == hash_password(new_password) 

228 

229 

230def test_ChangePasswordV2_normal_short_password(db, fast_passwords): 

231 # user has old password and is changing to new password, but used short password 

232 old_password = random_hex() 

233 new_password = random_hex(length=1) 

234 user, token = generate_user(hashed_password=hash_password(old_password)) 

235 

236 with account_session(token) as account: 

237 with pytest.raises(grpc.RpcError) as e: 

238 account.ChangePasswordV2( 

239 account_pb2.ChangePasswordV2Req( 

240 old_password=old_password, 

241 new_password=new_password, 

242 ) 

243 ) 

244 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

245 assert e.value.details() == "The password must be 8 or more characters long." 

246 

247 with session_scope() as session: 

248 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

249 assert updated_user.hashed_password == hash_password(old_password) 

250 

251 

252def test_ChangePasswordV2_normal_long_password(db, fast_passwords): 

253 # user has old password and is changing to new password, but used short password 

254 old_password = random_hex() 

255 new_password = random_hex(length=1000) 

256 user, token = generate_user(hashed_password=hash_password(old_password)) 

257 

258 with account_session(token) as account: 

259 with pytest.raises(grpc.RpcError) as e: 

260 account.ChangePasswordV2( 

261 account_pb2.ChangePasswordV2Req( 

262 old_password=old_password, 

263 new_password=new_password, 

264 ) 

265 ) 

266 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

267 assert e.value.details() == "The password must be less than 256 characters." 

268 

269 with session_scope() as session: 

270 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

271 assert updated_user.hashed_password == hash_password(old_password) 

272 

273 

274def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords): 

275 # user has old password and is changing to new password, but used insecure password 

276 old_password = random_hex() 

277 new_password = "12345678" 

278 user, token = generate_user(hashed_password=hash_password(old_password)) 

279 

280 with account_session(token) as account: 

281 with pytest.raises(grpc.RpcError) as e: 

282 account.ChangePasswordV2( 

283 account_pb2.ChangePasswordV2Req( 

284 old_password=old_password, 

285 new_password=new_password, 

286 ) 

287 ) 

288 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

289 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable." 

290 

291 with session_scope() as session: 

292 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

293 assert updated_user.hashed_password == hash_password(old_password) 

294 

295 

296def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords): 

297 # user has old password and is changing to new password, but used wrong old password 

298 old_password = random_hex() 

299 new_password = random_hex() 

300 user, token = generate_user(hashed_password=hash_password(old_password)) 

301 

302 with account_session(token) as account: 

303 with pytest.raises(grpc.RpcError) as e: 

304 account.ChangePasswordV2( 

305 account_pb2.ChangePasswordV2Req( 

306 old_password="wrong password", 

307 new_password=new_password, 

308 ) 

309 ) 

310 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

311 assert e.value.details() == "Wrong password." 

312 

313 with session_scope() as session: 

314 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

315 assert updated_user.hashed_password == hash_password(old_password) 

316 

317 

318def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords): 

319 # user has old password and called with empty body 

320 old_password = random_hex() 

321 user, token = generate_user(hashed_password=hash_password(old_password)) 

322 

323 with account_session(token) as account: 

324 with pytest.raises(grpc.RpcError) as e: 

325 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password)) 

326 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

327 assert e.value.details() == "The password must be 8 or more characters long." 

328 

329 with session_scope() as session: 

330 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

331 assert updated_user.hashed_password == hash_password(old_password) 

332 

333 

334def test_ChangeEmailV2_wrong_password(db, fast_passwords): 

335 password = random_hex() 

336 new_email = f"{random_hex()}@couchers.org.invalid" 

337 user, token = generate_user(hashed_password=hash_password(password)) 

338 

339 with account_session(token) as account: 

340 with pytest.raises(grpc.RpcError) as e: 

341 account.ChangeEmailV2( 

342 account_pb2.ChangeEmailV2Req( 

343 password="wrong password", 

344 new_email=new_email, 

345 ) 

346 ) 

347 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

348 assert e.value.details() == "Wrong password." 

349 

350 with session_scope() as session: 

351 assert ( 

352 session.execute( 

353 select(func.count()) 

354 .select_from(User) 

355 .where(User.new_email_token_created <= func.now()) 

356 .where(User.new_email_token_expiry >= func.now()) 

357 ) 

358 ).scalar_one() == 0 

359 

360 

361def test_ChangeEmailV2_wrong_email(db, fast_passwords): 

362 password = random_hex() 

363 new_email = f"{random_hex()}@couchers.org.invalid" 

364 user, token = generate_user(hashed_password=hash_password(password)) 

365 

366 with account_session(token) as account: 

367 with pytest.raises(grpc.RpcError) as e: 

368 account.ChangeEmailV2( 

369 account_pb2.ChangeEmailV2Req( 

370 password="wrong password", 

371 new_email=new_email, 

372 ) 

373 ) 

374 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

375 assert e.value.details() == "Wrong password." 

376 

377 with session_scope() as session: 

378 assert ( 

379 session.execute( 

380 select(func.count()) 

381 .select_from(User) 

382 .where(User.new_email_token_created <= func.now()) 

383 .where(User.new_email_token_expiry >= func.now()) 

384 ) 

385 ).scalar_one() == 0 

386 

387 

388def test_ChangeEmailV2_invalid_email(db, fast_passwords): 

389 password = random_hex() 

390 user, token = generate_user(hashed_password=hash_password(password)) 

391 

392 with account_session(token) as account: 

393 with pytest.raises(grpc.RpcError) as e: 

394 account.ChangeEmailV2( 

395 account_pb2.ChangeEmailV2Req( 

396 password=password, 

397 new_email="not a real email", 

398 ) 

399 ) 

400 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

401 assert e.value.details() == "Invalid email." 

402 

403 with session_scope() as session: 

404 assert ( 

405 session.execute( 

406 select(func.count()) 

407 .select_from(User) 

408 .where(User.new_email_token_created <= func.now()) 

409 .where(User.new_email_token_expiry >= func.now()) 

410 ) 

411 ).scalar_one() == 0 

412 

413 

414def test_ChangeEmailV2_email_in_use(db, fast_passwords): 

415 password = random_hex() 

416 user, token = generate_user(hashed_password=hash_password(password)) 

417 user2, token2 = generate_user(hashed_password=hash_password(password)) 

418 

419 with account_session(token) as account: 

420 with pytest.raises(grpc.RpcError) as e: 

421 account.ChangeEmailV2( 

422 account_pb2.ChangeEmailV2Req( 

423 password=password, 

424 new_email=user2.email, 

425 ) 

426 ) 

427 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

428 assert e.value.details() == "Invalid email." 

429 

430 with session_scope() as session: 

431 assert ( 

432 session.execute( 

433 select(func.count()) 

434 .select_from(User) 

435 .where(User.new_email_token_created <= func.now()) 

436 .where(User.new_email_token_expiry >= func.now()) 

437 ) 

438 ).scalar_one() == 0 

439 

440 

441def test_ChangeEmailV2_no_change(db, fast_passwords): 

442 password = random_hex() 

443 user, token = generate_user(hashed_password=hash_password(password)) 

444 

445 with account_session(token) as account: 

446 with pytest.raises(grpc.RpcError) as e: 

447 account.ChangeEmailV2( 

448 account_pb2.ChangeEmailV2Req( 

449 password=password, 

450 new_email=user.email, 

451 ) 

452 ) 

453 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

454 assert e.value.details() == "Invalid email." 

455 

456 with session_scope() as session: 

457 assert ( 

458 session.execute( 

459 select(func.count()) 

460 .select_from(User) 

461 .where(User.new_email_token_created <= func.now()) 

462 .where(User.new_email_token_expiry >= func.now()) 

463 ) 

464 ).scalar_one() == 0 

465 

466 

467def test_ChangeEmailV2_wrong_token(db, fast_passwords): 

468 password = random_hex() 

469 new_email = f"{random_hex()}@couchers.org.invalid" 

470 user, token = generate_user(hashed_password=hash_password(password)) 

471 

472 with account_session(token) as account: 

473 account.ChangeEmailV2( 

474 account_pb2.ChangeEmailV2Req( 

475 password=password, 

476 new_email=new_email, 

477 ) 

478 ) 

479 

480 with auth_api_session() as (auth_api, metadata_interceptor): 

481 with pytest.raises(grpc.RpcError) as e: 

482 res = auth_api.ConfirmChangeEmailV2( 

483 auth_pb2.ConfirmChangeEmailV2Req( 

484 change_email_token="wrongtoken", 

485 ) 

486 ) 

487 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

488 assert e.value.details() == "Invalid token." 

489 

490 with session_scope() as session: 

491 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

492 assert user_updated.email == user.email 

493 

494 

495def test_ChangeEmailV2_tokens_two_hour_window(db): 

496 def two_hours_one_minute_in_future(): 

497 return now() + timedelta(hours=2, minutes=1) 

498 

499 def one_minute_ago(): 

500 return now() - timedelta(minutes=1) 

501 

502 password = random_hex() 

503 new_email = f"{random_hex()}@couchers.org.invalid" 

504 user, token = generate_user(hashed_password=hash_password(password)) 

505 

506 with account_session(token) as account: 

507 account.ChangeEmailV2( 

508 account_pb2.ChangeEmailV2Req( 

509 password=password, 

510 new_email=new_email, 

511 ) 

512 ) 

513 

514 with session_scope() as session: 

515 new_email_token = session.execute(select(User.new_email_token).where(User.id == user.id)).scalar_one() 

516 

517 with patch("couchers.servicers.auth.now", one_minute_ago): 

518 with auth_api_session() as (auth_api, metadata_interceptor): 

519 with pytest.raises(grpc.RpcError) as e: 

520 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

521 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

522 assert e.value.details() == "Invalid token." 

523 

524 with pytest.raises(grpc.RpcError) as e: 

525 auth_api.ConfirmChangeEmailV2( 

526 auth_pb2.ConfirmChangeEmailV2Req( 

527 change_email_token=new_email_token, 

528 ) 

529 ) 

530 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

531 assert e.value.details() == "Invalid token." 

532 

533 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

534 with auth_api_session() as (auth_api, metadata_interceptor): 

535 with pytest.raises(grpc.RpcError) as e: 

536 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

537 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

538 assert e.value.details() == "Invalid token." 

539 

540 with pytest.raises(grpc.RpcError) as e: 

541 auth_api.ConfirmChangeEmailV2( 

542 auth_pb2.ConfirmChangeEmailV2Req( 

543 change_email_token=new_email_token, 

544 ) 

545 ) 

546 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

547 assert e.value.details() == "Invalid token." 

548 

549 

550def test_ChangeEmailV2(db, fast_passwords, push_collector): 

551 password = random_hex() 

552 new_email = f"{random_hex()}@couchers.org.invalid" 

553 user, token = generate_user(hashed_password=hash_password(password)) 

554 user_id = user.id 

555 

556 with account_session(token) as account: 

557 account.ChangeEmailV2( 

558 account_pb2.ChangeEmailV2Req( 

559 password=password, 

560 new_email=new_email, 

561 ) 

562 ) 

563 

564 with session_scope() as session: 

565 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one() 

566 assert user_updated.email == user.email 

567 assert user_updated.new_email == new_email 

568 assert user_updated.new_email_token is not None 

569 assert user_updated.new_email_token_created <= now() 

570 assert user_updated.new_email_token_expiry >= now() 

571 

572 token = user_updated.new_email_token 

573 

574 process_jobs() 

575 push_collector.assert_user_push_matches_fields( 

576 user_id, 

577 ix=0, 

578 title="An email change was initiated on your account", 

579 body=f"An email change to the email {new_email} was initiated on your account.", 

580 ) 

581 

582 with auth_api_session() as (auth_api, metadata_interceptor): 

583 res = auth_api.ConfirmChangeEmailV2( 

584 auth_pb2.ConfirmChangeEmailV2Req( 

585 change_email_token=token, 

586 ) 

587 ) 

588 

589 with session_scope() as session: 

590 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

591 assert user.email == new_email 

592 assert user.new_email is None 

593 assert user.new_email_token is None 

594 assert user.new_email_token_created is None 

595 assert user.new_email_token_expiry is None 

596 

597 process_jobs() 

598 push_collector.assert_user_push_matches_fields( 

599 user_id, 

600 ix=1, 

601 title="Email change completed", 

602 body="Your new email address has been verified.", 

603 ) 

604 

605 

606def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector): 

607 password = random_hex() 

608 new_email = f"{random_hex()}@couchers.org.invalid" 

609 user, token = generate_user(hashed_password=hash_password(password)) 

610 

611 with account_session(token) as account: 

612 account.ChangeEmailV2( 

613 account_pb2.ChangeEmailV2Req( 

614 password=password, 

615 new_email=new_email, 

616 ) 

617 ) 

618 

619 process_jobs() 

620 

621 with session_scope() as session: 

622 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

623 assert len(jobs) == 2 

624 payload_for_notification_email = jobs[0].payload 

625 payload_for_confirmation_email_new_address = jobs[1].payload 

626 uq_str1 = b"An email change to the email" 

627 uq_str2 = ( 

628 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is" 

629 ) 

630 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or ( 

631 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload 

632 ) 

633 

634 push_collector.assert_user_has_single_matching( 

635 user.id, 

636 title="An email change was initiated on your account", 

637 body=f"An email change to the email {new_email} was initiated on your account.", 

638 ) 

639 

640 

641def test_ChangeLanguagePreference(db, fast_passwords): 

642 # user changes from default to ISO 639-1 language code 

643 newLanguageCode = "zh" 

644 user, token = generate_user() 

645 

646 with real_account_session(token) as account: 

647 res = account.GetAccountInfo(empty_pb2.Empty()) 

648 assert res.ui_language_preference == "" 

649 

650 # call will have info about the request 

651 res, call = account.ChangeLanguagePreference.with_call( 

652 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=newLanguageCode) 

653 ) 

654 

655 # cookies are sent via initial metadata, so we check for it there 

656 for key, val in call.initial_metadata(): 

657 if key == "set-cookie": 

658 # the value of "set-cookie" will be the full cookie string, pull the key value from the string 

659 key_val = val.split(";")[0] 

660 if key_val == "NEXT_LOCALE=zh": 

661 # the changed language preference should also be sent to the backend 

662 res = account.GetAccountInfo(empty_pb2.Empty()) 

663 assert res.ui_language_preference == "zh" 

664 return 

665 raise Exception(f"Didn't find right cookie, got {call.initial_metadata()}") 

666 

667 

668def test_contributor_form(db): 

669 user, token = generate_user() 

670 

671 with account_session(token) as account: 

672 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

673 assert not res.filled_contributor_form 

674 

675 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

676 

677 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

678 assert res.filled_contributor_form 

679 

680 

681def test_DeleteAccount_start(db): 

682 user, token = generate_user() 

683 

684 with account_session(token) as account: 

685 with mock_notification_email() as mock: 

686 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

687 mock.assert_called_once() 

688 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

689 

690 with session_scope() as session: 

691 deletion_token: AccountDeletionToken = session.execute( 

692 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

693 ).scalar_one() 

694 

695 assert deletion_token.is_valid 

696 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted 

697 

698 

699def test_DeleteAccount_message_storage(db): 

700 user, token = generate_user() 

701 

702 with account_session(token) as account: 

703 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

704 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

705 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

706 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

707 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

708 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

709 

710 with session_scope() as session: 

711 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

712 

713 

714def test_full_delete_account_with_recovery(db, push_collector): 

715 user, token = generate_user() 

716 user_id = user.id 

717 

718 with account_session(token) as account: 

719 with pytest.raises(grpc.RpcError) as e: 

720 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

721 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

722 assert e.value.details() == "Please confirm your account deletion." 

723 

724 # Check the right email is sent 

725 with mock_notification_email() as mock: 

726 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

727 

728 push_collector.assert_user_push_matches_fields( 

729 user_id, 

730 ix=0, 

731 title="Account deletion initiated", 

732 body="Someone initiated the deletion of your Couchers.org account. To delete your account, please follow the link in the email we sent you.", 

733 ) 

734 

735 mock.assert_called_once() 

736 e = email_fields(mock) 

737 

738 with session_scope() as session: 

739 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

740 token = token_o.token 

741 

742 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

743 assert token_o.user == user_ 

744 assert not user_.is_deleted 

745 assert not user_.undelete_token 

746 assert not user_.undelete_until 

747 

748 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

749 assert e.recipient == user.email 

750 assert "account deletion" in e.subject.lower() 

751 assert token in e.plain 

752 assert token in e.html 

753 unique_string = "You requested that we delete your account from Couchers.org." 

754 assert unique_string in e.plain 

755 assert unique_string in e.html 

756 url = f"http://localhost:3000/delete-account?token={token}" 

757 assert url in e.plain 

758 assert url in e.html 

759 assert "support@couchers.org" in e.plain 

760 assert "support@couchers.org" in e.html 

761 

762 with mock_notification_email() as mock: 

763 with auth_api_session() as (auth_api, metadata_interceptor): 

764 auth_api.ConfirmDeleteAccount( 

765 auth_pb2.ConfirmDeleteAccountReq( 

766 token=token, 

767 ) 

768 ) 

769 

770 push_collector.assert_user_push_matches_fields( 

771 user_id, 

772 ix=1, 

773 title="Your Couchers.org account has been deleted", 

774 body="You can still undo this by following the link we emailed to you within 7 days.", 

775 ) 

776 

777 mock.assert_called_once() 

778 e = email_fields(mock) 

779 

780 with session_scope() as session: 

781 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

782 

783 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

784 assert user_.is_deleted 

785 assert user_.undelete_token 

786 assert user_.undelete_until > now() 

787 

788 undelete_token = user_.undelete_token 

789 

790 assert e.recipient == user.email 

791 assert "account has been deleted" in e.subject.lower() 

792 unique_string = "You have successfully deleted your account from Couchers.org." 

793 assert unique_string in e.plain 

794 assert unique_string in e.html 

795 assert "7 days" in e.plain 

796 assert "7 days" in e.html 

797 url = f"http://localhost:3000/recover-account?token={undelete_token}" 

798 assert url in e.plain 

799 assert url in e.html 

800 assert "support@couchers.org" in e.plain 

801 assert "support@couchers.org" in e.html 

802 

803 with mock_notification_email() as mock: 

804 with auth_api_session() as (auth_api, metadata_interceptor): 

805 auth_api.RecoverAccount( 

806 auth_pb2.RecoverAccountReq( 

807 token=undelete_token, 

808 ) 

809 ) 

810 

811 push_collector.assert_user_push_matches_fields( 

812 user_id, 

813 ix=2, 

814 title="Your Couchers.org account has been recovered!", 

815 body="We have recovered your Couchers.org account as per your request! Welcome back!", 

816 ) 

817 

818 mock.assert_called_once() 

819 e = email_fields(mock) 

820 

821 assert e.recipient == user.email 

822 assert "account has been recovered" in e.subject.lower() 

823 unique_string = "Your account on Couchers.org has been successfully recovered!" 

824 assert unique_string in e.plain 

825 assert unique_string in e.html 

826 assert "support@couchers.org" in e.plain 

827 assert "support@couchers.org" in e.html 

828 

829 with session_scope() as session: 

830 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

831 

832 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

833 assert not user.is_deleted 

834 assert not user.undelete_token 

835 assert not user.undelete_until 

836 

837 

838def test_multiple_delete_tokens(db): 

839 """ 

840 Make sure deletion tokens are deleted on delete 

841 """ 

842 user, token = generate_user() 

843 

844 with account_session(token) as account: 

845 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

846 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

847 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

848 

849 with session_scope() as session: 

850 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

851 token = session.execute(select(AccountDeletionToken).limit(1)).scalars().one_or_none().token 

852 

853 with auth_api_session() as (auth_api, metadata_interceptor): 

854 auth_api.ConfirmDeleteAccount( 

855 auth_pb2.ConfirmDeleteAccountReq( 

856 token=token, 

857 ) 

858 ) 

859 

860 with session_scope() as session: 

861 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

862 

863 

864def test_ListActiveSessions_pagination(db, fast_passwords): 

865 password = random_hex() 

866 user, token = generate_user(hashed_password=hash_password(password)) 

867 

868 with auth_api_session() as (auth_api, metadata_interceptor): 

869 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

870 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

871 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

872 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

873 

874 with real_account_session(token) as account: 

875 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3)) 

876 assert len(res.active_sessions) == 3 

877 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3)) 

878 assert len(res.active_sessions) == 2 

879 assert not res.next_page_token 

880 

881 

882def test_ListActiveSessions_details(db, fast_passwords): 

883 password = random_hex() 

884 user, token = generate_user(hashed_password=hash_password(password)) 

885 

886 ips_user_agents = [ 

887 ( 

888 "108.123.33.162", 

889 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1", 

890 ), 

891 ( 

892 "8.245.212.28", 

893 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36", 

894 ), 

895 ( 

896 "95.254.140.156", 

897 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", 

898 ), 

899 ] 

900 

901 for ip, user_agent in ips_user_agents: 

902 options = (("grpc.primary_user_agent", user_agent),) 

903 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor): 

904 auth_api.Authenticate( 

905 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),) 

906 ) 

907 

908 def dummy_geoip(ip_address): 

909 return { 

910 "108.123.33.162": "Chicago, United States", 

911 "8.245.212.28": "Sydney, Australia", 

912 }.get(ip_address) 

913 

914 with real_account_session(token) as account: 

915 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip): 

916 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

917 print(res) 

918 assert len(res.active_sessions) == 4 

919 

920 # this one currently making the API call 

921 assert res.active_sessions[0].operating_system == "Other" 

922 assert res.active_sessions[0].browser == "Other" 

923 assert res.active_sessions[0].device == "Other" 

924 assert res.active_sessions[0].approximate_location == "Unknown" 

925 assert res.active_sessions[0].is_current_session 

926 

927 assert res.active_sessions[1].operating_system == "Ubuntu" 

928 assert res.active_sessions[1].browser == "Firefox" 

929 assert res.active_sessions[1].device == "Other" 

930 assert res.active_sessions[1].approximate_location == "Unknown" 

931 assert not res.active_sessions[1].is_current_session 

932 

933 assert res.active_sessions[2].operating_system == "Android" 

934 assert res.active_sessions[2].browser == "Samsung Internet" 

935 assert res.active_sessions[2].device == "K" 

936 assert res.active_sessions[2].approximate_location == "Sydney, Australia" 

937 assert not res.active_sessions[2].is_current_session 

938 

939 assert res.active_sessions[3].operating_system == "iOS" 

940 assert res.active_sessions[3].browser == "Mobile Safari" 

941 assert res.active_sessions[3].device == "iPhone" 

942 assert res.active_sessions[3].approximate_location == "Chicago, United States" 

943 assert not res.active_sessions[3].is_current_session 

944 

945 

946def test_LogOutSession(db, fast_passwords): 

947 password = random_hex() 

948 user, token = generate_user(hashed_password=hash_password(password)) 

949 

950 with auth_api_session() as (auth_api, metadata_interceptor): 

951 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

952 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

953 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

954 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

955 

956 with real_account_session(token) as account: 

957 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

958 assert len(res.active_sessions) == 5 

959 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created)) 

960 

961 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

962 assert len(res2.active_sessions) == 4 

963 

964 # ignore the first session as it changes 

965 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:] 

966 

967 

968def test_LogOutOtherSessions(db, fast_passwords): 

969 password = random_hex() 

970 user, token = generate_user(hashed_password=hash_password(password)) 

971 

972 with auth_api_session() as (auth_api, metadata_interceptor): 

973 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

974 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

975 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

976 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

977 

978 with real_account_session(token) as account: 

979 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

980 assert len(res.active_sessions) == 5 

981 with pytest.raises(grpc.RpcError) as e: 

982 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False)) 

983 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

984 assert e.value.details() == "Please confirm you want to log out of other sessions." 

985 

986 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True)) 

987 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

988 assert len(res.active_sessions) == 1 

989 

990 

991def test_CreateInviteCode(db): 

992 user, token = generate_user() 

993 

994 with account_session(token) as account: 

995 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()) 

996 code = res.code 

997 assert len(code) == 8 

998 

999 with session_scope() as session: 

1000 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one() 

1001 assert invite.creator_user_id == user.id 

1002 assert invite.disabled is None 

1003 assert res.url == urls.invite_code_link(code=res.code) 

1004 

1005 

1006def test_DisableInviteCode(db): 

1007 user, token = generate_user() 

1008 code = "TEST1234" 

1009 with session_scope() as session: 

1010 session.add(InviteCode(id=code, creator_user_id=user.id)) 

1011 

1012 with account_session(token) as account: 

1013 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code)) 

1014 

1015 with session_scope() as session: 

1016 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one() 

1017 assert invite.disabled is not None 

1018 

1019 

1020def test_ListInviteCodes(db): 

1021 user, token = generate_user() 

1022 another_user, _ = generate_user() 

1023 

1024 code = "LIST1234" 

1025 with session_scope() as session: 

1026 session.add(InviteCode(id=code, creator_user_id=user.id)) 

1027 session.execute(update(User).where(User.id == another_user.id).values(invite_code_id=code)) 

1028 

1029 with account_session(token) as account: 

1030 res = account.ListInviteCodes(empty_pb2.Empty()) 

1031 assert len(res.invite_codes) == 1 

1032 assert res.invite_codes[0].code == code 

1033 assert res.invite_codes[0].uses == 1 

1034 assert res.invite_codes[0].url == urls.invite_code_link(code=code) 

1035 

1036 

1037def test_reminders(db, moderator): 

1038 # the strong verification reminder's absence is tested in test_strong_verification.py 

1039 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite 

1040 # we use LiteUser, so remember to refresh materialized views 

1041 user, token = generate_user(complete_profile=False) 

1042 complete_user, complete_token = generate_user(complete_profile=True) 

1043 req_user1, req_user_token1 = generate_user(complete_profile=True) 

1044 req_user2, req_user_token2 = generate_user(complete_profile=True) 

1045 

1046 refresh_materialized_views_rapid(None) 

1047 with account_session(complete_token) as account: 

1048 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1049 "complete_verification_reminder" 

1050 ] 

1051 with account_session(token) as account: 

1052 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1053 "complete_profile_reminder", 

1054 "complete_verification_reminder", 

1055 ] 

1056 

1057 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

1058 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

1059 with requests_session(req_user_token1) as api: 

1060 host_request1_id = api.CreateHostRequest( 

1061 requests_pb2.CreateHostRequestReq( 

1062 host_user_id=user.id, 

1063 from_date=today_plus_2, 

1064 to_date=today_plus_3, 

1065 text=valid_request_text("Test request 1"), 

1066 ) 

1067 ).host_request_id 

1068 moderator.approve_host_request(host_request1_id) 

1069 

1070 with account_session(token) as account: 

1071 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1072 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1073 "respond_to_host_request_reminder", 

1074 "complete_profile_reminder", 

1075 "complete_verification_reminder", 

1076 ] 

1077 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1078 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1079 

1080 with requests_session(req_user_token2) as api: 

1081 host_request2_id = api.CreateHostRequest( 

1082 requests_pb2.CreateHostRequestReq( 

1083 host_user_id=user.id, 

1084 from_date=today_plus_2, 

1085 to_date=today_plus_3, 

1086 text=valid_request_text("Test request 2"), 

1087 ) 

1088 ).host_request_id 

1089 moderator.approve_host_request(host_request2_id) 

1090 

1091 refresh_materialized_views_rapid(None) 

1092 with account_session(token) as account: 

1093 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1094 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1095 "respond_to_host_request_reminder", 

1096 "respond_to_host_request_reminder", 

1097 "complete_profile_reminder", 

1098 "complete_verification_reminder", 

1099 ] 

1100 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1101 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1102 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id 

1103 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1104 

1105 with requests_session(req_user_token1) as api: 

1106 host_request3_id = api.CreateHostRequest( 

1107 requests_pb2.CreateHostRequestReq( 

1108 host_user_id=user.id, 

1109 from_date=today_plus_2, 

1110 to_date=today_plus_3, 

1111 text=valid_request_text("Test request 3"), 

1112 ) 

1113 ).host_request_id 

1114 moderator.approve_host_request(host_request3_id) 

1115 

1116 refresh_materialized_views_rapid(None) 

1117 with account_session(token) as account: 

1118 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1119 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1120 "respond_to_host_request_reminder", 

1121 "respond_to_host_request_reminder", 

1122 "respond_to_host_request_reminder", 

1123 "complete_profile_reminder", 

1124 "complete_verification_reminder", 

1125 ] 

1126 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1127 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1128 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id 

1129 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1130 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id 

1131 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1132 

1133 # accept req 

1134 with requests_session(token) as api: 

1135 api.RespondHostRequest( 

1136 requests_pb2.RespondHostRequestReq( 

1137 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED 

1138 ) 

1139 ) 

1140 

1141 refresh_materialized_views_rapid(None) 

1142 with account_session(token) as account: 

1143 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1144 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1145 "respond_to_host_request_reminder", 

1146 "respond_to_host_request_reminder", 

1147 "complete_profile_reminder", 

1148 "complete_verification_reminder", 

1149 ] 

1150 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id 

1151 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1152 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id 

1153 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1154 

1155 

1156def test_volunteer_stuff(db): 

1157 # taken from couchers/app/backend/resources/badges.json 

1158 board_member_id = 8347 

1159 

1160 # with password 

1161 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam", id=board_member_id) 

1162 

1163 with account_session(token) as account: 

1164 res = account.GetAccountInfo(empty_pb2.Empty()) 

1165 assert not res.is_volunteer 

1166 

1167 with pytest.raises(grpc.RpcError) as e: 

1168 account.GetMyVolunteerInfo(empty_pb2.Empty()) 

1169 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1170 assert ( 

1171 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us." 

1172 ) 

1173 

1174 with pytest.raises(grpc.RpcError) as e: 

1175 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq()) 

1176 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1177 assert ( 

1178 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us." 

1179 ) 

1180 

1181 with session_scope() as session: 

1182 session.add( 

1183 Volunteer( 

1184 user_id=user.id, 

1185 display_name="Great Volunteer", 

1186 display_location="The Bitbucket", 

1187 role="Lead Tester", 

1188 started_volunteering=date(2020, 6, 1), 

1189 show_on_team_page=True, 

1190 ) 

1191 ) 

1192 

1193 with account_session(token) as account: 

1194 res = account.GetAccountInfo(empty_pb2.Empty()) 

1195 assert res.is_volunteer 

1196 

1197 res = account.GetMyVolunteerInfo(empty_pb2.Empty()) 

1198 

1199 assert res.display_name == "Great Volunteer" 

1200 assert res.display_location == "The Bitbucket" 

1201 assert res.role == "Lead Tester" 

1202 assert res.started_volunteering == "2020-06-01" 

1203 assert not res.stopped_volunteering 

1204 assert res.show_on_team_page 

1205 assert res.link_type == "couchers" 

1206 assert res.link_text == "@tester" 

1207 assert res.link_url == "http://localhost:3000/user/tester" 

1208 

1209 res = account.UpdateMyVolunteerInfo( 

1210 account_pb2.UpdateMyVolunteerInfoReq( 

1211 display_name=wrappers_pb2.StringValue(value=""), 

1212 link_type=wrappers_pb2.StringValue(value="website"), 

1213 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"), 

1214 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"), 

1215 ) 

1216 ) 

1217 

1218 assert res.display_name == "" 

1219 assert res.display_location == "The Bitbucket" 

1220 assert res.role == "Lead Tester" 

1221 assert res.started_volunteering == "2020-06-01" 

1222 assert not res.stopped_volunteering 

1223 assert res.show_on_team_page 

1224 assert res.link_type == "website" 

1225 assert res.link_text == "testervontester.com.invalid" 

1226 assert res.link_url == "https://www.testervontester.com.invalid/" 

1227 res = account.UpdateMyVolunteerInfo( 

1228 account_pb2.UpdateMyVolunteerInfoReq( 

1229 display_name=wrappers_pb2.StringValue(value=""), 

1230 link_type=wrappers_pb2.StringValue(value="linkedin"), 

1231 link_text=wrappers_pb2.StringValue(value="tester-vontester"), 

1232 ) 

1233 ) 

1234 assert res.display_name == "" 

1235 assert res.display_location == "The Bitbucket" 

1236 assert res.role == "Lead Tester" 

1237 assert res.started_volunteering == "2020-06-01" 

1238 assert not res.stopped_volunteering 

1239 assert res.show_on_team_page 

1240 assert res.link_type == "linkedin" 

1241 assert res.link_text == "tester-vontester" 

1242 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/" 

1243 

1244 res = account.UpdateMyVolunteerInfo( 

1245 account_pb2.UpdateMyVolunteerInfoReq( 

1246 display_name=wrappers_pb2.StringValue(value="Tester"), 

1247 display_location=wrappers_pb2.StringValue(value=""), 

1248 link_type=wrappers_pb2.StringValue(value="email"), 

1249 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"), 

1250 ) 

1251 ) 

1252 assert res.display_name == "Tester" 

1253 assert res.display_location == "" 

1254 assert res.role == "Lead Tester" 

1255 assert res.started_volunteering == "2020-06-01" 

1256 assert not res.stopped_volunteering 

1257 assert res.show_on_team_page 

1258 assert res.link_type == "email" 

1259 assert res.link_text == "tester@vontester.com.invalid" 

1260 assert res.link_url == "mailto:tester@vontester.com.invalid" 

1261 

1262 refresh_materialized_views_rapid(None) 

1263 

1264 with public_session() as public: 

1265 res = public.GetVolunteers(empty_pb2.Empty()) 

1266 assert len(res.current_volunteers) == 1 

1267 v = res.current_volunteers[0] 

1268 assert v.name == "Tester" 

1269 assert v.username == "tester" 

1270 assert v.is_board_member 

1271 assert v.role == "Lead Tester" 

1272 assert v.location == "Amsterdam" 

1273 assert v.img.startswith("http://localhost:5001/img/thumbnail/") 

1274 assert v.link_type == "email" 

1275 assert v.link_text == "tester@vontester.com.invalid" 

1276 assert v.link_url == "mailto:tester@vontester.com.invalid"