Coverage for src/tests/test_auth.py: 100%

601 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-06 08:02 +0000

1import http.cookies 

2 

3import grpc 

4import pytest 

5from google.protobuf import empty_pb2, wrappers_pb2 

6from sqlalchemy import update 

7from sqlalchemy.sql import delete, func 

8 

9from couchers import urls 

10from couchers.crypto import hash_password, random_hex 

11from couchers.db import session_scope 

12from couchers.models import ( 

13 ContributeOption, 

14 ContributorForm, 

15 InviteCode, 

16 LoginToken, 

17 PasswordResetToken, 

18 SignupFlow, 

19 Upload, 

20 User, 

21 UserSession, 

22) 

23from couchers.proto import api_pb2, auth_pb2 

24from couchers.sql import couchers_select as select 

25from tests.test_fixtures import ( # noqa 

26 api_session, 

27 auth_api_session, 

28 db, 

29 email_fields, 

30 fast_passwords, 

31 generate_user, 

32 mock_notification_email, 

33 push_collector, 

34 real_api_session, 

35 testconfig, 

36) 

37 

38 

39@pytest.fixture(autouse=True) 

40def _(testconfig, fast_passwords): 

41 pass 

42 

43 

44def get_session_cookie_tokens(metadata_interceptor): 

45 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"] 

46 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value 

47 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value 

48 return sesh, uid 

49 

50 

51def test_UsernameValid(db): 

52 with auth_api_session() as (auth_api, metadata_interceptor): 

53 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid 

54 

55 with auth_api_session() as (auth_api, metadata_interceptor): 

56 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid 

57 

58 

59def test_signup_incremental(db): 

60 with auth_api_session() as (auth_api, metadata_interceptor): 

61 res = auth_api.SignupFlow( 

62 auth_pb2.SignupFlowReq( 

63 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

64 ) 

65 ) 

66 

67 flow_token = res.flow_token 

68 assert res.flow_token 

69 assert not res.HasField("auth_res") 

70 assert not res.need_basic 

71 assert res.need_account 

72 assert not res.need_feedback 

73 assert res.need_verify_email 

74 assert res.need_accept_community_guidelines 

75 

76 # read out the signup token directly from the database for now 

77 with session_scope() as session: 

78 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

79 assert flow.email_sent 

80 assert not flow.email_verified 

81 email_token = flow.email_token 

82 

83 with auth_api_session() as (auth_api, metadata_interceptor): 

84 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token)) 

85 

86 assert res.flow_token == flow_token 

87 assert not res.HasField("auth_res") 

88 assert not res.need_basic 

89 assert res.need_account 

90 assert not res.need_feedback 

91 assert res.need_verify_email 

92 assert res.need_accept_community_guidelines 

93 

94 # Add feedback 

95 with auth_api_session() as (auth_api, metadata_interceptor): 

96 res = auth_api.SignupFlow( 

97 auth_pb2.SignupFlowReq( 

98 flow_token=flow_token, 

99 feedback=auth_pb2.ContributorForm( 

100 ideas="I'm a robot, incapable of original ideation", 

101 features="I love all your features", 

102 experience="I haven't done couch surfing before", 

103 contribute=auth_pb2.CONTRIBUTE_OPTION_YES, 

104 contribute_ways=["serving", "backend"], 

105 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes", 

106 ), 

107 ) 

108 ) 

109 

110 assert res.flow_token == flow_token 

111 assert not res.HasField("auth_res") 

112 assert not res.need_basic 

113 assert res.need_account 

114 assert not res.need_feedback 

115 assert res.need_verify_email 

116 assert res.need_accept_community_guidelines 

117 

118 # Agree to community guidelines 

119 with auth_api_session() as (auth_api, metadata_interceptor): 

120 res = auth_api.SignupFlow( 

121 auth_pb2.SignupFlowReq( 

122 flow_token=flow_token, 

123 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

124 ) 

125 ) 

126 

127 assert res.flow_token == flow_token 

128 assert not res.HasField("auth_res") 

129 assert not res.need_basic 

130 assert res.need_account 

131 assert not res.need_feedback 

132 assert res.need_verify_email 

133 assert not res.need_accept_community_guidelines 

134 

135 # Verify email 

136 with auth_api_session() as (auth_api, metadata_interceptor): 

137 res = auth_api.SignupFlow( 

138 auth_pb2.SignupFlowReq( 

139 flow_token=flow_token, 

140 email_token=email_token, 

141 ) 

142 ) 

143 

144 assert res.flow_token == flow_token 

145 assert not res.HasField("auth_res") 

146 assert not res.need_basic 

147 assert res.need_account 

148 assert not res.need_feedback 

149 assert not res.need_verify_email 

150 assert not res.need_accept_community_guidelines 

151 

152 # Finally finish off account info 

153 with auth_api_session() as (auth_api, metadata_interceptor): 

154 res = auth_api.SignupFlow( 

155 auth_pb2.SignupFlowReq( 

156 flow_token=flow_token, 

157 account=auth_pb2.SignupAccount( 

158 username="frodo", 

159 password="a very insecure password", 

160 birthdate="1970-01-01", 

161 gender="Bot", 

162 hosting_status=api_pb2.HOSTING_STATUS_MAYBE, 

163 city="New York City", 

164 lat=40.7331, 

165 lng=-73.9778, 

166 radius=500, 

167 accept_tos=True, 

168 ), 

169 ) 

170 ) 

171 

172 assert not res.flow_token 

173 assert res.HasField("auth_res") 

174 assert res.auth_res.user_id 

175 assert not res.auth_res.jailed 

176 assert not res.need_basic 

177 assert not res.need_account 

178 assert not res.need_feedback 

179 assert not res.need_verify_email 

180 assert not res.need_accept_community_guidelines 

181 

182 user_id = res.auth_res.user_id 

183 

184 sess_token, uid = get_session_cookie_tokens(metadata_interceptor) 

185 assert uid == str(user_id) 

186 

187 with api_session(sess_token) as api: 

188 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id))) 

189 

190 assert res.username == "frodo" 

191 assert res.gender == "Bot" 

192 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE 

193 assert res.city == "New York City" 

194 assert res.lat == 40.7331 

195 assert res.lng == -73.9778 

196 assert res.radius == 500 

197 

198 with session_scope() as session: 

199 form = session.execute(select(ContributorForm)).scalar_one() 

200 

201 assert form.ideas == "I'm a robot, incapable of original ideation" 

202 assert form.features == "I love all your features" 

203 assert form.experience == "I haven't done couch surfing before" 

204 assert form.contribute == ContributeOption.yes 

205 assert form.contribute_ways == ["serving", "backend"] 

206 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes" 

207 

208 

209def _quick_signup() -> int: 

210 with auth_api_session() as (auth_api, metadata_interceptor): 

211 res = auth_api.SignupFlow( 

212 auth_pb2.SignupFlowReq( 

213 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

214 account=auth_pb2.SignupAccount( 

215 username="frodo", 

216 password="a very insecure password", 

217 birthdate="1970-01-01", 

218 gender="Bot", 

219 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

220 city="New York City", 

221 lat=40.7331, 

222 lng=-73.9778, 

223 radius=500, 

224 accept_tos=True, 

225 ), 

226 feedback=auth_pb2.ContributorForm(), 

227 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

228 ) 

229 ) 

230 

231 flow_token = res.flow_token 

232 

233 assert res.flow_token 

234 assert not res.HasField("auth_res") 

235 assert not res.need_basic 

236 assert not res.need_account 

237 assert not res.need_feedback 

238 assert res.need_verify_email 

239 

240 # read out the signup token directly from the database for now 

241 with session_scope() as session: 

242 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

243 assert flow.email_sent 

244 assert not flow.email_verified 

245 email_token = flow.email_token 

246 

247 with auth_api_session() as (auth_api, metadata_interceptor): 

248 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

249 

250 assert not res.flow_token 

251 assert res.HasField("auth_res") 

252 assert res.auth_res.user_id 

253 assert not res.auth_res.jailed 

254 assert not res.need_basic 

255 assert not res.need_account 

256 assert not res.need_feedback 

257 assert not res.need_verify_email 

258 

259 # make sure we got the right token in a cookie 

260 with session_scope() as session: 

261 token = session.execute( 

262 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

263 ).scalar_one() 

264 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

265 assert sesh == token 

266 

267 return res.auth_res.user_id 

268 

269 

270def test_signup(db): 

271 _quick_signup() 

272 

273 

274def test_basic_login(db): 

275 # Create our test user using signup 

276 _quick_signup() 

277 

278 with auth_api_session() as (auth_api, metadata_interceptor): 

279 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

280 

281 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

282 

283 with session_scope() as session: 

284 token = session.execute( 

285 select(UserSession.token) 

286 .join(User, UserSession.user_id == User.id) 

287 .where(User.username == "frodo") 

288 .where(UserSession.token == reply_token) 

289 .where(UserSession.is_valid) 

290 ).scalar_one_or_none() 

291 assert token 

292 

293 # log out 

294 with auth_api_session() as (auth_api, metadata_interceptor): 

295 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

296 

297 

298def test_login_part_signed_up_verified_email(db): 

299 """ 

300 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up. 

301 """ 

302 with auth_api_session() as (auth_api, metadata_interceptor): 

303 res = auth_api.SignupFlow( 

304 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid")) 

305 ) 

306 

307 flow_token = res.flow_token 

308 assert res.need_verify_email 

309 

310 # verify the email 

311 with session_scope() as session: 

312 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

313 flow_token = flow.flow_token 

314 email_token = flow.email_token 

315 with auth_api_session() as (auth_api, metadata_interceptor): 

316 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

317 

318 with mock_notification_email() as mock: 

319 with auth_api_session() as (auth_api, metadata_interceptor): 

320 with pytest.raises(grpc.RpcError) as e: 

321 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd")) 

322 assert e.value.details() == "Please check your email for a link to continue signing up." 

323 

324 assert mock.call_count == 1 

325 e = email_fields(mock) 

326 assert e.recipient == "email@couchers.org.invalid" 

327 assert flow_token in e.plain 

328 assert flow_token in e.html 

329 

330 

331def test_login_part_signed_up_not_verified_email(db): 

332 with auth_api_session() as (auth_api, metadata_interceptor): 

333 res = auth_api.SignupFlow( 

334 auth_pb2.SignupFlowReq( 

335 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

336 account=auth_pb2.SignupAccount( 

337 username="frodo", 

338 password="a very insecure password", 

339 birthdate="1999-01-01", 

340 gender="Bot", 

341 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

342 city="New York City", 

343 lat=40.7331, 

344 lng=-73.9778, 

345 radius=500, 

346 accept_tos=True, 

347 ), 

348 ) 

349 ) 

350 

351 flow_token = res.flow_token 

352 assert res.need_verify_email 

353 

354 with mock_notification_email() as mock: 

355 with auth_api_session() as (auth_api, metadata_interceptor): 

356 with pytest.raises(grpc.RpcError) as e: 

357 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd")) 

358 assert e.value.details() == "Please check your email for a link to continue signing up." 

359 

360 with session_scope() as session: 

361 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

362 email_token = flow.email_token 

363 

364 assert mock.call_count == 1 

365 e = email_fields(mock) 

366 assert e.recipient == "email@couchers.org.invalid" 

367 assert email_token in e.plain 

368 assert email_token in e.html 

369 

370 

371def test_banned_user(db): 

372 _quick_signup() 

373 

374 with session_scope() as session: 

375 session.execute(select(User)).scalar_one().is_banned = True 

376 

377 with auth_api_session() as (auth_api, metadata_interceptor): 

378 with pytest.raises(grpc.RpcError) as e: 

379 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

380 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

381 assert e.value.details() == "Your account is suspended." 

382 

383 

384def test_deleted_user(db): 

385 user_id = _quick_signup() 

386 

387 with session_scope() as session: 

388 session.execute(update(User).where(User.id == user_id).values(is_deleted=True)) 

389 

390 with auth_api_session() as (auth_api, metadata_interceptor): 

391 with pytest.raises(grpc.RpcError) as e: 

392 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

393 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

394 assert e.value.details() == "An account with that username or email was not found." 

395 

396 

397def test_invalid_token(db): 

398 user1, token1 = generate_user() 

399 user2, token2 = generate_user() 

400 

401 wrong_token = random_hex(32) 

402 

403 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e: 

404 res = api.GetUser(api_pb2.GetUserReq(user=user2.username)) 

405 

406 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

407 assert e.value.details() == "Unauthorized" 

408 

409 

410def test_password_reset_v2(db, push_collector): 

411 user, token = generate_user(hashed_password=hash_password("mypassword")) 

412 

413 with auth_api_session() as (auth_api, metadata_interceptor): 

414 with mock_notification_email() as mock: 

415 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username)) 

416 

417 with session_scope() as session: 

418 password_reset_token = session.execute(select(PasswordResetToken.token)).scalar_one() 

419 

420 assert mock.call_count == 1 

421 e = email_fields(mock) 

422 assert e.recipient == user.email 

423 assert "reset" in e.subject.lower() 

424 assert password_reset_token in e.plain 

425 assert password_reset_token in e.html 

426 unique_string = "You asked for your password to be reset on Couchers.org." 

427 assert unique_string in e.plain 

428 assert unique_string in e.html 

429 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain 

430 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html 

431 assert "support@couchers.org" in e.plain 

432 assert "support@couchers.org" in e.html 

433 

434 push_collector.assert_user_push_matches_fields( 

435 user.id, 

436 title="A password reset was initiated on your account", 

437 body="Someone initiated a password change on your account.", 

438 ) 

439 

440 # make sure bad password are caught 

441 with auth_api_session() as (auth_api, metadata_interceptor): 

442 with pytest.raises(grpc.RpcError) as e: 

443 auth_api.CompletePasswordResetV2( 

444 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password") 

445 ) 

446 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

447 assert e.value.details() == "The password is insecure. Please use one that is not easily guessable." 

448 

449 # make sure we can set a good password 

450 with auth_api_session() as (auth_api, metadata_interceptor): 

451 pwd = random_hex() 

452 with mock_notification_email() as mock: 

453 res = auth_api.CompletePasswordResetV2( 

454 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd) 

455 ) 

456 

457 push_collector.assert_user_push_matches_fields( 

458 user.id, 

459 ix=1, 

460 title="Your password was successfully reset", 

461 body="Your password on Couchers.org was changed. If that was you, then no further action is needed.", 

462 ) 

463 

464 session_token, _ = get_session_cookie_tokens(metadata_interceptor) 

465 

466 with session_scope() as session: 

467 other_session_token = session.execute( 

468 select(UserSession.token) 

469 .join(User, UserSession.user_id == User.id) 

470 .where(User.username == user.username) 

471 .where(UserSession.token == session_token) 

472 .where(UserSession.is_valid) 

473 ).scalar_one_or_none() 

474 assert other_session_token 

475 

476 # make sure we can't set a password again 

477 with auth_api_session() as (auth_api, metadata_interceptor): 

478 with pytest.raises(grpc.RpcError) as e: 

479 auth_api.CompletePasswordResetV2( 

480 auth_pb2.CompletePasswordResetV2Req( 

481 password_reset_token=password_reset_token, new_password=random_hex() 

482 ) 

483 ) 

484 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

485 assert e.value.details() == "Invalid token." 

486 

487 with session_scope() as session: 

488 user = session.execute(select(User)).scalar_one() 

489 assert user.hashed_password == hash_password(pwd) 

490 

491 

492def test_password_reset_no_such_user(db): 

493 user, token = generate_user() 

494 

495 with auth_api_session() as (auth_api, metadata_interceptor): 

496 res = auth_api.ResetPassword( 

497 auth_pb2.ResetPasswordReq( 

498 user="nonexistentuser", 

499 ) 

500 ) 

501 

502 with session_scope() as session: 

503 res = session.execute(select(PasswordResetToken)).scalar_one_or_none() 

504 

505 assert res is None 

506 

507 

508def test_password_reset_invalid_token_v2(db): 

509 password = random_hex() 

510 user, token = generate_user(hashed_password=hash_password(password)) 

511 

512 with auth_api_session() as (auth_api, metadata_interceptor): 

513 res = auth_api.ResetPassword( 

514 auth_pb2.ResetPasswordReq( 

515 user=user.username, 

516 ) 

517 ) 

518 

519 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e: 

520 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken")) 

521 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

522 assert e.value.details() == "Invalid token." 

523 

524 with session_scope() as session: 

525 user = session.execute(select(User)).scalar_one() 

526 assert user.hashed_password == hash_password(password) 

527 

528 

529def test_logout_invalid_token(db): 

530 # Create our test user using signup 

531 _quick_signup() 

532 

533 with auth_api_session() as (auth_api, metadata_interceptor): 

534 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

535 

536 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

537 

538 # delete all login tokens 

539 with session_scope() as session: 

540 session.execute(delete(LoginToken)) 

541 

542 # log out with non-existent token should still return a valid result 

543 with auth_api_session() as (auth_api, metadata_interceptor): 

544 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

545 

546 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

547 # make sure we set an empty cookie 

548 assert reply_token == "" 

549 

550 

551def test_signup_without_password(db): 

552 with auth_api_session() as (auth_api, metadata_interceptor): 

553 with pytest.raises(grpc.RpcError) as e: 

554 auth_api.SignupFlow( 

555 auth_pb2.SignupFlowReq( 

556 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

557 account=auth_pb2.SignupAccount( 

558 username="frodo", 

559 password="bad", 

560 city="Minas Tirith", 

561 birthdate="9999-12-31", # arbitrary future birthdate 

562 gender="Robot", 

563 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

564 lat=1, 

565 lng=1, 

566 radius=100, 

567 accept_tos=True, 

568 ), 

569 feedback=auth_pb2.ContributorForm(), 

570 ) 

571 ) 

572 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

573 assert e.value.details() == "The password must be 8 or more characters long." 

574 

575 

576def test_signup_invalid_birthdate(db): 

577 with auth_api_session() as (auth_api, metadata_interceptor): 

578 with pytest.raises(grpc.RpcError) as e: 

579 auth_api.SignupFlow( 

580 auth_pb2.SignupFlowReq( 

581 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

582 account=auth_pb2.SignupAccount( 

583 username="frodo", 

584 password="a very insecure password", 

585 city="Minas Tirith", 

586 birthdate="9999-12-31", # arbitrary future birthdate 

587 gender="Robot", 

588 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

589 lat=1, 

590 lng=1, 

591 radius=100, 

592 accept_tos=True, 

593 ), 

594 feedback=auth_pb2.ContributorForm(), 

595 ) 

596 ) 

597 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

598 assert e.value.details() == "You must be at least 18 years old to sign up." 

599 

600 res = auth_api.SignupFlow( 

601 auth_pb2.SignupFlowReq( 

602 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"), 

603 account=auth_pb2.SignupAccount( 

604 username="ceelo", 

605 password="a very insecure password", 

606 city="New York City", 

607 birthdate="2000-12-31", # arbitrary birthdate older than 18 years 

608 gender="Helicopter", 

609 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

610 lat=1, 

611 lng=1, 

612 radius=100, 

613 accept_tos=True, 

614 ), 

615 feedback=auth_pb2.ContributorForm(), 

616 ) 

617 ) 

618 

619 assert res.flow_token 

620 

621 with pytest.raises(grpc.RpcError) as e: 

622 auth_api.SignupFlow( 

623 auth_pb2.SignupFlowReq( 

624 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"), 

625 account=auth_pb2.SignupAccount( 

626 username="franklin", 

627 password="a very insecure password", 

628 city="Los Santos", 

629 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs 

630 gender="Male", 

631 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

632 lat=1, 

633 lng=1, 

634 radius=100, 

635 accept_tos=True, 

636 ), 

637 feedback=auth_pb2.ContributorForm(), 

638 ) 

639 ) 

640 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

641 assert e.value.details() == "You must be at least 18 years old to sign up." 

642 

643 with session_scope() as session: 

644 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1 

645 

646 

647def test_signup_invalid_email(db): 

648 with auth_api_session() as (auth_api, metadata_interceptor): 

649 with pytest.raises(grpc.RpcError) as e: 

650 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a"))) 

651 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

652 assert e.value.details() == "Invalid email." 

653 

654 with auth_api_session() as (auth_api, metadata_interceptor): 

655 with pytest.raises(grpc.RpcError) as e: 

656 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b"))) 

657 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

658 assert e.value.details() == "Invalid email." 

659 

660 with auth_api_session() as (auth_api, metadata_interceptor): 

661 with pytest.raises(grpc.RpcError) as e: 

662 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b."))) 

663 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

664 assert e.value.details() == "Invalid email." 

665 

666 with auth_api_session() as (auth_api, metadata_interceptor): 

667 with pytest.raises(grpc.RpcError) as e: 

668 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c"))) 

669 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

670 assert e.value.details() == "Invalid email." 

671 

672 

673def test_signup_existing_email(db): 

674 # Signed up user 

675 user, _ = generate_user() 

676 

677 with auth_api_session() as (auth_api, metadata_interceptor): 

678 with pytest.raises(grpc.RpcError) as e: 

679 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email))) 

680 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

681 assert e.value.details() == "That email address is already associated with an account. Please log in instead!" 

682 

683 

684def test_signup_banned_user_email(db): 

685 user, _ = generate_user() 

686 

687 with session_scope() as session: 

688 session.execute(update(User).where(User.id == user.id).values(is_banned=True)) 

689 

690 with auth_api_session() as (auth_api, _): 

691 with pytest.raises(grpc.RpcError) as e: 

692 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email))) 

693 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

694 assert e.value.details() == "You cannot sign up with that email address." 

695 

696 

697def test_signup_deleted_user_email(db): 

698 user, _ = generate_user() 

699 

700 with session_scope() as session: 

701 session.execute(update(User).where(User.id == user.id).values(is_deleted=True)) 

702 

703 with auth_api_session() as (auth_api, _): 

704 with pytest.raises(grpc.RpcError) as e: 

705 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email))) 

706 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

707 assert e.value.details() == "You cannot sign up with that email address." 

708 

709 

710def test_signup_continue_with_email(db): 

711 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

712 with auth_api_session() as (auth_api, metadata_interceptor): 

713 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

714 flow_token = res.flow_token 

715 assert flow_token 

716 

717 # continue with same email, should just send another email to the user 

718 with auth_api_session() as (auth_api, metadata_interceptor): 

719 with pytest.raises(grpc.RpcError) as e: 

720 res = auth_api.SignupFlow( 

721 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

722 ) 

723 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

724 assert e.value.details() == "Please check your email for a link to continue signing up." 

725 

726 

727def test_signup_resend_email(db): 

728 with auth_api_session() as (auth_api, metadata_interceptor): 

729 with mock_notification_email() as mock: 

730 res = auth_api.SignupFlow( 

731 auth_pb2.SignupFlowReq( 

732 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

733 account=auth_pb2.SignupAccount( 

734 username="frodo", 

735 password="a very insecure password", 

736 birthdate="1970-01-01", 

737 gender="Bot", 

738 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

739 city="New York City", 

740 lat=40.7331, 

741 lng=-73.9778, 

742 radius=500, 

743 accept_tos=True, 

744 ), 

745 feedback=auth_pb2.ContributorForm(), 

746 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

747 ) 

748 ) 

749 assert mock.call_count == 1 

750 e = email_fields(mock) 

751 assert e.recipient == "email@couchers.org.invalid" 

752 

753 flow_token = res.flow_token 

754 assert flow_token 

755 

756 with session_scope() as session: 

757 flow = session.execute(select(SignupFlow)).scalar_one() 

758 assert flow.flow_token == flow_token 

759 assert flow.email_sent 

760 assert not flow.email_verified 

761 email_token = flow.email_token 

762 

763 # ask for a new signup email 

764 with auth_api_session() as (auth_api, metadata_interceptor): 

765 with mock_notification_email() as mock: 

766 res = auth_api.SignupFlow( 

767 auth_pb2.SignupFlowReq( 

768 flow_token=flow_token, 

769 resend_verification_email=True, 

770 ) 

771 ) 

772 assert mock.call_count == 1 

773 e = email_fields(mock) 

774 assert e.recipient == "email@couchers.org.invalid" 

775 assert email_token in e.plain 

776 assert email_token in e.html 

777 

778 with session_scope() as session: 

779 flow = session.execute(select(SignupFlow)).scalar_one() 

780 assert not flow.email_verified 

781 

782 with auth_api_session() as (auth_api, metadata_interceptor): 

783 res = auth_api.SignupFlow( 

784 auth_pb2.SignupFlowReq( 

785 email_token=email_token, 

786 ) 

787 ) 

788 

789 assert not res.flow_token 

790 assert res.HasField("auth_res") 

791 

792 

793def test_successful_authenticate(db): 

794 user, _ = generate_user(hashed_password=hash_password("password")) 

795 

796 # Authenticate with username 

797 with auth_api_session() as (auth_api, metadata_interceptor): 

798 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password")) 

799 assert not reply.jailed 

800 

801 # Authenticate with email 

802 with auth_api_session() as (auth_api, metadata_interceptor): 

803 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password")) 

804 assert not reply.jailed 

805 

806 

807def test_unsuccessful_authenticate(db): 

808 user, _ = generate_user(hashed_password=hash_password("password")) 

809 

810 # Invalid password 

811 with auth_api_session() as (auth_api, metadata_interceptor): 

812 with pytest.raises(grpc.RpcError) as e: 

813 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword")) 

814 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

815 assert e.value.details() == "Wrong password." 

816 

817 # Invalid username 

818 with auth_api_session() as (auth_api, metadata_interceptor): 

819 with pytest.raises(grpc.RpcError) as e: 

820 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password")) 

821 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

822 assert e.value.details() == "An account with that username or email was not found." 

823 

824 # Invalid email 

825 with auth_api_session() as (auth_api, metadata_interceptor): 

826 with pytest.raises(grpc.RpcError) as e: 

827 reply = auth_api.Authenticate( 

828 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password") 

829 ) 

830 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

831 assert e.value.details() == "An account with that username or email was not found." 

832 

833 # Invalid id 

834 with auth_api_session() as (auth_api, metadata_interceptor): 

835 with pytest.raises(grpc.RpcError) as e: 

836 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password")) 

837 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

838 assert e.value.details() == "An account with that username or email was not found." 

839 

840 

841def test_complete_signup(db): 

842 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

843 with auth_api_session() as (auth_api, metadata_interceptor): 

844 reply = auth_api.SignupFlow( 

845 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

846 ) 

847 

848 flow_token = reply.flow_token 

849 

850 with auth_api_session() as (auth_api, metadata_interceptor): 

851 # Invalid username 

852 with pytest.raises(grpc.RpcError) as e: 

853 auth_api.SignupFlow( 

854 auth_pb2.SignupFlowReq( 

855 flow_token=flow_token, 

856 account=auth_pb2.SignupAccount( 

857 username=" ", 

858 password="a very insecure password", 

859 city="Minas Tirith", 

860 birthdate="1980-12-31", 

861 gender="Robot", 

862 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

863 lat=1, 

864 lng=1, 

865 radius=100, 

866 accept_tos=True, 

867 ), 

868 ) 

869 ) 

870 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

871 assert e.value.details() == "Invalid username." 

872 

873 with auth_api_session() as (auth_api, metadata_interceptor): 

874 # Invalid name 

875 with pytest.raises(grpc.RpcError) as e: 

876 auth_api.SignupFlow( 

877 auth_pb2.SignupFlowReq( 

878 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid") 

879 ) 

880 ) 

881 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

882 assert e.value.details() == "Name not supported." 

883 

884 with auth_api_session() as (auth_api, metadata_interceptor): 

885 # Hosting status required 

886 with pytest.raises(grpc.RpcError) as e: 

887 auth_api.SignupFlow( 

888 auth_pb2.SignupFlowReq( 

889 flow_token=flow_token, 

890 account=auth_pb2.SignupAccount( 

891 username="frodo", 

892 password="a very insecure password", 

893 city="Minas Tirith", 

894 birthdate="1980-12-31", 

895 gender="Robot", 

896 hosting_status=None, 

897 lat=1, 

898 lng=1, 

899 radius=100, 

900 accept_tos=True, 

901 ), 

902 ) 

903 ) 

904 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

905 assert e.value.details() == "Hosting status is required." 

906 

907 user, _ = generate_user() 

908 with auth_api_session() as (auth_api, metadata_interceptor): 

909 # Username unavailable 

910 with pytest.raises(grpc.RpcError) as e: 

911 auth_api.SignupFlow( 

912 auth_pb2.SignupFlowReq( 

913 flow_token=flow_token, 

914 account=auth_pb2.SignupAccount( 

915 username=user.username, 

916 password="a very insecure password", 

917 city="Minas Tirith", 

918 birthdate="1980-12-31", 

919 gender="Robot", 

920 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

921 lat=1, 

922 lng=1, 

923 radius=100, 

924 accept_tos=True, 

925 ), 

926 ) 

927 ) 

928 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

929 assert e.value.details() == "Sorry, that username isn't available." 

930 

931 with auth_api_session() as (auth_api, metadata_interceptor): 

932 # Invalid coordinate 

933 with pytest.raises(grpc.RpcError) as e: 

934 auth_api.SignupFlow( 

935 auth_pb2.SignupFlowReq( 

936 flow_token=flow_token, 

937 account=auth_pb2.SignupAccount( 

938 username="frodo", 

939 password="a very insecure password", 

940 city="Minas Tirith", 

941 birthdate="1980-12-31", 

942 gender="Robot", 

943 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

944 lat=0, 

945 lng=0, 

946 radius=100, 

947 accept_tos=True, 

948 ), 

949 ) 

950 ) 

951 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

952 assert e.value.details() == "Invalid coordinate." 

953 

954 

955def test_signup_token_regression(db): 

956 # Repro steps: 

957 # 1. Start a signup 

958 # 2. Confirm the email 

959 # 3. Start a new signup with the same email 

960 # Expected: send a link to the email to continue signing up. 

961 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'` 

962 

963 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

964 

965 # 1. Start a signup 

966 with auth_api_session() as (auth_api, metadata_interceptor): 

967 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

968 flow_token = res.flow_token 

969 assert flow_token 

970 

971 # 2. Confirm the email 

972 with session_scope() as session: 

973 email_token = session.execute( 

974 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token) 

975 ).scalar_one() 

976 

977 with auth_api_session() as (auth_api, metadata_interceptor): 

978 auth_api.SignupFlow( 

979 auth_pb2.SignupFlowReq( 

980 flow_token=flow_token, 

981 email_token=email_token, 

982 ) 

983 ) 

984 

985 # 3. Start a new signup with the same email 

986 with auth_api_session() as (auth_api, metadata_interceptor): 

987 with pytest.raises(grpc.RpcError) as e: 

988 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

989 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

990 assert e.value.details() == "Please check your email for a link to continue signing up." 

991 

992 

993@pytest.mark.parametrize("opt_out", [True, False]) 

994def test_opt_out_of_newsletter(db, opt_out): 

995 with auth_api_session() as (auth_api, metadata_interceptor): 

996 res = auth_api.SignupFlow( 

997 auth_pb2.SignupFlowReq( 

998 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

999 account=auth_pb2.SignupAccount( 

1000 username="frodo", 

1001 password="a very insecure password", 

1002 birthdate="1970-01-01", 

1003 gender="Bot", 

1004 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1005 city="New York City", 

1006 lat=40.7331, 

1007 lng=-73.9778, 

1008 radius=500, 

1009 accept_tos=True, 

1010 opt_out_of_newsletter=opt_out, 

1011 ), 

1012 feedback=auth_pb2.ContributorForm(), 

1013 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1014 ) 

1015 ) 

1016 

1017 with session_scope() as session: 

1018 email_token = session.execute( 

1019 select(SignupFlow.email_token).where(SignupFlow.flow_token == res.flow_token) 

1020 ).scalar_one() 

1021 

1022 with auth_api_session() as (auth_api, metadata_interceptor): 

1023 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1024 

1025 user_id = res.auth_res.user_id 

1026 

1027 with session_scope() as session: 

1028 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1029 assert not user.in_sync_with_newsletter 

1030 assert user.opt_out_of_newsletter == opt_out 

1031 

1032 

1033def test_GetAuthState(db): 

1034 user, token = generate_user() 

1035 jailed_user, jailed_token = generate_user(accepted_tos=0) 

1036 

1037 with auth_api_session() as (auth_api, metadata_interceptor): 

1038 res = auth_api.GetAuthState(empty_pb2.Empty()) 

1039 assert not res.logged_in 

1040 assert not res.HasField("auth_res") 

1041 

1042 with auth_api_session() as (auth_api, metadata_interceptor): 

1043 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1044 assert res.logged_in 

1045 assert res.HasField("auth_res") 

1046 assert res.auth_res.user_id == user.id 

1047 assert not res.auth_res.jailed 

1048 

1049 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1050 

1051 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1052 assert not res.logged_in 

1053 assert not res.HasField("auth_res") 

1054 

1055 with auth_api_session() as (auth_api, metadata_interceptor): 

1056 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),)) 

1057 assert res.logged_in 

1058 assert res.HasField("auth_res") 

1059 assert res.auth_res.user_id == jailed_user.id 

1060 assert res.auth_res.jailed 

1061 

1062 

1063def test_signup_no_feedback_regression(db): 

1064 """ 

1065 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup, 

1066 this regression test checks that. 

1067 """ 

1068 with auth_api_session() as (auth_api, metadata_interceptor): 

1069 res = auth_api.SignupFlow( 

1070 auth_pb2.SignupFlowReq( 

1071 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1072 account=auth_pb2.SignupAccount( 

1073 username="frodo", 

1074 password="a very insecure password", 

1075 birthdate="1970-01-01", 

1076 gender="Bot", 

1077 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1078 city="New York City", 

1079 lat=40.7331, 

1080 lng=-73.9778, 

1081 radius=500, 

1082 accept_tos=True, 

1083 ), 

1084 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1085 ) 

1086 ) 

1087 

1088 flow_token = res.flow_token 

1089 

1090 assert res.flow_token 

1091 assert not res.HasField("auth_res") 

1092 assert not res.need_basic 

1093 assert not res.need_account 

1094 assert not res.need_feedback 

1095 assert res.need_verify_email 

1096 

1097 # read out the signup token directly from the database for now 

1098 with session_scope() as session: 

1099 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1100 assert flow.email_sent 

1101 assert not flow.email_verified 

1102 email_token = flow.email_token 

1103 

1104 with auth_api_session() as (auth_api, metadata_interceptor): 

1105 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1106 

1107 assert not res.flow_token 

1108 assert res.HasField("auth_res") 

1109 assert res.auth_res.user_id 

1110 assert not res.auth_res.jailed 

1111 assert not res.need_basic 

1112 assert not res.need_account 

1113 assert not res.need_feedback 

1114 assert not res.need_verify_email 

1115 

1116 # make sure we got the right token in a cookie 

1117 with session_scope() as session: 

1118 token = session.execute( 

1119 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

1120 ).scalar_one() 

1121 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

1122 assert sesh == token 

1123 

1124 

1125def test_banned_username(db): 

1126 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

1127 with auth_api_session() as (auth_api, metadata_interceptor): 

1128 reply = auth_api.SignupFlow( 

1129 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

1130 ) 

1131 

1132 flow_token = reply.flow_token 

1133 

1134 with auth_api_session() as (auth_api, metadata_interceptor): 

1135 # Banned username 

1136 with pytest.raises(grpc.RpcError) as e: 

1137 auth_api.SignupFlow( 

1138 auth_pb2.SignupFlowReq( 

1139 flow_token=flow_token, 

1140 account=auth_pb2.SignupAccount( 

1141 username="thecouchersadminaccount", 

1142 password="a very insecure password", 

1143 city="Minas Tirith", 

1144 birthdate="1980-12-31", 

1145 gender="Robot", 

1146 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1147 lat=1, 

1148 lng=1, 

1149 radius=100, 

1150 accept_tos=True, 

1151 ), 

1152 ) 

1153 ) 

1154 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1155 assert e.value.details() == "Sorry, that username isn't available." 

1156 

1157 

1158# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_* 

1159 

1160 

1161def test_GetInviteCodeInfo(db): 

1162 user, token = generate_user() 

1163 code_id = "TST12345" 

1164 

1165 with session_scope() as session: 

1166 avatar = Upload( 

1167 key="test_avatar.jpg", 

1168 filename="test_avatar.jpg", 

1169 creator_user_id=user.id, 

1170 ) 

1171 session.add(avatar) 

1172 session.flush() 

1173 

1174 session.execute(update(User).where(User.id == user.id).values(avatar_key=avatar.key)) 

1175 

1176 code = InviteCode(id=code_id, creator_user_id=user.id) 

1177 session.add(code) 

1178 

1179 with auth_api_session() as (auth, _): 

1180 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code_id)) 

1181 assert res.name == user.name 

1182 assert res.username == user.username 

1183 assert res.avatar_url.endswith("/img/thumbnail/test_avatar.jpg") 

1184 assert res.url == urls.invite_code_link(code=code_id) 

1185 

1186 

1187def test_GetInviteCodeInfo_no_avatar(db): 

1188 user, token = generate_user() 

1189 code_id = "NOAVTR1" 

1190 

1191 with session_scope() as session: 

1192 session.execute(update(User).where(User.id == user.id).values(avatar_key=None)) 

1193 

1194 code = InviteCode(id="NOAVTR1", creator_user_id=user.id) 

1195 session.add(code) 

1196 

1197 with auth_api_session() as (auth, _): 

1198 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code_id)) 

1199 assert res.name == user.name 

1200 assert res.username == user.username 

1201 assert res.avatar_url == "" 

1202 assert res.url == urls.invite_code_link(code=code_id) 

1203 

1204 

1205def test_GetInviteCodeInfo_not_found(db): 

1206 generate_user() 

1207 

1208 with auth_api_session() as (auth, _): 

1209 with pytest.raises(grpc.RpcError) as e: 

1210 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE")) 

1211 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1212 assert e.value.details() == "Invite code not found." 

1213 

1214 

1215def test_SignupFlow_invite_code(db): 

1216 user, token = generate_user() 

1217 invite_code = "INV12345" 

1218 with session_scope() as session: 

1219 session.flush() 

1220 invite = InviteCode(id=invite_code, creator_user_id=user.id) 

1221 session.add(invite) 

1222 

1223 with auth_api_session() as (auth_api, _): 

1224 # Signup basic step with invite code 

1225 res = auth_api.SignupFlow( 

1226 auth_pb2.SignupFlowReq( 

1227 basic=auth_pb2.SignupBasic( 

1228 name="Test User", 

1229 email="inviteuser@example.com", 

1230 invite_code=invite_code, 

1231 ) 

1232 ) 

1233 ) 

1234 flow_token = res.flow_token 

1235 assert flow_token 

1236 

1237 # Confirm email 

1238 with session_scope() as session: 

1239 email_token = session.execute( 

1240 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token) 

1241 ).scalar_one() 

1242 

1243 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1244 

1245 # Signup account step 

1246 auth_api.SignupFlow( 

1247 auth_pb2.SignupFlowReq( 

1248 flow_token=flow_token, 

1249 account=auth_pb2.SignupAccount( 

1250 username="invited_user", 

1251 password="secure password", 

1252 birthdate="1990-01-01", 

1253 gender="Other", 

1254 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1255 city="Example City", 

1256 lat=1, 

1257 lng=5, 

1258 radius=100, 

1259 accept_tos=True, 

1260 ), 

1261 feedback=auth_pb2.ContributorForm(), 

1262 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1263 ) 

1264 ) 

1265 

1266 # Check that invite_code_id is stored in the final User object 

1267 with session_scope() as session: 

1268 user = session.execute(select(User).where(User.username == "invited_user")).scalar_one() 

1269 assert user.invite_code_id == invite_code