Coverage for app / backend / src / tests / test_account.py: 100%

698 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1from datetime import UTC, date, datetime, timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy import select, update 

8from sqlalchemy.sql import func 

9 

10from couchers import urls 

11from couchers.crypto import hash_password, random_hex 

12from couchers.db import session_scope 

13from couchers.materialized_views import refresh_materialized_views_rapid 

14from couchers.models import ( 

15 AccountDeletionReason, 

16 AccountDeletionToken, 

17 BackgroundJob, 

18 InviteCode, 

19 PhotoGalleryItem, 

20 Upload, 

21 User, 

22) 

23from couchers.proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2 

24from couchers.utils import now, today 

25from tests.fixtures.db import generate_user, make_volunteer 

26from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email, process_jobs 

27from tests.fixtures.sessions import ( 

28 account_session, 

29 auth_api_session, 

30 public_session, 

31 real_account_session, 

32 requests_session, 

33) 

34from tests.test_requests import valid_request_text 

35 

36 

37@pytest.fixture(autouse=True) 

38def _(testconfig): 

39 pass 

40 

41 

42def test_GetAccountInfo(db, fast_passwords): 

43 # with password 

44 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

45 

46 with account_session(token1) as account: 

47 res = account.GetAccountInfo(empty_pb2.Empty()) 

48 assert res.email == "user@couchers.invalid" 

49 assert res.username == user1.username 

50 assert not res.has_strong_verification 

51 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

52 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

53 assert not res.is_superuser 

54 assert res.ui_language_preference == "" 

55 assert not res.is_volunteer 

56 

57 

58def test_donation_banner_no_drive(db): 

59 """Test that the banner is not shown when DONATION_DRIVE_START is None""" 

60 # User has donated, but the drive is disabled, so the banner should not show 

61 user, token = generate_user() 

62 

63 with patch("couchers.servicers.account.DONATION_DRIVE_START", None): 

64 with account_session(token) as account: 

65 res = account.GetAccountInfo(empty_pb2.Empty()) 

66 assert not res.should_show_donation_banner 

67 

68 

69def test_donation_banner_never_donated(db): 

70 """Test that banner is shown when user has never donated and drive is active""" 

71 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

72 

73 # Explicitly set last_donated=None since generate_user defaults to now() 

74 user, token = generate_user(last_donated=None) 

75 

76 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start): 

77 with account_session(token) as account: 

78 res = account.GetAccountInfo(empty_pb2.Empty()) 

79 assert res.should_show_donation_banner 

80 

81 

82def test_donation_banner_donated_before_drive(db): 

83 """Test that banner is shown when user donated before drive start""" 

84 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

85 

86 user, token = generate_user() 

87 

88 # Set donation before drive start 

89 with session_scope() as session: 

90 last_donated = datetime(2025, 10, 15, tzinfo=UTC) # Before Nov 1 

91 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated)) 

92 

93 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start): 

94 with account_session(token) as account: 

95 res = account.GetAccountInfo(empty_pb2.Empty()) 

96 assert res.should_show_donation_banner 

97 

98 

99def test_donation_banner_donated_after_drive(db): 

100 """Test that banner is not shown when user donated after drive start""" 

101 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

102 

103 user, token = generate_user() 

104 

105 # Set donation after drive start 

106 with session_scope() as session: 

107 last_donated = datetime(2025, 11, 15, tzinfo=UTC) # After Nov 1 

108 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated)) 

109 

110 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start): 

111 with account_session(token) as account: 

112 res = account.GetAccountInfo(empty_pb2.Empty()) 

113 assert not res.should_show_donation_banner 

114 

115 

116def test_donation_banner_donated_exactly_at_drive_start(db): 

117 """Test that banner is not shown when user donated exactly at drive start time""" 

118 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

119 

120 user, token = generate_user() 

121 

122 # Set donation exactly at drive start 

123 with session_scope() as session: 

124 session.execute(update(User).where(User.id == user.id).values(last_donated=drive_start)) 

125 

126 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start): 

127 with account_session(token) as account: 

128 res = account.GetAccountInfo(empty_pb2.Empty()) 

129 assert not res.should_show_donation_banner 

130 

131 

132def test_GetAccountInfo_regression(db): 

133 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

134 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

135 user, token = generate_user(about_me=None, complete_profile=False) 

136 

137 # add an avatar photo to the user's profile gallery 

138 with session_scope() as session: 

139 key = random_hex(32) 

140 filename = random_hex(32) + ".jpg" 

141 session.add( 

142 Upload( 

143 key=key, 

144 filename=filename, 

145 creator_user_id=user.id, 

146 ) 

147 ) 

148 session.flush() 

149 assert user.profile_gallery_id is not None 

150 session.add( 

151 PhotoGalleryItem( 

152 gallery_id=user.profile_gallery_id, 

153 upload_key=key, 

154 position=0, 

155 ) 

156 ) 

157 

158 with account_session(token) as account: 

159 res = account.GetAccountInfo(empty_pb2.Empty()) 

160 

161 

162def test_ChangePasswordV2_normal(db, fast_passwords, push_collector: PushCollector): 

163 # user has old password and is changing to new password 

164 old_password = random_hex() 

165 new_password = random_hex() 

166 user, token = generate_user(hashed_password=hash_password(old_password)) 

167 

168 with account_session(token) as account: 

169 with mock_notification_email() as mock: 

170 account.ChangePasswordV2( 

171 account_pb2.ChangePasswordV2Req( 

172 old_password=old_password, 

173 new_password=new_password, 

174 ) 

175 ) 

176 

177 mock.assert_called_once() 

178 assert email_fields(mock).subject == "[TEST] Your password was changed" 

179 

180 push = push_collector.pop_for_user(user.id, last=True) 

181 assert push.content.title == "Password changed" 

182 assert push.content.body == "Your password was changed." 

183 

184 with session_scope() as session: 

185 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

186 assert updated_user.hashed_password == hash_password(new_password) 

187 

188 

189def test_ChangePasswordV2_regression(db, fast_passwords): 

190 # send_password_changed_email wasn't working 

191 # user has old password and is changing to new password 

192 old_password = random_hex() 

193 new_password = random_hex() 

194 user, token = generate_user(hashed_password=hash_password(old_password)) 

195 

196 with account_session(token) as account: 

197 account.ChangePasswordV2( 

198 account_pb2.ChangePasswordV2Req( 

199 old_password=old_password, 

200 new_password=new_password, 

201 ) 

202 ) 

203 

204 with session_scope() as session: 

205 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

206 assert updated_user.hashed_password == hash_password(new_password) 

207 

208 

209def test_ChangePasswordV2_normal_short_password(db, fast_passwords): 

210 # user has old password and is changing to new password, but used short password 

211 old_password = random_hex() 

212 new_password = random_hex(length=1) 

213 user, token = generate_user(hashed_password=hash_password(old_password)) 

214 

215 with account_session(token) as account: 

216 with pytest.raises(grpc.RpcError) as e: 

217 account.ChangePasswordV2( 

218 account_pb2.ChangePasswordV2Req( 

219 old_password=old_password, 

220 new_password=new_password, 

221 ) 

222 ) 

223 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

224 assert e.value.details() == "The password must be 8 or more characters long." 

225 

226 with session_scope() as session: 

227 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

228 assert updated_user.hashed_password == hash_password(old_password) 

229 

230 

231def test_ChangePasswordV2_normal_long_password(db, fast_passwords): 

232 # user has old password and is changing to new password, but used short password 

233 old_password = random_hex() 

234 new_password = random_hex(length=1000) 

235 user, token = generate_user(hashed_password=hash_password(old_password)) 

236 

237 with account_session(token) as account: 

238 with pytest.raises(grpc.RpcError) as e: 

239 account.ChangePasswordV2( 

240 account_pb2.ChangePasswordV2Req( 

241 old_password=old_password, 

242 new_password=new_password, 

243 ) 

244 ) 

245 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

246 assert e.value.details() == "The password must be less than 256 characters." 

247 

248 with session_scope() as session: 

249 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

250 assert updated_user.hashed_password == hash_password(old_password) 

251 

252 

253def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords): 

254 # user has old password and is changing to new password, but used insecure password 

255 old_password = random_hex() 

256 new_password = "12345678" 

257 user, token = generate_user(hashed_password=hash_password(old_password)) 

258 

259 with account_session(token) as account: 

260 with pytest.raises(grpc.RpcError) as e: 

261 account.ChangePasswordV2( 

262 account_pb2.ChangePasswordV2Req( 

263 old_password=old_password, 

264 new_password=new_password, 

265 ) 

266 ) 

267 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

268 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable." 

269 

270 with session_scope() as session: 

271 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

272 assert updated_user.hashed_password == hash_password(old_password) 

273 

274 

275def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords): 

276 # user has old password and is changing to new password, but used wrong old password 

277 old_password = random_hex() 

278 new_password = random_hex() 

279 user, token = generate_user(hashed_password=hash_password(old_password)) 

280 

281 with account_session(token) as account: 

282 with pytest.raises(grpc.RpcError) as e: 

283 account.ChangePasswordV2( 

284 account_pb2.ChangePasswordV2Req( 

285 old_password="wrong password", 

286 new_password=new_password, 

287 ) 

288 ) 

289 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

290 assert e.value.details() == "Wrong password." 

291 

292 with session_scope() as session: 

293 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

294 assert updated_user.hashed_password == hash_password(old_password) 

295 

296 

297def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords): 

298 # user has old password and called with empty body 

299 old_password = random_hex() 

300 user, token = generate_user(hashed_password=hash_password(old_password)) 

301 

302 with account_session(token) as account: 

303 with pytest.raises(grpc.RpcError) as e: 

304 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password)) 

305 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

306 assert e.value.details() == "The password must be 8 or more characters long." 

307 

308 with session_scope() as session: 

309 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

310 assert updated_user.hashed_password == hash_password(old_password) 

311 

312 

313def test_ChangeEmailV2_wrong_password(db, fast_passwords): 

314 password = random_hex() 

315 new_email = f"{random_hex()}@couchers.org.invalid" 

316 user, token = generate_user(hashed_password=hash_password(password)) 

317 

318 with account_session(token) as account: 

319 with pytest.raises(grpc.RpcError) as e: 

320 account.ChangeEmailV2( 

321 account_pb2.ChangeEmailV2Req( 

322 password="wrong password", 

323 new_email=new_email, 

324 ) 

325 ) 

326 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

327 assert e.value.details() == "Wrong password." 

328 

329 with session_scope() as session: 

330 assert ( 

331 session.execute( 

332 select(func.count()) 

333 .select_from(User) 

334 .where(User.new_email_token_created <= func.now()) 

335 .where(User.new_email_token_expiry >= func.now()) 

336 ) 

337 ).scalar_one() == 0 

338 

339 

340def test_ChangeEmailV2_wrong_email(db, fast_passwords): 

341 password = random_hex() 

342 new_email = f"{random_hex()}@couchers.org.invalid" 

343 user, token = generate_user(hashed_password=hash_password(password)) 

344 

345 with account_session(token) as account: 

346 with pytest.raises(grpc.RpcError) as e: 

347 account.ChangeEmailV2( 

348 account_pb2.ChangeEmailV2Req( 

349 password="wrong password", 

350 new_email=new_email, 

351 ) 

352 ) 

353 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

354 assert e.value.details() == "Wrong password." 

355 

356 with session_scope() as session: 

357 assert ( 

358 session.execute( 

359 select(func.count()) 

360 .select_from(User) 

361 .where(User.new_email_token_created <= func.now()) 

362 .where(User.new_email_token_expiry >= func.now()) 

363 ) 

364 ).scalar_one() == 0 

365 

366 

367def test_ChangeEmailV2_invalid_email(db, fast_passwords): 

368 password = random_hex() 

369 user, token = generate_user(hashed_password=hash_password(password)) 

370 

371 with account_session(token) as account: 

372 with pytest.raises(grpc.RpcError) as e: 

373 account.ChangeEmailV2( 

374 account_pb2.ChangeEmailV2Req( 

375 password=password, 

376 new_email="not a real email", 

377 ) 

378 ) 

379 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

380 assert e.value.details() == "Invalid email." 

381 

382 with session_scope() as session: 

383 assert ( 

384 session.execute( 

385 select(func.count()) 

386 .select_from(User) 

387 .where(User.new_email_token_created <= func.now()) 

388 .where(User.new_email_token_expiry >= func.now()) 

389 ) 

390 ).scalar_one() == 0 

391 

392 

393def test_ChangeEmailV2_email_in_use(db, fast_passwords): 

394 password = random_hex() 

395 user, token = generate_user(hashed_password=hash_password(password)) 

396 user2, token2 = generate_user(hashed_password=hash_password(password)) 

397 

398 with account_session(token) as account: 

399 with pytest.raises(grpc.RpcError) as e: 

400 account.ChangeEmailV2( 

401 account_pb2.ChangeEmailV2Req( 

402 password=password, 

403 new_email=user2.email, 

404 ) 

405 ) 

406 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

407 assert e.value.details() == "Invalid email." 

408 

409 with session_scope() as session: 

410 assert ( 

411 session.execute( 

412 select(func.count()) 

413 .select_from(User) 

414 .where(User.new_email_token_created <= func.now()) 

415 .where(User.new_email_token_expiry >= func.now()) 

416 ) 

417 ).scalar_one() == 0 

418 

419 

420def test_ChangeEmailV2_no_change(db, fast_passwords): 

421 password = random_hex() 

422 user, token = generate_user(hashed_password=hash_password(password)) 

423 

424 with account_session(token) as account: 

425 with pytest.raises(grpc.RpcError) as e: 

426 account.ChangeEmailV2( 

427 account_pb2.ChangeEmailV2Req( 

428 password=password, 

429 new_email=user.email, 

430 ) 

431 ) 

432 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

433 assert e.value.details() == "Invalid email." 

434 

435 with session_scope() as session: 

436 assert ( 

437 session.execute( 

438 select(func.count()) 

439 .select_from(User) 

440 .where(User.new_email_token_created <= func.now()) 

441 .where(User.new_email_token_expiry >= func.now()) 

442 ) 

443 ).scalar_one() == 0 

444 

445 

446def test_ChangeEmailV2_wrong_token(db, fast_passwords): 

447 password = random_hex() 

448 new_email = f"{random_hex()}@couchers.org.invalid" 

449 user, token = generate_user(hashed_password=hash_password(password)) 

450 

451 with account_session(token) as account: 

452 account.ChangeEmailV2( 

453 account_pb2.ChangeEmailV2Req( 

454 password=password, 

455 new_email=new_email, 

456 ) 

457 ) 

458 

459 with auth_api_session() as (auth_api, metadata_interceptor): 

460 with pytest.raises(grpc.RpcError) as e: 

461 res = auth_api.ConfirmChangeEmailV2( 

462 auth_pb2.ConfirmChangeEmailV2Req( 

463 change_email_token="wrongtoken", 

464 ) 

465 ) 

466 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

467 assert e.value.details() == "Invalid token." 

468 

469 with session_scope() as session: 

470 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

471 assert user_updated.email == user.email 

472 

473 

474def test_ChangeEmailV2_tokens_two_hour_window(db): 

475 def two_hours_one_minute_in_future(): 

476 return now() + timedelta(hours=2, minutes=1) 

477 

478 def one_minute_ago(): 

479 return now() - timedelta(minutes=1) 

480 

481 password = random_hex() 

482 new_email = f"{random_hex()}@couchers.org.invalid" 

483 user, token = generate_user(hashed_password=hash_password(password)) 

484 

485 with account_session(token) as account: 

486 account.ChangeEmailV2( 

487 account_pb2.ChangeEmailV2Req( 

488 password=password, 

489 new_email=new_email, 

490 ) 

491 ) 

492 

493 with session_scope() as session: 

494 new_email_token = session.execute(select(User.new_email_token).where(User.id == user.id)).scalar_one() 

495 

496 with patch("couchers.servicers.auth.now", one_minute_ago): 

497 with auth_api_session() as (auth_api, metadata_interceptor): 

498 with pytest.raises(grpc.RpcError) as e: 

499 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

500 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

501 assert e.value.details() == "Invalid token." 

502 

503 with pytest.raises(grpc.RpcError) as e: 

504 auth_api.ConfirmChangeEmailV2( 

505 auth_pb2.ConfirmChangeEmailV2Req( 

506 change_email_token=new_email_token, 

507 ) 

508 ) 

509 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

510 assert e.value.details() == "Invalid token." 

511 

512 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

513 with auth_api_session() as (auth_api, metadata_interceptor): 

514 with pytest.raises(grpc.RpcError) as e: 

515 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

516 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

517 assert e.value.details() == "Invalid token." 

518 

519 with pytest.raises(grpc.RpcError) as e: 

520 auth_api.ConfirmChangeEmailV2( 

521 auth_pb2.ConfirmChangeEmailV2Req( 

522 change_email_token=new_email_token, 

523 ) 

524 ) 

525 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

526 assert e.value.details() == "Invalid token." 

527 

528 

529def test_ChangeEmailV2(db, fast_passwords, push_collector: PushCollector): 

530 password = random_hex() 

531 new_email = f"{random_hex()}@couchers.org.invalid" 

532 user, token = generate_user(hashed_password=hash_password(password)) 

533 user_id = user.id 

534 

535 with account_session(token) as account: 

536 account.ChangeEmailV2( 

537 account_pb2.ChangeEmailV2Req( 

538 password=password, 

539 new_email=new_email, 

540 ) 

541 ) 

542 

543 with session_scope() as session: 

544 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one() 

545 assert user_updated.email == user.email 

546 assert user_updated.new_email == new_email 

547 assert user_updated.new_email_token is not None 

548 assert user_updated.new_email_token_created 

549 assert user_updated.new_email_token_created <= now() 

550 assert user_updated.new_email_token_expiry 

551 assert user_updated.new_email_token_expiry >= now() 

552 

553 token = user_updated.new_email_token 

554 

555 process_jobs() 

556 push = push_collector.pop_for_user(user_id, last=True) 

557 assert push.content.title == "Email change requested" 

558 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address." 

559 

560 with auth_api_session() as (auth_api, metadata_interceptor): 

561 auth_api.ConfirmChangeEmailV2( 

562 auth_pb2.ConfirmChangeEmailV2Req( 

563 change_email_token=token, 

564 ) 

565 ) 

566 

567 with session_scope() as session: 

568 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

569 assert user.email == new_email 

570 assert user.new_email is None 

571 assert user.new_email_token is None 

572 assert user.new_email_token_created is None 

573 assert user.new_email_token_expiry is None 

574 

575 process_jobs() 

576 push = push_collector.pop_for_user(user_id, last=True) 

577 assert push.content.title == "Email verified" 

578 assert push.content.body == "Your new email address has been verified." 

579 

580 

581def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector: PushCollector): 

582 password = random_hex() 

583 new_email = f"{random_hex()}@couchers.org.invalid" 

584 user, token = generate_user(hashed_password=hash_password(password)) 

585 

586 with account_session(token) as account: 

587 account.ChangeEmailV2( 

588 account_pb2.ChangeEmailV2Req( 

589 password=password, 

590 new_email=new_email, 

591 ) 

592 ) 

593 

594 process_jobs() 

595 

596 with session_scope() as session: 

597 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

598 assert len(jobs) == 2 

599 uq_str1 = b"An email change to the email" 

600 uq_str2 = ( 

601 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is" 

602 ) 

603 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or ( 

604 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload 

605 ) 

606 

607 push = push_collector.pop_for_user(user.id, last=True) 

608 assert push.content.title == "Email change requested" 

609 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address." 

610 

611 

612def test_ChangeLanguagePreference(db, fast_passwords): 

613 # user changes from default to ISO 639-1 language code 

614 new_lang = "zh" 

615 user, token = generate_user() 

616 

617 with real_account_session(token) as account: 

618 res = account.GetAccountInfo(empty_pb2.Empty()) 

619 assert res.ui_language_preference == "" 

620 

621 # call will have info about the request 

622 res, call = account.ChangeLanguagePreference.with_call( 

623 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=new_lang) 

624 ) 

625 

626 # cookies are sent via initial metadata, so we check for it there 

627 # the value of "set-cookie" will be the full cookie string, pull the key value from the string 

628 cookie_values = [v.split(";")[0] for k, v in call.initial_metadata() if k == "set-cookie"] 

629 assert any(val == "NEXT_LOCALE=zh" for val in cookie_values), ( 

630 f"Didn't find the right cookie, got {call.initial_metadata()}" 

631 ) 

632 

633 # the changed language preference should also be sent to the backend 

634 res = account.GetAccountInfo(empty_pb2.Empty()) 

635 assert res.ui_language_preference == "zh" 

636 

637 

638def test_contributor_form(db): 

639 user, token = generate_user() 

640 

641 with account_session(token) as account: 

642 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

643 assert not res.filled_contributor_form 

644 

645 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

646 

647 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

648 assert res.filled_contributor_form 

649 

650 

651def test_DeleteAccount_start(db): 

652 user, token = generate_user() 

653 

654 with account_session(token) as account: 

655 with mock_notification_email() as mock: 

656 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

657 mock.assert_called_once() 

658 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

659 

660 with session_scope() as session: 

661 deletion_token: AccountDeletionToken = session.execute( 

662 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

663 ).scalar_one() 

664 

665 assert deletion_token.is_valid 

666 assert session.execute(select(User).where(User.id == user.id)).scalar_one().deleted_at is None 

667 

668 

669def test_DeleteAccount_message_storage(db): 

670 user, token = generate_user() 

671 

672 with account_session(token) as account: 

673 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

674 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

675 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

676 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

677 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

678 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

679 

680 with session_scope() as session: 

681 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

682 

683 

684def test_full_delete_account_with_recovery(db, push_collector: PushCollector): 

685 user, token = generate_user() 

686 user_id = user.id 

687 

688 with account_session(token) as account: 

689 with pytest.raises(grpc.RpcError) as err: 

690 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

691 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

692 assert err.value.details() == "Please confirm your account deletion." 

693 

694 # Check the right email is sent 

695 with mock_notification_email() as mock: 

696 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

697 

698 push = push_collector.pop_for_user(user_id, last=True) 

699 assert push.content.title == "Account deletion requested" 

700 assert push.content.body == "Use the link we emailed you to confirm." 

701 

702 mock.assert_called_once() 

703 e = email_fields(mock) 

704 

705 with session_scope() as session: 

706 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

707 token = token_o.token 

708 

709 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

710 assert token_o.user == user_ 

711 assert user_.deleted_at is None 

712 assert not user_.undelete_token 

713 assert not user_.undelete_until 

714 

715 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

716 assert e.recipient == user.email 

717 assert "account deletion" in e.subject.lower() 

718 assert token in e.plain 

719 assert token in e.html 

720 unique_string = "You requested that we delete your account from Couchers.org." 

721 assert unique_string in e.plain 

722 assert unique_string in e.html 

723 url = f"http://localhost:3000/delete-account?token={token}" 

724 assert url in e.plain 

725 assert url in e.html 

726 assert "support@couchers.org" in e.plain 

727 assert "support@couchers.org" in e.html 

728 

729 with mock_notification_email() as mock: 

730 with auth_api_session() as (auth_api, metadata_interceptor): 

731 auth_api.ConfirmDeleteAccount( 

732 auth_pb2.ConfirmDeleteAccountReq( 

733 token=token, 

734 ) 

735 ) 

736 

737 push = push_collector.pop_for_user(user_id, last=True) 

738 assert push.content.title == "Account deleted" 

739 assert push.content.body == "You can restore it within 7 days using the link we emailed you." 

740 

741 mock.assert_called_once() 

742 e = email_fields(mock) 

743 

744 with session_scope() as session: 

745 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

746 

747 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

748 assert user_.deleted_at is not None 

749 assert user_.undelete_token 

750 assert user_.undelete_until 

751 assert user_.undelete_until > now() 

752 

753 undelete_token = user_.undelete_token 

754 

755 assert e.recipient == user.email 

756 assert "account has been deleted" in e.subject.lower() 

757 unique_string = "You have successfully deleted your account from Couchers.org." 

758 assert unique_string in e.plain 

759 assert unique_string in e.html 

760 assert "7 days" in e.plain 

761 assert "7 days" in e.html 

762 url = f"http://localhost:3000/recover-account?token={undelete_token}" 

763 assert url in e.plain 

764 assert url in e.html 

765 assert "support@couchers.org" in e.plain 

766 assert "support@couchers.org" in e.html 

767 

768 with mock_notification_email() as mock: 

769 with auth_api_session() as (auth_api, metadata_interceptor): 

770 auth_api.RecoverAccount( 

771 auth_pb2.RecoverAccountReq( 

772 token=undelete_token, 

773 ) 

774 ) 

775 

776 push = push_collector.pop_for_user(user_id, last=True) 

777 assert push.content.title == "Account restored" 

778 assert push.content.body == "Welcome back!" 

779 

780 mock.assert_called_once() 

781 e = email_fields(mock) 

782 

783 assert e.recipient == user.email 

784 assert "account has been recovered" in e.subject.lower() 

785 unique_string = "Your account on Couchers.org has been successfully recovered!" 

786 assert unique_string in e.plain 

787 assert unique_string in e.html 

788 assert "support@couchers.org" in e.plain 

789 assert "support@couchers.org" in e.html 

790 

791 with session_scope() as session: 

792 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

793 

794 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

795 assert user.deleted_at is None 

796 assert not user.undelete_token 

797 assert not user.undelete_until 

798 

799 

800def test_multiple_delete_tokens(db): 

801 """ 

802 Make sure deletion tokens are deleted on delete 

803 """ 

804 user, token = generate_user() 

805 

806 with account_session(token) as account: 

807 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

808 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

809 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

810 

811 with session_scope() as session: 

812 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

813 token = session.execute(select(AccountDeletionToken.token).limit(1)).scalar_one() 

814 

815 with auth_api_session() as (auth_api, metadata_interceptor): 

816 auth_api.ConfirmDeleteAccount( 

817 auth_pb2.ConfirmDeleteAccountReq( 

818 token=token, 

819 ) 

820 ) 

821 

822 with session_scope() as session: 

823 assert not session.execute(select(AccountDeletionToken.token)).scalar_one_or_none() 

824 

825 

826def test_ListActiveSessions_pagination(db, fast_passwords): 

827 password = random_hex() 

828 user, token = generate_user(hashed_password=hash_password(password)) 

829 

830 with auth_api_session() as (auth_api, metadata_interceptor): 

831 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

832 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

833 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

834 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

835 

836 with real_account_session(token) as account: 

837 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3)) 

838 assert len(res.active_sessions) == 3 

839 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3)) 

840 assert len(res.active_sessions) == 2 

841 assert not res.next_page_token 

842 

843 

844def test_ListActiveSessions_details(db, fast_passwords): 

845 password = random_hex() 

846 user, token = generate_user(hashed_password=hash_password(password)) 

847 

848 ips_user_agents = [ 

849 ( 

850 "108.123.33.162", 

851 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1", 

852 ), 

853 ( 

854 "8.245.212.28", 

855 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36", 

856 ), 

857 ( 

858 "95.254.140.156", 

859 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", 

860 ), 

861 ] 

862 

863 for ip, user_agent in ips_user_agents: 

864 options = (("grpc.primary_user_agent", user_agent),) 

865 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor): 

866 auth_api.Authenticate( 

867 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),) 

868 ) 

869 

870 def dummy_geoip(ip_address): 

871 return { 

872 "108.123.33.162": "Chicago, United States", 

873 "8.245.212.28": "Sydney, Australia", 

874 }.get(ip_address) 

875 

876 with real_account_session(token) as account: 

877 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip): 

878 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

879 print(res) 

880 assert len(res.active_sessions) == 4 

881 

882 # this one currently making the API call 

883 assert res.active_sessions[0].operating_system == "Other" 

884 assert res.active_sessions[0].browser == "Other" 

885 assert res.active_sessions[0].device == "Other" 

886 assert res.active_sessions[0].approximate_location == "Unknown" 

887 assert res.active_sessions[0].is_current_session 

888 

889 assert res.active_sessions[1].operating_system == "Ubuntu" 

890 assert res.active_sessions[1].browser == "Firefox" 

891 assert res.active_sessions[1].device == "Other" 

892 assert res.active_sessions[1].approximate_location == "Unknown" 

893 assert not res.active_sessions[1].is_current_session 

894 

895 assert res.active_sessions[2].operating_system == "Android" 

896 assert res.active_sessions[2].browser == "Samsung Internet" 

897 assert res.active_sessions[2].device == "K" 

898 assert res.active_sessions[2].approximate_location == "Sydney, Australia" 

899 assert not res.active_sessions[2].is_current_session 

900 

901 assert res.active_sessions[3].operating_system == "iOS" 

902 assert res.active_sessions[3].browser == "Mobile Safari" 

903 assert res.active_sessions[3].device == "iPhone" 

904 assert res.active_sessions[3].approximate_location == "Chicago, United States" 

905 assert not res.active_sessions[3].is_current_session 

906 

907 

908def test_LogOutSession(db, fast_passwords): 

909 password = random_hex() 

910 user, token = generate_user(hashed_password=hash_password(password)) 

911 

912 with auth_api_session() as (auth_api, metadata_interceptor): 

913 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

914 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

915 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

916 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

917 

918 with real_account_session(token) as account: 

919 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

920 assert len(res.active_sessions) == 5 

921 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created)) 

922 

923 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

924 assert len(res2.active_sessions) == 4 

925 

926 # ignore the first session as it changes 

927 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:] 

928 

929 

930def test_LogOutOtherSessions(db, fast_passwords): 

931 password = random_hex() 

932 user, token = generate_user(hashed_password=hash_password(password)) 

933 

934 with auth_api_session() as (auth_api, metadata_interceptor): 

935 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

936 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

937 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

938 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

939 

940 with real_account_session(token) as account: 

941 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

942 assert len(res.active_sessions) == 5 

943 with pytest.raises(grpc.RpcError) as e: 

944 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False)) 

945 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

946 assert e.value.details() == "Please confirm you want to log out of other sessions." 

947 

948 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True)) 

949 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

950 assert len(res.active_sessions) == 1 

951 

952 

953def test_CreateInviteCode(db): 

954 user, token = generate_user() 

955 

956 with account_session(token) as account: 

957 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()) 

958 code = res.code 

959 assert len(code) == 8 

960 

961 with session_scope() as session: 

962 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one() 

963 assert invite.creator_user_id == user.id 

964 assert invite.disabled is None 

965 assert res.url == urls.invite_code_link(code=res.code) 

966 

967 

968def test_DisableInviteCode(db): 

969 user, token = generate_user() 

970 

971 with account_session(token) as account: 

972 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

973 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code)) 

974 

975 with session_scope() as session: 

976 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one() 

977 assert invite.disabled is not None 

978 

979 

980def test_ListInviteCodes(db): 

981 user, token = generate_user() 

982 another_user, _ = generate_user() 

983 

984 with account_session(token) as account: 

985 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

986 

987 # simulate another_user having signed up with this invite code 

988 with session_scope() as session: 

989 session.execute(update(User).where(User.id == another_user.id).values(invite_code_id=code)) 

990 

991 with account_session(token) as account: 

992 res = account.ListInviteCodes(empty_pb2.Empty()) 

993 assert len(res.invite_codes) == 1 

994 assert res.invite_codes[0].code == code 

995 assert res.invite_codes[0].uses == 1 

996 assert res.invite_codes[0].url == urls.invite_code_link(code=code) 

997 

998 

999def test_reminders(db, moderator): 

1000 # the strong verification reminder's absence is tested in test_strong_verification.py 

1001 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite 

1002 # we use LiteUser, so remember to refresh materialized views 

1003 user, token = generate_user(complete_profile=False) 

1004 complete_user, complete_token = generate_user(complete_profile=True) 

1005 req_user1, req_user_token1 = generate_user(complete_profile=True) 

1006 req_user2, req_user_token2 = generate_user(complete_profile=True) 

1007 

1008 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1009 with account_session(complete_token) as account: 

1010 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1011 "complete_verification_reminder" 

1012 ] 

1013 with account_session(token) as account: 

1014 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1015 "complete_profile_reminder", 

1016 "complete_verification_reminder", 

1017 ] 

1018 

1019 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

1020 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

1021 with requests_session(req_user_token1) as api: 

1022 host_request1_id = api.CreateHostRequest( 

1023 requests_pb2.CreateHostRequestReq( 

1024 host_user_id=user.id, 

1025 from_date=today_plus_2, 

1026 to_date=today_plus_3, 

1027 text=valid_request_text("Test request 1"), 

1028 ) 

1029 ).host_request_id 

1030 moderator.approve_host_request(host_request1_id) 

1031 

1032 with account_session(token) as account: 

1033 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1034 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1035 "respond_to_host_request_reminder", 

1036 "complete_profile_reminder", 

1037 "complete_verification_reminder", 

1038 ] 

1039 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1040 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1041 

1042 with requests_session(req_user_token2) as api: 

1043 host_request2_id = api.CreateHostRequest( 

1044 requests_pb2.CreateHostRequestReq( 

1045 host_user_id=user.id, 

1046 from_date=today_plus_2, 

1047 to_date=today_plus_3, 

1048 text=valid_request_text("Test request 2"), 

1049 ) 

1050 ).host_request_id 

1051 moderator.approve_host_request(host_request2_id) 

1052 

1053 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1054 with account_session(token) as account: 

1055 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1056 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1057 "respond_to_host_request_reminder", 

1058 "respond_to_host_request_reminder", 

1059 "complete_profile_reminder", 

1060 "complete_verification_reminder", 

1061 ] 

1062 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1063 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1064 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id 

1065 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1066 

1067 with requests_session(req_user_token1) as api: 

1068 host_request3_id = api.CreateHostRequest( 

1069 requests_pb2.CreateHostRequestReq( 

1070 host_user_id=user.id, 

1071 from_date=today_plus_2, 

1072 to_date=today_plus_3, 

1073 text=valid_request_text("Test request 3"), 

1074 ) 

1075 ).host_request_id 

1076 moderator.approve_host_request(host_request3_id) 

1077 

1078 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1079 with account_session(token) as account: 

1080 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1081 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1082 "respond_to_host_request_reminder", 

1083 "respond_to_host_request_reminder", 

1084 "respond_to_host_request_reminder", 

1085 "complete_profile_reminder", 

1086 "complete_verification_reminder", 

1087 ] 

1088 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1089 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1090 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id 

1091 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1092 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id 

1093 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1094 

1095 # accept req 

1096 with requests_session(token) as api: 

1097 api.RespondHostRequest( 

1098 requests_pb2.RespondHostRequestReq( 

1099 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED 

1100 ) 

1101 ) 

1102 

1103 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1104 with account_session(token) as account: 

1105 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1106 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1107 "respond_to_host_request_reminder", 

1108 "respond_to_host_request_reminder", 

1109 "complete_profile_reminder", 

1110 "complete_verification_reminder", 

1111 ] 

1112 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id 

1113 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1114 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id 

1115 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1116 

1117 

1118def test_volunteer_stuff(db): 

1119 # taken from couchers/app/backend/resources/badges.json 

1120 board_member_id = 8347 

1121 

1122 # with password 

1123 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam", id=board_member_id) 

1124 

1125 with account_session(token) as account: 

1126 res = account.GetAccountInfo(empty_pb2.Empty()) 

1127 assert not res.is_volunteer 

1128 

1129 with pytest.raises(grpc.RpcError) as e: 

1130 account.GetMyVolunteerInfo(empty_pb2.Empty()) 

1131 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1132 assert ( 

1133 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us." 

1134 ) 

1135 

1136 with pytest.raises(grpc.RpcError) as e: 

1137 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq()) 

1138 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1139 assert ( 

1140 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us." 

1141 ) 

1142 

1143 with session_scope() as session: 

1144 session.add( 

1145 make_volunteer( 

1146 user_id=user.id, 

1147 display_name="Great Volunteer", 

1148 display_location="The Bitbucket", 

1149 role="Lead Tester", 

1150 started_volunteering=date(2020, 6, 1), 

1151 ) 

1152 ) 

1153 

1154 with account_session(token) as account: 

1155 res = account.GetAccountInfo(empty_pb2.Empty()) 

1156 assert res.is_volunteer 

1157 

1158 res = account.GetMyVolunteerInfo(empty_pb2.Empty()) 

1159 

1160 assert res.display_name == "Great Volunteer" 

1161 assert res.display_location == "The Bitbucket" 

1162 assert res.role == "Lead Tester" 

1163 assert res.started_volunteering == "2020-06-01" 

1164 assert not res.stopped_volunteering 

1165 assert res.show_on_team_page 

1166 assert res.link_type == "couchers" 

1167 assert res.link_text == "@tester" 

1168 assert res.link_url == "http://localhost:3000/user/tester" 

1169 

1170 res = account.UpdateMyVolunteerInfo( 

1171 account_pb2.UpdateMyVolunteerInfoReq( 

1172 display_name=wrappers_pb2.StringValue(value=""), 

1173 link_type=wrappers_pb2.StringValue(value="website"), 

1174 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"), 

1175 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"), 

1176 ) 

1177 ) 

1178 

1179 assert res.display_name == "" 

1180 assert res.display_location == "The Bitbucket" 

1181 assert res.role == "Lead Tester" 

1182 assert res.started_volunteering == "2020-06-01" 

1183 assert not res.stopped_volunteering 

1184 assert res.show_on_team_page 

1185 assert res.link_type == "website" 

1186 assert res.link_text == "testervontester.com.invalid" 

1187 assert res.link_url == "https://www.testervontester.com.invalid/" 

1188 res = account.UpdateMyVolunteerInfo( 

1189 account_pb2.UpdateMyVolunteerInfoReq( 

1190 display_name=wrappers_pb2.StringValue(value=""), 

1191 link_type=wrappers_pb2.StringValue(value="linkedin"), 

1192 link_text=wrappers_pb2.StringValue(value="tester-vontester"), 

1193 ) 

1194 ) 

1195 assert res.display_name == "" 

1196 assert res.display_location == "The Bitbucket" 

1197 assert res.role == "Lead Tester" 

1198 assert res.started_volunteering == "2020-06-01" 

1199 assert not res.stopped_volunteering 

1200 assert res.show_on_team_page 

1201 assert res.link_type == "linkedin" 

1202 assert res.link_text == "tester-vontester" 

1203 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/" 

1204 

1205 res = account.UpdateMyVolunteerInfo( 

1206 account_pb2.UpdateMyVolunteerInfoReq( 

1207 display_name=wrappers_pb2.StringValue(value="Tester"), 

1208 display_location=wrappers_pb2.StringValue(value=""), 

1209 link_type=wrappers_pb2.StringValue(value="email"), 

1210 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"), 

1211 ) 

1212 ) 

1213 assert res.display_name == "Tester" 

1214 assert res.display_location == "" 

1215 assert res.role == "Lead Tester" 

1216 assert res.started_volunteering == "2020-06-01" 

1217 assert not res.stopped_volunteering 

1218 assert res.show_on_team_page 

1219 assert res.link_type == "email" 

1220 assert res.link_text == "tester@vontester.com.invalid" 

1221 assert res.link_url == "mailto:tester@vontester.com.invalid" 

1222 

1223 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1224 

1225 with public_session() as public: 

1226 res = public.GetVolunteers(empty_pb2.Empty()) 

1227 assert len(res.current_volunteers) == 1 

1228 v = res.current_volunteers[0] 

1229 assert v.name == "Tester" 

1230 assert v.username == "tester" 

1231 assert v.is_board_member 

1232 assert v.role == "Lead Tester" 

1233 assert v.location == "Amsterdam" 

1234 assert v.img.startswith("http://localhost:5001/img/thumbnail/") 

1235 assert v.link_type == "email" 

1236 assert v.link_text == "tester@vontester.com.invalid" 

1237 assert v.link_url == "mailto:tester@vontester.com.invalid"