Coverage for app/backend/src/tests/test_account.py: 100%

742 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1from datetime import UTC, date, datetime, timedelta 

2from unittest.mock import patch 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy import select, update 

8from sqlalchemy.sql import func 

9 

10from couchers import urls 

11from couchers.crypto import hash_password, random_hex 

12from couchers.db import session_scope 

13from couchers.materialized_views import refresh_materialized_views_rapid 

14from couchers.models import ( 

15 AccountDeletionReason, 

16 AccountDeletionToken, 

17 BackgroundJob, 

18 HostingStatus, 

19 InviteCode, 

20 PhotoGalleryItem, 

21 SleepingArrangement, 

22 Upload, 

23 User, 

24) 

25from couchers.proto import account_pb2, api_pb2, auth_pb2, conversations_pb2, requests_pb2 

26from couchers.utils import now, today 

27from tests.fixtures.db import generate_user, make_volunteer 

28from tests.fixtures.misc import EmailCollector, PushCollector, process_jobs 

29from tests.fixtures.sessions import ( 

30 account_session, 

31 auth_api_session, 

32 public_session, 

33 real_account_session, 

34 requests_session, 

35) 

36from tests.test_requests import valid_request_text 

37 

38 

39@pytest.fixture(autouse=True) 

40def _(testconfig): 

41 pass 

42 

43 

44def test_GetAccountInfo(db, fast_passwords): 

45 # with password 

46 user1, token1 = generate_user(hashed_password=hash_password(random_hex()), email="user@couchers.invalid") 

47 

48 with account_session(token1) as account: 

49 res = account.GetAccountInfo(empty_pb2.Empty()) 

50 assert res.email == "user@couchers.invalid" 

51 assert res.username == user1.username 

52 assert not res.has_strong_verification 

53 assert res.birthdate_verification_status == api_pb2.BIRTHDATE_VERIFICATION_STATUS_UNVERIFIED 

54 assert res.gender_verification_status == api_pb2.GENDER_VERIFICATION_STATUS_UNVERIFIED 

55 assert not res.is_superuser 

56 assert res.ui_language_preference == "" 

57 assert not res.is_volunteer 

58 

59 

60def test_donation_banner_no_drive(db): 

61 """Test that the banner is not shown when no drive is configured (flag unset)""" 

62 # User has donated, but there's no drive, so the banner should not show 

63 user, token = generate_user() 

64 

65 with account_session(token) as account: 

66 res = account.GetAccountInfo(empty_pb2.Empty()) 

67 assert not res.should_show_donation_banner 

68 

69 

70def test_donation_banner_never_donated(db, feature_flags): 

71 """Test that banner is shown when user has never donated and drive is active""" 

72 # Explicitly set last_donated=None since generate_user defaults to now() 

73 user, token = generate_user(last_donated=None) 

74 

75 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

76 feature_flags.set("donation_drive_start", int(drive_start.timestamp())) 

77 with account_session(token) as account: 

78 res = account.GetAccountInfo(empty_pb2.Empty()) 

79 assert res.should_show_donation_banner 

80 

81 

82def test_donation_banner_donated_before_drive(db, feature_flags): 

83 """Test that banner is shown when user donated before drive start""" 

84 user, token = generate_user() 

85 

86 # Set donation before drive start 

87 with session_scope() as session: 

88 last_donated = datetime(2025, 10, 15, tzinfo=UTC) # Before Nov 1 

89 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated)) 

90 

91 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

92 feature_flags.set("donation_drive_start", int(drive_start.timestamp())) 

93 with account_session(token) as account: 

94 res = account.GetAccountInfo(empty_pb2.Empty()) 

95 assert res.should_show_donation_banner 

96 

97 

98def test_donation_banner_donated_after_drive(db, feature_flags): 

99 """Test that banner is not shown when user donated after drive start""" 

100 user, token = generate_user() 

101 

102 # Set donation after drive start 

103 with session_scope() as session: 

104 last_donated = datetime(2025, 11, 15, tzinfo=UTC) # After Nov 1 

105 session.execute(update(User).where(User.id == user.id).values(last_donated=last_donated)) 

106 

107 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

108 feature_flags.set("donation_drive_start", int(drive_start.timestamp())) 

109 with account_session(token) as account: 

110 res = account.GetAccountInfo(empty_pb2.Empty()) 

111 assert not res.should_show_donation_banner 

112 

113 

114def test_donation_banner_donated_exactly_at_drive_start(db, feature_flags): 

115 """Test that banner is not shown when user donated exactly at drive start time""" 

116 drive_start = datetime(2025, 11, 1, tzinfo=UTC) 

117 

118 user, token = generate_user() 

119 

120 # Set donation exactly at drive start 

121 with session_scope() as session: 

122 session.execute(update(User).where(User.id == user.id).values(last_donated=drive_start)) 

123 

124 feature_flags.set("donation_drive_start", int(drive_start.timestamp())) 

125 with account_session(token) as account: 

126 res = account.GetAccountInfo(empty_pb2.Empty()) 

127 assert not res.should_show_donation_banner 

128 

129 

130def test_GetAccountInfo_regression(db): 

131 # there was a bug in evaluating `has_completed_profile` on the backend (in python) 

132 # when about_me is None but the user has a key, it was failing because len(about_me) doesn't work on None 

133 user, token = generate_user(about_me=None, complete_profile=False) 

134 

135 # add an avatar photo to the user's profile gallery 

136 with session_scope() as session: 

137 key = random_hex(32) 

138 filename = random_hex(32) + ".jpg" 

139 session.add( 

140 Upload( 

141 key=key, 

142 filename=filename, 

143 creator_user_id=user.id, 

144 ) 

145 ) 

146 session.flush() 

147 assert user.profile_gallery_id is not None 

148 session.add( 

149 PhotoGalleryItem( 

150 gallery_id=user.profile_gallery_id, 

151 upload_key=key, 

152 position=0, 

153 ) 

154 ) 

155 

156 with account_session(token) as account: 

157 res = account.GetAccountInfo(empty_pb2.Empty()) 

158 

159 

160def test_ChangePasswordV2_normal(db, fast_passwords, email_collector: EmailCollector, push_collector: PushCollector): 

161 # user has old password and is changing to new password 

162 old_password = random_hex() 

163 new_password = random_hex() 

164 user, token = generate_user(hashed_password=hash_password(old_password)) 

165 

166 with account_session(token) as account: 

167 account.ChangePasswordV2( 

168 account_pb2.ChangePasswordV2Req( 

169 old_password=old_password, 

170 new_password=new_password, 

171 ) 

172 ) 

173 

174 email = email_collector.pop_for_recipient(user.email, last=True) 

175 assert email.subject == "[TEST] Your password was changed" 

176 

177 push = push_collector.pop_for_user(user.id, last=True) 

178 assert push.content.title == "Password changed" 

179 assert push.content.body == "Your password was changed." 

180 

181 with session_scope() as session: 

182 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

183 assert updated_user.hashed_password == hash_password(new_password) 

184 

185 

186def test_ChangePasswordV2_regression(db, fast_passwords): 

187 # send_password_changed_email wasn't working 

188 # user has old password and is changing to new password 

189 old_password = random_hex() 

190 new_password = random_hex() 

191 user, token = generate_user(hashed_password=hash_password(old_password)) 

192 

193 with account_session(token) as account: 

194 account.ChangePasswordV2( 

195 account_pb2.ChangePasswordV2Req( 

196 old_password=old_password, 

197 new_password=new_password, 

198 ) 

199 ) 

200 

201 with session_scope() as session: 

202 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

203 assert updated_user.hashed_password == hash_password(new_password) 

204 

205 

206def test_ChangePasswordV2_normal_short_password(db, fast_passwords): 

207 # user has old password and is changing to new password, but used short password 

208 old_password = random_hex() 

209 new_password = random_hex(length=1) 

210 user, token = generate_user(hashed_password=hash_password(old_password)) 

211 

212 with account_session(token) as account: 

213 with pytest.raises(grpc.RpcError) as e: 

214 account.ChangePasswordV2( 

215 account_pb2.ChangePasswordV2Req( 

216 old_password=old_password, 

217 new_password=new_password, 

218 ) 

219 ) 

220 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

221 assert e.value.details() == "The password must be 8 or more characters long." 

222 

223 with session_scope() as session: 

224 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

225 assert updated_user.hashed_password == hash_password(old_password) 

226 

227 

228def test_ChangePasswordV2_normal_long_password(db, fast_passwords): 

229 # user has old password and is changing to new password, but used short password 

230 old_password = random_hex() 

231 new_password = random_hex(length=1000) 

232 user, token = generate_user(hashed_password=hash_password(old_password)) 

233 

234 with account_session(token) as account: 

235 with pytest.raises(grpc.RpcError) as e: 

236 account.ChangePasswordV2( 

237 account_pb2.ChangePasswordV2Req( 

238 old_password=old_password, 

239 new_password=new_password, 

240 ) 

241 ) 

242 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

243 assert e.value.details() == "The password must be less than 256 characters." 

244 

245 with session_scope() as session: 

246 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

247 assert updated_user.hashed_password == hash_password(old_password) 

248 

249 

250def test_ChangePasswordV2_normal_insecure_password(db, fast_passwords): 

251 # user has old password and is changing to new password, but used insecure password 

252 old_password = random_hex() 

253 new_password = "12345678" 

254 user, token = generate_user(hashed_password=hash_password(old_password)) 

255 

256 with account_session(token) as account: 

257 with pytest.raises(grpc.RpcError) as e: 

258 account.ChangePasswordV2( 

259 account_pb2.ChangePasswordV2Req( 

260 old_password=old_password, 

261 new_password=new_password, 

262 ) 

263 ) 

264 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

265 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable." 

266 

267 with session_scope() as session: 

268 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

269 assert updated_user.hashed_password == hash_password(old_password) 

270 

271 

272def test_ChangePasswordV2_normal_wrong_password(db, fast_passwords): 

273 # user has old password and is changing to new password, but used wrong old password 

274 old_password = random_hex() 

275 new_password = random_hex() 

276 user, token = generate_user(hashed_password=hash_password(old_password)) 

277 

278 with account_session(token) as account: 

279 with pytest.raises(grpc.RpcError) as e: 

280 account.ChangePasswordV2( 

281 account_pb2.ChangePasswordV2Req( 

282 old_password="Wrong password", 

283 new_password=new_password, 

284 ) 

285 ) 

286 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

287 assert e.value.details() == "Wrong password." 

288 

289 with session_scope() as session: 

290 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

291 assert updated_user.hashed_password == hash_password(old_password) 

292 

293 

294def test_ChangePasswordV2_normal_no_passwords(db, fast_passwords): 

295 # user has old password and called with empty body 

296 old_password = random_hex() 

297 user, token = generate_user(hashed_password=hash_password(old_password)) 

298 

299 with account_session(token) as account: 

300 with pytest.raises(grpc.RpcError) as e: 

301 account.ChangePasswordV2(account_pb2.ChangePasswordV2Req(old_password=old_password)) 

302 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

303 assert e.value.details() == "The password must be 8 or more characters long." 

304 

305 with session_scope() as session: 

306 updated_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

307 assert updated_user.hashed_password == hash_password(old_password) 

308 

309 

310def test_ChangeEmailV2_wrong_password(db, fast_passwords): 

311 password = random_hex() 

312 new_email = f"{random_hex()}@couchers.org.invalid" 

313 user, token = generate_user(hashed_password=hash_password(password)) 

314 

315 with account_session(token) as account: 

316 with pytest.raises(grpc.RpcError) as e: 

317 account.ChangeEmailV2( 

318 account_pb2.ChangeEmailV2Req( 

319 password="Wrong password", 

320 new_email=new_email, 

321 ) 

322 ) 

323 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

324 assert e.value.details() == "Wrong password." 

325 

326 with session_scope() as session: 

327 assert ( 

328 session.execute( 

329 select(func.count()) 

330 .select_from(User) 

331 .where(User.new_email_token_created <= func.now()) 

332 .where(User.new_email_token_expiry >= func.now()) 

333 ) 

334 ).scalar_one() == 0 

335 

336 

337def test_ChangeEmailV2_wrong_email(db, fast_passwords): 

338 password = random_hex() 

339 new_email = f"{random_hex()}@couchers.org.invalid" 

340 user, token = generate_user(hashed_password=hash_password(password)) 

341 

342 with account_session(token) as account: 

343 with pytest.raises(grpc.RpcError) as e: 

344 account.ChangeEmailV2( 

345 account_pb2.ChangeEmailV2Req( 

346 password="Wrong password", 

347 new_email=new_email, 

348 ) 

349 ) 

350 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

351 assert e.value.details() == "Wrong password." 

352 

353 with session_scope() as session: 

354 assert ( 

355 session.execute( 

356 select(func.count()) 

357 .select_from(User) 

358 .where(User.new_email_token_created <= func.now()) 

359 .where(User.new_email_token_expiry >= func.now()) 

360 ) 

361 ).scalar_one() == 0 

362 

363 

364def test_ChangeEmailV2_invalid_email(db, fast_passwords): 

365 password = random_hex() 

366 user, token = generate_user(hashed_password=hash_password(password)) 

367 

368 with account_session(token) as account: 

369 with pytest.raises(grpc.RpcError) as e: 

370 account.ChangeEmailV2( 

371 account_pb2.ChangeEmailV2Req( 

372 password=password, 

373 new_email="not a real email", 

374 ) 

375 ) 

376 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

377 assert e.value.details() == "Invalid email." 

378 

379 with session_scope() as session: 

380 assert ( 

381 session.execute( 

382 select(func.count()) 

383 .select_from(User) 

384 .where(User.new_email_token_created <= func.now()) 

385 .where(User.new_email_token_expiry >= func.now()) 

386 ) 

387 ).scalar_one() == 0 

388 

389 

390def test_ChangeEmailV2_email_in_use(db, fast_passwords): 

391 password = random_hex() 

392 user, token = generate_user(hashed_password=hash_password(password)) 

393 user2, token2 = generate_user(hashed_password=hash_password(password)) 

394 

395 with account_session(token) as account: 

396 with pytest.raises(grpc.RpcError) as e: 

397 account.ChangeEmailV2( 

398 account_pb2.ChangeEmailV2Req( 

399 password=password, 

400 new_email=user2.email, 

401 ) 

402 ) 

403 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

404 assert e.value.details() == "Invalid email." 

405 

406 with session_scope() as session: 

407 assert ( 

408 session.execute( 

409 select(func.count()) 

410 .select_from(User) 

411 .where(User.new_email_token_created <= func.now()) 

412 .where(User.new_email_token_expiry >= func.now()) 

413 ) 

414 ).scalar_one() == 0 

415 

416 

417def test_ChangeEmailV2_no_change(db, fast_passwords): 

418 password = random_hex() 

419 user, token = generate_user(hashed_password=hash_password(password)) 

420 

421 with account_session(token) as account: 

422 with pytest.raises(grpc.RpcError) as e: 

423 account.ChangeEmailV2( 

424 account_pb2.ChangeEmailV2Req( 

425 password=password, 

426 new_email=user.email, 

427 ) 

428 ) 

429 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

430 assert e.value.details() == "Invalid email." 

431 

432 with session_scope() as session: 

433 assert ( 

434 session.execute( 

435 select(func.count()) 

436 .select_from(User) 

437 .where(User.new_email_token_created <= func.now()) 

438 .where(User.new_email_token_expiry >= func.now()) 

439 ) 

440 ).scalar_one() == 0 

441 

442 

443def test_ChangeEmailV2_wrong_token(db, fast_passwords): 

444 password = random_hex() 

445 new_email = f"{random_hex()}@couchers.org.invalid" 

446 user, token = generate_user(hashed_password=hash_password(password)) 

447 

448 with account_session(token) as account: 

449 account.ChangeEmailV2( 

450 account_pb2.ChangeEmailV2Req( 

451 password=password, 

452 new_email=new_email, 

453 ) 

454 ) 

455 

456 with auth_api_session() as (auth_api, metadata_interceptor): 

457 with pytest.raises(grpc.RpcError) as e: 

458 res = auth_api.ConfirmChangeEmailV2( 

459 auth_pb2.ConfirmChangeEmailV2Req( 

460 change_email_token="wrongtoken", 

461 ) 

462 ) 

463 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

464 assert e.value.details() == "Invalid token." 

465 

466 with session_scope() as session: 

467 user_updated = session.execute(select(User).where(User.id == user.id)).scalar_one() 

468 assert user_updated.email == user.email 

469 

470 

471def test_ChangeEmailV2_tokens_two_hour_window(db): 

472 def two_hours_one_minute_in_future(): 

473 return now() + timedelta(hours=2, minutes=1) 

474 

475 def one_minute_ago(): 

476 return now() - timedelta(minutes=1) 

477 

478 password = random_hex() 

479 new_email = f"{random_hex()}@couchers.org.invalid" 

480 user, token = generate_user(hashed_password=hash_password(password)) 

481 

482 with account_session(token) as account: 

483 account.ChangeEmailV2( 

484 account_pb2.ChangeEmailV2Req( 

485 password=password, 

486 new_email=new_email, 

487 ) 

488 ) 

489 

490 with session_scope() as session: 

491 new_email_token = session.execute(select(User.new_email_token).where(User.id == user.id)).scalar_one() 

492 

493 with patch("couchers.servicers.auth.now", one_minute_ago): 

494 with auth_api_session() as (auth_api, metadata_interceptor): 

495 with pytest.raises(grpc.RpcError) as e: 

496 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

497 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

498 assert e.value.details() == "Invalid token." 

499 

500 with pytest.raises(grpc.RpcError) as e: 

501 auth_api.ConfirmChangeEmailV2( 

502 auth_pb2.ConfirmChangeEmailV2Req( 

503 change_email_token=new_email_token, 

504 ) 

505 ) 

506 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

507 assert e.value.details() == "Invalid token." 

508 

509 with patch("couchers.servicers.auth.now", two_hours_one_minute_in_future): 

510 with auth_api_session() as (auth_api, metadata_interceptor): 

511 with pytest.raises(grpc.RpcError) as e: 

512 auth_api.ConfirmChangeEmailV2(auth_pb2.ConfirmChangeEmailV2Req()) 

513 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

514 assert e.value.details() == "Invalid token." 

515 

516 with pytest.raises(grpc.RpcError) as e: 

517 auth_api.ConfirmChangeEmailV2( 

518 auth_pb2.ConfirmChangeEmailV2Req( 

519 change_email_token=new_email_token, 

520 ) 

521 ) 

522 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

523 assert e.value.details() == "Invalid token." 

524 

525 

526def test_ChangeEmailV2(db, fast_passwords, push_collector: PushCollector): 

527 password = random_hex() 

528 new_email = f"{random_hex()}@couchers.org.invalid" 

529 user, token = generate_user(hashed_password=hash_password(password)) 

530 user_id = user.id 

531 

532 with account_session(token) as account: 

533 account.ChangeEmailV2( 

534 account_pb2.ChangeEmailV2Req( 

535 password=password, 

536 new_email=new_email, 

537 ) 

538 ) 

539 

540 with session_scope() as session: 

541 user_updated = session.execute(select(User).where(User.id == user_id)).scalar_one() 

542 assert user_updated.email == user.email 

543 assert user_updated.new_email == new_email 

544 assert user_updated.new_email_token is not None 

545 assert user_updated.new_email_token_created 

546 assert user_updated.new_email_token_created <= now() 

547 assert user_updated.new_email_token_expiry 

548 assert user_updated.new_email_token_expiry >= now() 

549 

550 token = user_updated.new_email_token 

551 

552 process_jobs() 

553 push = push_collector.pop_for_user(user_id, last=True) 

554 assert push.content.title == "Email change requested" 

555 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address." 

556 

557 with auth_api_session() as (auth_api, metadata_interceptor): 

558 auth_api.ConfirmChangeEmailV2( 

559 auth_pb2.ConfirmChangeEmailV2Req( 

560 change_email_token=token, 

561 ) 

562 ) 

563 

564 with session_scope() as session: 

565 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

566 assert user.email == new_email 

567 assert user.new_email is None 

568 assert user.new_email_token is None 

569 assert user.new_email_token_created is None 

570 assert user.new_email_token_expiry is None 

571 

572 process_jobs() 

573 push = push_collector.pop_for_user(user_id, last=True) 

574 assert push.content.title == "Email verified" 

575 assert push.content.body == "Your new email address has been verified." 

576 

577 

578def test_ChangeEmailV2_sends_proper_emails(db, fast_passwords, push_collector: PushCollector): 

579 password = random_hex() 

580 new_email = f"{random_hex()}@couchers.org.invalid" 

581 user, token = generate_user(hashed_password=hash_password(password)) 

582 

583 with account_session(token) as account: 

584 account.ChangeEmailV2( 

585 account_pb2.ChangeEmailV2Req( 

586 password=password, 

587 new_email=new_email, 

588 ) 

589 ) 

590 

591 process_jobs() 

592 

593 with session_scope() as session: 

594 jobs = session.execute(select(BackgroundJob).where(BackgroundJob.job_type == "send_email")).scalars().all() 

595 assert len(jobs) == 2 

596 uq_str1 = b"An email change to the email" 

597 uq_str2 = ( 

598 b"You requested that your email be changed to this email address on Couchers.org. Your old email address is" 

599 ) 

600 assert (uq_str1 in jobs[0].payload and uq_str2 in jobs[1].payload) or ( 

601 uq_str2 in jobs[0].payload and uq_str1 in jobs[1].payload 

602 ) 

603 

604 push = push_collector.pop_for_user(user.id, last=True) 

605 assert push.content.title == "Email change requested" 

606 assert push.content.body == f"Use the link we sent to {new_email} to confirm your new address." 

607 

608 

609def test_ChangeLanguagePreference(db, fast_passwords): 

610 # user changes from default to ISO 639-1 language code 

611 new_lang = "zh" 

612 user, token = generate_user() 

613 

614 with real_account_session(token) as account: 

615 res = account.GetAccountInfo(empty_pb2.Empty()) 

616 assert res.ui_language_preference == "" 

617 

618 # call will have info about the request 

619 res, call = account.ChangeLanguagePreference.with_call( 

620 account_pb2.ChangeLanguagePreferenceReq(ui_language_preference=new_lang) 

621 ) 

622 

623 # cookies are sent via initial metadata, so we check for it there 

624 # the value of "set-cookie" will be the full cookie string, pull the key value from the string 

625 cookie_values = [v.split(";")[0] for k, v in call.initial_metadata() if k == "set-cookie"] 

626 assert any(val == "NEXT_LOCALE=zh" for val in cookie_values), ( 

627 f"Didn't find the right cookie, got {call.initial_metadata()}" 

628 ) 

629 

630 # the changed language preference should also be sent to the backend 

631 res = account.GetAccountInfo(empty_pb2.Empty()) 

632 assert res.ui_language_preference == "zh" 

633 

634 

635def test_contributor_form(db): 

636 user, token = generate_user() 

637 

638 with account_session(token) as account: 

639 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

640 assert not res.filled_contributor_form 

641 

642 account.FillContributorForm(account_pb2.FillContributorFormReq(contributor_form=auth_pb2.ContributorForm())) 

643 

644 res = account.GetContributorFormInfo(empty_pb2.Empty()) 

645 assert res.filled_contributor_form 

646 

647 

648def test_DeleteAccount_start(db, email_collector: EmailCollector): 

649 user, token = generate_user() 

650 

651 with account_session(token) as account: 

652 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) 

653 email = email_collector.pop_for_recipient(user.email, last=True) 

654 assert email.subject == "[TEST] Confirm your Couchers.org account deletion" 

655 

656 with session_scope() as session: 

657 deletion_token: AccountDeletionToken = session.execute( 

658 select(AccountDeletionToken).where(AccountDeletionToken.user_id == user.id) 

659 ).scalar_one() 

660 

661 assert deletion_token.is_valid 

662 assert session.execute(select(User).where(User.id == user.id)).scalar_one().deleted_at is None 

663 

664 

665def test_DeleteAccount_message_storage(db): 

666 user, token = generate_user() 

667 

668 with account_session(token) as account: 

669 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason=None)) # not stored 

670 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="")) # not stored 

671 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="Reason")) 

672 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="0192#(&!&#)*@//)(8")) 

673 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="\n\n\t")) # not stored 

674 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True, reason="1337")) 

675 

676 with session_scope() as session: 

677 assert session.execute(select(func.count()).select_from(AccountDeletionReason)).scalar_one() == 3 

678 

679 

680def test_full_delete_account_with_recovery(db, email_collector: EmailCollector, push_collector: PushCollector): 

681 user, token = generate_user() 

682 user_id = user.id 

683 

684 with account_session(token) as account: 

685 with pytest.raises(grpc.RpcError) as err: 

686 account.DeleteAccount(account_pb2.DeleteAccountReq()) 

687 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

688 assert err.value.details() == "Please confirm your account deletion." 

689 

690 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

691 

692 email = email_collector.pop_for_recipient(user.email, last=True) 

693 assert email.subject == "[TEST] Confirm your Couchers.org account deletion" 

694 assert email.recipient == user.email 

695 assert "account deletion" in email.subject.lower() 

696 unique_string = "You requested that we delete your account from Couchers.org." 

697 assert unique_string in email.plain 

698 assert unique_string in email.html 

699 assert "support@couchers.org" in email.plain 

700 assert "support@couchers.org" in email.html 

701 

702 push = push_collector.pop_for_user(user_id, last=True) 

703 assert push.content.title == "Account deletion requested" 

704 assert push.content.body == "Use the link we emailed you to confirm." 

705 

706 with session_scope() as session: 

707 token_o = session.execute(select(AccountDeletionToken)).scalar_one() 

708 delete_token = token_o.token 

709 

710 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

711 assert token_o.user == user_ 

712 assert user_.deleted_at is None 

713 assert not user_.undelete_token 

714 assert not user_.undelete_until 

715 

716 assert delete_token in email.plain 

717 assert delete_token in email.html 

718 delete_url = f"http://localhost:3000/delete-account?token={delete_token}" 

719 assert delete_url in email.plain 

720 assert delete_url in email.html 

721 

722 with auth_api_session() as (auth_api, metadata_interceptor): 

723 auth_api.ConfirmDeleteAccount( 

724 auth_pb2.ConfirmDeleteAccountReq( 

725 token=delete_token, 

726 ) 

727 ) 

728 

729 email = email_collector.pop_for_recipient(user.email, last=True) 

730 assert email.recipient == user.email 

731 assert "account has been deleted" in email.subject.lower() 

732 unique_string = "You have successfully deleted your account from Couchers.org." 

733 assert unique_string in email.plain 

734 assert unique_string in email.html 

735 assert "7 days" in email.plain 

736 assert "7 days" in email.html 

737 assert "support@couchers.org" in email.plain 

738 assert "support@couchers.org" in email.html 

739 

740 push = push_collector.pop_for_user(user_id, last=True) 

741 assert push.content.title == "Account deleted" 

742 assert push.content.body == "You can restore it within 7 days using the link we emailed you." 

743 

744 with session_scope() as session: 

745 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

746 

747 user_ = session.execute(select(User).where(User.id == user_id)).scalar_one() 

748 assert user_.deleted_at is not None 

749 assert user_.undelete_token 

750 assert user_.undelete_until 

751 assert user_.undelete_until > now() 

752 

753 undelete_token = user_.undelete_token 

754 

755 undelete_url = f"http://localhost:3000/recover-account?token={undelete_token}" 

756 assert undelete_url in email.plain 

757 assert undelete_url in email.html 

758 

759 with auth_api_session() as (auth_api, metadata_interceptor): 

760 auth_api.RecoverAccount( 

761 auth_pb2.RecoverAccountReq( 

762 token=undelete_token, 

763 ) 

764 ) 

765 

766 email = email_collector.pop_for_recipient(user.email, last=True) 

767 assert email.recipient == user.email 

768 assert "account has been recovered" in email.subject.lower() 

769 unique_string = "Your account on Couchers.org has been successfully recovered!" 

770 assert unique_string in email.plain 

771 assert unique_string in email.html 

772 assert "support@couchers.org" in email.plain 

773 assert "support@couchers.org" in email.html 

774 

775 push = push_collector.pop_for_user(user_id, last=True) 

776 assert push.content.title == "Account restored" 

777 assert push.content.body == "Welcome back!" 

778 

779 with session_scope() as session: 

780 assert not session.execute(select(AccountDeletionToken)).scalar_one_or_none() 

781 

782 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

783 assert user.deleted_at is None 

784 assert not user.undelete_token 

785 assert not user.undelete_until 

786 

787 

788def test_multiple_delete_tokens(db): 

789 """ 

790 Make sure deletion tokens are deleted on delete 

791 """ 

792 user, token = generate_user() 

793 

794 with account_session(token) as account: 

795 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

796 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

797 account.DeleteAccount(account_pb2.DeleteAccountReq(confirm=True)) 

798 

799 with session_scope() as session: 

800 assert session.execute(select(func.count()).select_from(AccountDeletionToken)).scalar_one() == 3 

801 token = session.execute(select(AccountDeletionToken.token).limit(1)).scalar_one() 

802 

803 with auth_api_session() as (auth_api, metadata_interceptor): 

804 auth_api.ConfirmDeleteAccount( 

805 auth_pb2.ConfirmDeleteAccountReq( 

806 token=token, 

807 ) 

808 ) 

809 

810 with session_scope() as session: 

811 assert not session.execute(select(AccountDeletionToken.token)).scalar_one_or_none() 

812 

813 

814def test_ListActiveSessions_pagination(db, fast_passwords): 

815 password = random_hex() 

816 user, token = generate_user(hashed_password=hash_password(password)) 

817 

818 with auth_api_session() as (auth_api, metadata_interceptor): 

819 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

820 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

821 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

822 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

823 

824 with real_account_session(token) as account: 

825 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_size=3)) 

826 assert len(res.active_sessions) == 3 

827 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq(page_token=res.next_page_token, page_size=3)) 

828 assert len(res.active_sessions) == 2 

829 assert not res.next_page_token 

830 

831 

832def test_ListActiveSessions_details(db, fast_passwords): 

833 password = random_hex() 

834 user, token = generate_user(hashed_password=hash_password(password)) 

835 

836 ips_user_agents = [ 

837 ( 

838 "108.123.33.162", 

839 "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Mobile/15E148 Safari/604.1", 

840 ), 

841 ( 

842 "8.245.212.28", 

843 "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/26.0 Chrome/122.0.0.0 Mobile Safari/537.36", 

844 ), 

845 ( 

846 "95.254.140.156", 

847 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", 

848 ), 

849 ] 

850 

851 for ip, user_agent in ips_user_agents: 

852 options = (("grpc.primary_user_agent", user_agent),) 

853 with auth_api_session(grpc_channel_options=options) as (auth_api, metadata_interceptor): 

854 auth_api.Authenticate( 

855 auth_pb2.AuthReq(user=user.username, password=password), metadata=(("x-couchers-real-ip", ip),) 

856 ) 

857 

858 def dummy_geoip(ip_address): 

859 return { 

860 "108.123.33.162": "Chicago, United States", 

861 "8.245.212.28": "Sydney, Australia", 

862 }.get(ip_address) 

863 

864 with real_account_session(token) as account: 

865 with patch("couchers.servicers.account.geoip_approximate_location", dummy_geoip): 

866 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

867 print(res) 

868 assert len(res.active_sessions) == 4 

869 

870 # this one currently making the API call 

871 assert res.active_sessions[0].operating_system == "Other" 

872 assert res.active_sessions[0].browser == "Other" 

873 assert res.active_sessions[0].device == "Other" 

874 assert res.active_sessions[0].approximate_location == "Unknown" 

875 assert res.active_sessions[0].is_current_session 

876 

877 assert res.active_sessions[1].operating_system == "Ubuntu" 

878 assert res.active_sessions[1].browser == "Firefox" 

879 assert res.active_sessions[1].device == "Other" 

880 assert res.active_sessions[1].approximate_location == "Unknown" 

881 assert not res.active_sessions[1].is_current_session 

882 

883 assert res.active_sessions[2].operating_system == "Android" 

884 assert res.active_sessions[2].browser == "Samsung Internet" 

885 assert res.active_sessions[2].device == "K" 

886 assert res.active_sessions[2].approximate_location == "Sydney, Australia" 

887 assert not res.active_sessions[2].is_current_session 

888 

889 assert res.active_sessions[3].operating_system == "iOS" 

890 assert res.active_sessions[3].browser == "Mobile Safari" 

891 assert res.active_sessions[3].device == "iPhone" 

892 assert res.active_sessions[3].approximate_location == "Chicago, United States" 

893 assert not res.active_sessions[3].is_current_session 

894 

895 

896def test_LogOutSession(db, fast_passwords): 

897 password = random_hex() 

898 user, token = generate_user(hashed_password=hash_password(password)) 

899 

900 with auth_api_session() as (auth_api, metadata_interceptor): 

901 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

902 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

903 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

904 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

905 

906 with real_account_session(token) as account: 

907 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

908 assert len(res.active_sessions) == 5 

909 account.LogOutSession(account_pb2.LogOutSessionReq(created=res.active_sessions[3].created)) 

910 

911 res2 = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

912 assert len(res2.active_sessions) == 4 

913 

914 # ignore the first session as it changes 

915 assert res.active_sessions[1:3] + res.active_sessions[4:] == res2.active_sessions[1:] 

916 

917 

918def test_LogOutOtherSessions(db, fast_passwords): 

919 password = random_hex() 

920 user, token = generate_user(hashed_password=hash_password(password)) 

921 

922 with auth_api_session() as (auth_api, metadata_interceptor): 

923 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

924 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

925 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

926 auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password=password)) 

927 

928 with real_account_session(token) as account: 

929 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

930 assert len(res.active_sessions) == 5 

931 with pytest.raises(grpc.RpcError) as e: 

932 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=False)) 

933 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

934 assert e.value.details() == "Please confirm you want to log out of other sessions." 

935 

936 account.LogOutOtherSessions(account_pb2.LogOutOtherSessionsReq(confirm=True)) 

937 res = account.ListActiveSessions(account_pb2.ListActiveSessionsReq()) 

938 assert len(res.active_sessions) == 1 

939 

940 

941def test_CreateInviteCode(db): 

942 user, token = generate_user() 

943 

944 with account_session(token) as account: 

945 res = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()) 

946 code = res.code 

947 assert len(code) == 8 

948 

949 with session_scope() as session: 

950 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one() 

951 assert invite.creator_user_id == user.id 

952 assert invite.disabled is None 

953 assert res.url == urls.invite_code_link(code=res.code) 

954 

955 

956def test_DisableInviteCode(db): 

957 user, token = generate_user() 

958 

959 with account_session(token) as account: 

960 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

961 account.DisableInviteCode(account_pb2.DisableInviteCodeReq(code=code)) 

962 

963 with session_scope() as session: 

964 invite = session.execute(select(InviteCode).where(InviteCode.id == code)).scalar_one() 

965 assert invite.disabled is not None 

966 

967 

968def test_ListInviteCodes(db): 

969 user, token = generate_user() 

970 another_user, _ = generate_user() 

971 

972 with account_session(token) as account: 

973 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

974 

975 # simulate another_user having signed up with this invite code 

976 with session_scope() as session: 

977 session.execute(update(User).where(User.id == another_user.id).values(invite_code_id=code)) 

978 

979 with account_session(token) as account: 

980 res = account.ListInviteCodes(empty_pb2.Empty()) 

981 assert len(res.invite_codes) == 1 

982 assert res.invite_codes[0].code == code 

983 assert res.invite_codes[0].uses == 1 

984 assert res.invite_codes[0].url == urls.invite_code_link(code=code) 

985 

986 

987def test_reminders(db, moderator): 

988 # reference writing reminders tested in test_AvailableWriteReferences_and_ListPendingReferencesToWrite 

989 # we use LiteUser, so remember to refresh materialized views 

990 user, token = generate_user(complete_profile=False) 

991 complete_user, complete_token = generate_user(complete_profile=True) 

992 req_user1, req_user_token1 = generate_user(complete_profile=True) 

993 req_user2, req_user_token2 = generate_user(complete_profile=True) 

994 

995 refresh_materialized_views_rapid(empty_pb2.Empty()) 

996 with account_session(complete_token) as account: 

997 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [] 

998 with account_session(token) as account: 

999 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1000 "complete_profile_reminder", 

1001 ] 

1002 

1003 today_plus_2 = (today() + timedelta(days=2)).isoformat() 

1004 today_plus_3 = (today() + timedelta(days=3)).isoformat() 

1005 with requests_session(req_user_token1) as api: 

1006 host_request1_id = api.CreateHostRequest( 

1007 requests_pb2.CreateHostRequestReq( 

1008 host_user_id=user.id, 

1009 from_date=today_plus_2, 

1010 to_date=today_plus_3, 

1011 text=valid_request_text("Test request 1"), 

1012 ) 

1013 ).host_request_id 

1014 moderator.approve_host_request(host_request1_id) 

1015 

1016 with account_session(token) as account: 

1017 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1018 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1019 "respond_to_host_request_reminder", 

1020 "complete_profile_reminder", 

1021 ] 

1022 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1023 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1024 

1025 with requests_session(req_user_token2) as api: 

1026 host_request2_id = api.CreateHostRequest( 

1027 requests_pb2.CreateHostRequestReq( 

1028 host_user_id=user.id, 

1029 from_date=today_plus_2, 

1030 to_date=today_plus_3, 

1031 text=valid_request_text("Test request 2"), 

1032 ) 

1033 ).host_request_id 

1034 moderator.approve_host_request(host_request2_id) 

1035 

1036 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1037 with account_session(token) as account: 

1038 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1039 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1040 "respond_to_host_request_reminder", 

1041 "respond_to_host_request_reminder", 

1042 "complete_profile_reminder", 

1043 ] 

1044 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1045 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1046 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id 

1047 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1048 

1049 with requests_session(req_user_token1) as api: 

1050 host_request3_id = api.CreateHostRequest( 

1051 requests_pb2.CreateHostRequestReq( 

1052 host_user_id=user.id, 

1053 from_date=today_plus_2, 

1054 to_date=today_plus_3, 

1055 text=valid_request_text("Test request 3"), 

1056 ) 

1057 ).host_request_id 

1058 moderator.approve_host_request(host_request3_id) 

1059 

1060 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1061 with account_session(token) as account: 

1062 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1063 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1064 "respond_to_host_request_reminder", 

1065 "respond_to_host_request_reminder", 

1066 "respond_to_host_request_reminder", 

1067 "complete_profile_reminder", 

1068 ] 

1069 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request1_id 

1070 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1071 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request2_id 

1072 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1073 assert reminders[2].respond_to_host_request_reminder.host_request_id == host_request3_id 

1074 assert reminders[2].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1075 

1076 # accept req 

1077 with requests_session(token) as api: 

1078 api.RespondHostRequest( 

1079 requests_pb2.RespondHostRequestReq( 

1080 host_request_id=host_request1_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED 

1081 ) 

1082 ) 

1083 

1084 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1085 with account_session(token) as account: 

1086 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1087 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1088 "respond_to_host_request_reminder", 

1089 "respond_to_host_request_reminder", 

1090 "complete_profile_reminder", 

1091 ] 

1092 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request2_id 

1093 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user2.id 

1094 assert reminders[1].respond_to_host_request_reminder.host_request_id == host_request3_id 

1095 assert reminders[1].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1096 

1097 # host replies to req2 with a message: reminder should clear even though it's still pending 

1098 with requests_session(token) as api: 

1099 api.SendHostRequestMessage( 

1100 requests_pb2.SendHostRequestMessageReq(host_request_id=host_request2_id, text="Let me think about it") 

1101 ) 

1102 

1103 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1104 with account_session(token) as account: 

1105 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1106 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1107 "respond_to_host_request_reminder", 

1108 "complete_profile_reminder", 

1109 ] 

1110 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request3_id 

1111 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1112 

1113 # surfer sending a message should not clear the reminder 

1114 with requests_session(req_user_token1) as api: 

1115 api.SendHostRequestMessage( 

1116 requests_pb2.SendHostRequestMessageReq(host_request_id=host_request3_id, text="Any update?") 

1117 ) 

1118 

1119 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1120 with account_session(token) as account: 

1121 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1122 assert [reminder.WhichOneof("reminder") for reminder in reminders] == [ 

1123 "respond_to_host_request_reminder", 

1124 "complete_profile_reminder", 

1125 ] 

1126 assert reminders[0].respond_to_host_request_reminder.host_request_id == host_request3_id 

1127 assert reminders[0].respond_to_host_request_reminder.surfer_user.user_id == req_user1.id 

1128 

1129 

1130def test_confirm_host_request_reminder(db, moderator): 

1131 host, host_token = generate_user(complete_profile=True) 

1132 surfer, surfer_token = generate_user(complete_profile=True) 

1133 

1134 today_plus_10 = (today() + timedelta(days=10)).isoformat() 

1135 today_plus_12 = (today() + timedelta(days=12)).isoformat() 

1136 

1137 with requests_session(surfer_token) as api: 

1138 host_request_id = api.CreateHostRequest( 

1139 requests_pb2.CreateHostRequestReq( 

1140 host_user_id=host.id, 

1141 from_date=today_plus_10, 

1142 to_date=today_plus_12, 

1143 text=valid_request_text("Please host me"), 

1144 ) 

1145 ).host_request_id 

1146 moderator.approve_host_request(host_request_id) 

1147 

1148 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1149 with account_session(surfer_token) as account: 

1150 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [] 

1151 

1152 with requests_session(host_token) as api: 

1153 api.RespondHostRequest( 

1154 requests_pb2.RespondHostRequestReq( 

1155 host_request_id=host_request_id, status=conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED 

1156 ) 

1157 ) 

1158 

1159 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1160 with account_session(surfer_token) as account: 

1161 reminders = account.GetReminders(empty_pb2.Empty()).reminders 

1162 assert [reminder.WhichOneof("reminder") for reminder in reminders] == ["confirm_host_request_reminder"] 

1163 assert reminders[0].confirm_host_request_reminder.host_request_id == host_request_id 

1164 assert reminders[0].confirm_host_request_reminder.host_user.user_id == host.id 

1165 

1166 # after surfer confirms, reminder should clear 

1167 with requests_session(surfer_token) as api: 

1168 api.RespondHostRequest( 

1169 requests_pb2.RespondHostRequestReq( 

1170 host_request_id=host_request_id, status=conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED 

1171 ) 

1172 ) 

1173 

1174 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1175 with account_session(surfer_token) as account: 

1176 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [] 

1177 

1178 

1179def test_my_home_reminder(db): 

1180 # can_host with incomplete my home (max_guests not set) → reminder shown 

1181 can_host_incomplete, token1 = generate_user(hosting_status=HostingStatus.can_host) 

1182 # maybe with incomplete my home → reminder shown 

1183 maybe_incomplete, token2 = generate_user(hosting_status=HostingStatus.maybe) 

1184 # cant_host → no reminder regardless of my home completion 

1185 cant_host, token3 = generate_user(hosting_status=HostingStatus.cant_host) 

1186 # can_host with fully completed my home → no reminder 

1187 can_host_complete, token4 = generate_user( 

1188 hosting_status=HostingStatus.can_host, 

1189 max_guests=2, 

1190 sleeping_arrangement=SleepingArrangement.private, 

1191 # about_place is set by default in make_user 

1192 ) 

1193 

1194 with account_session(token1) as account: 

1195 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1196 "complete_my_home_reminder", 

1197 ] 

1198 

1199 with account_session(token2) as account: 

1200 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [ 

1201 "complete_my_home_reminder", 

1202 ] 

1203 

1204 with account_session(token3) as account: 

1205 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [] 

1206 

1207 with account_session(token4) as account: 

1208 assert [reminder.WhichOneof("reminder") for reminder in account.GetReminders(empty_pb2.Empty()).reminders] == [] 

1209 

1210 

1211def test_volunteer_stuff(db): 

1212 # taken from couchers/app/backend/resources/badges.json 

1213 board_member_id = 8347 

1214 

1215 # with password 

1216 user, token = generate_user(name="Von Tester", username="tester", city="Amsterdam", id=board_member_id) 

1217 

1218 with account_session(token) as account: 

1219 res = account.GetAccountInfo(empty_pb2.Empty()) 

1220 assert not res.is_volunteer 

1221 

1222 with pytest.raises(grpc.RpcError) as e: 

1223 account.GetMyVolunteerInfo(empty_pb2.Empty()) 

1224 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1225 assert ( 

1226 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us." 

1227 ) 

1228 

1229 with pytest.raises(grpc.RpcError) as e: 

1230 account.UpdateMyVolunteerInfo(account_pb2.UpdateMyVolunteerInfoReq()) 

1231 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1232 assert ( 

1233 e.value.details() == "You are currently not registered as a volunteer, if this is wrong, please contact us." 

1234 ) 

1235 

1236 with session_scope() as session: 

1237 session.add( 

1238 make_volunteer( 

1239 user_id=user.id, 

1240 display_name="Great Volunteer", 

1241 display_location="The Bitbucket", 

1242 role="Lead Tester", 

1243 started_volunteering=date(2020, 6, 1), 

1244 ) 

1245 ) 

1246 

1247 with account_session(token) as account: 

1248 res = account.GetAccountInfo(empty_pb2.Empty()) 

1249 assert res.is_volunteer 

1250 

1251 res = account.GetMyVolunteerInfo(empty_pb2.Empty()) 

1252 

1253 assert res.display_name == "Great Volunteer" 

1254 assert res.display_location == "The Bitbucket" 

1255 assert res.role == "Lead Tester" 

1256 assert res.started_volunteering == "2020-06-01" 

1257 assert not res.stopped_volunteering 

1258 assert res.show_on_team_page 

1259 assert res.link_type == "couchers" 

1260 assert res.link_text == "@tester" 

1261 assert res.link_url == "http://localhost:3000/user/tester" 

1262 

1263 res = account.UpdateMyVolunteerInfo( 

1264 account_pb2.UpdateMyVolunteerInfoReq( 

1265 display_name=wrappers_pb2.StringValue(value=""), 

1266 link_type=wrappers_pb2.StringValue(value="website"), 

1267 link_text=wrappers_pb2.StringValue(value="testervontester.com.invalid"), 

1268 link_url=wrappers_pb2.StringValue(value="https://www.testervontester.com.invalid/"), 

1269 ) 

1270 ) 

1271 

1272 assert res.display_name == "" 

1273 assert res.display_location == "The Bitbucket" 

1274 assert res.role == "Lead Tester" 

1275 assert res.started_volunteering == "2020-06-01" 

1276 assert not res.stopped_volunteering 

1277 assert res.show_on_team_page 

1278 assert res.link_type == "website" 

1279 assert res.link_text == "testervontester.com.invalid" 

1280 assert res.link_url == "https://www.testervontester.com.invalid/" 

1281 res = account.UpdateMyVolunteerInfo( 

1282 account_pb2.UpdateMyVolunteerInfoReq( 

1283 display_name=wrappers_pb2.StringValue(value=""), 

1284 link_type=wrappers_pb2.StringValue(value="linkedin"), 

1285 link_text=wrappers_pb2.StringValue(value="tester-vontester"), 

1286 ) 

1287 ) 

1288 assert res.display_name == "" 

1289 assert res.display_location == "The Bitbucket" 

1290 assert res.role == "Lead Tester" 

1291 assert res.started_volunteering == "2020-06-01" 

1292 assert not res.stopped_volunteering 

1293 assert res.show_on_team_page 

1294 assert res.link_type == "linkedin" 

1295 assert res.link_text == "tester-vontester" 

1296 assert res.link_url == "https://www.linkedin.com/in/tester-vontester/" 

1297 

1298 res = account.UpdateMyVolunteerInfo( 

1299 account_pb2.UpdateMyVolunteerInfoReq( 

1300 display_name=wrappers_pb2.StringValue(value="Tester"), 

1301 display_location=wrappers_pb2.StringValue(value=""), 

1302 link_type=wrappers_pb2.StringValue(value="email"), 

1303 link_text=wrappers_pb2.StringValue(value="tester@vontester.com.invalid"), 

1304 ) 

1305 ) 

1306 assert res.display_name == "Tester" 

1307 assert res.display_location == "" 

1308 assert res.role == "Lead Tester" 

1309 assert res.started_volunteering == "2020-06-01" 

1310 assert not res.stopped_volunteering 

1311 assert res.show_on_team_page 

1312 assert res.link_type == "email" 

1313 assert res.link_text == "tester@vontester.com.invalid" 

1314 assert res.link_url == "mailto:tester@vontester.com.invalid" 

1315 

1316 refresh_materialized_views_rapid(empty_pb2.Empty()) 

1317 

1318 with public_session() as public: 

1319 res = public.GetVolunteers(empty_pb2.Empty()) 

1320 assert len(res.current_volunteers) == 1 

1321 v = res.current_volunteers[0] 

1322 assert v.name == "Tester" 

1323 assert v.username == "tester" 

1324 assert v.is_board_member 

1325 assert v.role == "Lead Tester" 

1326 assert v.location == "Amsterdam" 

1327 assert v.img.startswith("http://localhost:5001/img/thumbnail/") 

1328 assert v.link_type == "email" 

1329 assert v.link_text == "tester@vontester.com.invalid" 

1330 assert v.link_url == "mailto:tester@vontester.com.invalid"