Coverage for app / backend / src / tests / test_account.py: 100%

697 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1from datetime import UTC, date, datetime, timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy import select, update 

8from sqlalchemy.sql import func 

9 

10from couchers import urls 

11from couchers.crypto import hash_password, random_hex 

12from couchers.db import session_scope 

13from couchers.materialized_views import refresh_materialized_views_rapid 

14from couchers.models import ( 

15 AccountDeletionReason, 

16 AccountDeletionToken, 

17 BackgroundJob, 

18 InviteCode, 

19 PhotoGalleryItem, 

20 Upload, 

21 User, 

22) 

23from couchers.proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2 

24from couchers.utils import now, today 

25from tests.fixtures.db import generate_user, make_volunteer 

26from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email, process_jobs 

27from tests.fixtures.sessions import ( 

28 account_session, 

29 auth_api_session, 

30 public_session, 

31 real_account_session, 

32 requests_session, 

33) 

34from tests.test_requests import valid_request_text 

35 

36 

37@pytest.fixture(autouse=True) 

38def _(testconfig): 

39 pass 

40 

41 

42def test_GetAccountInfo(db, fast_passwords): 

43 # with password 

44 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

45 

46 with account_session(token1) as account: 

47 res = account.GetAccountInfo(empty_pb2.Empty()) 

48 assert res.email == "user@couchers.invalid" 

49 assert res.username == user1.username 

50 assert not res.has_strong_verification 

51 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

52 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

53 assert not res.is_superuser 

54 assert res.ui_language_preference == "" 

55 assert not res.is_volunteer 

56 

57 

58def test_donation_banner_no_drive(db): 

59 """Test that the banner is not shown when DONATION_DRIVE_START is None""" 

60 # User has donated, but the drive is disabled, so the banner should not show 

61 user, token = generate_user() 

62 

63 with patch("couchers.servicers.account.DONATION_DRIVE_START", None): 

64 with account_session(token) as account: 

65 res = account.GetAccountInfo(empty_pb2.Empty()) 

66 assert not res.should_show_donation_banner 

67 

68 

69def test_donation_banner_never_donated(db): 

70 """Test that banner is shown when user has never donated and drive is active""" 

71 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

72 

73 # Explicitly set last_donated=None since generate_user defaults to now() 

74 user, token = generate_user(last_donated=None) 

75 

76 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start): 

77 with account_session(token) as account: 

78 res = account.GetAccountInfo(empty_pb2.Empty()) 

79 assert res.should_show_donation_banner 

80 

81 

82def test_donation_banner_donated_before_drive(db): 

83 """Test that banner is shown when user donated before drive start""" 

84 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

85 

86 user, token = generate_user() 

87 

88 # Set donation before drive start 

89 with session_scope() as session: 

90 last_donated = datetime(2025, 10, 15, tzinfo=UTC) # Before Nov 1 

91 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated)) 

92 

93 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start): 

94 with account_session(token) as account: 

95 res = account.GetAccountInfo(empty_pb2.Empty()) 

96 assert res.should_show_donation_banner 

97 

98 

99def test_donation_banner_donated_after_drive(db): 

100 """Test that banner is not shown when user donated after drive start""" 

101 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

102 

103 user, token = generate_user() 

104 

105 # Set donation after drive start 

106 with session_scope() as session: 

107 last_donated = datetime(2025, 11, 15, tzinfo=UTC) # After Nov 1 

108 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated)) 

109 

110 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start): 

111 with account_session(token) as account: 

112 res = account.GetAccountInfo(empty_pb2.Empty()) 

113 assert not res.should_show_donation_banner 

114 

115 

116def test_donation_banner_donated_exactly_at_drive_start(db): 

117 """Test that banner is not shown when user donated exactly at drive start time""" 

118 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

119 

120 user, token = generate_user() 

121 

122 # Set donation exactly at drive start 

123 with session_scope() as session: 

124 session.execute(update(User).where(User.id == user.id).values(last_donated=drive_start)) 

125 

126 with patch("couchers.servicers.account.DONATION_DRIVE_START", drive_start): 

127 with account_session(token) as account: 

128 res = account.GetAccountInfo(empty_pb2.Empty()) 

129 assert not res.should_show_donation_banner 

130 

131 

132def test_GetAccountInfo_regression(db): 

133 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

134 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

135 user, token = generate_user(about_me=None, complete_profile=False) 

136 

137 # add an avatar photo to the user's profile gallery 

138 with session_scope() as session: 

139 key = random_hex(32) 

140 filename = random_hex(32) + ".jpg" 

141 session.add( 

142 Upload( 

143 key=key, 

144 filename=filename, 

145 creator_user_id=user.id, 

146 ) 

147 ) 

148 session.flush() 

149 session.add( 

150 PhotoGalleryItem( 

151 gallery_id=user.profile_gallery_id, 

152 upload_key=key, 

153 position=0, 

154 ) 

155 ) 

156 

157 with account_session(token) as account: 

158 res = account.GetAccountInfo(empty_pb2.Empty()) 

159 

160 

161def test_ChangePasswordV2_normal(db, fast_passwords, push_collector: PushCollector): 

162 # user has old password and is changing to new password 

163 old_password = random_hex() 

164 new_password = random_hex() 

165 user, token = generate_user(hashed_password=hash_password(old_password)) 

166 

167 with account_session(token) as account: 

168 with mock_notification_email() as mock: 

169 account.ChangePasswordV2( 

170 account_pb2.ChangePasswordV2Req( 

171 old_password=old_password, 

172 new_password=new_password, 

173 ) 

174 ) 

175 

176 mock.assert_called_once() 

177 assert email_fields(mock).subject == "[TEST] Your password was changed" 

178 

179 push = push_collector.pop_for_user(user.id, last=True) 

180 assert push.content.title == "Password changed" 

181 assert push.content.body == "Your password was changed." 

182 

183 with session_scope() as session: 

184 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

185 assert updated_user.hashed_password == hash_password(new_password) 

186 

187 

188def test_ChangePasswordV2_regression(db, fast_passwords): 

189 # send_password_changed_email wasn't working 

190 # user has old password and is changing to new password 

191 old_password = random_hex() 

192 new_password = random_hex() 

193 user, token = generate_user(hashed_password=hash_password(old_password)) 

194 

195 with account_session(token) as account: 

196 account.ChangePasswordV2( 

197 account_pb2.ChangePasswordV2Req( 

198 old_password=old_password, 

199 new_password=new_password, 

200 ) 

201 ) 

202 

203 with session_scope() as session: 

204 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

205 assert updated_user.hashed_password == hash_password(new_password) 

206 

207 

208def test_ChangePasswordV2_normal_short_password(db, fast_passwords): 

209 # user has old password and is changing to new password, but used short password 

210 old_password = random_hex() 

211 new_password = random_hex(length=1) 

212 user, token = generate_user(hashed_password=hash_password(old_password)) 

213 

214 with account_session(token) as account: 

215 with pytest.raises(grpc.RpcError) as e: 

216 account.ChangePasswordV2( 

217 account_pb2.ChangePasswordV2Req( 

218 old_password=old_password, 

219 new_password=new_password, 

220 ) 

221 ) 

222 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

223 assert e.value.details() == "The password must be 8 or more characters long." 

224 

225 with session_scope() as session: 

226 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

227 assert updated_user.hashed_password == hash_password(old_password) 

228 

229 

230def test_ChangePasswordV2_normal_long_password(db, fast_passwords): 

231 # user has old password and is changing to new password, but used short password 

232 old_password = random_hex() 

233 new_password = random_hex(length=1000) 

234 user, token = generate_user(hashed_password=hash_password(old_password)) 

235 

236 with account_session(token) as account: 

237 with pytest.raises(grpc.RpcError) as e: 

238 account.ChangePasswordV2( 

239 account_pb2.ChangePasswordV2Req( 

240 old_password=old_password, 

241 new_password=new_password, 

242 ) 

243 ) 

244 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

245 assert e.value.details() == "The password must be less than 256 characters." 

246 

247 with session_scope() as session: 

248 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

249 assert updated_user.hashed_password == hash_password(old_password) 

250 

251 

252def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords): 

253 # user has old password and is changing to new password, but used insecure password 

254 old_password = random_hex() 

255 new_password = "12345678" 

256 user, token = generate_user(hashed_password=hash_password(old_password)) 

257 

258 with account_session(token) as account: 

259 with pytest.raises(grpc.RpcError) as e: 

260 account.ChangePasswordV2( 

261 account_pb2.ChangePasswordV2Req( 

262 old_password=old_password, 

263 new_password=new_password, 

264 ) 

265 ) 

266 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

267 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable." 

268 

269 with session_scope() as session: 

270 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

271 assert updated_user.hashed_password == hash_password(old_password) 

272 

273 

274def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords): 

275 # user has old password and is changing to new password, but used wrong old password 

276 old_password = random_hex() 

277 new_password = random_hex() 

278 user, token = generate_user(hashed_password=hash_password(old_password)) 

279 

280 with account_session(token) as account: 

281 with pytest.raises(grpc.RpcError) as e: 

282 account.ChangePasswordV2( 

283 account_pb2.ChangePasswordV2Req( 

284 old_password="wrong password", 

285 new_password=new_password, 

286 ) 

287 ) 

288 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

289 assert e.value.details() == "Wrong password." 

290 

291 with session_scope() as session: 

292 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

293 assert updated_user.hashed_password == hash_password(old_password) 

294 

295 

296def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords): 

297 # user has old password and called with empty body 

298 old_password = random_hex() 

299 user, token = generate_user(hashed_password=hash_password(old_password)) 

300 

301 with account_session(token) as account: 

302 with pytest.raises(grpc.RpcError) as e: 

303 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password)) 

304 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

305 assert e.value.details() == "The password must be 8 or more characters long." 

306 

307 with session_scope() as session: 

308 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

309 assert updated_user.hashed_password == hash_password(old_password) 

310 

311 

312def test_ChangeEmailV2_wrong_password(db, fast_passwords): 

313 password = random_hex() 

314 new_email = f"{random_hex()}@couchers.org.invalid" 

315 user, token = generate_user(hashed_password=hash_password(password)) 

316 

317 with account_session(token) as account: 

318 with pytest.raises(grpc.RpcError) as e: 

319 account.ChangeEmailV2( 

320 account_pb2.ChangeEmailV2Req( 

321 password="wrong password", 

322 new_email=new_email, 

323 ) 

324 ) 

325 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

326 assert e.value.details() == "Wrong password." 

327 

328 with session_scope() as session: 

329 assert ( 

330 session.execute( 

331 select(func.count()) 

332 .select_from(User) 

333 .where(User.new_email_token_created <= func.now()) 

334 .where(User.new_email_token_expiry >= func.now()) 

335 ) 

336 ).scalar_one() == 0 

337 

338 

339def test_ChangeEmailV2_wrong_email(db, fast_passwords): 

340 password = random_hex() 

341 new_email = f"{random_hex()}@couchers.org.invalid" 

342 user, token = generate_user(hashed_password=hash_password(password)) 

343 

344 with account_session(token) as account: 

345 with pytest.raises(grpc.RpcError) as e: 

346 account.ChangeEmailV2( 

347 account_pb2.ChangeEmailV2Req( 

348 password="wrong password", 

349 new_email=new_email, 

350 ) 

351 ) 

352 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

353 assert e.value.details() == "Wrong password." 

354 

355 with session_scope() as session: 

356 assert ( 

357 session.execute( 

358 select(func.count()) 

359 .select_from(User) 

360 .where(User.new_email_token_created <= func.now()) 

361 .where(User.new_email_token_expiry >= func.now()) 

362 ) 

363 ).scalar_one() == 0 

364 

365 

366def test_ChangeEmailV2_invalid_email(db, fast_passwords): 

367 password = random_hex() 

368 user, token = generate_user(hashed_password=hash_password(password)) 

369 

370 with account_session(token) as account: 

371 with pytest.raises(grpc.RpcError) as e: 

372 account.ChangeEmailV2( 

373 account_pb2.ChangeEmailV2Req( 

374 password=password, 

375 new_email="not a real email", 

376 ) 

377 ) 

378 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

379 assert e.value.details() == "Invalid email." 

380 

381 with session_scope() as session: 

382 assert ( 

383 session.execute( 

384 select(func.count()) 

385 .select_from(User) 

386 .where(User.new_email_token_created <= func.now()) 

387 .where(User.new_email_token_expiry >= func.now()) 

388 ) 

389 ).scalar_one() == 0 

390 

391 

392def test_ChangeEmailV2_email_in_use(db, fast_passwords): 

393 password = random_hex() 

394 user, token = generate_user(hashed_password=hash_password(password)) 

395 user2, token2 = generate_user(hashed_password=hash_password(password)) 

396 

397 with account_session(token) as account: 

398 with pytest.raises(grpc.RpcError) as e: 

399 account.ChangeEmailV2( 

400 account_pb2.ChangeEmailV2Req( 

401 password=password, 

402 new_email=user2.email, 

403 ) 

404 ) 

405 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

406 assert e.value.details() == "Invalid email." 

407 

408 with session_scope() as session: 

409 assert ( 

410 session.execute( 

411 select(func.count()) 

412 .select_from(User) 

413 .where(User.new_email_token_created <= func.now()) 

414 .where(User.new_email_token_expiry >= func.now()) 

415 ) 

416 ).scalar_one() == 0 

417 

418 

419def test_ChangeEmailV2_no_change(db, fast_passwords): 

420 password = random_hex() 

421 user, token = generate_user(hashed_password=hash_password(password)) 

422 

423 with account_session(token) as account: 

424 with pytest.raises(grpc.RpcError) as e: 

425 account.ChangeEmailV2( 

426 account_pb2.ChangeEmailV2Req( 

427 password=password, 

428 new_email=user.email, 

429 ) 

430 ) 

431 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

432 assert e.value.details() == "Invalid email." 

433 

434 with session_scope() as session: 

435 assert ( 

436 session.execute( 

437 select(func.count()) 

438 .select_from(User) 

439 .where(User.new_email_token_created <= func.now()) 

440 .where(User.new_email_token_expiry >= func.now()) 

441 ) 

442 ).scalar_one() == 0 

443 

444 

445def test_ChangeEmailV2_wrong_token(db, fast_passwords): 

446 password = random_hex() 

447 new_email = f"{random_hex()}@couchers.org.invalid" 

448 user, token = generate_user(hashed_password=hash_password(password)) 

449 

450 with account_session(token) as account: 

451 account.ChangeEmailV2( 

452 account_pb2.ChangeEmailV2Req( 

453 password=password, 

454 new_email=new_email, 

455 ) 

456 ) 

457 

458 with auth_api_session() as (auth_api, metadata_interceptor): 

459 with pytest.raises(grpc.RpcError) as e: 

460 res = auth_api.ConfirmChangeEmailV2( 

461 auth_pb2.ConfirmChangeEmailV2Req( 

462 change_email_token="wrongtoken", 

463 ) 

464 ) 

465 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

466 assert e.value.details() == "Invalid token." 

467 

468 with session_scope() as session: 

469 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

470 assert user_updated.email == user.email 

471 

472 

473def test_ChangeEmailV2_tokens_two_hour_window(db): 

474 def two_hours_one_minute_in_future(): 

475 return now() + timedelta(hours=2, minutes=1) 

476 

477 def one_minute_ago(): 

478 return now() - timedelta(minutes=1) 

479 

480 password = random_hex() 

481 new_email = f"{random_hex()}@couchers.org.invalid" 

482 user, token = generate_user(hashed_password=hash_password(password)) 

483 

484 with account_session(token) as account: 

485 account.ChangeEmailV2( 

486 account_pb2.ChangeEmailV2Req( 

487 password=password, 

488 new_email=new_email, 

489 ) 

490 ) 

491 

492 with session_scope() as session: 

493 new_email_token = session.execute(select(User.new_email_token).where(User.id == user.id)).scalar_one() 

494 

495 with patch("couchers.servicers.auth.now", one_minute_ago): 

496 with auth_api_session() as (auth_api, metadata_interceptor): 

497 with pytest.raises(grpc.RpcError) as e: 

498 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

499 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

500 assert e.value.details() == "Invalid token." 

501 

502 with pytest.raises(grpc.RpcError) as e: 

503 auth_api.ConfirmChangeEmailV2( 

504 auth_pb2.ConfirmChangeEmailV2Req( 

505 change_email_token=new_email_token, 

506 ) 

507 ) 

508 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

509 assert e.value.details() == "Invalid token." 

510 

511 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

512 with auth_api_session() as (auth_api, metadata_interceptor): 

513 with pytest.raises(grpc.RpcError) as e: 

514 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

515 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

516 assert e.value.details() == "Invalid token." 

517 

518 with pytest.raises(grpc.RpcError) as e: 

519 auth_api.ConfirmChangeEmailV2( 

520 auth_pb2.ConfirmChangeEmailV2Req( 

521 change_email_token=new_email_token, 

522 ) 

523 ) 

524 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

525 assert e.value.details() == "Invalid token." 

526 

527 

528def test_ChangeEmailV2(db, fast_passwords, push_collector: PushCollector): 

529 password = random_hex() 

530 new_email = f"{random_hex()}@couchers.org.invalid" 

531 user, token = generate_user(hashed_password=hash_password(password)) 

532 user_id = user.id 

533 

534 with account_session(token) as account: 

535 account.ChangeEmailV2( 

536 account_pb2.ChangeEmailV2Req( 

537 password=password, 

538 new_email=new_email, 

539 ) 

540 ) 

541 

542 with session_scope() as session: 

543 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one() 

544 assert user_updated.email == user.email 

545 assert user_updated.new_email == new_email 

546 assert user_updated.new_email_token is not None 

547 assert user_updated.new_email_token_created 

548 assert user_updated.new_email_token_created <= now() 

549 assert user_updated.new_email_token_expiry 

550 assert user_updated.new_email_token_expiry >= now() 

551 

552 token = user_updated.new_email_token 

553 

554 process_jobs() 

555 push = push_collector.pop_for_user(user_id, last=True) 

556 assert push.content.title == "Email change requested" 

557 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address." 

558 

559 with auth_api_session() as (auth_api, metadata_interceptor): 

560 auth_api.ConfirmChangeEmailV2( 

561 auth_pb2.ConfirmChangeEmailV2Req( 

562 change_email_token=token, 

563 ) 

564 ) 

565 

566 with session_scope() as session: 

567 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

568 assert user.email == new_email 

569 assert user.new_email is None 

570 assert user.new_email_token is None 

571 assert user.new_email_token_created is None 

572 assert user.new_email_token_expiry is None 

573 

574 process_jobs() 

575 push = push_collector.pop_for_user(user_id, last=True) 

576 assert push.content.title == "Email verified" 

577 assert push.content.body == "Your new email address has been verified." 

578 

579 

580def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector: PushCollector): 

581 password = random_hex() 

582 new_email = f"{random_hex()}@couchers.org.invalid" 

583 user, token = generate_user(hashed_password=hash_password(password)) 

584 

585 with account_session(token) as account: 

586 account.ChangeEmailV2( 

587 account_pb2.ChangeEmailV2Req( 

588 password=password, 

589 new_email=new_email, 

590 ) 

591 ) 

592 

593 process_jobs() 

594 

595 with session_scope() as session: 

596 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

597 assert len(jobs) == 2 

598 uq_str1 = b"An email change to the email" 

599 uq_str2 = ( 

600 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is" 

601 ) 

602 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or ( 

603 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload 

604 ) 

605 

606 push = push_collector.pop_for_user(user.id, last=True) 

607 assert push.content.title == "Email change requested" 

608 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address." 

609 

610 

611def test_ChangeLanguagePreference(db, fast_passwords): 

612 # user changes from default to ISO 639-1 language code 

613 new_lang = "zh" 

614 user, token = generate_user() 

615 

616 with real_account_session(token) as account: 

617 res = account.GetAccountInfo(empty_pb2.Empty()) 

618 assert res.ui_language_preference == "" 

619 

620 # call will have info about the request 

621 res, call = account.ChangeLanguagePreference.with_call( 

622 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=new_lang) 

623 ) 

624 

625 # cookies are sent via initial metadata, so we check for it there 

626 # the value of "set-cookie" will be the full cookie string, pull the key value from the string 

627 cookie_values = [v.split(";")[0] for k, v in call.initial_metadata() if k == "set-cookie"] 

628 assert any(val == "NEXT_LOCALE=zh" for val in cookie_values), ( 

629 f"Didn't find the right cookie, got {call.initial_metadata()}" 

630 ) 

631 

632 # the changed language preference should also be sent to the backend 

633 res = account.GetAccountInfo(empty_pb2.Empty()) 

634 assert res.ui_language_preference == "zh" 

635 

636 

637def test_contributor_form(db): 

638 user, token = generate_user() 

639 

640 with account_session(token) as account: 

641 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

642 assert not res.filled_contributor_form 

643 

644 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

645 

646 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

647 assert res.filled_contributor_form 

648 

649 

650def test_DeleteAccount_start(db): 

651 user, token = generate_user() 

652 

653 with account_session(token) as account: 

654 with mock_notification_email() as mock: 

655 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

656 mock.assert_called_once() 

657 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

658 

659 with session_scope() as session: 

660 deletion_token: AccountDeletionToken = session.execute( 

661 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

662 ).scalar_one() 

663 

664 assert deletion_token.is_valid 

665 assert not session.execute(select(User).where(User.id == user.id)).scalar_one().is_deleted 

666 

667 

668def test_DeleteAccount_message_storage(db): 

669 user, token = generate_user() 

670 

671 with account_session(token) as account: 

672 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

673 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

674 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

675 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

676 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

677 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

678 

679 with session_scope() as session: 

680 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

681 

682 

683def test_full_delete_account_with_recovery(db, push_collector: PushCollector): 

684 user, token = generate_user() 

685 user_id = user.id 

686 

687 with account_session(token) as account: 

688 with pytest.raises(grpc.RpcError) as err: 

689 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

690 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

691 assert err.value.details() == "Please confirm your account deletion." 

692 

693 # Check the right email is sent 

694 with mock_notification_email() as mock: 

695 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

696 

697 push = push_collector.pop_for_user(user_id, last=True) 

698 assert push.content.title == "Account deletion requested" 

699 assert push.content.body == "Use the link we emailed you to confirm." 

700 

701 mock.assert_called_once() 

702 e = email_fields(mock) 

703 

704 with session_scope() as session: 

705 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

706 token = token_o.token 

707 

708 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

709 assert token_o.user == user_ 

710 assert not user_.is_deleted 

711 assert not user_.undelete_token 

712 assert not user_.undelete_until 

713 

714 assert email_fields(mock).subject == "[TEST] Confirm your Couchers.org account deletion" 

715 assert e.recipient == user.email 

716 assert "account deletion" in e.subject.lower() 

717 assert token in e.plain 

718 assert token in e.html 

719 unique_string = "You requested that we delete your account from Couchers.org." 

720 assert unique_string in e.plain 

721 assert unique_string in e.html 

722 url = f"http://localhost:3000/delete-account?token={token}" 

723 assert url in e.plain 

724 assert url in e.html 

725 assert "support@couchers.org" in e.plain 

726 assert "support@couchers.org" in e.html 

727 

728 with mock_notification_email() as mock: 

729 with auth_api_session() as (auth_api, metadata_interceptor): 

730 auth_api.ConfirmDeleteAccount( 

731 auth_pb2.ConfirmDeleteAccountReq( 

732 token=token, 

733 ) 

734 ) 

735 

736 push = push_collector.pop_for_user(user_id, last=True) 

737 assert push.content.title == "Account deleted" 

738 assert push.content.body == "You can restore it within 7 days using the link we emailed you." 

739 

740 mock.assert_called_once() 

741 e = email_fields(mock) 

742 

743 with session_scope() as session: 

744 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

745 

746 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

747 assert user_.is_deleted 

748 assert user_.undelete_token 

749 assert user_.undelete_until 

750 assert user_.undelete_until > now() 

751 

752 undelete_token = user_.undelete_token 

753 

754 assert e.recipient == user.email 

755 assert "account has been deleted" in e.subject.lower() 

756 unique_string = "You have successfully deleted your account from Couchers.org." 

757 assert unique_string in e.plain 

758 assert unique_string in e.html 

759 assert "7 days" in e.plain 

760 assert "7 days" in e.html 

761 url = f"http://localhost:3000/recover-account?token={undelete_token}" 

762 assert url in e.plain 

763 assert url in e.html 

764 assert "support@couchers.org" in e.plain 

765 assert "support@couchers.org" in e.html 

766 

767 with mock_notification_email() as mock: 

768 with auth_api_session() as (auth_api, metadata_interceptor): 

769 auth_api.RecoverAccount( 

770 auth_pb2.RecoverAccountReq( 

771 token=undelete_token, 

772 ) 

773 ) 

774 

775 push = push_collector.pop_for_user(user_id, last=True) 

776 assert push.content.title == "Account restored" 

777 assert push.content.body == "Welcome back!" 

778 

779 mock.assert_called_once() 

780 e = email_fields(mock) 

781 

782 assert e.recipient == user.email 

783 assert "account has been recovered" in e.subject.lower() 

784 unique_string = "Your account on Couchers.org has been successfully recovered!" 

785 assert unique_string in e.plain 

786 assert unique_string in e.html 

787 assert "support@couchers.org" in e.plain 

788 assert "support@couchers.org" in e.html 

789 

790 with session_scope() as session: 

791 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

792 

793 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

794 assert not user.is_deleted 

795 assert not user.undelete_token 

796 assert not user.undelete_until 

797 

798 

799def test_multiple_delete_tokens(db): 

800 """ 

801 Make sure deletion tokens are deleted on delete 

802 """ 

803 user, token = generate_user() 

804 

805 with account_session(token) as account: 

806 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

807 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

808 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

809 

810 with session_scope() as session: 

811 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

812 token = session.execute(select(AccountDeletionToken.token).limit(1)).scalar_one() 

813 

814 with auth_api_session() as (auth_api, metadata_interceptor): 

815 auth_api.ConfirmDeleteAccount( 

816 auth_pb2.ConfirmDeleteAccountReq( 

817 token=token, 

818 ) 

819 ) 

820 

821 with session_scope() as session: 

822 assert not session.execute(select(AccountDeletionToken.token)).scalar_one_or_none() 

823 

824 

825def test_ListActiveSessions_pagination(db, fast_passwords): 

826 password = random_hex() 

827 user, token = generate_user(hashed_password=hash_password(password)) 

828 

829 with auth_api_session() as (auth_api, metadata_interceptor): 

830 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

831 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

832 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

833 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

834 

835 with real_account_session(token) as account: 

836 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3)) 

837 assert len(res.active_sessions) == 3 

838 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3)) 

839 assert len(res.active_sessions) == 2 

840 assert not res.next_page_token 

841 

842 

843def test_ListActiveSessions_details(db, fast_passwords): 

844 password = random_hex() 

845 user, token = generate_user(hashed_password=hash_password(password)) 

846 

847 ips_user_agents = [ 

848 ( 

849 "108.123.33.162", 

850 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1", 

851 ), 

852 ( 

853 "8.245.212.28", 

854 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36", 

855 ), 

856 ( 

857 "95.254.140.156", 

858 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", 

859 ), 

860 ] 

861 

862 for ip, user_agent in ips_user_agents: 

863 options = (("grpc.primary_user_agent", user_agent),) 

864 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor): 

865 auth_api.Authenticate( 

866 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),) 

867 ) 

868 

869 def dummy_geoip(ip_address): 

870 return { 

871 "108.123.33.162": "Chicago, United States", 

872 "8.245.212.28": "Sydney, Australia", 

873 }.get(ip_address) 

874 

875 with real_account_session(token) as account: 

876 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip): 

877 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

878 print(res) 

879 assert len(res.active_sessions) == 4 

880 

881 # this one currently making the API call 

882 assert res.active_sessions[0].operating_system == "Other" 

883 assert res.active_sessions[0].browser == "Other" 

884 assert res.active_sessions[0].device == "Other" 

885 assert res.active_sessions[0].approximate_location == "Unknown" 

886 assert res.active_sessions[0].is_current_session 

887 

888 assert res.active_sessions[1].operating_system == "Ubuntu" 

889 assert res.active_sessions[1].browser == "Firefox" 

890 assert res.active_sessions[1].device == "Other" 

891 assert res.active_sessions[1].approximate_location == "Unknown" 

892 assert not res.active_sessions[1].is_current_session 

893 

894 assert res.active_sessions[2].operating_system == "Android" 

895 assert res.active_sessions[2].browser == "Samsung Internet" 

896 assert res.active_sessions[2].device == "K" 

897 assert res.active_sessions[2].approximate_location == "Sydney, Australia" 

898 assert not res.active_sessions[2].is_current_session 

899 

900 assert res.active_sessions[3].operating_system == "iOS" 

901 assert res.active_sessions[3].browser == "Mobile Safari" 

902 assert res.active_sessions[3].device == "iPhone" 

903 assert res.active_sessions[3].approximate_location == "Chicago, United States" 

904 assert not res.active_sessions[3].is_current_session 

905 

906 

907def test_LogOutSession(db, fast_passwords): 

908 password = random_hex() 

909 user, token = generate_user(hashed_password=hash_password(password)) 

910 

911 with auth_api_session() as (auth_api, metadata_interceptor): 

912 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

913 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

914 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

915 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

916 

917 with real_account_session(token) as account: 

918 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

919 assert len(res.active_sessions) == 5 

920 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created)) 

921 

922 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

923 assert len(res2.active_sessions) == 4 

924 

925 # ignore the first session as it changes 

926 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:] 

927 

928 

929def test_LogOutOtherSessions(db, fast_passwords): 

930 password = random_hex() 

931 user, token = generate_user(hashed_password=hash_password(password)) 

932 

933 with auth_api_session() as (auth_api, metadata_interceptor): 

934 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

935 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

936 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

937 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

938 

939 with real_account_session(token) as account: 

940 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

941 assert len(res.active_sessions) == 5 

942 with pytest.raises(grpc.RpcError) as e: 

943 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False)) 

944 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

945 assert e.value.details() == "Please confirm you want to log out of other sessions." 

946 

947 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True)) 

948 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

949 assert len(res.active_sessions) == 1 

950 

951 

952def test_CreateInviteCode(db): 

953 user, token = generate_user() 

954 

955 with account_session(token) as account: 

956 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()) 

957 code = res.code 

958 assert len(code) == 8 

959 

960 with session_scope() as session: 

961 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one() 

962 assert invite.creator_user_id == user.id 

963 assert invite.disabled is None 

964 assert res.url == urls.invite_code_link(code=res.code) 

965 

966 

967def test_DisableInviteCode(db): 

968 user, token = generate_user() 

969 

970 with account_session(token) as account: 

971 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

972 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code)) 

973 

974 with session_scope() as session: 

975 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one() 

976 assert invite.disabled is not None 

977 

978 

979def test_ListInviteCodes(db): 

980 user, token = generate_user() 

981 another_user, _ = generate_user() 

982 

983 with account_session(token) as account: 

984 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

985 

986 # simulate another_user having signed up with this invite code 

987 with session_scope() as session: 

988 session.execute(update(User).where(User.id == another_user.id).values(invite_code_id=code)) 

989 

990 with account_session(token) as account: 

991 res = account.ListInviteCodes(empty_pb2.Empty()) 

992 assert len(res.invite_codes) == 1 

993 assert res.invite_codes[0].code == code 

994 assert res.invite_codes[0].uses == 1 

995 assert res.invite_codes[0].url == urls.invite_code_link(code=code) 

996 

997 

998def test_reminders(db, moderator): 

999 # the strong verification reminder's absence is tested in test_strong_verification.py 

1000 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite 

1001 # we use LiteUser, so remember to refresh materialized views 

1002 user, token = generate_user(complete_profile=False) 

1003 complete_user, complete_token = generate_user(complete_profile=True) 

1004 req_user1, req_user_token1 = generate_user(complete_profile=True) 

1005 req_user2, req_user_token2 = generate_user(complete_profile=True) 

1006 

1007 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1008 with account_session(complete_token) as account: 

1009 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1010 "complete_verification_reminder" 

1011 ] 

1012 with account_session(token) as account: 

1013 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1014 "complete_profile_reminder", 

1015 "complete_verification_reminder", 

1016 ] 

1017 

1018 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

1019 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

1020 with requests_session(req_user_token1) as api: 

1021 host_request1_id = api.CreateHostRequest( 

1022 requests_pb2.CreateHostRequestReq( 

1023 host_user_id=user.id, 

1024 from_date=today_plus_2, 

1025 to_date=today_plus_3, 

1026 text=valid_request_text("Test request 1"), 

1027 ) 

1028 ).host_request_id 

1029 moderator.approve_host_request(host_request1_id) 

1030 

1031 with account_session(token) as account: 

1032 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1033 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1034 "respond_to_host_request_reminder", 

1035 "complete_profile_reminder", 

1036 "complete_verification_reminder", 

1037 ] 

1038 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1039 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1040 

1041 with requests_session(req_user_token2) as api: 

1042 host_request2_id = api.CreateHostRequest( 

1043 requests_pb2.CreateHostRequestReq( 

1044 host_user_id=user.id, 

1045 from_date=today_plus_2, 

1046 to_date=today_plus_3, 

1047 text=valid_request_text("Test request 2"), 

1048 ) 

1049 ).host_request_id 

1050 moderator.approve_host_request(host_request2_id) 

1051 

1052 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1053 with account_session(token) as account: 

1054 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1055 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1056 "respond_to_host_request_reminder", 

1057 "respond_to_host_request_reminder", 

1058 "complete_profile_reminder", 

1059 "complete_verification_reminder", 

1060 ] 

1061 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1062 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1063 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id 

1064 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1065 

1066 with requests_session(req_user_token1) as api: 

1067 host_request3_id = api.CreateHostRequest( 

1068 requests_pb2.CreateHostRequestReq( 

1069 host_user_id=user.id, 

1070 from_date=today_plus_2, 

1071 to_date=today_plus_3, 

1072 text=valid_request_text("Test request 3"), 

1073 ) 

1074 ).host_request_id 

1075 moderator.approve_host_request(host_request3_id) 

1076 

1077 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1078 with account_session(token) as account: 

1079 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1080 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1081 "respond_to_host_request_reminder", 

1082 "respond_to_host_request_reminder", 

1083 "respond_to_host_request_reminder", 

1084 "complete_profile_reminder", 

1085 "complete_verification_reminder", 

1086 ] 

1087 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1088 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1089 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id 

1090 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1091 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id 

1092 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1093 

1094 # accept req 

1095 with requests_session(token) as api: 

1096 api.RespondHostRequest( 

1097 requests_pb2.RespondHostRequestReq( 

1098 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED 

1099 ) 

1100 ) 

1101 

1102 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1103 with account_session(token) as account: 

1104 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1105 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1106 "respond_to_host_request_reminder", 

1107 "respond_to_host_request_reminder", 

1108 "complete_profile_reminder", 

1109 "complete_verification_reminder", 

1110 ] 

1111 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id 

1112 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1113 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id 

1114 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1115 

1116 

1117def test_volunteer_stuff(db): 

1118 # taken from couchers/app/backend/resources/badges.json 

1119 board_member_id = 8347 

1120 

1121 # with password 

1122 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam", id=board_member_id) 

1123 

1124 with account_session(token) as account: 

1125 res = account.GetAccountInfo(empty_pb2.Empty()) 

1126 assert not res.is_volunteer 

1127 

1128 with pytest.raises(grpc.RpcError) as e: 

1129 account.GetMyVolunteerInfo(empty_pb2.Empty()) 

1130 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1131 assert ( 

1132 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us." 

1133 ) 

1134 

1135 with pytest.raises(grpc.RpcError) as e: 

1136 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq()) 

1137 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1138 assert ( 

1139 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us." 

1140 ) 

1141 

1142 with session_scope() as session: 

1143 session.add( 

1144 make_volunteer( 

1145 user_id=user.id, 

1146 display_name="Great Volunteer", 

1147 display_location="The Bitbucket", 

1148 role="Lead Tester", 

1149 started_volunteering=date(2020, 6, 1), 

1150 ) 

1151 ) 

1152 

1153 with account_session(token) as account: 

1154 res = account.GetAccountInfo(empty_pb2.Empty()) 

1155 assert res.is_volunteer 

1156 

1157 res = account.GetMyVolunteerInfo(empty_pb2.Empty()) 

1158 

1159 assert res.display_name == "Great Volunteer" 

1160 assert res.display_location == "The Bitbucket" 

1161 assert res.role == "Lead Tester" 

1162 assert res.started_volunteering == "2020-06-01" 

1163 assert not res.stopped_volunteering 

1164 assert res.show_on_team_page 

1165 assert res.link_type == "couchers" 

1166 assert res.link_text == "@tester" 

1167 assert res.link_url == "http://localhost:3000/user/tester" 

1168 

1169 res = account.UpdateMyVolunteerInfo( 

1170 account_pb2.UpdateMyVolunteerInfoReq( 

1171 display_name=wrappers_pb2.StringValue(value=""), 

1172 link_type=wrappers_pb2.StringValue(value="website"), 

1173 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"), 

1174 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"), 

1175 ) 

1176 ) 

1177 

1178 assert res.display_name == "" 

1179 assert res.display_location == "The Bitbucket" 

1180 assert res.role == "Lead Tester" 

1181 assert res.started_volunteering == "2020-06-01" 

1182 assert not res.stopped_volunteering 

1183 assert res.show_on_team_page 

1184 assert res.link_type == "website" 

1185 assert res.link_text == "testervontester.com.invalid" 

1186 assert res.link_url == "https://www.testervontester.com.invalid/" 

1187 res = account.UpdateMyVolunteerInfo( 

1188 account_pb2.UpdateMyVolunteerInfoReq( 

1189 display_name=wrappers_pb2.StringValue(value=""), 

1190 link_type=wrappers_pb2.StringValue(value="linkedin"), 

1191 link_text=wrappers_pb2.StringValue(value="tester-vontester"), 

1192 ) 

1193 ) 

1194 assert res.display_name == "" 

1195 assert res.display_location == "The Bitbucket" 

1196 assert res.role == "Lead Tester" 

1197 assert res.started_volunteering == "2020-06-01" 

1198 assert not res.stopped_volunteering 

1199 assert res.show_on_team_page 

1200 assert res.link_type == "linkedin" 

1201 assert res.link_text == "tester-vontester" 

1202 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/" 

1203 

1204 res = account.UpdateMyVolunteerInfo( 

1205 account_pb2.UpdateMyVolunteerInfoReq( 

1206 display_name=wrappers_pb2.StringValue(value="Tester"), 

1207 display_location=wrappers_pb2.StringValue(value=""), 

1208 link_type=wrappers_pb2.StringValue(value="email"), 

1209 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"), 

1210 ) 

1211 ) 

1212 assert res.display_name == "Tester" 

1213 assert res.display_location == "" 

1214 assert res.role == "Lead Tester" 

1215 assert res.started_volunteering == "2020-06-01" 

1216 assert not res.stopped_volunteering 

1217 assert res.show_on_team_page 

1218 assert res.link_type == "email" 

1219 assert res.link_text == "tester@vontester.com.invalid" 

1220 assert res.link_url == "mailto:tester@vontester.com.invalid" 

1221 

1222 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1223 

1224 with public_session() as public: 

1225 res = public.GetVolunteers(empty_pb2.Empty()) 

1226 assert len(res.current_volunteers) == 1 

1227 v = res.current_volunteers[0] 

1228 assert v.name == "Tester" 

1229 assert v.username == "tester" 

1230 assert v.is_board_member 

1231 assert v.role == "Lead Tester" 

1232 assert v.location == "Amsterdam" 

1233 assert v.img.startswith("http://localhost:5001/img/thumbnail/") 

1234 assert v.link_type == "email" 

1235 assert v.link_text == "tester@vontester.com.invalid" 

1236 assert v.link_url == "mailto:tester@vontester.com.invalid"